Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next-2.6
[safe/jmp/linux-2.6] / net / sunrpc / clnt.c
index 22092b9..756fc32 100644 (file)
 
 #include <linux/module.h>
 #include <linux/types.h>
+#include <linux/kallsyms.h>
 #include <linux/mm.h>
+#include <linux/namei.h>
+#include <linux/mount.h>
 #include <linux/slab.h>
-#include <linux/smp_lock.h>
 #include <linux/utsname.h>
 #include <linux/workqueue.h>
+#include <linux/in6.h>
 
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
 #include <linux/sunrpc/metrics.h>
+#include <linux/sunrpc/bc_xprt.h>
 
+#include "sunrpc.h"
 
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY       RPCDBG_CALL
@@ -42,7 +47,7 @@
 
 #define dprint_status(t)                                       \
        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
-                       __FUNCTION__, t->tk_status)
+                       __func__, t->tk_status)
 
 /*
  * All RPC clients are linked into this list
@@ -57,11 +62,13 @@ static void call_start(struct rpc_task *task);
 static void    call_reserve(struct rpc_task *task);
 static void    call_reserveresult(struct rpc_task *task);
 static void    call_allocate(struct rpc_task *task);
-static void    call_encode(struct rpc_task *task);
 static void    call_decode(struct rpc_task *task);
 static void    call_bind(struct rpc_task *task);
 static void    call_bind_status(struct rpc_task *task);
 static void    call_transmit(struct rpc_task *task);
+#if defined(CONFIG_NFS_V4_1)
+static void    call_bc_transmit(struct rpc_task *task);
+#endif /* CONFIG_NFS_V4_1 */
 static void    call_status(struct rpc_task *task);
 static void    call_transmit_status(struct rpc_task *task);
 static void    call_refresh(struct rpc_task *task);
@@ -69,10 +76,10 @@ static void call_refreshresult(struct rpc_task *task);
 static void    call_timeout(struct rpc_task *task);
 static void    call_connect(struct rpc_task *task);
 static void    call_connect_status(struct rpc_task *task);
-static __be32 *        call_header(struct rpc_task *task);
-static __be32 *        call_verify(struct rpc_task *task);
 
-static int     rpc_ping(struct rpc_clnt *clnt, int flags);
+static __be32  *rpc_encode_header(struct rpc_task *task);
+static __be32  *rpc_verify_header(struct rpc_task *task);
+static int     rpc_ping(struct rpc_clnt *clnt);
 
 static void rpc_register_client(struct rpc_clnt *clnt)
 {
@@ -92,37 +99,54 @@ static int
 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
 {
        static uint32_t clntid;
+       struct nameidata nd;
+       struct path path;
+       char name[15];
+       struct qstr q = {
+               .name = name,
+       };
        int error;
 
-       clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
-       clnt->cl_dentry = ERR_PTR(-ENOENT);
+       clnt->cl_path.mnt = ERR_PTR(-ENOENT);
+       clnt->cl_path.dentry = ERR_PTR(-ENOENT);
        if (dir_name == NULL)
                return 0;
 
-       clnt->cl_vfsmnt = rpc_get_mount();
-       if (IS_ERR(clnt->cl_vfsmnt))
-               return PTR_ERR(clnt->cl_vfsmnt);
+       path.mnt = rpc_get_mount();
+       if (IS_ERR(path.mnt))
+               return PTR_ERR(path.mnt);
+       error = vfs_path_lookup(path.mnt->mnt_root, path.mnt, dir_name, 0, &nd);
+       if (error)
+               goto err;
 
        for (;;) {
-               snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
-                               "%s/clnt%x", dir_name,
-                               (unsigned int)clntid++);
-               clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
-               clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
-               if (!IS_ERR(clnt->cl_dentry))
-                       return 0;
-               error = PTR_ERR(clnt->cl_dentry);
+               q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
+               name[sizeof(name) - 1] = '\0';
+               q.hash = full_name_hash(q.name, q.len);
+               path.dentry = rpc_create_client_dir(nd.path.dentry, &q, clnt);
+               if (!IS_ERR(path.dentry))
+                       break;
+               error = PTR_ERR(path.dentry);
                if (error != -EEXIST) {
-                       printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
-                                       clnt->cl_pathname, error);
-                       rpc_put_mount();
-                       return error;
+                       printk(KERN_INFO "RPC: Couldn't create pipefs entry"
+                                       " %s/%s, error %d\n",
+                                       dir_name, name, error);
+                       goto err_path_put;
                }
        }
+       path_put(&nd.path);
+       clnt->cl_path = path;
+       return 0;
+err_path_put:
+       path_put(&nd.path);
+err:
+       rpc_put_mount();
+       return error;
 }
 
-static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
+static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
 {
+       struct rpc_program      *program = args->program;
        struct rpc_version      *version;
        struct rpc_clnt         *clnt = NULL;
        struct rpc_auth         *auth;
@@ -131,13 +155,13 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
 
        /* sanity check the name before trying to print it */
        err = -EINVAL;
-       len = strlen(servname);
+       len = strlen(args->servername);
        if (len > RPC_MAXNETNAMELEN)
                goto out_no_rpciod;
        len++;
 
        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
-                       program->name, servname, xprt);
+                       program->name, args->servername, xprt);
 
        err = rpciod_up();
        if (err)
@@ -145,7 +169,11 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        err = -EINVAL;
        if (!xprt)
                goto out_no_xprt;
-       if (vers >= program->nrvers || !(version = program->version[vers]))
+
+       if (args->version >= program->nrvers)
+               goto out_err;
+       version = program->version[args->version];
+       if (version == NULL)
                goto out_err;
 
        err = -ENOMEM;
@@ -157,18 +185,18 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        clnt->cl_server = clnt->cl_inline_name;
        if (len > sizeof(clnt->cl_inline_name)) {
                char *buf = kmalloc(len, GFP_KERNEL);
-               if (buf != 0)
+               if (buf != NULL)
                        clnt->cl_server = buf;
                else
                        len = sizeof(clnt->cl_inline_name);
        }
-       strlcpy(clnt->cl_server, servname, len);
+       strlcpy(clnt->cl_server, args->servername, len);
 
        clnt->cl_xprt     = xprt;
        clnt->cl_procinfo = version->procs;
        clnt->cl_maxproc  = version->nrprocs;
        clnt->cl_protname = program->name;
-       clnt->cl_prog     = program->number;
+       clnt->cl_prog     = args->prognumber ? : program->number;
        clnt->cl_vers     = version->number;
        clnt->cl_stats    = program->stats;
        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
@@ -182,8 +210,21 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        if (!xprt_bound(clnt->cl_xprt))
                clnt->cl_autobind = 1;
 
+       clnt->cl_timeout = xprt->timeout;
+       if (args->timeout != NULL) {
+               memcpy(&clnt->cl_timeout_default, args->timeout,
+                               sizeof(clnt->cl_timeout_default));
+               clnt->cl_timeout = &clnt->cl_timeout_default;
+       }
+
        clnt->cl_rtt = &clnt->cl_rtt_default;
-       rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
+       rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
+       clnt->cl_principal = NULL;
+       if (args->client_name) {
+               clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
+               if (!clnt->cl_principal)
+                       goto out_no_principal;
+       }
 
        kref_init(&clnt->cl_kref);
 
@@ -191,28 +232,30 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        if (err < 0)
                goto out_no_path;
 
-       auth = rpcauth_create(flavor, clnt);
+       auth = rpcauth_create(args->authflavor, clnt);
        if (IS_ERR(auth)) {
                printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
-                               flavor);
+                               args->authflavor);
                err = PTR_ERR(auth);
                goto out_no_auth;
        }
 
        /* save the nodename */
-       clnt->cl_nodelen = strlen(utsname()->nodename);
+       clnt->cl_nodelen = strlen(init_utsname()->nodename);
        if (clnt->cl_nodelen > UNX_MAXNODENAME)
                clnt->cl_nodelen = UNX_MAXNODENAME;
-       memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
+       memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
        rpc_register_client(clnt);
        return clnt;
 
 out_no_auth:
-       if (!IS_ERR(clnt->cl_dentry)) {
-               rpc_rmdir(clnt->cl_dentry);
+       if (!IS_ERR(clnt->cl_path.dentry)) {
+               rpc_remove_client_dir(clnt->cl_path.dentry);
                rpc_put_mount();
        }
 out_no_path:
+       kfree(clnt->cl_principal);
+out_no_principal:
        rpc_free_iostats(clnt->cl_metrics);
 out_no_stats:
        if (clnt->cl_server != clnt->cl_inline_name)
@@ -245,26 +288,43 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
                .srcaddr = args->saddress,
                .dstaddr = args->address,
                .addrlen = args->addrsize,
-               .timeout = args->timeout
+               .bc_xprt = args->bc_xprt,
        };
-       char servername[20];
-
-       xprt = xprt_create_transport(&xprtargs);
-       if (IS_ERR(xprt))
-               return (struct rpc_clnt *)xprt;
+       char servername[48];
 
        /*
         * If the caller chooses not to specify a hostname, whip
         * up a string representation of the passed-in address.
         */
        if (args->servername == NULL) {
-               struct sockaddr_in *addr =
-                                       (struct sockaddr_in *) args->address;
-               snprintf(servername, sizeof(servername), NIPQUAD_FMT,
-                       NIPQUAD(addr->sin_addr.s_addr));
+               servername[0] = '\0';
+               switch (args->address->sa_family) {
+               case AF_INET: {
+                       struct sockaddr_in *sin =
+                                       (struct sockaddr_in *)args->address;
+                       snprintf(servername, sizeof(servername), "%pI4",
+                                &sin->sin_addr.s_addr);
+                       break;
+               }
+               case AF_INET6: {
+                       struct sockaddr_in6 *sin =
+                                       (struct sockaddr_in6 *)args->address;
+                       snprintf(servername, sizeof(servername), "%pI6",
+                                &sin->sin6_addr);
+                       break;
+               }
+               default:
+                       /* caller wants default server name, but
+                        * address family isn't recognized. */
+                       return ERR_PTR(-EINVAL);
+               }
                args->servername = servername;
        }
 
+       xprt = xprt_create_transport(&xprtargs);
+       if (IS_ERR(xprt))
+               return (struct rpc_clnt *)xprt;
+
        /*
         * By default, kernel RPC client connects from a reserved port.
         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
@@ -275,13 +335,12 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
                xprt->resvport = 0;
 
-       clnt = rpc_new_client(xprt, args->servername, args->program,
-                               args->version, args->authflavor);
+       clnt = rpc_new_client(args, xprt);
        if (IS_ERR(clnt))
                return clnt;
 
        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
-               int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+               int err = rpc_ping(clnt);
                if (err != 0) {
                        rpc_shutdown_client(clnt);
                        return ERR_PTR(err);
@@ -292,12 +351,12 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
                clnt->cl_softrtry = 0;
 
-       if (args->flags & RPC_CLNT_CREATE_INTR)
-               clnt->cl_intr = 1;
        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
                clnt->cl_autobind = 1;
        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
                clnt->cl_discrtry = 1;
+       if (!(args->flags & RPC_CLNT_CREATE_QUIET))
+               clnt->cl_chatty = 1;
 
        return clnt;
 }
@@ -322,10 +381,15 @@ rpc_clone_client(struct rpc_clnt *clnt)
        new->cl_autobind = 0;
        INIT_LIST_HEAD(&new->cl_tasks);
        spin_lock_init(&new->cl_lock);
-       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
+       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
        new->cl_metrics = rpc_alloc_iostats(clnt);
        if (new->cl_metrics == NULL)
                goto out_no_stats;
+       if (clnt->cl_principal) {
+               new->cl_principal = kstrdup(clnt->cl_principal, GFP_KERNEL);
+               if (new->cl_principal == NULL)
+                       goto out_no_principal;
+       }
        kref_init(&new->cl_kref);
        err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
        if (err != 0)
@@ -338,11 +402,13 @@ rpc_clone_client(struct rpc_clnt *clnt)
        rpciod_up();
        return new;
 out_no_path:
+       kfree(new->cl_principal);
+out_no_principal:
        rpc_free_iostats(new->cl_metrics);
 out_no_stats:
        kfree(new);
 out_no_clnt:
-       dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
+       dprintk("RPC:       %s: returned error %d\n", __func__, err);
        return ERR_PTR(err);
 }
 EXPORT_SYMBOL_GPL(rpc_clone_client);
@@ -376,8 +442,8 @@ rpc_free_client(struct kref *kref)
 
        dprintk("RPC:       destroying %s client for %s\n",
                        clnt->cl_protname, clnt->cl_server);
-       if (!IS_ERR(clnt->cl_dentry)) {
-               rpc_rmdir(clnt->cl_dentry);
+       if (!IS_ERR(clnt->cl_path.dentry)) {
+               rpc_remove_client_dir(clnt->cl_path.dentry);
                rpc_put_mount();
        }
        if (clnt->cl_parent != clnt) {
@@ -389,6 +455,7 @@ rpc_free_client(struct kref *kref)
 out_free:
        rpc_unregister_client(clnt);
        rpc_free_iostats(clnt->cl_metrics);
+       kfree(clnt->cl_principal);
        clnt->cl_metrics = NULL;
        xprt_put(clnt->cl_xprt);
        rpciod_down();
@@ -434,9 +501,9 @@ rpc_release_client(struct rpc_clnt *clnt)
 
 /**
  * rpc_bind_new_program - bind a new RPC program to an existing client
- * @old - old rpc_client
- * @program - rpc program to set
- * @vers - rpc program version
+ * @old: old rpc_client
+ * @program: rpc program to set
+ * @vers: rpc program version
  *
  * Clones the rpc client and sets up a new RPC program. This is mainly
  * of use for enabling different RPC programs to share the same transport.
@@ -461,7 +528,7 @@ struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
        clnt->cl_prog     = program->number;
        clnt->cl_vers     = version->number;
        clnt->cl_stats    = program->stats;
-       err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+       err = rpc_ping(clnt);
        if (err != 0) {
                rpc_shutdown_client(clnt);
                clnt = ERR_PTR(err);
@@ -483,79 +550,24 @@ static const struct rpc_call_ops rpc_default_ops = {
        .rpc_call_done = rpc_default_callback,
 };
 
-/*
- *     Export the signal mask handling for synchronous code that
- *     sleeps on RPC calls
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @task_setup_data: pointer to task initialisation data
  */
-#define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
-
-static void rpc_save_sigmask(sigset_t *oldset, int intr)
-{
-       unsigned long   sigallow = sigmask(SIGKILL);
-       sigset_t sigmask;
-
-       /* Block all signals except those listed in sigallow */
-       if (intr)
-               sigallow |= RPC_INTR_SIGNALS;
-       siginitsetinv(&sigmask, sigallow);
-       sigprocmask(SIG_BLOCK, &sigmask, oldset);
-}
-
-static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
-{
-       rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
-}
-
-static inline void rpc_restore_sigmask(sigset_t *oldset)
+struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
 {
-       sigprocmask(SIG_SETMASK, oldset, NULL);
-}
-
-void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
-{
-       rpc_save_sigmask(oldset, clnt->cl_intr);
-}
-EXPORT_SYMBOL_GPL(rpc_clnt_sigmask);
+       struct rpc_task *task;
 
-void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
-{
-       rpc_restore_sigmask(oldset);
-}
-EXPORT_SYMBOL_GPL(rpc_clnt_sigunmask);
-
-static
-struct rpc_task *rpc_do_run_task(struct rpc_clnt *clnt,
-               struct rpc_message *msg,
-               int flags,
-               const struct rpc_call_ops *ops,
-               void *data)
-{
-       struct rpc_task *task, *ret;
-       sigset_t oldset;
-
-       task = rpc_new_task(clnt, flags, ops, data);
-       if (task == NULL) {
-               rpc_release_calldata(ops, data);
-               return ERR_PTR(-ENOMEM);
-       }
+       task = rpc_new_task(task_setup_data);
+       if (IS_ERR(task))
+               goto out;
 
-       /* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */
-       rpc_task_sigmask(task, &oldset);
-       if (msg != NULL) {
-               rpc_call_setup(task, msg, 0);
-               if (task->tk_status != 0) {
-                       ret = ERR_PTR(task->tk_status);
-                       rpc_put_task(task);
-                       goto out;
-               }
-       }
        atomic_inc(&task->tk_count);
        rpc_execute(task);
-       ret = task;
 out:
-       rpc_restore_sigmask(&oldset);
-       return ret;
+       return task;
 }
+EXPORT_SYMBOL_GPL(rpc_run_task);
 
 /**
  * rpc_call_sync - Perform a synchronous RPC call
@@ -563,14 +575,20 @@ out:
  * @msg: RPC call parameters
  * @flags: RPC call flags
  */
-int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
+int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
 {
        struct rpc_task *task;
+       struct rpc_task_setup task_setup_data = {
+               .rpc_client = clnt,
+               .rpc_message = msg,
+               .callback_ops = &rpc_default_ops,
+               .flags = flags,
+       };
        int status;
 
        BUG_ON(flags & RPC_TASK_ASYNC);
 
-       task = rpc_do_run_task(clnt, msg, flags, &rpc_default_ops, NULL);
+       task = rpc_run_task(&task_setup_data);
        if (IS_ERR(task))
                return PTR_ERR(task);
        status = task->tk_status;
@@ -584,16 +602,23 @@ EXPORT_SYMBOL_GPL(rpc_call_sync);
  * @clnt: pointer to RPC client
  * @msg: RPC call parameters
  * @flags: RPC call flags
- * @ops: RPC call ops
+ * @tk_ops: RPC call ops
  * @data: user call data
  */
 int
-rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
+rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
               const struct rpc_call_ops *tk_ops, void *data)
 {
        struct rpc_task *task;
+       struct rpc_task_setup task_setup_data = {
+               .rpc_client = clnt,
+               .rpc_message = msg,
+               .callback_ops = tk_ops,
+               .callback_data = data,
+               .flags = flags|RPC_TASK_ASYNC,
+       };
 
-       task = rpc_do_run_task(clnt, msg, flags|RPC_TASK_ASYNC, tk_ops, data);
+       task = rpc_run_task(&task_setup_data);
        if (IS_ERR(task))
                return PTR_ERR(task);
        rpc_put_task(task);
@@ -601,44 +626,63 @@ rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
 }
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
+#if defined(CONFIG_NFS_V4_1)
 /**
- * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
- * @clnt: pointer to RPC client
- * @flags: RPC flags
- * @ops: RPC call ops
- * @data: user call data
+ * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
+ * rpc_execute against it
+ * @req: RPC request
+ * @tk_ops: RPC call ops
  */
-struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
-                                       const struct rpc_call_ops *tk_ops,
-                                       void *data)
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
+                               const struct rpc_call_ops *tk_ops)
 {
-       return rpc_do_run_task(clnt, NULL, flags, tk_ops, data);
+       struct rpc_task *task;
+       struct xdr_buf *xbufp = &req->rq_snd_buf;
+       struct rpc_task_setup task_setup_data = {
+               .callback_ops = tk_ops,
+       };
+
+       dprintk("RPC: rpc_run_bc_task req= %p\n", req);
+       /*
+        * Create an rpc_task to send the data
+        */
+       task = rpc_new_task(&task_setup_data);
+       if (IS_ERR(task)) {
+               xprt_free_bc_request(req);
+               goto out;
+       }
+       task->tk_rqstp = req;
+
+       /*
+        * Set up the xdr_buf length.
+        * This also indicates that the buffer is XDR encoded already.
+        */
+       xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
+                       xbufp->tail[0].iov_len;
+
+       task->tk_action = call_bc_transmit;
+       atomic_inc(&task->tk_count);
+       BUG_ON(atomic_read(&task->tk_count) != 2);
+       rpc_execute(task);
+
+out:
+       dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
+       return task;
 }
-EXPORT_SYMBOL_GPL(rpc_run_task);
+#endif /* CONFIG_NFS_V4_1 */
 
 void
-rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
+rpc_call_start(struct rpc_task *task)
 {
-       task->tk_msg   = *msg;
-       task->tk_flags |= flags;
-       /* Bind the user cred */
-       if (task->tk_msg.rpc_cred != NULL)
-               rpcauth_holdcred(task);
-       else
-               rpcauth_bindcred(task);
-
-       if (task->tk_status == 0)
-               task->tk_action = call_start;
-       else
-               task->tk_action = rpc_exit_task;
+       task->tk_action = call_start;
 }
-EXPORT_SYMBOL_GPL(rpc_call_setup);
+EXPORT_SYMBOL_GPL(rpc_call_start);
 
 /**
  * rpc_peeraddr - extract remote peer address from clnt's xprt
  * @clnt: RPC client structure
  * @buf: target buffer
- * @size: length of target buffer
+ * @bufsize: length of target buffer
  *
  * Returns the number of bytes that are actually in the stored address.
  */
@@ -661,7 +705,8 @@ EXPORT_SYMBOL_GPL(rpc_peeraddr);
  * @format: address format
  *
  */
-char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
+const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
+                            enum rpc_display_format_t format)
 {
        struct rpc_xprt *xprt = clnt->cl_xprt;
 
@@ -708,6 +753,19 @@ void rpc_force_rebind(struct rpc_clnt *clnt)
 EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
 /*
+ * Restart an (async) RPC call from the call_prepare state.
+ * Usually called from within the exit handler.
+ */
+void
+rpc_restart_call_prepare(struct rpc_task *task)
+{
+       if (RPC_ASSASSINATED(task))
+               return;
+       task->tk_action = rpc_prepare_task;
+}
+EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
+
+/*
  * Restart an (async) RPC call. Usually called from within the
  * exit handler.
  */
@@ -721,6 +779,21 @@ rpc_restart_call(struct rpc_task *task)
 }
 EXPORT_SYMBOL_GPL(rpc_restart_call);
 
+#ifdef RPC_DEBUG
+static const char *rpc_proc_name(const struct rpc_task *task)
+{
+       const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
+
+       if (proc) {
+               if (proc->p_name)
+                       return proc->p_name;
+               else
+                       return "NULL";
+       } else
+               return "no proc";
+}
+#endif
+
 /*
  * 0.  Initial state
  *
@@ -732,9 +805,9 @@ call_start(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
 
-       dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
+       dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
                        clnt->cl_protname, clnt->cl_vers,
-                       task->tk_msg.rpc_proc->p_proc,
+                       rpc_proc_name(task),
                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
 
        /* Increment call count */
@@ -783,7 +856,7 @@ call_reserveresult(struct rpc_task *task)
                }
 
                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
-                               __FUNCTION__, status);
+                               __func__, status);
                rpc_exit(task, -EIO);
                return;
        }
@@ -794,7 +867,7 @@ call_reserveresult(struct rpc_task *task)
         */
        if (task->tk_rqstp) {
                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
-                               __FUNCTION__, status);
+                               __func__, status);
                xprt_release(task);
        }
 
@@ -806,7 +879,7 @@ call_reserveresult(struct rpc_task *task)
                break;
        default:
                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
-                               __FUNCTION__, status);
+                               __func__, status);
                break;
        }
        rpc_exit(task, status);
@@ -874,6 +947,7 @@ static inline void
 rpc_task_force_reencode(struct rpc_task *task)
 {
        task->tk_rqstp->rq_snd_buf.len = 0;
+       task->tk_rqstp->rq_bytes_sent = 0;
 }
 
 static inline void
@@ -892,7 +966,7 @@ rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
  * 3.  Encode arguments of an RPC call
  */
 static void
-call_encode(struct rpc_task *task)
+rpc_xdr_encode(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
        kxdrproc_t      encode;
@@ -907,23 +981,19 @@ call_encode(struct rpc_task *task)
                         (char *)req->rq_buffer + req->rq_callsize,
                         req->rq_rcvsize);
 
-       /* Encode header and provided arguments */
-       encode = task->tk_msg.rpc_proc->p_encode;
-       if (!(p = call_header(task))) {
-               printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
+       p = rpc_encode_header(task);
+       if (p == NULL) {
+               printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
                rpc_exit(task, -EIO);
                return;
        }
+
+       encode = task->tk_msg.rpc_proc->p_encode;
        if (encode == NULL)
                return;
 
        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
                        task->tk_msg.rpc_argp);
-       if (task->tk_status == -ENOMEM) {
-               /* XXX: Is this sane? */
-               rpc_delay(task, 3*HZ);
-               task->tk_status = -EAGAIN;
-       }
 }
 
 /*
@@ -960,11 +1030,9 @@ call_bind_status(struct rpc_task *task)
        }
 
        switch (task->tk_status) {
-       case -EAGAIN:
-               dprintk("RPC: %5u rpcbind waiting for another request "
-                               "to finish\n", task->tk_pid);
-               /* avoid busy-waiting here -- could be a network outage. */
-               rpc_delay(task, 5*HZ);
+       case -ENOMEM:
+               dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
+               rpc_delay(task, HZ >> 2);
                goto retry_timeout;
        case -EACCES:
                dprintk("RPC: %5u remote rpcbind: RPC program/version "
@@ -982,7 +1050,7 @@ call_bind_status(struct rpc_task *task)
                goto retry_timeout;
        case -EPFNOSUPPORT:
                /* server doesn't support any rpcbind version we know of */
-               dprintk("RPC: %5u remote rpcbind service unavailable\n",
+               dprintk("RPC: %5u unrecognized remote rpcbind service\n",
                                task->tk_pid);
                break;
        case -EPROTONOSUPPORT:
@@ -991,6 +1059,21 @@ call_bind_status(struct rpc_task *task)
                task->tk_status = 0;
                task->tk_action = call_bind;
                return;
+       case -ECONNREFUSED:             /* connection problems */
+       case -ECONNRESET:
+       case -ENOTCONN:
+       case -EHOSTDOWN:
+       case -EHOSTUNREACH:
+       case -ENETUNREACH:
+       case -EPIPE:
+               dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
+                               task->tk_pid, task->tk_status);
+               if (!RPC_IS_SOFTCONN(task)) {
+                       rpc_delay(task, 5*HZ);
+                       goto retry_timeout;
+               }
+               status = task->tk_status;
+               break;
        default:
                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
                                task->tk_pid, -task->tk_status);
@@ -1036,27 +1119,20 @@ call_connect_status(struct rpc_task *task)
        dprint_status(task);
 
        task->tk_status = 0;
-       if (status >= 0) {
+       if (status >= 0 || status == -EAGAIN) {
                clnt->cl_stats->netreconn++;
                task->tk_action = call_transmit;
                return;
        }
 
-       /* Something failed: remote service port may have changed */
-       rpc_force_rebind(clnt);
-
        switch (status) {
-       case -ENOTCONN:
-       case -EAGAIN:
-               task->tk_action = call_bind;
-               if (!RPC_IS_SOFT(task))
-                       return;
                /* if soft mounted, test if we've timed out */
        case -ETIMEDOUT:
                task->tk_action = call_timeout;
-               return;
+               break;
+       default:
+               rpc_exit(task, -EIO);
        }
-       rpc_exit(task, -EIO);
 }
 
 /*
@@ -1077,10 +1153,16 @@ call_transmit(struct rpc_task *task)
        /* Encode here so that rpcsec_gss can use correct sequence number. */
        if (rpc_task_need_encode(task)) {
                BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
-               call_encode(task);
+               rpc_xdr_encode(task);
                /* Did the encode result in an error condition? */
-               if (task->tk_status != 0)
+               if (task->tk_status != 0) {
+                       /* Was the error nonfatal? */
+                       if (task->tk_status == -EAGAIN)
+                               rpc_delay(task, HZ >> 4);
+                       else
+                               rpc_exit(task, task->tk_status);
                        return;
+               }
        }
        xprt_transmit(task);
        if (task->tk_status < 0)
@@ -1090,10 +1172,10 @@ call_transmit(struct rpc_task *task)
         * in order to allow access to the socket to other RPC requests.
         */
        call_transmit_status(task);
-       if (task->tk_msg.rpc_proc->p_decode != NULL)
+       if (rpc_reply_expected(task))
                return;
        task->tk_action = rpc_exit_task;
-       rpc_wake_up_task(task);
+       rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
 }
 
 /*
@@ -1103,15 +1185,112 @@ static void
 call_transmit_status(struct rpc_task *task)
 {
        task->tk_action = call_status;
+
        /*
-        * Special case: if we've been waiting on the socket's write_space()
-        * callback, then don't call xprt_end_transmit().
+        * Common case: success.  Force the compiler to put this
+        * test first.
         */
-       if (task->tk_status == -EAGAIN)
+       if (task->tk_status == 0) {
+               xprt_end_transmit(task);
+               rpc_task_force_reencode(task);
+               return;
+       }
+
+       switch (task->tk_status) {
+       case -EAGAIN:
+               break;
+       default:
+               dprint_status(task);
+               xprt_end_transmit(task);
+               rpc_task_force_reencode(task);
+               break;
+               /*
+                * Special cases: if we've been waiting on the
+                * socket's write_space() callback, or if the
+                * socket just returned a connection error,
+                * then hold onto the transport lock.
+                */
+       case -ECONNREFUSED:
+       case -EHOSTDOWN:
+       case -EHOSTUNREACH:
+       case -ENETUNREACH:
+               if (RPC_IS_SOFTCONN(task)) {
+                       xprt_end_transmit(task);
+                       rpc_exit(task, task->tk_status);
+                       break;
+               }
+       case -ECONNRESET:
+       case -ENOTCONN:
+       case -EPIPE:
+               rpc_task_force_reencode(task);
+       }
+}
+
+#if defined(CONFIG_NFS_V4_1)
+/*
+ * 5b. Send the backchannel RPC reply.  On error, drop the reply.  In
+ * addition, disconnect on connectivity errors.
+ */
+static void
+call_bc_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       BUG_ON(task->tk_status != 0);
+       task->tk_status = xprt_prepare_transmit(task);
+       if (task->tk_status == -EAGAIN) {
+               /*
+                * Could not reserve the transport. Try again after the
+                * transport is released.
+                */
+               task->tk_status = 0;
+               task->tk_action = call_bc_transmit;
                return;
+       }
+
+       task->tk_action = rpc_exit_task;
+       if (task->tk_status < 0) {
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               return;
+       }
+
+       xprt_transmit(task);
        xprt_end_transmit(task);
-       rpc_task_force_reencode(task);
+       dprint_status(task);
+       switch (task->tk_status) {
+       case 0:
+               /* Success */
+               break;
+       case -EHOSTDOWN:
+       case -EHOSTUNREACH:
+       case -ENETUNREACH:
+       case -ETIMEDOUT:
+               /*
+                * Problem reaching the server.  Disconnect and let the
+                * forechannel reestablish the connection.  The server will
+                * have to retransmit the backchannel request and we'll
+                * reprocess it.  Since these ops are idempotent, there's no
+                * need to cache our reply at this time.
+                */
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               xprt_conditional_disconnect(task->tk_xprt,
+                       req->rq_connect_cookie);
+               break;
+       default:
+               /*
+                * We were unable to reply and will have to drop the
+                * request.  The server should reconnect and retransmit.
+                */
+               BUG_ON(task->tk_status == -EAGAIN);
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               break;
+       }
+       rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
 }
+#endif /* CONFIG_NFS_V4_1 */
 
 /*
  * 6.  Sort out the RPC call status
@@ -1123,8 +1302,8 @@ call_status(struct rpc_task *task)
        struct rpc_rqst *req = task->tk_rqstp;
        int             status;
 
-       if (req->rq_received > 0 && !req->rq_bytes_sent)
-               task->tk_status = req->rq_received;
+       if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
+               task->tk_status = req->rq_reply_bytes_recvd;
 
        dprint_status(task);
 
@@ -1147,11 +1326,15 @@ call_status(struct rpc_task *task)
        case -ETIMEDOUT:
                task->tk_action = call_timeout;
                if (task->tk_client->cl_discrtry)
-                       xprt_force_disconnect(task->tk_xprt);
+                       xprt_conditional_disconnect(task->tk_xprt,
+                                       req->rq_connect_cookie);
                break;
+       case -ECONNRESET:
        case -ECONNREFUSED:
-       case -ENOTCONN:
                rpc_force_rebind(clnt);
+               rpc_delay(task, 3*HZ);
+       case -EPIPE:
+       case -ENOTCONN:
                task->tk_action = call_bind;
                break;
        case -EAGAIN:
@@ -1162,7 +1345,8 @@ call_status(struct rpc_task *task)
                rpc_exit(task, status);
                break;
        default:
-               printk("%s: RPC call returned error %d\n",
+               if (clnt->cl_chatty)
+                       printk("%s: RPC call returned error %d\n",
                               clnt->cl_protname, -status);
                rpc_exit(task, status);
        }
@@ -1186,8 +1370,13 @@ call_timeout(struct rpc_task *task)
        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
        task->tk_timeouts++;
 
+       if (RPC_IS_SOFTCONN(task)) {
+               rpc_exit(task, -ETIMEDOUT);
+               return;
+       }
        if (RPC_IS_SOFT(task)) {
-               printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
+               if (clnt->cl_chatty)
+                       printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
                                clnt->cl_protname, clnt->cl_server);
                rpc_exit(task, -EIO);
                return;
@@ -1195,10 +1384,16 @@ call_timeout(struct rpc_task *task)
 
        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
                task->tk_flags |= RPC_CALL_MAJORSEEN;
-               printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
+               if (clnt->cl_chatty)
+                       printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
                        clnt->cl_protname, clnt->cl_server);
        }
        rpc_force_rebind(clnt);
+       /*
+        * Did our request time out due to an RPCSEC_GSS out-of-sequence
+        * event? RFC2203 requires the server to drop all such requests.
+        */
+       rpcauth_invalcred(task);
 
 retry:
        clnt->cl_stats->rpcretrans++;
@@ -1221,12 +1416,24 @@ call_decode(struct rpc_task *task)
                        task->tk_pid, task->tk_status);
 
        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
-               printk(KERN_NOTICE "%s: server %s OK\n",
-                       clnt->cl_protname, clnt->cl_server);
+               if (clnt->cl_chatty)
+                       printk(KERN_NOTICE "%s: server %s OK\n",
+                               clnt->cl_protname, clnt->cl_server);
                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
        }
 
-       if (task->tk_status < 12) {
+       /*
+        * Ensure that we see all writes made by xprt_complete_rqst()
+        * before it changed req->rq_reply_bytes_recvd.
+        */
+       smp_rmb();
+       req->rq_rcv_buf.len = req->rq_private_buf.len;
+
+       /* Check that the softirq receive buffer is valid */
+       WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
+                               sizeof(req->rq_rcv_buf)) != 0);
+
+       if (req->rq_rcv_buf.len < 12) {
                if (!RPC_IS_SOFT(task)) {
                        task->tk_action = call_bind;
                        clnt->cl_stats->rpcretrans++;
@@ -1238,19 +1445,7 @@ call_decode(struct rpc_task *task)
                goto out_retry;
        }
 
-       /*
-        * Ensure that we see all writes made by xprt_complete_rqst()
-        * before it changed req->rq_received.
-        */
-       smp_rmb();
-       req->rq_rcv_buf.len = req->rq_private_buf.len;
-
-       /* Check that the softirq receive buffer is valid */
-       WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
-                               sizeof(req->rq_rcv_buf)) != 0);
-
-       /* Verify the RPC header */
-       p = call_verify(task);
+       p = rpc_verify_header(task);
        if (IS_ERR(p)) {
                if (p == ERR_PTR(-EAGAIN))
                        goto out_retry;
@@ -1267,10 +1462,14 @@ call_decode(struct rpc_task *task)
                        task->tk_status);
        return;
 out_retry:
-       req->rq_received = req->rq_private_buf.len = 0;
        task->tk_status = 0;
-       if (task->tk_client->cl_discrtry)
-               xprt_force_disconnect(task->tk_xprt);
+       /* Note: rpc_verify_header() may have freed the RPC slot */
+       if (task->tk_rqstp == req) {
+               req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
+               if (task->tk_client->cl_discrtry)
+                       xprt_conditional_disconnect(task->tk_xprt,
+                                       req->rq_connect_cookie);
+       }
 }
 
 /*
@@ -1308,14 +1507,10 @@ call_refreshresult(struct rpc_task *task)
        task->tk_action = call_refresh;
        if (status != -ETIMEDOUT)
                rpc_delay(task, 3*HZ);
-       return;
 }
 
-/*
- * Call header serialization
- */
 static __be32 *
-call_header(struct rpc_task *task)
+rpc_encode_header(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
@@ -1335,11 +1530,8 @@ call_header(struct rpc_task *task)
        return p;
 }
 
-/*
- * Reply header verification
- */
 static __be32 *
-call_verify(struct rpc_task *task)
+rpc_verify_header(struct rpc_task *task)
 {
        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
@@ -1354,19 +1546,20 @@ call_verify(struct rpc_task *task)
                 *   undefined results
                 */
                dprintk("RPC: %5u %s: XDR representation not a multiple of"
-                      " 4 bytes: 0x%x\n", task->tk_pid, __FUNCTION__,
+                      " 4 bytes: 0x%x\n", task->tk_pid, __func__,
                       task->tk_rqstp->rq_rcv_buf.len);
                goto out_eio;
        }
        if ((len -= 3) < 0)
                goto out_overflow;
-       p += 1; /* skip XID */
 
+       p += 1; /* skip XID */
        if ((n = ntohl(*p++)) != RPC_REPLY) {
                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
-                               task->tk_pid, __FUNCTION__, n);
+                       task->tk_pid, __func__, n);
                goto out_garbage;
        }
+
        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
                if (--len < 0)
                        goto out_overflow;
@@ -1376,13 +1569,13 @@ call_verify(struct rpc_task *task)
                        case RPC_MISMATCH:
                                dprintk("RPC: %5u %s: RPC call version "
                                                "mismatch!\n",
-                                               task->tk_pid, __FUNCTION__);
+                                               task->tk_pid, __func__);
                                error = -EPROTONOSUPPORT;
                                goto out_err;
                        default:
                                dprintk("RPC: %5u %s: RPC call rejected, "
                                                "unknown error: %x\n",
-                                               task->tk_pid, __FUNCTION__, n);
+                                               task->tk_pid, __func__, n);
                                goto out_eio;
                }
                if (--len < 0)
@@ -1396,7 +1589,7 @@ call_verify(struct rpc_task *task)
                                break;
                        task->tk_cred_retry--;
                        dprintk("RPC: %5u %s: retry stale creds\n",
-                                       task->tk_pid, __FUNCTION__);
+                                       task->tk_pid, __func__);
                        rpcauth_invalcred(task);
                        /* Ensure we obtain a new XID! */
                        xprt_release(task);
@@ -1409,25 +1602,25 @@ call_verify(struct rpc_task *task)
                                break;
                        task->tk_garb_retry--;
                        dprintk("RPC: %5u %s: retry garbled creds\n",
-                                       task->tk_pid, __FUNCTION__);
+                                       task->tk_pid, __func__);
                        task->tk_action = call_bind;
                        goto out_retry;
                case RPC_AUTH_TOOWEAK:
-                       printk(KERN_NOTICE "call_verify: server %s requires stronger "
+                       printk(KERN_NOTICE "RPC: server %s requires stronger "
                               "authentication.\n", task->tk_client->cl_server);
                        break;
                default:
                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
-                                       task->tk_pid, __FUNCTION__, n);
+                                       task->tk_pid, __func__, n);
                        error = -EIO;
                }
                dprintk("RPC: %5u %s: call rejected %d\n",
-                               task->tk_pid, __FUNCTION__, n);
+                               task->tk_pid, __func__, n);
                goto out_err;
        }
        if (!(p = rpcauth_checkverf(task, p))) {
                dprintk("RPC: %5u %s: auth check failed\n",
-                               task->tk_pid, __FUNCTION__);
+                               task->tk_pid, __func__);
                goto out_garbage;               /* bad verifier, retry */
        }
        len = p - (__be32 *)iov->iov_base - 1;
@@ -1438,24 +1631,24 @@ call_verify(struct rpc_task *task)
                return p;
        case RPC_PROG_UNAVAIL:
                dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
-                               task->tk_pid, __FUNCTION__,
+                               task->tk_pid, __func__,
                                (unsigned int)task->tk_client->cl_prog,
                                task->tk_client->cl_server);
                error = -EPFNOSUPPORT;
                goto out_err;
        case RPC_PROG_MISMATCH:
                dprintk("RPC: %5u %s: program %u, version %u unsupported by "
-                               "server %s\n", task->tk_pid, __FUNCTION__,
+                               "server %s\n", task->tk_pid, __func__,
                                (unsigned int)task->tk_client->cl_prog,
                                (unsigned int)task->tk_client->cl_vers,
                                task->tk_client->cl_server);
                error = -EPROTONOSUPPORT;
                goto out_err;
        case RPC_PROC_UNAVAIL:
-               dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
+               dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
                                "version %u on server %s\n",
-                               task->tk_pid, __FUNCTION__,
-                               task->tk_msg.rpc_proc,
+                               task->tk_pid, __func__,
+                               rpc_proc_name(task),
                                task->tk_client->cl_prog,
                                task->tk_client->cl_vers,
                                task->tk_client->cl_server);
@@ -1463,11 +1656,11 @@ call_verify(struct rpc_task *task)
                goto out_err;
        case RPC_GARBAGE_ARGS:
                dprintk("RPC: %5u %s: server saw garbage\n",
-                               task->tk_pid, __FUNCTION__);
+                               task->tk_pid, __func__);
                break;                  /* retry */
        default:
                dprintk("RPC: %5u %s: server accept status: %x\n",
-                               task->tk_pid, __FUNCTION__, n);
+                               task->tk_pid, __func__, n);
                /* Also retry */
        }
 
@@ -1476,7 +1669,7 @@ out_garbage:
        if (task->tk_garb_retry) {
                task->tk_garb_retry--;
                dprintk("RPC: %5u %s: retrying\n",
-                               task->tk_pid, __FUNCTION__);
+                               task->tk_pid, __func__);
                task->tk_action = call_bind;
 out_retry:
                return ERR_PTR(-EAGAIN);
@@ -1486,11 +1679,11 @@ out_eio:
 out_err:
        rpc_exit(task, error);
        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
-                       __FUNCTION__, error);
+                       __func__, error);
        return ERR_PTR(error);
 out_overflow:
        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
-                       __FUNCTION__);
+                       __func__);
        goto out_garbage;
 }
 
@@ -1509,14 +1702,14 @@ static struct rpc_procinfo rpcproc_null = {
        .p_decode = rpcproc_decode_null,
 };
 
-static int rpc_ping(struct rpc_clnt *clnt, int flags)
+static int rpc_ping(struct rpc_clnt *clnt)
 {
        struct rpc_message msg = {
                .rpc_proc = &rpcproc_null,
        };
        int err;
        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
-       err = rpc_call_sync(clnt, &msg, flags);
+       err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
        put_rpccred(msg.rpc_cred);
        return err;
 }
@@ -1527,49 +1720,64 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int
                .rpc_proc = &rpcproc_null,
                .rpc_cred = cred,
        };
-       return rpc_do_run_task(clnt, &msg, flags, &rpc_default_ops, NULL);
+       struct rpc_task_setup task_setup_data = {
+               .rpc_client = clnt,
+               .rpc_message = &msg,
+               .callback_ops = &rpc_default_ops,
+               .flags = flags,
+       };
+       return rpc_run_task(&task_setup_data);
 }
 EXPORT_SYMBOL_GPL(rpc_call_null);
 
 #ifdef RPC_DEBUG
+static void rpc_show_header(void)
+{
+       printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
+               "-timeout ---ops--\n");
+}
+
+static void rpc_show_task(const struct rpc_clnt *clnt,
+                         const struct rpc_task *task)
+{
+       const char *rpc_waitq = "none";
+       char *p, action[KSYM_SYMBOL_LEN];
+
+       if (RPC_IS_QUEUED(task))
+               rpc_waitq = rpc_qname(task->tk_waitqueue);
+
+       /* map tk_action pointer to a function name; then trim off
+        * the "+0x0 [sunrpc]" */
+       sprint_symbol(action, (unsigned long)task->tk_action);
+       p = strchr(action, '+');
+       if (p)
+               *p = '\0';
+
+       printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%s q:%s\n",
+               task->tk_pid, task->tk_flags, task->tk_status,
+               clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
+               clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
+               action, rpc_waitq);
+}
+
 void rpc_show_tasks(void)
 {
        struct rpc_clnt *clnt;
-       struct rpc_task *t;
+       struct rpc_task *task;
+       int header = 0;
 
        spin_lock(&rpc_client_lock);
-       if (list_empty(&all_clients))
-               goto out;
-       printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
-               "-rpcwait -action- ---ops--\n");
        list_for_each_entry(clnt, &all_clients, cl_clients) {
-               if (list_empty(&clnt->cl_tasks))
-                       continue;
                spin_lock(&clnt->cl_lock);
-               list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
-                       const char *rpc_waitq = "none";
-                       int proc;
-
-                       if (t->tk_msg.rpc_proc)
-                               proc = t->tk_msg.rpc_proc->p_proc;
-                       else
-                               proc = -1;
-
-                       if (RPC_IS_QUEUED(t))
-                               rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
-
-                       printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
-                               t->tk_pid, proc,
-                               t->tk_flags, t->tk_status,
-                               t->tk_client,
-                               (t->tk_client ? t->tk_client->cl_prog : 0),
-                               t->tk_rqstp, t->tk_timeout,
-                               rpc_waitq,
-                               t->tk_action, t->tk_ops);
+               list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
+                       if (!header) {
+                               rpc_show_header();
+                               header++;
+                       }
+                       rpc_show_task(clnt, task);
                }
                spin_unlock(&clnt->cl_lock);
        }
-out:
        spin_unlock(&rpc_client_lock);
 }
 #endif