+ cpu_maps_update_done();
+ }
+
+ if (err) {
+ destroy_workqueue(wq);
+ wq = NULL;
+ }
+ return wq;
+}
+EXPORT_SYMBOL_GPL(__create_workqueue_key);
+
+static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
+{
+ /*
+ * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
+ * cpu_add_remove_lock protects cwq->thread.
+ */
+ if (cwq->thread == NULL)
+ return;
+
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_release(&cwq->wq->lockdep_map);
+
+ flush_cpu_workqueue(cwq);
+ /*
+ * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
+ * a concurrent flush_workqueue() can insert a barrier after us.
+ * However, in that case run_workqueue() won't return and check
+ * kthread_should_stop() until it flushes all work_struct's.
+ * When ->worklist becomes empty it is safe to exit because no
+ * more work_structs can be queued on this cwq: flush_workqueue
+ * checks list_empty(), and a "normal" queue_work() can't use
+ * a dead CPU.
+ */
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
+}
+
+/**
+ * destroy_workqueue - safely terminate a workqueue
+ * @wq: target workqueue
+ *
+ * Safely destroy a workqueue. All work currently pending will be done first.
+ */
+void destroy_workqueue(struct workqueue_struct *wq)
+{
+ const struct cpumask *cpu_map = wq_cpu_map(wq);
+ int cpu;
+
+ cpu_maps_update_begin();
+ spin_lock(&workqueue_lock);
+ list_del(&wq->list);
+ spin_unlock(&workqueue_lock);
+
+ for_each_cpu_mask_nr(cpu, *cpu_map)
+ cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
+ cpu_maps_update_done();
+
+ free_percpu(wq->cpu_wq);
+ kfree(wq);
+}
+EXPORT_SYMBOL_GPL(destroy_workqueue);