cifs: refactor new_inode() calls and inode initialization
[safe/jmp/linux-2.6] / fs / btrfs / async-thread.c
index 8b9e2cf..8e2fec0 100644 (file)
 #include <linux/kthread.h>
 #include <linux/list.h>
 #include <linux/spinlock.h>
-
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,20)
 # include <linux/freezer.h>
-#else
-# include <linux/sched.h>
-#endif
-
 #include "async-thread.h"
 
+#define WORK_QUEUED_BIT 0
+#define WORK_DONE_BIT 1
+#define WORK_ORDER_DONE_BIT 2
+
 /*
  * container for the kthread task pointer and the list of pending work
  * One of these is allocated per thread.
  */
 struct btrfs_worker_thread {
+       /* pool we belong to */
+       struct btrfs_workers *workers;
+
        /* list of struct btrfs_work that are waiting for service */
        struct list_head pending;
 
@@ -46,14 +47,93 @@ struct btrfs_worker_thread {
        /* number of things on the pending list */
        atomic_t num_pending;
 
+       unsigned long sequence;
+
        /* protects the pending list. */
        spinlock_t lock;
 
        /* set to non-zero when this thread is already awake and kicking */
        int working;
+
+       /* are we currently idle */
+       int idle;
 };
 
 /*
+ * helper function to move a thread onto the idle list after it
+ * has finished some requests.
+ */
+static void check_idle_worker(struct btrfs_worker_thread *worker)
+{
+       if (!worker->idle && atomic_read(&worker->num_pending) <
+           worker->workers->idle_thresh / 2) {
+               unsigned long flags;
+               spin_lock_irqsave(&worker->workers->lock, flags);
+               worker->idle = 1;
+               list_move(&worker->worker_list, &worker->workers->idle_list);
+               spin_unlock_irqrestore(&worker->workers->lock, flags);
+       }
+}
+
+/*
+ * helper function to move a thread off the idle list after new
+ * pending work is added.
+ */
+static void check_busy_worker(struct btrfs_worker_thread *worker)
+{
+       if (worker->idle && atomic_read(&worker->num_pending) >=
+           worker->workers->idle_thresh) {
+               unsigned long flags;
+               spin_lock_irqsave(&worker->workers->lock, flags);
+               worker->idle = 0;
+               list_move_tail(&worker->worker_list,
+                              &worker->workers->worker_list);
+               spin_unlock_irqrestore(&worker->workers->lock, flags);
+       }
+}
+
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+                                           struct btrfs_work *work)
+{
+       unsigned long flags;
+
+       if (!workers->ordered)
+               return 0;
+
+       set_bit(WORK_DONE_BIT, &work->flags);
+
+       spin_lock_irqsave(&workers->lock, flags);
+
+       while (!list_empty(&workers->order_list)) {
+               work = list_entry(workers->order_list.next,
+                                 struct btrfs_work, order_list);
+
+               if (!test_bit(WORK_DONE_BIT, &work->flags))
+                       break;
+
+               /* we are going to call the ordered done function, but
+                * we leave the work item on the list as a barrier so
+                * that later work items that are done don't have their
+                * functions called before this one returns
+                */
+               if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
+                       break;
+
+               spin_unlock_irqrestore(&workers->lock, flags);
+
+               work->ordered_func(work);
+
+               /* now take the lock again and call the freeing code */
+               spin_lock_irqsave(&workers->lock, flags);
+               list_del(&work->order_list);
+               work->ordered_free(work);
+       }
+
+       spin_unlock_irqrestore(&workers->lock, flags);
+       return 0;
+}
+
+/*
  * main loop for servicing work items
  */
 static int worker_loop(void *arg)
@@ -63,11 +143,11 @@ static int worker_loop(void *arg)
        struct btrfs_work *work;
        do {
                spin_lock_irq(&worker->lock);
-               while(!list_empty(&worker->pending)) {
+               while (!list_empty(&worker->pending)) {
                        cur = worker->pending.next;
                        work = list_entry(cur, struct btrfs_work, list);
                        list_del(&work->list);
-                       clear_bit(0, &work->flags);
+                       clear_bit(WORK_QUEUED_BIT, &work->flags);
 
                        work->worker = worker;
                        spin_unlock_irq(&worker->lock);
@@ -75,7 +155,15 @@ static int worker_loop(void *arg)
                        work->func(work);
 
                        atomic_dec(&worker->num_pending);
+                       /*
+                        * unless this is an ordered work queue,
+                        * 'work' was probably freed by func above.
+                        */
+                       run_ordered_completions(worker->workers, work);
+
                        spin_lock_irq(&worker->lock);
+                       check_idle_worker(worker);
+
                }
                worker->working = 0;
                if (freezing(current)) {
@@ -83,7 +171,8 @@ static int worker_loop(void *arg)
                } else {
                        set_current_state(TASK_INTERRUPTIBLE);
                        spin_unlock_irq(&worker->lock);
-                       schedule();
+                       if (!kthread_should_stop())
+                               schedule();
                        __set_current_state(TASK_RUNNING);
                }
        } while (!kthread_should_stop());
@@ -98,7 +187,8 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
        struct list_head *cur;
        struct btrfs_worker_thread *worker;
 
-       while(!list_empty(&workers->worker_list)) {
+       list_splice_init(&workers->idle_list, &workers->worker_list);
+       while (!list_empty(&workers->worker_list)) {
                cur = workers->worker_list.next;
                worker = list_entry(cur, struct btrfs_worker_thread,
                                    worker_list);
@@ -112,13 +202,17 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
 /*
  * simple init on struct btrfs_workers
  */
-void btrfs_init_workers(struct btrfs_workers *workers, int max)
+void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
 {
        workers->num_workers = 0;
        INIT_LIST_HEAD(&workers->worker_list);
-       workers->last = NULL;
+       INIT_LIST_HEAD(&workers->idle_list);
+       INIT_LIST_HEAD(&workers->order_list);
        spin_lock_init(&workers->lock);
        workers->max_workers = max;
+       workers->idle_thresh = 32;
+       workers->name = name;
+       workers->ordered = 0;
 }
 
 /*
@@ -142,15 +236,19 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                INIT_LIST_HEAD(&worker->worker_list);
                spin_lock_init(&worker->lock);
                atomic_set(&worker->num_pending, 0);
-               worker->task = kthread_run(worker_loop, worker, "btrfs");
+               worker->task = kthread_run(worker_loop, worker,
+                                          "btrfs-%s-%d", workers->name,
+                                          workers->num_workers + i);
+               worker->workers = workers;
                if (IS_ERR(worker->task)) {
+                       kfree(worker);
                        ret = PTR_ERR(worker->task);
                        goto fail;
                }
 
                spin_lock_irq(&workers->lock);
-               list_add_tail(&worker->worker_list, &workers->worker_list);
-               workers->last = worker;
+               list_add_tail(&worker->worker_list, &workers->idle_list);
+               worker->idle = 1;
                workers->num_workers++;
                spin_unlock_irq(&workers->lock);
        }
@@ -169,45 +267,44 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        struct list_head *next;
-       struct list_head *start;
        int enforce_min = workers->num_workers < workers->max_workers;
 
-       /* start with the last thread if it isn't busy */
-       worker = workers->last;
-       if (atomic_read(&worker->num_pending) < 64)
-               goto done;
-
-       next = worker->worker_list.next;
-       start = &worker->worker_list;
-
        /*
-        * check all the workers for someone that is bored.  FIXME, do
-        * something smart here
+        * if we find an idle thread, don't move it to the end of the
+        * idle list.  This improves the chance that the next submission
+        * will reuse the same thread, and maybe catch it while it is still
+        * working
         */
-       while(next != start) {
-               if (next == &workers->worker_list) {
-                       next = workers->worker_list.next;
-                       continue;
-               }
+       if (!list_empty(&workers->idle_list)) {
+               next = workers->idle_list.next;
                worker = list_entry(next, struct btrfs_worker_thread,
                                    worker_list);
-               if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
-                       goto done;
-               next = next->next;
+               return worker;
        }
+       if (enforce_min || list_empty(&workers->worker_list))
+               return NULL;
+
        /*
-        * nobody was bored, if we're already at the max thread count,
-        * use the last thread
+        * if we pick a busy task, move the task to the end of the list.
+        * hopefully this will keep things somewhat evenly balanced.
+        * Do the move in batches based on the sequence number.  This groups
+        * requests submitted at roughly the same time onto the same worker.
         */
-       if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
-               return workers->last;
-       }
-       return NULL;
-done:
-       workers->last = worker;
+       next = workers->worker_list.next;
+       worker = list_entry(next, struct btrfs_worker_thread, worker_list);
+       atomic_inc(&worker->num_pending);
+       worker->sequence++;
+
+       if (worker->sequence % workers->idle_thresh == 0)
+               list_move_tail(next, &workers->worker_list);
        return worker;
 }
 
+/*
+ * selects a worker thread to take the next job.  This will either find
+ * an idle worker, start a new worker up to the max count, or just return
+ * one of the existing busy workers.
+ */
 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
@@ -221,11 +318,17 @@ again:
        if (!worker) {
                spin_lock_irqsave(&workers->lock, flags);
                if (workers->num_workers >= workers->max_workers) {
+                       struct list_head *fallback = NULL;
                        /*
                         * we have failed to find any workers, just
                         * return the force one
                         */
-                       worker = list_entry(workers->worker_list.next,
+                       if (!list_empty(&workers->worker_list))
+                               fallback = workers->worker_list.next;
+                       if (!list_empty(&workers->idle_list))
+                               fallback = workers->idle_list.next;
+                       BUG_ON(!fallback);
+                       worker = list_entry(fallback,
                                  struct btrfs_worker_thread, worker_list);
                        spin_unlock_irqrestore(&workers->lock, flags);
                } else {
@@ -248,13 +351,26 @@ int btrfs_requeue_work(struct btrfs_work *work)
        struct btrfs_worker_thread *worker = work->worker;
        unsigned long flags;
 
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        spin_lock_irqsave(&worker->lock, flags);
        atomic_inc(&worker->num_pending);
        list_add_tail(&work->list, &worker->pending);
+
+       /* by definition we're busy, take ourselves off the idle
+        * list
+        */
+       if (worker->idle) {
+               spin_lock_irqsave(&worker->workers->lock, flags);
+               worker->idle = 0;
+               list_move_tail(&worker->worker_list,
+                              &worker->workers->worker_list);
+               spin_unlock_irqrestore(&worker->workers->lock, flags);
+       }
+
        spin_unlock_irqrestore(&worker->lock, flags);
+
 out:
        return 0;
 }
@@ -269,13 +385,21 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
        int wake = 0;
 
        /* don't requeue something already on a list */
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        worker = find_worker(workers);
+       if (workers->ordered) {
+               spin_lock_irqsave(&workers->lock, flags);
+               list_add_tail(&work->order_list, &workers->order_list);
+               spin_unlock_irqrestore(&workers->lock, flags);
+       } else {
+               INIT_LIST_HEAD(&work->order_list);
+       }
 
        spin_lock_irqsave(&worker->lock, flags);
        atomic_inc(&worker->num_pending);
+       check_busy_worker(worker);
        list_add_tail(&work->list, &worker->pending);
 
        /*