lib: Introduce generic list_sort function
[safe/jmp/linux-2.6] / fs / btrfs / async-thread.c
index 4b4372d..c0861e7 100644 (file)
@@ -64,6 +64,51 @@ struct btrfs_worker_thread {
 };
 
 /*
+ * btrfs_start_workers uses kthread_run, which can block waiting for memory
+ * for a very long time.  It will actually throttle on page writeback,
+ * and so it may not make progress until after our btrfs worker threads
+ * process all of the pending work structs in their queue
+ *
+ * This means we can't use btrfs_start_workers from inside a btrfs worker
+ * thread that is used as part of cleaning dirty memory, which pretty much
+ * involves all of the worker threads.
+ *
+ * Instead we have a helper queue who never has more than one thread
+ * where we scheduler thread start operations.  This worker_start struct
+ * is used to contain the work and hold a pointer to the queue that needs
+ * another worker.
+ */
+struct worker_start {
+       struct btrfs_work work;
+       struct btrfs_workers *queue;
+};
+
+static void start_new_worker_func(struct btrfs_work *work)
+{
+       struct worker_start *start;
+       start = container_of(work, struct worker_start, work);
+       btrfs_start_workers(start->queue, 1);
+       kfree(start);
+}
+
+static int start_new_worker(struct btrfs_workers *queue)
+{
+       struct worker_start *start;
+       int ret;
+
+       start = kzalloc(sizeof(*start), GFP_NOFS);
+       if (!start)
+               return -ENOMEM;
+
+       start->work.func = start_new_worker_func;
+       start->queue = queue;
+       ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
+       if (ret)
+               kfree(start);
+       return ret;
+}
+
+/*
  * helper function to move a thread onto the idle list after it
  * has finished some requests.
  */
@@ -74,7 +119,12 @@ static void check_idle_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 1;
-               list_move(&worker->worker_list, &worker->workers->idle_list);
+
+               /* the list may be empty if the worker is just starting */
+               if (!list_empty(&worker->worker_list)) {
+                       list_move(&worker->worker_list,
+                                &worker->workers->idle_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
@@ -90,8 +140,11 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 0;
-               list_move_tail(&worker->worker_list,
-                              &worker->workers->worker_list);
+
+               if (!list_empty(&worker->worker_list)) {
+                       list_move_tail(&worker->worker_list,
+                                     &worker->workers->worker_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
@@ -110,11 +163,13 @@ static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
                goto out;
 
        workers->atomic_start_pending = 0;
-       if (workers->num_workers >= workers->max_workers)
+       if (workers->num_workers + workers->num_workers_starting >=
+           workers->max_workers)
                goto out;
 
+       workers->num_workers_starting += 1;
        spin_unlock_irqrestore(&workers->lock, flags);
-       btrfs_start_workers(workers, 1);
+       start_new_worker(workers);
        return;
 
 out:
@@ -177,18 +232,19 @@ static int try_worker_shutdown(struct btrfs_worker_thread *worker)
        int freeit = 0;
 
        spin_lock_irq(&worker->lock);
-       spin_lock_irq(&worker->workers->lock);
+       spin_lock(&worker->workers->lock);
        if (worker->workers->num_workers > 1 &&
            worker->idle &&
            !worker->working &&
            !list_empty(&worker->worker_list) &&
            list_empty(&worker->prio_pending) &&
-           list_empty(&worker->pending)) {
+           list_empty(&worker->pending) &&
+           atomic_read(&worker->num_pending) == 0) {
                freeit = 1;
                list_del_init(&worker->worker_list);
                worker->workers->num_workers--;
        }
-       spin_unlock_irq(&worker->workers->lock);
+       spin_unlock(&worker->workers->lock);
        spin_unlock_irq(&worker->lock);
 
        if (freeit)
@@ -196,31 +252,73 @@ static int try_worker_shutdown(struct btrfs_worker_thread *worker)
        return freeit;
 }
 
+static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
+                                       struct list_head *prio_head,
+                                       struct list_head *head)
+{
+       struct btrfs_work *work = NULL;
+       struct list_head *cur = NULL;
+
+       if(!list_empty(prio_head))
+               cur = prio_head->next;
+
+       smp_mb();
+       if (!list_empty(&worker->prio_pending))
+               goto refill;
+
+       if (!list_empty(head))
+               cur = head->next;
+
+       if (cur)
+               goto out;
+
+refill:
+       spin_lock_irq(&worker->lock);
+       list_splice_tail_init(&worker->prio_pending, prio_head);
+       list_splice_tail_init(&worker->pending, head);
+
+       if (!list_empty(prio_head))
+               cur = prio_head->next;
+       else if (!list_empty(head))
+               cur = head->next;
+       spin_unlock_irq(&worker->lock);
+
+       if (!cur)
+               goto out_fail;
+
+out:
+       work = list_entry(cur, struct btrfs_work, list);
+
+out_fail:
+       return work;
+}
+
 /*
  * main loop for servicing work items
  */
 static int worker_loop(void *arg)
 {
        struct btrfs_worker_thread *worker = arg;
-       struct list_head *cur;
+       struct list_head head;
+       struct list_head prio_head;
        struct btrfs_work *work;
+
+       INIT_LIST_HEAD(&head);
+       INIT_LIST_HEAD(&prio_head);
+
        do {
-               spin_lock_irq(&worker->lock);
-again_locked:
+again:
                while (1) {
-                       if (!list_empty(&worker->prio_pending))
-                               cur = worker->prio_pending.next;
-                       else if (!list_empty(&worker->pending))
-                               cur = worker->pending.next;
-                       else
+
+
+                       work = get_next_work(worker, &prio_head, &head);
+                       if (!work)
                                break;
 
-                       work = list_entry(cur, struct btrfs_work, list);
                        list_del(&work->list);
                        clear_bit(WORK_QUEUED_BIT, &work->flags);
 
                        work->worker = worker;
-                       spin_unlock_irq(&worker->lock);
 
                        work->func(work);
 
@@ -233,9 +331,11 @@ again_locked:
 
                        check_pending_worker_creates(worker);
 
-                       spin_lock_irq(&worker->lock);
-                       check_idle_worker(worker);
                }
+
+               spin_lock_irq(&worker->lock);
+               check_idle_worker(worker);
+
                if (freezing(current)) {
                        worker->working = 0;
                        spin_unlock_irq(&worker->lock);
@@ -274,8 +374,10 @@ again_locked:
                                spin_lock_irq(&worker->lock);
                                set_current_state(TASK_INTERRUPTIBLE);
                                if (!list_empty(&worker->pending) ||
-                                   !list_empty(&worker->prio_pending))
-                                       goto again_locked;
+                                   !list_empty(&worker->prio_pending)) {
+                                       spin_unlock_irq(&worker->lock);
+                                       goto again;
+                               }
 
                                /*
                                 * this makes sure we get a wakeup when someone
@@ -335,9 +437,11 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
 /*
  * simple init on struct btrfs_workers
  */
-void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
+void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
+                       struct btrfs_workers *async_helper)
 {
        workers->num_workers = 0;
+       workers->num_workers_starting = 0;
        INIT_LIST_HEAD(&workers->worker_list);
        INIT_LIST_HEAD(&workers->idle_list);
        INIT_LIST_HEAD(&workers->order_list);
@@ -349,14 +453,15 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
        workers->name = name;
        workers->ordered = 0;
        workers->atomic_start_pending = 0;
-       workers->atomic_worker_start = 0;
+       workers->atomic_worker_start = async_helper;
 }
 
 /*
  * starts new worker threads.  This does not enforce the max worker
  * count in case you need to temporarily go past it.
  */
-int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+static int __btrfs_start_workers(struct btrfs_workers *workers,
+                                int num_workers)
 {
        struct btrfs_worker_thread *worker;
        int ret = 0;
@@ -389,6 +494,8 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                list_add_tail(&worker->worker_list, &workers->idle_list);
                worker->idle = 1;
                workers->num_workers++;
+               workers->num_workers_starting--;
+               WARN_ON(workers->num_workers_starting < 0);
                spin_unlock_irq(&workers->lock);
        }
        return 0;
@@ -397,6 +504,14 @@ fail:
        return ret;
 }
 
+int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+{
+       spin_lock_irq(&workers->lock);
+       workers->num_workers_starting += num_workers;
+       spin_unlock_irq(&workers->lock);
+       return __btrfs_start_workers(workers, num_workers);
+}
+
 /*
  * run through the list and find a worker thread that doesn't have a lot
  * to do right now.  This can return null if we aren't yet at the thread
@@ -406,7 +521,10 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        struct list_head *next;
-       int enforce_min = workers->num_workers < workers->max_workers;
+       int enforce_min;
+
+       enforce_min = (workers->num_workers + workers->num_workers_starting) <
+               workers->max_workers;
 
        /*
         * if we find an idle thread, don't move it to the end of the
@@ -431,7 +549,6 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
         */
        next = workers->worker_list.next;
        worker = list_entry(next, struct btrfs_worker_thread, worker_list);
-       atomic_inc(&worker->num_pending);
        worker->sequence++;
 
        if (worker->sequence % workers->idle_thresh == 0)
@@ -455,20 +572,21 @@ again:
        worker = next_worker(workers);
 
        if (!worker) {
-               if (workers->num_workers >= workers->max_workers) {
+               if (workers->num_workers + workers->num_workers_starting >=
+                   workers->max_workers) {
                        goto fallback;
                } else if (workers->atomic_worker_start) {
                        workers->atomic_start_pending = 1;
                        goto fallback;
                } else {
+                       workers->num_workers_starting++;
                        spin_unlock_irqrestore(&workers->lock, flags);
                        /* we're below the limit, start another worker */
-                       btrfs_start_workers(workers, 1);
+                       __btrfs_start_workers(workers, 1);
                        goto again;
                }
        }
-       spin_unlock_irqrestore(&workers->lock, flags);
-       return worker;
+       goto found;
 
 fallback:
        fallback = NULL;
@@ -483,6 +601,12 @@ fallback:
        BUG_ON(!fallback);
        worker = list_entry(fallback,
                  struct btrfs_worker_thread, worker_list);
+found:
+       /*
+        * this makes sure the worker doesn't exit before it is placed
+        * onto a busy/idle list
+        */
+       atomic_inc(&worker->num_pending);
        spin_unlock_irqrestore(&workers->lock, flags);
        return worker;
 }
@@ -515,7 +639,7 @@ int btrfs_requeue_work(struct btrfs_work *work)
                spin_lock(&worker->workers->lock);
                worker->idle = 0;
                list_move_tail(&worker->worker_list,
-                              &worker->workers->worker_list);
+                             &worker->workers->worker_list);
                spin_unlock(&worker->workers->lock);
        }
        if (!worker->working) {
@@ -573,7 +697,6 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
                list_add_tail(&work->list, &worker->prio_pending);
        else
                list_add_tail(&work->list, &worker->pending);
-       atomic_inc(&worker->num_pending);
        check_busy_worker(worker);
 
        /*