reiserfs: constify xattr_handler
[safe/jmp/linux-2.6] / fs / btrfs / async-thread.c
index 4e780b2..462859a 100644 (file)
  * Boston, MA 021110-1307, USA.
  */
 
-#include <linux/version.h>
 #include <linux/kthread.h>
+#include <linux/slab.h>
 #include <linux/list.h>
 #include <linux/spinlock.h>
-# include <linux/freezer.h>
+#include <linux/freezer.h>
 #include "async-thread.h"
 
+#define WORK_QUEUED_BIT 0
+#define WORK_DONE_BIT 1
+#define WORK_ORDER_DONE_BIT 2
+#define WORK_HIGH_PRIO_BIT 3
+
 /*
  * container for the kthread task pointer and the list of pending work
  * One of these is allocated per thread.
@@ -33,6 +38,7 @@ struct btrfs_worker_thread {
 
        /* list of struct btrfs_work that are waiting for service */
        struct list_head pending;
+       struct list_head prio_pending;
 
        /* list of worker threads from struct btrfs_workers */
        struct list_head worker_list;
@@ -43,6 +49,9 @@ struct btrfs_worker_thread {
        /* number of things on the pending list */
        atomic_t num_pending;
 
+       /* reference counter for this struct */
+       atomic_t refs;
+
        unsigned long sequence;
 
        /* protects the pending list. */
@@ -56,6 +65,51 @@ struct btrfs_worker_thread {
 };
 
 /*
+ * btrfs_start_workers uses kthread_run, which can block waiting for memory
+ * for a very long time.  It will actually throttle on page writeback,
+ * and so it may not make progress until after our btrfs worker threads
+ * process all of the pending work structs in their queue
+ *
+ * This means we can't use btrfs_start_workers from inside a btrfs worker
+ * thread that is used as part of cleaning dirty memory, which pretty much
+ * involves all of the worker threads.
+ *
+ * Instead we have a helper queue who never has more than one thread
+ * where we scheduler thread start operations.  This worker_start struct
+ * is used to contain the work and hold a pointer to the queue that needs
+ * another worker.
+ */
+struct worker_start {
+       struct btrfs_work work;
+       struct btrfs_workers *queue;
+};
+
+static void start_new_worker_func(struct btrfs_work *work)
+{
+       struct worker_start *start;
+       start = container_of(work, struct worker_start, work);
+       btrfs_start_workers(start->queue, 1);
+       kfree(start);
+}
+
+static int start_new_worker(struct btrfs_workers *queue)
+{
+       struct worker_start *start;
+       int ret;
+
+       start = kzalloc(sizeof(*start), GFP_NOFS);
+       if (!start)
+               return -ENOMEM;
+
+       start->work.func = start_new_worker_func;
+       start->queue = queue;
+       ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
+       if (ret)
+               kfree(start);
+       return ret;
+}
+
+/*
  * helper function to move a thread onto the idle list after it
  * has finished some requests.
  */
@@ -66,7 +120,12 @@ static void check_idle_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 1;
-               list_move(&worker->worker_list, &worker->workers->idle_list);
+
+               /* the list may be empty if the worker is just starting */
+               if (!list_empty(&worker->worker_list)) {
+                       list_move(&worker->worker_list,
+                                &worker->workers->idle_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
@@ -82,44 +141,260 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 0;
-               list_move_tail(&worker->worker_list,
-                              &worker->workers->worker_list);
+
+               if (!list_empty(&worker->worker_list)) {
+                       list_move_tail(&worker->worker_list,
+                                     &worker->workers->worker_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
 
+static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
+{
+       struct btrfs_workers *workers = worker->workers;
+       unsigned long flags;
+
+       rmb();
+       if (!workers->atomic_start_pending)
+               return;
+
+       spin_lock_irqsave(&workers->lock, flags);
+       if (!workers->atomic_start_pending)
+               goto out;
+
+       workers->atomic_start_pending = 0;
+       if (workers->num_workers + workers->num_workers_starting >=
+           workers->max_workers)
+               goto out;
+
+       workers->num_workers_starting += 1;
+       spin_unlock_irqrestore(&workers->lock, flags);
+       start_new_worker(workers);
+       return;
+
+out:
+       spin_unlock_irqrestore(&workers->lock, flags);
+}
+
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+                                           struct btrfs_work *work)
+{
+       if (!workers->ordered)
+               return 0;
+
+       set_bit(WORK_DONE_BIT, &work->flags);
+
+       spin_lock(&workers->order_lock);
+
+       while (1) {
+               if (!list_empty(&workers->prio_order_list)) {
+                       work = list_entry(workers->prio_order_list.next,
+                                         struct btrfs_work, order_list);
+               } else if (!list_empty(&workers->order_list)) {
+                       work = list_entry(workers->order_list.next,
+                                         struct btrfs_work, order_list);
+               } else {
+                       break;
+               }
+               if (!test_bit(WORK_DONE_BIT, &work->flags))
+                       break;
+
+               /* we are going to call the ordered done function, but
+                * we leave the work item on the list as a barrier so
+                * that later work items that are done don't have their
+                * functions called before this one returns
+                */
+               if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
+                       break;
+
+               spin_unlock(&workers->order_lock);
+
+               work->ordered_func(work);
+
+               /* now take the lock again and call the freeing code */
+               spin_lock(&workers->order_lock);
+               list_del(&work->order_list);
+               work->ordered_free(work);
+       }
+
+       spin_unlock(&workers->order_lock);
+       return 0;
+}
+
+static void put_worker(struct btrfs_worker_thread *worker)
+{
+       if (atomic_dec_and_test(&worker->refs))
+               kfree(worker);
+}
+
+static int try_worker_shutdown(struct btrfs_worker_thread *worker)
+{
+       int freeit = 0;
+
+       spin_lock_irq(&worker->lock);
+       spin_lock(&worker->workers->lock);
+       if (worker->workers->num_workers > 1 &&
+           worker->idle &&
+           !worker->working &&
+           !list_empty(&worker->worker_list) &&
+           list_empty(&worker->prio_pending) &&
+           list_empty(&worker->pending) &&
+           atomic_read(&worker->num_pending) == 0) {
+               freeit = 1;
+               list_del_init(&worker->worker_list);
+               worker->workers->num_workers--;
+       }
+       spin_unlock(&worker->workers->lock);
+       spin_unlock_irq(&worker->lock);
+
+       if (freeit)
+               put_worker(worker);
+       return freeit;
+}
+
+static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
+                                       struct list_head *prio_head,
+                                       struct list_head *head)
+{
+       struct btrfs_work *work = NULL;
+       struct list_head *cur = NULL;
+
+       if(!list_empty(prio_head))
+               cur = prio_head->next;
+
+       smp_mb();
+       if (!list_empty(&worker->prio_pending))
+               goto refill;
+
+       if (!list_empty(head))
+               cur = head->next;
+
+       if (cur)
+               goto out;
+
+refill:
+       spin_lock_irq(&worker->lock);
+       list_splice_tail_init(&worker->prio_pending, prio_head);
+       list_splice_tail_init(&worker->pending, head);
+
+       if (!list_empty(prio_head))
+               cur = prio_head->next;
+       else if (!list_empty(head))
+               cur = head->next;
+       spin_unlock_irq(&worker->lock);
+
+       if (!cur)
+               goto out_fail;
+
+out:
+       work = list_entry(cur, struct btrfs_work, list);
+
+out_fail:
+       return work;
+}
+
 /*
  * main loop for servicing work items
  */
 static int worker_loop(void *arg)
 {
        struct btrfs_worker_thread *worker = arg;
-       struct list_head *cur;
+       struct list_head head;
+       struct list_head prio_head;
        struct btrfs_work *work;
+
+       INIT_LIST_HEAD(&head);
+       INIT_LIST_HEAD(&prio_head);
+
        do {
-               spin_lock_irq(&worker->lock);
-               while(!list_empty(&worker->pending)) {
-                       cur = worker->pending.next;
-                       work = list_entry(cur, struct btrfs_work, list);
+again:
+               while (1) {
+
+
+                       work = get_next_work(worker, &prio_head, &head);
+                       if (!work)
+                               break;
+
                        list_del(&work->list);
-                       clear_bit(0, &work->flags);
+                       clear_bit(WORK_QUEUED_BIT, &work->flags);
 
                        work->worker = worker;
-                       spin_unlock_irq(&worker->lock);
 
                        work->func(work);
 
                        atomic_dec(&worker->num_pending);
-                       spin_lock_irq(&worker->lock);
-                       check_idle_worker(worker);
+                       /*
+                        * unless this is an ordered work queue,
+                        * 'work' was probably freed by func above.
+                        */
+                       run_ordered_completions(worker->workers, work);
+
+                       check_pending_worker_creates(worker);
+
                }
-               worker->working = 0;
+
+               spin_lock_irq(&worker->lock);
+               check_idle_worker(worker);
+
                if (freezing(current)) {
+                       worker->working = 0;
+                       spin_unlock_irq(&worker->lock);
                        refrigerator();
                } else {
-                       set_current_state(TASK_INTERRUPTIBLE);
                        spin_unlock_irq(&worker->lock);
-                       schedule();
+                       if (!kthread_should_stop()) {
+                               cpu_relax();
+                               /*
+                                * we've dropped the lock, did someone else
+                                * jump_in?
+                                */
+                               smp_mb();
+                               if (!list_empty(&worker->pending) ||
+                                   !list_empty(&worker->prio_pending))
+                                       continue;
+
+                               /*
+                                * this short schedule allows more work to
+                                * come in without the queue functions
+                                * needing to go through wake_up_process()
+                                *
+                                * worker->working is still 1, so nobody
+                                * is going to try and wake us up
+                                */
+                               schedule_timeout(1);
+                               smp_mb();
+                               if (!list_empty(&worker->pending) ||
+                                   !list_empty(&worker->prio_pending))
+                                       continue;
+
+                               if (kthread_should_stop())
+                                       break;
+
+                               /* still no more work?, sleep for real */
+                               spin_lock_irq(&worker->lock);
+                               set_current_state(TASK_INTERRUPTIBLE);
+                               if (!list_empty(&worker->pending) ||
+                                   !list_empty(&worker->prio_pending)) {
+                                       spin_unlock_irq(&worker->lock);
+                                       goto again;
+                               }
+
+                               /*
+                                * this makes sure we get a wakeup when someone
+                                * adds something new to the queue
+                                */
+                               worker->working = 0;
+                               spin_unlock_irq(&worker->lock);
+
+                               if (!kthread_should_stop()) {
+                                       schedule_timeout(HZ * 120);
+                                       if (!worker->working &&
+                                           try_worker_shutdown(worker)) {
+                                               return 0;
+                                       }
+                               }
+                       }
                        __set_current_state(TASK_RUNNING);
                }
        } while (!kthread_should_stop());
@@ -133,38 +408,61 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
 {
        struct list_head *cur;
        struct btrfs_worker_thread *worker;
+       int can_stop;
 
+       spin_lock_irq(&workers->lock);
        list_splice_init(&workers->idle_list, &workers->worker_list);
-       while(!list_empty(&workers->worker_list)) {
+       while (!list_empty(&workers->worker_list)) {
                cur = workers->worker_list.next;
                worker = list_entry(cur, struct btrfs_worker_thread,
                                    worker_list);
-               kthread_stop(worker->task);
-               list_del(&worker->worker_list);
-               kfree(worker);
+
+               atomic_inc(&worker->refs);
+               workers->num_workers -= 1;
+               if (!list_empty(&worker->worker_list)) {
+                       list_del_init(&worker->worker_list);
+                       put_worker(worker);
+                       can_stop = 1;
+               } else
+                       can_stop = 0;
+               spin_unlock_irq(&workers->lock);
+               if (can_stop)
+                       kthread_stop(worker->task);
+               spin_lock_irq(&workers->lock);
+               put_worker(worker);
        }
+       spin_unlock_irq(&workers->lock);
        return 0;
 }
 
 /*
  * simple init on struct btrfs_workers
  */
-void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
+void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
+                       struct btrfs_workers *async_helper)
 {
        workers->num_workers = 0;
+       workers->num_workers_starting = 0;
        INIT_LIST_HEAD(&workers->worker_list);
        INIT_LIST_HEAD(&workers->idle_list);
+       INIT_LIST_HEAD(&workers->order_list);
+       INIT_LIST_HEAD(&workers->prio_order_list);
        spin_lock_init(&workers->lock);
+       spin_lock_init(&workers->order_lock);
        workers->max_workers = max;
        workers->idle_thresh = 32;
        workers->name = name;
+       workers->ordered = 0;
+       workers->atomic_start_pending = 0;
+       workers->atomic_worker_start = async_helper;
 }
 
 /*
  * starts new worker threads.  This does not enforce the max worker
  * count in case you need to temporarily go past it.
  */
-int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+static int __btrfs_start_workers(struct btrfs_workers *workers,
+                                int num_workers)
 {
        struct btrfs_worker_thread *worker;
        int ret = 0;
@@ -178,23 +476,27 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                }
 
                INIT_LIST_HEAD(&worker->pending);
+               INIT_LIST_HEAD(&worker->prio_pending);
                INIT_LIST_HEAD(&worker->worker_list);
                spin_lock_init(&worker->lock);
+
                atomic_set(&worker->num_pending, 0);
+               atomic_set(&worker->refs, 1);
+               worker->workers = workers;
                worker->task = kthread_run(worker_loop, worker,
                                           "btrfs-%s-%d", workers->name,
                                           workers->num_workers + i);
-               worker->workers = workers;
                if (IS_ERR(worker->task)) {
-                       kfree(worker);
                        ret = PTR_ERR(worker->task);
+                       kfree(worker);
                        goto fail;
                }
-
                spin_lock_irq(&workers->lock);
                list_add_tail(&worker->worker_list, &workers->idle_list);
                worker->idle = 1;
                workers->num_workers++;
+               workers->num_workers_starting--;
+               WARN_ON(workers->num_workers_starting < 0);
                spin_unlock_irq(&workers->lock);
        }
        return 0;
@@ -203,6 +505,14 @@ fail:
        return ret;
 }
 
+int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+{
+       spin_lock_irq(&workers->lock);
+       workers->num_workers_starting += num_workers;
+       spin_unlock_irq(&workers->lock);
+       return __btrfs_start_workers(workers, num_workers);
+}
+
 /*
  * run through the list and find a worker thread that doesn't have a lot
  * to do right now.  This can return null if we aren't yet at the thread
@@ -212,7 +522,10 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        struct list_head *next;
-       int enforce_min = workers->num_workers < workers->max_workers;
+       int enforce_min;
+
+       enforce_min = (workers->num_workers + workers->num_workers_starting) <
+               workers->max_workers;
 
        /*
         * if we find an idle thread, don't move it to the end of the
@@ -231,50 +544,71 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 
        /*
         * if we pick a busy task, move the task to the end of the list.
-        * hopefully this will keep things somewhat evenly balanced
+        * hopefully this will keep things somewhat evenly balanced.
+        * Do the move in batches based on the sequence number.  This groups
+        * requests submitted at roughly the same time onto the same worker.
         */
        next = workers->worker_list.next;
        worker = list_entry(next, struct btrfs_worker_thread, worker_list);
-       atomic_inc(&worker->num_pending);
        worker->sequence++;
+
        if (worker->sequence % workers->idle_thresh == 0)
                list_move_tail(next, &workers->worker_list);
        return worker;
 }
 
+/*
+ * selects a worker thread to take the next job.  This will either find
+ * an idle worker, start a new worker up to the max count, or just return
+ * one of the existing busy workers.
+ */
 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        unsigned long flags;
+       struct list_head *fallback;
 
 again:
        spin_lock_irqsave(&workers->lock, flags);
        worker = next_worker(workers);
-       spin_unlock_irqrestore(&workers->lock, flags);
 
        if (!worker) {
-               spin_lock_irqsave(&workers->lock, flags);
-               if (workers->num_workers >= workers->max_workers) {
-                       struct list_head *fallback = NULL;
-                       /*
-                        * we have failed to find any workers, just
-                        * return the force one
-                        */
-                       if (!list_empty(&workers->worker_list))
-                               fallback = workers->worker_list.next;
-                       if (!list_empty(&workers->idle_list))
-                               fallback = workers->idle_list.next;
-                       BUG_ON(!fallback);
-                       worker = list_entry(fallback,
-                                 struct btrfs_worker_thread, worker_list);
-                       spin_unlock_irqrestore(&workers->lock, flags);
+               if (workers->num_workers + workers->num_workers_starting >=
+                   workers->max_workers) {
+                       goto fallback;
+               } else if (workers->atomic_worker_start) {
+                       workers->atomic_start_pending = 1;
+                       goto fallback;
                } else {
+                       workers->num_workers_starting++;
                        spin_unlock_irqrestore(&workers->lock, flags);
                        /* we're below the limit, start another worker */
-                       btrfs_start_workers(workers, 1);
+                       __btrfs_start_workers(workers, 1);
                        goto again;
                }
        }
+       goto found;
+
+fallback:
+       fallback = NULL;
+       /*
+        * we have failed to find any workers, just
+        * return the first one we can find.
+        */
+       if (!list_empty(&workers->worker_list))
+               fallback = workers->worker_list.next;
+       if (!list_empty(&workers->idle_list))
+               fallback = workers->idle_list.next;
+       BUG_ON(!fallback);
+       worker = list_entry(fallback,
+                 struct btrfs_worker_thread, worker_list);
+found:
+       /*
+        * this makes sure the worker doesn't exit before it is placed
+        * onto a busy/idle list
+        */
+       atomic_inc(&worker->num_pending);
+       spin_unlock_irqrestore(&workers->lock, flags);
        return worker;
 }
 
@@ -287,19 +621,46 @@ int btrfs_requeue_work(struct btrfs_work *work)
 {
        struct btrfs_worker_thread *worker = work->worker;
        unsigned long flags;
+       int wake = 0;
 
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        spin_lock_irqsave(&worker->lock, flags);
+       if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
+               list_add_tail(&work->list, &worker->prio_pending);
+       else
+               list_add_tail(&work->list, &worker->pending);
        atomic_inc(&worker->num_pending);
-       list_add_tail(&work->list, &worker->pending);
-       check_busy_worker(worker);
+
+       /* by definition we're busy, take ourselves off the idle
+        * list
+        */
+       if (worker->idle) {
+               spin_lock(&worker->workers->lock);
+               worker->idle = 0;
+               list_move_tail(&worker->worker_list,
+                             &worker->workers->worker_list);
+               spin_unlock(&worker->workers->lock);
+       }
+       if (!worker->working) {
+               wake = 1;
+               worker->working = 1;
+       }
+
+       if (wake)
+               wake_up_process(worker->task);
        spin_unlock_irqrestore(&worker->lock, flags);
 out:
+
        return 0;
 }
 
+void btrfs_set_work_high_prio(struct btrfs_work *work)
+{
+       set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
+}
+
 /*
  * places a struct btrfs_work into the pending queue of one of the kthreads
  */
@@ -310,15 +671,34 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
        int wake = 0;
 
        /* don't requeue something already on a list */
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        worker = find_worker(workers);
+       if (workers->ordered) {
+               /*
+                * you're not allowed to do ordered queues from an
+                * interrupt handler
+                */
+               spin_lock(&workers->order_lock);
+               if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
+                       list_add_tail(&work->order_list,
+                                     &workers->prio_order_list);
+               } else {
+                       list_add_tail(&work->order_list, &workers->order_list);
+               }
+               spin_unlock(&workers->order_lock);
+       } else {
+               INIT_LIST_HEAD(&work->order_list);
+       }
 
        spin_lock_irqsave(&worker->lock, flags);
-       atomic_inc(&worker->num_pending);
+
+       if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
+               list_add_tail(&work->list, &worker->prio_pending);
+       else
+               list_add_tail(&work->list, &worker->pending);
        check_busy_worker(worker);
-       list_add_tail(&work->list, &worker->pending);
 
        /*
         * avoid calling into wake_up_process if this thread has already
@@ -328,10 +708,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
                wake = 1;
        worker->working = 1;
 
-       spin_unlock_irqrestore(&worker->lock, flags);
-
        if (wake)
                wake_up_process(worker->task);
+       spin_unlock_irqrestore(&worker->lock, flags);
+
 out:
        return 0;
 }