SUNRPC: Make create_client() take a reference to the rpciod workqueue
[safe/jmp/linux-2.6] / net / sunrpc / clnt.c
index 6d7221f..fe838e9 100644 (file)
@@ -36,8 +36,6 @@
 #include <linux/sunrpc/metrics.h>
 
 
-#define RPC_SLACK_SPACE                (1024)  /* total overkill */
-
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY       RPCDBG_CALL
 #endif
@@ -113,6 +111,9 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
                        program->name, servname, xprt);
 
+       err = rpciod_up();
+       if (err)
+               goto out_no_rpciod;
        err = -EINVAL;
        if (!xprt)
                goto out_no_xprt;
@@ -123,8 +124,6 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
        if (!clnt)
                goto out_err;
-       atomic_set(&clnt->cl_users, 0);
-       atomic_set(&clnt->cl_count, 1);
        clnt->cl_parent = clnt;
 
        clnt->cl_server = clnt->cl_inline_name;
@@ -150,6 +149,8 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        if (clnt->cl_metrics == NULL)
                goto out_no_stats;
        clnt->cl_program  = program;
+       INIT_LIST_HEAD(&clnt->cl_tasks);
+       spin_lock_init(&clnt->cl_lock);
 
        if (!xprt_bound(clnt->cl_xprt))
                clnt->cl_autobind = 1;
@@ -157,6 +158,8 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        clnt->cl_rtt = &clnt->cl_rtt_default;
        rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
 
+       kref_init(&clnt->cl_kref);
+
        err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
        if (err < 0)
                goto out_no_path;
@@ -174,6 +177,7 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        if (clnt->cl_nodelen > UNX_MAXNODENAME)
                clnt->cl_nodelen = UNX_MAXNODENAME;
        memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
+       rpc_register_client(clnt);
        return clnt;
 
 out_no_auth:
@@ -190,6 +194,8 @@ out_no_stats:
 out_err:
        xprt_put(xprt);
 out_no_xprt:
+       rpciod_down();
+out_no_rpciod:
        return ERR_PTR(err);
 }
 
@@ -247,8 +253,6 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
                clnt->cl_intr = 1;
        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
                clnt->cl_autobind = 1;
-       if (args->flags & RPC_CLNT_CREATE_ONESHOT)
-               clnt->cl_oneshot = 1;
        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
                clnt->cl_discrtry = 1;
 
@@ -270,24 +274,25 @@ rpc_clone_client(struct rpc_clnt *clnt)
        new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
        if (!new)
                goto out_no_clnt;
-       atomic_set(&new->cl_count, 1);
-       atomic_set(&new->cl_users, 0);
+       new->cl_parent = clnt;
+       /* Turn off autobind on clones */
+       new->cl_autobind = 0;
+       INIT_LIST_HEAD(&new->cl_tasks);
+       spin_lock_init(&new->cl_lock);
+       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        new->cl_metrics = rpc_alloc_iostats(clnt);
        if (new->cl_metrics == NULL)
                goto out_no_stats;
+       kref_init(&new->cl_kref);
        err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
        if (err != 0)
                goto out_no_path;
-       new->cl_parent = clnt;
-       atomic_inc(&clnt->cl_count);
-       new->cl_xprt = xprt_get(clnt->cl_xprt);
-       /* Turn off autobind on clones */
-       new->cl_autobind = 0;
-       new->cl_oneshot = 0;
-       new->cl_dead = 0;
-       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
+       xprt_get(clnt->cl_xprt);
+       kref_get(&clnt->cl_kref);
+       rpc_register_client(new);
+       rpciod_up();
        return new;
 out_no_path:
        rpc_free_iostats(new->cl_metrics);
@@ -300,47 +305,29 @@ out_no_clnt:
 
 /*
  * Properly shut down an RPC client, terminating all outstanding
- * requests. Note that we must be certain that cl_oneshot and
- * cl_dead are cleared, or else the client would be destroyed
- * when the last task releases it.
+ * requests.
  */
-int
-rpc_shutdown_client(struct rpc_clnt *clnt)
+void rpc_shutdown_client(struct rpc_clnt *clnt)
 {
-       dprintk("RPC:       shutting down %s client for %s, tasks=%d\n",
-                       clnt->cl_protname, clnt->cl_server,
-                       atomic_read(&clnt->cl_users));
-
-       while (atomic_read(&clnt->cl_users) > 0) {
-               /* Don't let rpc_release_client destroy us */
-               clnt->cl_oneshot = 0;
-               clnt->cl_dead = 0;
+       dprintk("RPC:       shutting down %s client for %s\n",
+                       clnt->cl_protname, clnt->cl_server);
+
+       while (!list_empty(&clnt->cl_tasks)) {
                rpc_killall_tasks(clnt);
                wait_event_timeout(destroy_wait,
-                       !atomic_read(&clnt->cl_users), 1*HZ);
+                       list_empty(&clnt->cl_tasks), 1*HZ);
        }
 
-       if (atomic_read(&clnt->cl_users) < 0) {
-               printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
-                               clnt, atomic_read(&clnt->cl_users));
-#ifdef RPC_DEBUG
-               rpc_show_tasks();
-#endif
-               BUG();
-       }
-
-       return rpc_destroy_client(clnt);
+       rpc_release_client(clnt);
 }
 
 /*
- * Delete an RPC client
+ * Free an RPC client
  */
-int
-rpc_destroy_client(struct rpc_clnt *clnt)
+static void
+rpc_free_client(struct kref *kref)
 {
-       if (!atomic_dec_and_test(&clnt->cl_count))
-               return 1;
-       BUG_ON(atomic_read(&clnt->cl_users) != 0);
+       struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
 
        dprintk("RPC:       destroying %s client for %s\n",
                        clnt->cl_protname, clnt->cl_server);
@@ -353,33 +340,31 @@ rpc_destroy_client(struct rpc_clnt *clnt)
                rpc_put_mount();
        }
        if (clnt->cl_parent != clnt) {
-               rpc_destroy_client(clnt->cl_parent);
+               rpc_release_client(clnt->cl_parent);
                goto out_free;
        }
        if (clnt->cl_server != clnt->cl_inline_name)
                kfree(clnt->cl_server);
 out_free:
+       rpc_unregister_client(clnt);
        rpc_free_iostats(clnt->cl_metrics);
        clnt->cl_metrics = NULL;
        xprt_put(clnt->cl_xprt);
+       rpciod_down();
        kfree(clnt);
-       return 0;
 }
 
 /*
- * Release an RPC client
+ * Release reference to the RPC client
  */
 void
 rpc_release_client(struct rpc_clnt *clnt)
 {
-       dprintk("RPC:       rpc_release_client(%p, %d)\n",
-                       clnt, atomic_read(&clnt->cl_users));
+       dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 
-       if (!atomic_dec_and_test(&clnt->cl_users))
-               return;
-       wake_up(&destroy_wait);
-       if (clnt->cl_oneshot || clnt->cl_dead)
-               rpc_destroy_client(clnt);
+       if (list_empty(&clnt->cl_tasks))
+               wake_up(&destroy_wait);
+       kref_put(&clnt->cl_kref, rpc_free_client);
 }
 
 /**
@@ -479,10 +464,6 @@ int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
        sigset_t        oldset;
        int             status;
 
-       /* If this client is slain all further I/O fails */
-       if (clnt->cl_dead)
-               return -EIO;
-
        BUG_ON(flags & RPC_TASK_ASYNC);
 
        task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
@@ -515,11 +496,6 @@ rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
        sigset_t        oldset;
        int             status;
 
-       /* If this client is slain all further I/O fails */
-       status = -EIO;
-       if (clnt->cl_dead)
-               goto out_release;
-
        flags |= RPC_TASK_ASYNC;
 
        /* Create/initialize a new RPC task */
@@ -747,21 +723,38 @@ call_reserveresult(struct rpc_task *task)
 static void
 call_allocate(struct rpc_task *task)
 {
+       unsigned int slack = task->tk_auth->au_cslack;
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = task->tk_xprt;
-       unsigned int    bufsiz;
+       struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
 
        dprint_status(task);
 
+       task->tk_status = 0;
        task->tk_action = call_bind;
+
        if (req->rq_buffer)
                return;
 
-       /* FIXME: compute buffer requirements more exactly using
-        * auth->au_wslack */
-       bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
+       if (proc->p_proc != 0) {
+               BUG_ON(proc->p_arglen == 0);
+               if (proc->p_decode != NULL)
+                       BUG_ON(proc->p_replen == 0);
+       }
 
-       if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
+       /*
+        * Calculate the size (in quads) of the RPC call
+        * and reply headers, and convert both values
+        * to byte sizes.
+        */
+       req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
+       req->rq_callsize <<= 2;
+       req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
+       req->rq_rcvsize <<= 2;
+
+       req->rq_buffer = xprt->ops->buf_alloc(task,
+                                       req->rq_callsize + req->rq_rcvsize);
+       if (req->rq_buffer != NULL)
                return;
 
        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
@@ -788,6 +781,17 @@ rpc_task_force_reencode(struct rpc_task *task)
        task->tk_rqstp->rq_snd_buf.len = 0;
 }
 
+static inline void
+rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
+{
+       buf->head[0].iov_base = start;
+       buf->head[0].iov_len = len;
+       buf->tail[0].iov_len = 0;
+       buf->page_len = 0;
+       buf->len = 0;
+       buf->buflen = len;
+}
+
 /*
  * 3.  Encode arguments of an RPC call
  */
@@ -795,28 +799,17 @@ static void
 call_encode(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       struct xdr_buf *sndbuf = &req->rq_snd_buf;
-       struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
-       unsigned int    bufsiz;
        kxdrproc_t      encode;
        __be32          *p;
 
        dprint_status(task);
 
-       /* Default buffer setup */
-       bufsiz = req->rq_bufsize >> 1;
-       sndbuf->head[0].iov_base = (void *)req->rq_buffer;
-       sndbuf->head[0].iov_len  = bufsiz;
-       sndbuf->tail[0].iov_len  = 0;
-       sndbuf->page_len         = 0;
-       sndbuf->len              = 0;
-       sndbuf->buflen           = bufsiz;
-       rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
-       rcvbuf->head[0].iov_len  = bufsiz;
-       rcvbuf->tail[0].iov_len  = 0;
-       rcvbuf->page_len         = 0;
-       rcvbuf->len              = 0;
-       rcvbuf->buflen           = bufsiz;
+       rpc_xdr_buf_init(&req->rq_snd_buf,
+                        req->rq_buffer,
+                        req->rq_callsize);
+       rpc_xdr_buf_init(&req->rq_rcv_buf,
+                        (char *)req->rq_buffer + req->rq_callsize,
+                        req->rq_rcvsize);
 
        /* Encode header and provided arguments */
        encode = task->tk_msg.rpc_proc->p_encode;
@@ -887,9 +880,11 @@ call_bind_status(struct rpc_task *task)
                                task->tk_pid);
                break;
        case -EPROTONOSUPPORT:
-               dprintk("RPC: %5u remote rpcbind version 2 unavailable\n",
+               dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
                                task->tk_pid);
-               break;
+               task->tk_status = 0;
+               task->tk_action = call_bind;
+               return;
        default:
                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
                                task->tk_pid, -task->tk_status);
@@ -1046,6 +1041,8 @@ call_status(struct rpc_task *task)
                rpc_delay(task, 3*HZ);
        case -ETIMEDOUT:
                task->tk_action = call_timeout;
+               if (task->tk_client->cl_discrtry)
+                       xprt_disconnect(task->tk_xprt);
                break;
        case -ECONNREFUSED:
        case -ENOTCONN:
@@ -1169,6 +1166,8 @@ call_decode(struct rpc_task *task)
 out_retry:
        req->rq_received = req->rq_private_buf.len = 0;
        task->tk_status = 0;
+       if (task->tk_client->cl_discrtry)
+               xprt_disconnect(task->tk_xprt);
 }
 
 /*