SUNRPC: Fix a race in rpciod_down()
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Thu, 19 Jul 2007 20:32:20 +0000 (16:32 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Tue, 7 Aug 2007 19:13:16 +0000 (15:13 -0400)
The commit 4ada539ed77c7a2bbcb75cafbbd7bd8d2b9bef7b lead to the unpleasant
possibility of an asynchronous rpc_task being required to call
rpciod_down() when it is complete. This again means that the rpciod
workqueue may get to call destroy_workqueue on itself -> hang...

Change rpciod_up/rpciod_down to just get/put the module, and then
create/destroy the workqueues on module load/unload.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
net/sunrpc/sched.c

index b5723c2..954d7ec 100644 (file)
@@ -50,8 +50,6 @@ static RPC_WAITQ(delay_queue, "delayq");
 /*
  * rpciod-related stuff
  */
-static DEFINE_MUTEX(rpciod_mutex);
-static atomic_t rpciod_users = ATOMIC_INIT(0);
 struct workqueue_struct *rpciod_workqueue;
 
 /*
@@ -961,60 +959,49 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
        spin_unlock(&clnt->cl_lock);
 }
 
+int rpciod_up(void)
+{
+       return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
+}
+
+void rpciod_down(void)
+{
+       module_put(THIS_MODULE);
+}
+
 /*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
  */
-int
-rpciod_up(void)
+static int rpciod_start(void)
 {
        struct workqueue_struct *wq;
-       int error = 0;
-
-       if (atomic_inc_not_zero(&rpciod_users))
-               return 0;
-
-       mutex_lock(&rpciod_mutex);
 
-       /* Guard against races with rpciod_down() */
-       if (rpciod_workqueue != NULL)
-               goto out_ok;
        /*
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       error = -ENOMEM;
        wq = create_workqueue("rpciod");
-       if (wq == NULL)
-               goto out;
-
        rpciod_workqueue = wq;
-       error = 0;
-out_ok:
-       atomic_inc(&rpciod_users);
-out:
-       mutex_unlock(&rpciod_mutex);
-       return error;
+       return rpciod_workqueue != NULL;
 }
 
-void
-rpciod_down(void)
+static void rpciod_stop(void)
 {
-       if (!atomic_dec_and_test(&rpciod_users))
-               return;
+       struct workqueue_struct *wq = NULL;
 
-       mutex_lock(&rpciod_mutex);
+       if (rpciod_workqueue == NULL)
+               return;
        dprintk("RPC:       destroying workqueue rpciod\n");
 
-       if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
-               destroy_workqueue(rpciod_workqueue);
-               rpciod_workqueue = NULL;
-       }
-       mutex_unlock(&rpciod_mutex);
+       wq = rpciod_workqueue;
+       rpciod_workqueue = NULL;
+       destroy_workqueue(wq);
 }
 
 void
 rpc_destroy_mempool(void)
 {
+       rpciod_stop();
        if (rpc_buffer_mempool)
                mempool_destroy(rpc_buffer_mempool);
        if (rpc_task_mempool)
@@ -1048,6 +1035,8 @@ rpc_init_mempool(void)
                                                      rpc_buffer_slabp);
        if (!rpc_buffer_mempool)
                goto err_nomem;
+       if (!rpciod_start())
+               goto err_nomem;
        return 0;
 err_nomem:
        rpc_destroy_mempool();