reimplement flush_workqueue()
[safe/jmp/linux-2.6] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35
36 /*
37  * The per-CPU workqueue (if single thread, we always use the first
38  * possible cpu).
39  */
40 struct cpu_workqueue_struct {
41
42         spinlock_t lock;
43
44         struct list_head worklist;
45         wait_queue_head_t more_work;
46
47         struct workqueue_struct *wq;
48         struct task_struct *thread;
49
50         int run_depth;          /* Detect run_workqueue() recursion depth */
51
52         int freezeable;         /* Freeze the thread during suspend */
53 } ____cacheline_aligned;
54
55 /*
56  * The externally visible workqueue abstraction is an array of
57  * per-CPU workqueues:
58  */
59 struct workqueue_struct {
60         struct cpu_workqueue_struct *cpu_wq;
61         const char *name;
62         struct list_head list;  /* Empty if single thread */
63 };
64
65 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
66    threads to each one as cpus come/go. */
67 static DEFINE_MUTEX(workqueue_mutex);
68 static LIST_HEAD(workqueues);
69
70 static int singlethread_cpu;
71
72 /* If it's single threaded, it isn't in the list of workqueues. */
73 static inline int is_single_threaded(struct workqueue_struct *wq)
74 {
75         return list_empty(&wq->list);
76 }
77
78 /*
79  * Set the workqueue on which a work item is to be run
80  * - Must *only* be called if the pending flag is set
81  */
82 static inline void set_wq_data(struct work_struct *work, void *wq)
83 {
84         unsigned long new;
85
86         BUG_ON(!work_pending(work));
87
88         new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
89         new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
90         atomic_long_set(&work->data, new);
91 }
92
93 static inline void *get_wq_data(struct work_struct *work)
94 {
95         return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
96 }
97
98 static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
99 {
100         int ret = 0;
101         unsigned long flags;
102
103         spin_lock_irqsave(&cwq->lock, flags);
104         /*
105          * We need to re-validate the work info after we've gotten
106          * the cpu_workqueue lock. We can run the work now iff:
107          *
108          *  - the wq_data still matches the cpu_workqueue_struct
109          *  - AND the work is still marked pending
110          *  - AND the work is still on a list (which will be this
111          *    workqueue_struct list)
112          *
113          * All these conditions are important, because we
114          * need to protect against the work being run right
115          * now on another CPU (all but the last one might be
116          * true if it's currently running and has not been
117          * released yet, for example).
118          */
119         if (get_wq_data(work) == cwq
120             && work_pending(work)
121             && !list_empty(&work->entry)) {
122                 work_func_t f = work->func;
123                 list_del_init(&work->entry);
124                 spin_unlock_irqrestore(&cwq->lock, flags);
125
126                 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
127                         work_release(work);
128                 f(work);
129
130                 spin_lock_irqsave(&cwq->lock, flags);
131                 ret = 1;
132         }
133         spin_unlock_irqrestore(&cwq->lock, flags);
134         return ret;
135 }
136
137 /**
138  * run_scheduled_work - run scheduled work synchronously
139  * @work: work to run
140  *
141  * This checks if the work was pending, and runs it
142  * synchronously if so. It returns a boolean to indicate
143  * whether it had any scheduled work to run or not.
144  *
145  * NOTE! This _only_ works for normal work_structs. You
146  * CANNOT use this for delayed work, because the wq data
147  * for delayed work will not point properly to the per-
148  * CPU workqueue struct, but will change!
149  */
150 int fastcall run_scheduled_work(struct work_struct *work)
151 {
152         for (;;) {
153                 struct cpu_workqueue_struct *cwq;
154
155                 if (!work_pending(work))
156                         return 0;
157                 if (list_empty(&work->entry))
158                         return 0;
159                 /* NOTE! This depends intimately on __queue_work! */
160                 cwq = get_wq_data(work);
161                 if (!cwq)
162                         return 0;
163                 if (__run_work(cwq, work))
164                         return 1;
165         }
166 }
167 EXPORT_SYMBOL(run_scheduled_work);
168
169 /* Preempt must be disabled. */
170 static void __queue_work(struct cpu_workqueue_struct *cwq,
171                          struct work_struct *work)
172 {
173         unsigned long flags;
174
175         spin_lock_irqsave(&cwq->lock, flags);
176         set_wq_data(work, cwq);
177         list_add_tail(&work->entry, &cwq->worklist);
178         wake_up(&cwq->more_work);
179         spin_unlock_irqrestore(&cwq->lock, flags);
180 }
181
182 /**
183  * queue_work - queue work on a workqueue
184  * @wq: workqueue to use
185  * @work: work to queue
186  *
187  * Returns 0 if @work was already on a queue, non-zero otherwise.
188  *
189  * We queue the work to the CPU it was submitted, but there is no
190  * guarantee that it will be processed by that CPU.
191  */
192 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
193 {
194         int ret = 0, cpu = get_cpu();
195
196         if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
197                 if (unlikely(is_single_threaded(wq)))
198                         cpu = singlethread_cpu;
199                 BUG_ON(!list_empty(&work->entry));
200                 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
201                 ret = 1;
202         }
203         put_cpu();
204         return ret;
205 }
206 EXPORT_SYMBOL_GPL(queue_work);
207
208 void delayed_work_timer_fn(unsigned long __data)
209 {
210         struct delayed_work *dwork = (struct delayed_work *)__data;
211         struct workqueue_struct *wq = get_wq_data(&dwork->work);
212         int cpu = smp_processor_id();
213
214         if (unlikely(is_single_threaded(wq)))
215                 cpu = singlethread_cpu;
216
217         __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
218 }
219
220 /**
221  * queue_delayed_work - queue work on a workqueue after delay
222  * @wq: workqueue to use
223  * @dwork: delayable work to queue
224  * @delay: number of jiffies to wait before queueing
225  *
226  * Returns 0 if @work was already on a queue, non-zero otherwise.
227  */
228 int fastcall queue_delayed_work(struct workqueue_struct *wq,
229                         struct delayed_work *dwork, unsigned long delay)
230 {
231         int ret = 0;
232         struct timer_list *timer = &dwork->timer;
233         struct work_struct *work = &dwork->work;
234
235         timer_stats_timer_set_start_info(timer);
236         if (delay == 0)
237                 return queue_work(wq, work);
238
239         if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
240                 BUG_ON(timer_pending(timer));
241                 BUG_ON(!list_empty(&work->entry));
242
243                 /* This stores wq for the moment, for the timer_fn */
244                 set_wq_data(work, wq);
245                 timer->expires = jiffies + delay;
246                 timer->data = (unsigned long)dwork;
247                 timer->function = delayed_work_timer_fn;
248                 add_timer(timer);
249                 ret = 1;
250         }
251         return ret;
252 }
253 EXPORT_SYMBOL_GPL(queue_delayed_work);
254
255 /**
256  * queue_delayed_work_on - queue work on specific CPU after delay
257  * @cpu: CPU number to execute work on
258  * @wq: workqueue to use
259  * @dwork: work to queue
260  * @delay: number of jiffies to wait before queueing
261  *
262  * Returns 0 if @work was already on a queue, non-zero otherwise.
263  */
264 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
265                         struct delayed_work *dwork, unsigned long delay)
266 {
267         int ret = 0;
268         struct timer_list *timer = &dwork->timer;
269         struct work_struct *work = &dwork->work;
270
271         if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
272                 BUG_ON(timer_pending(timer));
273                 BUG_ON(!list_empty(&work->entry));
274
275                 /* This stores wq for the moment, for the timer_fn */
276                 set_wq_data(work, wq);
277                 timer->expires = jiffies + delay;
278                 timer->data = (unsigned long)dwork;
279                 timer->function = delayed_work_timer_fn;
280                 add_timer_on(timer, cpu);
281                 ret = 1;
282         }
283         return ret;
284 }
285 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
286
287 static void run_workqueue(struct cpu_workqueue_struct *cwq)
288 {
289         unsigned long flags;
290
291         /*
292          * Keep taking off work from the queue until
293          * done.
294          */
295         spin_lock_irqsave(&cwq->lock, flags);
296         cwq->run_depth++;
297         if (cwq->run_depth > 3) {
298                 /* morton gets to eat his hat */
299                 printk("%s: recursion depth exceeded: %d\n",
300                         __FUNCTION__, cwq->run_depth);
301                 dump_stack();
302         }
303         while (!list_empty(&cwq->worklist)) {
304                 struct work_struct *work = list_entry(cwq->worklist.next,
305                                                 struct work_struct, entry);
306                 work_func_t f = work->func;
307
308                 list_del_init(cwq->worklist.next);
309                 spin_unlock_irqrestore(&cwq->lock, flags);
310
311                 BUG_ON(get_wq_data(work) != cwq);
312                 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
313                         work_release(work);
314                 f(work);
315
316                 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
317                         printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
318                                         "%s/0x%08x/%d\n",
319                                         current->comm, preempt_count(),
320                                         current->pid);
321                         printk(KERN_ERR "    last function: ");
322                         print_symbol("%s\n", (unsigned long)f);
323                         debug_show_held_locks(current);
324                         dump_stack();
325                 }
326
327                 spin_lock_irqsave(&cwq->lock, flags);
328         }
329         cwq->run_depth--;
330         spin_unlock_irqrestore(&cwq->lock, flags);
331 }
332
333 static int worker_thread(void *__cwq)
334 {
335         struct cpu_workqueue_struct *cwq = __cwq;
336         DECLARE_WAITQUEUE(wait, current);
337         struct k_sigaction sa;
338         sigset_t blocked;
339
340         if (!cwq->freezeable)
341                 current->flags |= PF_NOFREEZE;
342
343         set_user_nice(current, -5);
344
345         /* Block and flush all signals */
346         sigfillset(&blocked);
347         sigprocmask(SIG_BLOCK, &blocked, NULL);
348         flush_signals(current);
349
350         /*
351          * We inherited MPOL_INTERLEAVE from the booting kernel.
352          * Set MPOL_DEFAULT to insure node local allocations.
353          */
354         numa_default_policy();
355
356         /* SIG_IGN makes children autoreap: see do_notify_parent(). */
357         sa.sa.sa_handler = SIG_IGN;
358         sa.sa.sa_flags = 0;
359         siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
360         do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
361
362         set_current_state(TASK_INTERRUPTIBLE);
363         while (!kthread_should_stop()) {
364                 if (cwq->freezeable)
365                         try_to_freeze();
366
367                 add_wait_queue(&cwq->more_work, &wait);
368                 if (list_empty(&cwq->worklist))
369                         schedule();
370                 else
371                         __set_current_state(TASK_RUNNING);
372                 remove_wait_queue(&cwq->more_work, &wait);
373
374                 if (!list_empty(&cwq->worklist))
375                         run_workqueue(cwq);
376                 set_current_state(TASK_INTERRUPTIBLE);
377         }
378         __set_current_state(TASK_RUNNING);
379         return 0;
380 }
381
382 struct wq_barrier {
383         struct work_struct      work;
384         struct completion       done;
385 };
386
387 static void wq_barrier_func(struct work_struct *work)
388 {
389         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
390         complete(&barr->done);
391 }
392
393 static inline void init_wq_barrier(struct wq_barrier *barr)
394 {
395         INIT_WORK(&barr->work, wq_barrier_func);
396         __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
397
398         init_completion(&barr->done);
399 }
400
401 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
402 {
403         if (cwq->thread == current) {
404                 /*
405                  * Probably keventd trying to flush its own queue. So simply run
406                  * it by hand rather than deadlocking.
407                  */
408                 mutex_unlock(&workqueue_mutex);
409                 run_workqueue(cwq);
410                 mutex_lock(&workqueue_mutex);
411         } else {
412                 struct wq_barrier barr;
413
414                 init_wq_barrier(&barr);
415                 __queue_work(cwq, &barr.work);
416
417                 mutex_unlock(&workqueue_mutex);
418                 wait_for_completion(&barr.done);
419                 mutex_lock(&workqueue_mutex);
420         }
421 }
422
423 /**
424  * flush_workqueue - ensure that any scheduled work has run to completion.
425  * @wq: workqueue to flush
426  *
427  * Forces execution of the workqueue and blocks until its completion.
428  * This is typically used in driver shutdown handlers.
429  *
430  * We sleep until all works which were queued on entry have been handled,
431  * but we are not livelocked by new incoming ones.
432  *
433  * This function used to run the workqueues itself.  Now we just wait for the
434  * helper threads to do it.
435  */
436 void fastcall flush_workqueue(struct workqueue_struct *wq)
437 {
438         mutex_lock(&workqueue_mutex);
439         if (is_single_threaded(wq)) {
440                 /* Always use first cpu's area. */
441                 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
442         } else {
443                 int cpu;
444
445                 for_each_online_cpu(cpu)
446                         flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
447         }
448         mutex_unlock(&workqueue_mutex);
449 }
450 EXPORT_SYMBOL_GPL(flush_workqueue);
451
452 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
453                                                    int cpu, int freezeable)
454 {
455         struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
456         struct task_struct *p;
457
458         spin_lock_init(&cwq->lock);
459         cwq->wq = wq;
460         cwq->thread = NULL;
461         cwq->freezeable = freezeable;
462         INIT_LIST_HEAD(&cwq->worklist);
463         init_waitqueue_head(&cwq->more_work);
464
465         if (is_single_threaded(wq))
466                 p = kthread_create(worker_thread, cwq, "%s", wq->name);
467         else
468                 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
469         if (IS_ERR(p))
470                 return NULL;
471         cwq->thread = p;
472         return p;
473 }
474
475 struct workqueue_struct *__create_workqueue(const char *name,
476                                             int singlethread, int freezeable)
477 {
478         int cpu, destroy = 0;
479         struct workqueue_struct *wq;
480         struct task_struct *p;
481
482         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
483         if (!wq)
484                 return NULL;
485
486         wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
487         if (!wq->cpu_wq) {
488                 kfree(wq);
489                 return NULL;
490         }
491
492         wq->name = name;
493         mutex_lock(&workqueue_mutex);
494         if (singlethread) {
495                 INIT_LIST_HEAD(&wq->list);
496                 p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
497                 if (!p)
498                         destroy = 1;
499                 else
500                         wake_up_process(p);
501         } else {
502                 list_add(&wq->list, &workqueues);
503                 for_each_online_cpu(cpu) {
504                         p = create_workqueue_thread(wq, cpu, freezeable);
505                         if (p) {
506                                 kthread_bind(p, cpu);
507                                 wake_up_process(p);
508                         } else
509                                 destroy = 1;
510                 }
511         }
512         mutex_unlock(&workqueue_mutex);
513
514         /*
515          * Was there any error during startup? If yes then clean up:
516          */
517         if (destroy) {
518                 destroy_workqueue(wq);
519                 wq = NULL;
520         }
521         return wq;
522 }
523 EXPORT_SYMBOL_GPL(__create_workqueue);
524
525 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
526 {
527         struct cpu_workqueue_struct *cwq;
528         unsigned long flags;
529         struct task_struct *p;
530
531         cwq = per_cpu_ptr(wq->cpu_wq, cpu);
532         spin_lock_irqsave(&cwq->lock, flags);
533         p = cwq->thread;
534         cwq->thread = NULL;
535         spin_unlock_irqrestore(&cwq->lock, flags);
536         if (p)
537                 kthread_stop(p);
538 }
539
540 /**
541  * destroy_workqueue - safely terminate a workqueue
542  * @wq: target workqueue
543  *
544  * Safely destroy a workqueue. All work currently pending will be done first.
545  */
546 void destroy_workqueue(struct workqueue_struct *wq)
547 {
548         int cpu;
549
550         flush_workqueue(wq);
551
552         /* We don't need the distraction of CPUs appearing and vanishing. */
553         mutex_lock(&workqueue_mutex);
554         if (is_single_threaded(wq))
555                 cleanup_workqueue_thread(wq, singlethread_cpu);
556         else {
557                 for_each_online_cpu(cpu)
558                         cleanup_workqueue_thread(wq, cpu);
559                 list_del(&wq->list);
560         }
561         mutex_unlock(&workqueue_mutex);
562         free_percpu(wq->cpu_wq);
563         kfree(wq);
564 }
565 EXPORT_SYMBOL_GPL(destroy_workqueue);
566
567 static struct workqueue_struct *keventd_wq;
568
569 /**
570  * schedule_work - put work task in global workqueue
571  * @work: job to be done
572  *
573  * This puts a job in the kernel-global workqueue.
574  */
575 int fastcall schedule_work(struct work_struct *work)
576 {
577         return queue_work(keventd_wq, work);
578 }
579 EXPORT_SYMBOL(schedule_work);
580
581 /**
582  * schedule_delayed_work - put work task in global workqueue after delay
583  * @dwork: job to be done
584  * @delay: number of jiffies to wait or 0 for immediate execution
585  *
586  * After waiting for a given time this puts a job in the kernel-global
587  * workqueue.
588  */
589 int fastcall schedule_delayed_work(struct delayed_work *dwork,
590                                         unsigned long delay)
591 {
592         timer_stats_timer_set_start_info(&dwork->timer);
593         return queue_delayed_work(keventd_wq, dwork, delay);
594 }
595 EXPORT_SYMBOL(schedule_delayed_work);
596
597 /**
598  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
599  * @cpu: cpu to use
600  * @dwork: job to be done
601  * @delay: number of jiffies to wait
602  *
603  * After waiting for a given time this puts a job in the kernel-global
604  * workqueue on the specified CPU.
605  */
606 int schedule_delayed_work_on(int cpu,
607                         struct delayed_work *dwork, unsigned long delay)
608 {
609         return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
610 }
611 EXPORT_SYMBOL(schedule_delayed_work_on);
612
613 /**
614  * schedule_on_each_cpu - call a function on each online CPU from keventd
615  * @func: the function to call
616  *
617  * Returns zero on success.
618  * Returns -ve errno on failure.
619  *
620  * Appears to be racy against CPU hotplug.
621  *
622  * schedule_on_each_cpu() is very slow.
623  */
624 int schedule_on_each_cpu(work_func_t func)
625 {
626         int cpu;
627         struct work_struct *works;
628
629         works = alloc_percpu(struct work_struct);
630         if (!works)
631                 return -ENOMEM;
632
633         preempt_disable();              /* CPU hotplug */
634         for_each_online_cpu(cpu) {
635                 struct work_struct *work = per_cpu_ptr(works, cpu);
636
637                 INIT_WORK(work, func);
638                 set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
639                 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
640         }
641         preempt_enable();
642         flush_workqueue(keventd_wq);
643         free_percpu(works);
644         return 0;
645 }
646
647 void flush_scheduled_work(void)
648 {
649         flush_workqueue(keventd_wq);
650 }
651 EXPORT_SYMBOL(flush_scheduled_work);
652
653 /**
654  * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
655  * @wq:   the controlling workqueue structure
656  * @dwork: the delayed work struct
657  */
658 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
659                                        struct delayed_work *dwork)
660 {
661         while (!cancel_delayed_work(dwork))
662                 flush_workqueue(wq);
663 }
664 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
665
666 /**
667  * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
668  * @dwork: the delayed work struct
669  */
670 void cancel_rearming_delayed_work(struct delayed_work *dwork)
671 {
672         cancel_rearming_delayed_workqueue(keventd_wq, dwork);
673 }
674 EXPORT_SYMBOL(cancel_rearming_delayed_work);
675
676 /**
677  * execute_in_process_context - reliably execute the routine with user context
678  * @fn:         the function to execute
679  * @ew:         guaranteed storage for the execute work structure (must
680  *              be available when the work executes)
681  *
682  * Executes the function immediately if process context is available,
683  * otherwise schedules the function for delayed execution.
684  *
685  * Returns:     0 - function was executed
686  *              1 - function was scheduled for execution
687  */
688 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
689 {
690         if (!in_interrupt()) {
691                 fn(&ew->work);
692                 return 0;
693         }
694
695         INIT_WORK(&ew->work, fn);
696         schedule_work(&ew->work);
697
698         return 1;
699 }
700 EXPORT_SYMBOL_GPL(execute_in_process_context);
701
702 int keventd_up(void)
703 {
704         return keventd_wq != NULL;
705 }
706
707 int current_is_keventd(void)
708 {
709         struct cpu_workqueue_struct *cwq;
710         int cpu = smp_processor_id();   /* preempt-safe: keventd is per-cpu */
711         int ret = 0;
712
713         BUG_ON(!keventd_wq);
714
715         cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
716         if (current == cwq->thread)
717                 ret = 1;
718
719         return ret;
720
721 }
722
723 /* Take the work from this (downed) CPU. */
724 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
725 {
726         struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
727         struct list_head list;
728         struct work_struct *work;
729
730         spin_lock_irq(&cwq->lock);
731         list_replace_init(&cwq->worklist, &list);
732
733         while (!list_empty(&list)) {
734                 printk("Taking work for %s\n", wq->name);
735                 work = list_entry(list.next,struct work_struct,entry);
736                 list_del(&work->entry);
737                 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
738         }
739         spin_unlock_irq(&cwq->lock);
740 }
741
742 /* We're holding the cpucontrol mutex here */
743 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
744                                   unsigned long action,
745                                   void *hcpu)
746 {
747         unsigned int hotcpu = (unsigned long)hcpu;
748         struct workqueue_struct *wq;
749
750         switch (action) {
751         case CPU_UP_PREPARE:
752                 mutex_lock(&workqueue_mutex);
753                 /* Create a new workqueue thread for it. */
754                 list_for_each_entry(wq, &workqueues, list) {
755                         if (!create_workqueue_thread(wq, hotcpu, 0)) {
756                                 printk("workqueue for %i failed\n", hotcpu);
757                                 return NOTIFY_BAD;
758                         }
759                 }
760                 break;
761
762         case CPU_ONLINE:
763                 /* Kick off worker threads. */
764                 list_for_each_entry(wq, &workqueues, list) {
765                         struct cpu_workqueue_struct *cwq;
766
767                         cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
768                         kthread_bind(cwq->thread, hotcpu);
769                         wake_up_process(cwq->thread);
770                 }
771                 mutex_unlock(&workqueue_mutex);
772                 break;
773
774         case CPU_UP_CANCELED:
775                 list_for_each_entry(wq, &workqueues, list) {
776                         if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
777                                 continue;
778                         /* Unbind so it can run. */
779                         kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
780                                      any_online_cpu(cpu_online_map));
781                         cleanup_workqueue_thread(wq, hotcpu);
782                 }
783                 mutex_unlock(&workqueue_mutex);
784                 break;
785
786         case CPU_DOWN_PREPARE:
787                 mutex_lock(&workqueue_mutex);
788                 break;
789
790         case CPU_DOWN_FAILED:
791                 mutex_unlock(&workqueue_mutex);
792                 break;
793
794         case CPU_DEAD:
795                 list_for_each_entry(wq, &workqueues, list)
796                         cleanup_workqueue_thread(wq, hotcpu);
797                 list_for_each_entry(wq, &workqueues, list)
798                         take_over_work(wq, hotcpu);
799                 mutex_unlock(&workqueue_mutex);
800                 break;
801         }
802
803         return NOTIFY_OK;
804 }
805
806 void init_workqueues(void)
807 {
808         singlethread_cpu = first_cpu(cpu_possible_map);
809         hotcpu_notifier(workqueue_cpu_callback, 0);
810         keventd_wq = create_workqueue("events");
811         BUG_ON(!keventd_wq);
812 }
813