* Kai Petzke <wpp@marie.physik.tu-berlin.de>
* Theodore Ts'o <tytso@mit.edu>
*
- * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
+ * Made to use alloc_percpu by Christoph Lameter.
*/
#include <linux/module.h>
#include <linux/freezer.h>
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
+#include <linux/lockdep.h>
/*
* The per-CPU workqueue (if single thread, we always use the first
struct workqueue_struct *wq;
struct task_struct *thread;
- int should_stop;
int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
const char *name;
int singlethread;
int freezeable; /* Freeze threads during suspend */
+#ifdef CONFIG_LOCKDEP
+ struct lockdep_map lockdep_map;
+#endif
};
-/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
- threads to each one as cpus come/go. */
-static DEFINE_MUTEX(workqueue_mutex);
+/* Serializes the accesses to the list of workqueues. */
+static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
static int singlethread_cpu __read_mostly;
static cpumask_t cpu_singlethread_map __read_mostly;
-/* optimization, we could use cpu_possible_map */
+/*
+ * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
+ * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
+ * which comes in between can't use for_each_online_cpu(). We could
+ * use cpu_possible_map, the cpumask below is more a documentation
+ * than optimization.
+ */
static cpumask_t cpu_populated_map __read_mostly;
/* If it's single threaded, it isn't in the list of workqueues. */
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
- * We queue the work to the CPU it was submitted, but there is no
- * guarantee that it will be processed by that CPU.
+ * We queue the work to the CPU on which it was submitted, but if the CPU dies
+ * it can be processed by another CPU.
*/
-int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
+int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;
}
EXPORT_SYMBOL_GPL(queue_work);
-void delayed_work_timer_fn(unsigned long __data)
+static void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
-int fastcall queue_delayed_work(struct workqueue_struct *wq,
+int queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
- timer_stats_timer_set_start_info(&dwork->timer);
if (delay == 0)
return queue_work(wq, &dwork->work);
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
+ timer_stats_timer_set_start_info(&dwork->timer);
+
/* This stores cwq for the moment, for the timer_fn */
set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
timer->expires = jiffies + delay;
if (cwq->run_depth > 3) {
/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %d\n",
- __FUNCTION__, cwq->run_depth);
+ __func__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
+#ifdef CONFIG_LOCKDEP
+ /*
+ * It is permissible to free the struct work_struct
+ * from inside the function that is called from it,
+ * this we need to take into account for lockdep too.
+ * To avoid bogus "held lock freed" warnings as well
+ * as problems when looking into work->lockdep_map,
+ * make a copy and use that here.
+ */
+ struct lockdep_map lockdep_map = work->lockdep_map;
+#endif
cwq->current_work = work;
list_del_init(cwq->worklist.next);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
+ lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+ lock_acquire(&lockdep_map, 0, 0, 0, 2, _THIS_IP_);
f(work);
+ lock_release(&lockdep_map, 1, _THIS_IP_);
+ lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
current->comm, preempt_count(),
- current->pid);
+ task_pid_nr(current));
printk(KERN_ERR " last function: ");
print_symbol("%s\n", (unsigned long)f);
debug_show_held_locks(current);
spin_unlock_irq(&cwq->lock);
}
-/*
- * NOTE: the caller must not touch *cwq if this func returns true
- */
-static int cwq_should_stop(struct cpu_workqueue_struct *cwq)
-{
- int should_stop = cwq->should_stop;
-
- if (unlikely(should_stop)) {
- spin_lock_irq(&cwq->lock);
- should_stop = cwq->should_stop && list_empty(&cwq->worklist);
- if (should_stop)
- cwq->thread = NULL;
- spin_unlock_irq(&cwq->lock);
- }
-
- return should_stop;
-}
-
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
- if (!cwq->wq->freezeable)
- current->flags |= PF_NOFREEZE;
+ if (cwq->wq->freezeable)
+ set_freezable();
set_user_nice(current, -5);
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (!freezing(current) && !cwq->should_stop
- && list_empty(&cwq->worklist))
+ if (!freezing(current) &&
+ !kthread_should_stop() &&
+ list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
- if (cwq_should_stop(cwq))
+ if (kthread_should_stop())
break;
run_workqueue(cwq);
insert_work(cwq, &barr->work, tail);
}
-static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
+ int active;
+
if (cwq->thread == current) {
/*
* Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking.
*/
run_workqueue(cwq);
+ active = 1;
} else {
struct wq_barrier barr;
- int active = 0;
+ active = 0;
spin_lock_irq(&cwq->lock);
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
insert_wq_barrier(cwq, &barr, 1);
if (active)
wait_for_completion(&barr.done);
}
+
+ return active;
}
/**
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/
-void fastcall flush_workqueue(struct workqueue_struct *wq)
+void flush_workqueue(struct workqueue_struct *wq)
{
const cpumask_t *cpu_map = wq_cpu_map(wq);
int cpu;
might_sleep();
+ lock_acquire(&wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+ lock_release(&wq->lockdep_map, 1, _THIS_IP_);
for_each_cpu_mask(cpu, *cpu_map)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
}
EXPORT_SYMBOL_GPL(flush_workqueue);
/*
- * Upon a successful return, the caller "owns" WORK_STRUCT_PENDING bit,
+ * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
* so this work can't be re-armed in any way.
*/
static int try_to_grab_pending(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
- int ret = 0;
+ int ret = -1;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
- return 1;
+ return 0;
/*
* The queueing is in progress, or it is already queued. Try to
might_sleep();
+ lock_acquire(&work->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+ lock_release(&work->lockdep_map, 1, _THIS_IP_);
+
cwq = get_wq_data(work);
if (!cwq)
return;
wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
}
+static int __cancel_work_timer(struct work_struct *work,
+ struct timer_list* timer)
+{
+ int ret;
+
+ do {
+ ret = (timer && likely(del_timer(timer)));
+ if (!ret)
+ ret = try_to_grab_pending(work);
+ wait_on_work(work);
+ } while (unlikely(ret < 0));
+
+ work_clear_pending(work);
+ return ret;
+}
+
/**
* cancel_work_sync - block until a work_struct's callback has terminated
* @work: the work which is to be flushed
*
+ * Returns true if @work was pending.
+ *
* cancel_work_sync() will cancel the work if it is queued. If the work's
* callback appears to be running, cancel_work_sync() will block until it
* has completed.
* The caller must ensure that workqueue_struct on which this work was last
* queued can't be destroyed before this function returns.
*/
-void cancel_work_sync(struct work_struct *work)
+int cancel_work_sync(struct work_struct *work)
{
- while (!try_to_grab_pending(work))
- cpu_relax();
- wait_on_work(work);
- work_clear_pending(work);
+ return __cancel_work_timer(work, NULL);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
/**
- * cancel_rearming_delayed_work - reliably kill off a delayed work.
+ * cancel_delayed_work_sync - reliably kill off a delayed work.
* @dwork: the delayed work struct
*
+ * Returns true if @dwork was pending.
+ *
* It is possible to use this function if @dwork rearms itself via queue_work()
* or queue_delayed_work(). See also the comment for cancel_work_sync().
*/
-void cancel_rearming_delayed_work(struct delayed_work *dwork)
+int cancel_delayed_work_sync(struct delayed_work *dwork)
{
- while (!del_timer(&dwork->timer) &&
- !try_to_grab_pending(&dwork->work))
- cpu_relax();
- wait_on_work(&dwork->work);
- work_clear_pending(&dwork->work);
+ return __cancel_work_timer(&dwork->work, &dwork->timer);
}
-EXPORT_SYMBOL(cancel_rearming_delayed_work);
+EXPORT_SYMBOL(cancel_delayed_work_sync);
static struct workqueue_struct *keventd_wq __read_mostly;
*
* This puts a job in the kernel-global workqueue.
*/
-int fastcall schedule_work(struct work_struct *work)
+int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
* After waiting for a given time this puts a job in the kernel-global
* workqueue.
*/
-int fastcall schedule_delayed_work(struct delayed_work *dwork,
+int schedule_delayed_work(struct delayed_work *dwork,
unsigned long delay)
{
- timer_stats_timer_set_start_info(&dwork->timer);
return queue_delayed_work(keventd_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
* Returns zero on success.
* Returns -ve errno on failure.
*
- * Appears to be racy against CPU hotplug.
- *
* schedule_on_each_cpu() is very slow.
*/
int schedule_on_each_cpu(work_func_t func)
if (!works)
return -ENOMEM;
- preempt_disable(); /* CPU hotplug */
+ get_online_cpus();
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);
set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
}
- preempt_enable();
flush_workqueue(keventd_wq);
+ put_online_cpus();
free_percpu(works);
return 0;
}
int current_is_keventd(void)
{
struct cpu_workqueue_struct *cwq;
- int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
+ int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
int ret = 0;
BUG_ON(!keventd_wq);
return PTR_ERR(p);
cwq->thread = p;
- cwq->should_stop = 0;
return 0;
}
}
}
-struct workqueue_struct *__create_workqueue(const char *name,
- int singlethread, int freezeable)
+struct workqueue_struct *__create_workqueue_key(const char *name,
+ int singlethread,
+ int freezeable,
+ struct lock_class_key *key,
+ const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
}
wq->name = name;
+ lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
INIT_LIST_HEAD(&wq->list);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
} else {
- mutex_lock(&workqueue_mutex);
+ get_online_cpus();
+ spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
+ spin_unlock(&workqueue_lock);
for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
- mutex_unlock(&workqueue_mutex);
+ put_online_cpus();
}
if (err) {
}
return wq;
}
-EXPORT_SYMBOL_GPL(__create_workqueue);
+EXPORT_SYMBOL_GPL(__create_workqueue_key);
-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
+static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
{
- struct wq_barrier barr;
- int alive = 0;
-
- spin_lock_irq(&cwq->lock);
- if (cwq->thread != NULL) {
- insert_wq_barrier(cwq, &barr, 1);
- cwq->should_stop = 1;
- alive = 1;
- }
- spin_unlock_irq(&cwq->lock);
+ /*
+ * Our caller is either destroy_workqueue() or CPU_DEAD,
+ * get_online_cpus() protects cwq->thread.
+ */
+ if (cwq->thread == NULL)
+ return;
- if (alive) {
- wait_for_completion(&barr.done);
+ lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+ lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
- while (unlikely(cwq->thread != NULL))
- cpu_relax();
- /*
- * Wait until cwq->thread unlocks cwq->lock,
- * it won't touch *cwq after that.
- */
- smp_rmb();
- spin_unlock_wait(&cwq->lock);
- }
+ flush_cpu_workqueue(cwq);
+ /*
+ * If the caller is CPU_DEAD and cwq->worklist was not empty,
+ * a concurrent flush_workqueue() can insert a barrier after us.
+ * However, in that case run_workqueue() won't return and check
+ * kthread_should_stop() until it flushes all work_struct's.
+ * When ->worklist becomes empty it is safe to exit because no
+ * more work_structs can be queued on this cwq: flush_workqueue
+ * checks list_empty(), and a "normal" queue_work() can't use
+ * a dead CPU.
+ */
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
}
/**
void destroy_workqueue(struct workqueue_struct *wq)
{
const cpumask_t *cpu_map = wq_cpu_map(wq);
- struct cpu_workqueue_struct *cwq;
int cpu;
- mutex_lock(&workqueue_mutex);
+ get_online_cpus();
+ spin_lock(&workqueue_lock);
list_del(&wq->list);
- mutex_unlock(&workqueue_mutex);
+ spin_unlock(&workqueue_lock);
- for_each_cpu_mask(cpu, *cpu_map) {
- cwq = per_cpu_ptr(wq->cpu_wq, cpu);
- cleanup_workqueue_thread(cwq, cpu);
- }
+ for_each_cpu_mask(cpu, *cpu_map)
+ cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
+ put_online_cpus();
free_percpu(wq->cpu_wq);
kfree(wq);
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
- switch (action) {
- case CPU_LOCK_ACQUIRE:
- mutex_lock(&workqueue_mutex);
- return NOTIFY_OK;
-
- case CPU_LOCK_RELEASE:
- mutex_unlock(&workqueue_mutex);
- return NOTIFY_OK;
+ action &= ~CPU_TASKS_FROZEN;
+ switch (action) {
case CPU_UP_PREPARE:
cpu_set(cpu, cpu_populated_map);
}
case CPU_UP_PREPARE:
if (!create_workqueue_thread(cwq, cpu))
break;
- printk(KERN_ERR "workqueue for %i failed\n", cpu);
+ printk(KERN_ERR "workqueue [%s] for %i failed\n",
+ wq->name, cpu);
return NOTIFY_BAD;
case CPU_ONLINE:
case CPU_UP_CANCELED:
start_workqueue_thread(cwq, -1);
case CPU_DEAD:
- cleanup_workqueue_thread(cwq, cpu);
+ cleanup_workqueue_thread(cwq);
break;
}
}
+ switch (action) {
+ case CPU_UP_CANCELED:
+ case CPU_DEAD:
+ cpu_clear(cpu, cpu_populated_map);
+ }
+
return NOTIFY_OK;
}