SUNRPC: Make RPC portmapper use per-transport storage
authorChuck Lever <chuck.lever@oracle.com>
Wed, 23 Aug 2006 00:06:15 +0000 (20:06 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Sat, 23 Sep 2006 03:24:39 +0000 (23:24 -0400)
Move connection and bind state that was maintained in the rpc_clnt
structure to the rpc_xprt structure.  This will allow the creation of
a clean API for plugging in different types of bind mechanisms.

This brings improvements such as the elimination of a single spin lock to
control serialization for all in-kernel RPC binding.  A set of per-xprt
bitops is used to serialize tasks during RPC binding, just like it now
works for making RPC transport connections.

Test-plan:
Destructive testing (unplugging the network temporarily).  Connectathon
with UDP and TCP.  NFSv2/3 and NFSv4 mounting should be carefully checked.
Probably need to rig a server where certain services aren't running, or
that returns an error for some typical operation.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/clnt.h
include/linux/sunrpc/xprt.h
net/sunrpc/clnt.c
net/sunrpc/pmap_clnt.c
net/sunrpc/xprt.c

index 8fe9f35..00e9dba 100644 (file)
 #include <linux/sunrpc/timer.h>
 #include <asm/signal.h>
 
-/*
- * This defines an RPC port mapping
- */
-struct rpc_portmap {
-       __u32                   pm_prog;
-       __u32                   pm_vers;
-       __u32                   pm_prot;
-       __u16                   pm_port;
-       unsigned char           pm_binding : 1; /* doing a getport() */
-       struct rpc_wait_queue   pm_bindwait;    /* waiting on getport() */
-};
-
 struct rpc_inode;
 
 /*
@@ -40,7 +28,9 @@ struct rpc_clnt {
        atomic_t                cl_users;       /* number of references */
        struct rpc_xprt *       cl_xprt;        /* transport */
        struct rpc_procinfo *   cl_procinfo;    /* procedure info */
-       u32                     cl_maxproc;     /* max procedure number */
+       u32                     cl_prog,        /* RPC program number */
+                               cl_vers,        /* RPC version number */
+                               cl_maxproc;     /* max procedure number */
 
        char *                  cl_server;      /* server machine name */
        char *                  cl_protname;    /* protocol name */
@@ -55,7 +45,6 @@ struct rpc_clnt {
                                cl_dead     : 1;/* abandoned */
 
        struct rpc_rtt *        cl_rtt;         /* RTO estimator data */
-       struct rpc_portmap *    cl_pmap;        /* port mapping */
 
        int                     cl_nodelen;     /* nodename length */
        char                    cl_nodename[UNX_MAXNODENAME];
@@ -64,14 +53,8 @@ struct rpc_clnt {
        struct dentry *         cl_dentry;      /* inode */
        struct rpc_clnt *       cl_parent;      /* Points to parent of clones */
        struct rpc_rtt          cl_rtt_default;
-       struct rpc_portmap      cl_pmap_default;
        char                    cl_inline_name[32];
 };
-#define cl_timeout             cl_xprt->timeout
-#define cl_prog                        cl_pmap->pm_prog
-#define cl_vers                        cl_pmap->pm_vers
-#define cl_port                        cl_pmap->pm_port
-#define cl_prot                        cl_pmap->pm_prot
 
 /*
  * General RPC program info
index a711067..4ce8261 100644 (file)
@@ -138,6 +138,7 @@ struct rpc_xprt {
        unsigned int            tsh_size;       /* size of transport specific
                                                   header */
 
+       struct rpc_wait_queue   binding;        /* requests waiting on rpcbind */
        struct rpc_wait_queue   sending;        /* requests waiting to send */
        struct rpc_wait_queue   resend;         /* requests waiting to resend */
        struct rpc_wait_queue   pending;        /* requests in flight */
@@ -270,6 +271,7 @@ int                 xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to);
 #define XPRT_CONNECTING                (2)
 #define XPRT_CLOSE_WAIT                (3)
 #define XPRT_BOUND             (4)
+#define XPRT_BINDING           (5)
 
 static inline void xprt_set_connected(struct rpc_xprt *xprt)
 {
@@ -328,6 +330,18 @@ static inline void xprt_clear_bound(struct rpc_xprt *xprt)
        clear_bit(XPRT_BOUND, &xprt->state);
 }
 
+static inline void xprt_clear_binding(struct rpc_xprt *xprt)
+{
+       smp_mb__before_clear_bit();
+       clear_bit(XPRT_BINDING, &xprt->state);
+       smp_mb__after_clear_bit();
+}
+
+static inline int xprt_test_and_set_binding(struct rpc_xprt *xprt)
+{
+       return test_and_set_bit(XPRT_BINDING, &xprt->state);
+}
+
 #endif /* __KERNEL__*/
 
 #endif /* _LINUX_SUNRPC_XPRT_H */
index 0b8d03d..cee5041 100644 (file)
@@ -147,13 +147,10 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
        clnt->cl_procinfo = version->procs;
        clnt->cl_maxproc  = version->nrprocs;
        clnt->cl_protname = program->name;
-       clnt->cl_pmap     = &clnt->cl_pmap_default;
        clnt->cl_prog     = program->number;
        clnt->cl_vers     = version->number;
-       clnt->cl_prot     = xprt->prot;
        clnt->cl_stats    = program->stats;
        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
-       rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
 
        if (!xprt_bound(clnt->cl_xprt))
                clnt->cl_autobind = 1;
@@ -243,8 +240,6 @@ rpc_clone_client(struct rpc_clnt *clnt)
        atomic_set(&new->cl_users, 0);
        new->cl_parent = clnt;
        atomic_inc(&clnt->cl_count);
-       /* Duplicate portmapper */
-       rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
        /* Turn off autobind on clones */
        new->cl_autobind = 0;
        new->cl_oneshot = 0;
@@ -254,8 +249,7 @@ rpc_clone_client(struct rpc_clnt *clnt)
        rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
-       new->cl_pmap            = &new->cl_pmap_default;
-       new->cl_metrics         = rpc_alloc_iostats(clnt);
+       new->cl_metrics = rpc_alloc_iostats(clnt);
        return new;
 out_no_clnt:
        printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
index 209ffdf..59d5424 100644 (file)
 #define PMAP_UNSET             2
 #define PMAP_GETPORT           3
 
+struct portmap_args {
+       u32                     pm_prog;
+       u32                     pm_vers;
+       u32                     pm_prot;
+       unsigned short          pm_port;
+       struct rpc_task *       pm_task;
+};
+
 static struct rpc_procinfo     pmap_procedures[];
 static struct rpc_clnt *       pmap_create(char *, struct sockaddr_in *, int, int);
-static void                    pmap_getport_done(struct rpc_task *);
+static void                    pmap_getport_done(struct rpc_task *, void *);
 static struct rpc_program      pmap_program;
-static DEFINE_SPINLOCK(pmap_lock);
+
+static void pmap_getport_prepare(struct rpc_task *task, void *calldata)
+{
+       struct portmap_args *map = calldata;
+       struct rpc_message msg = {
+               .rpc_proc       = &pmap_procedures[PMAP_GETPORT],
+               .rpc_argp       = map,
+               .rpc_resp       = &map->pm_port,
+       };
+
+       rpc_call_setup(task, &msg, 0);
+}
+
+static inline struct portmap_args *pmap_map_alloc(void)
+{
+       return kmalloc(sizeof(struct portmap_args), GFP_NOFS);
+}
+
+static inline void pmap_map_free(struct portmap_args *map)
+{
+       kfree(map);
+}
+
+static void pmap_map_release(void *data)
+{
+       pmap_map_free(data);
+}
+
+static const struct rpc_call_ops pmap_getport_ops = {
+       .rpc_call_prepare       = pmap_getport_prepare,
+       .rpc_call_done          = pmap_getport_done,
+       .rpc_release            = pmap_map_release,
+};
+
+static inline void pmap_wake_portmap_waiters(struct rpc_xprt *xprt)
+{
+       xprt_clear_binding(xprt);
+       rpc_wake_up(&xprt->binding);
+}
 
 /*
  * Obtain the port for a given RPC service on a given host. This one can
@@ -37,67 +83,71 @@ static DEFINE_SPINLOCK(pmap_lock);
 void
 rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
 {
-       struct rpc_portmap *map = clnt->cl_pmap;
-       struct sockaddr_in *sap = &clnt->cl_xprt->addr;
-       struct rpc_message msg = {
-               .rpc_proc       = &pmap_procedures[PMAP_GETPORT],
-               .rpc_argp       = map,
-               .rpc_resp       = &clnt->cl_port,
-               .rpc_cred       = NULL
-       };
+       struct rpc_xprt *xprt = task->tk_xprt;
+       struct sockaddr_in *sap = &xprt->addr;
+       struct portmap_args *map;
        struct rpc_clnt *pmap_clnt;
-       struct rpc_task *child;
+       struct rpc_task *child;
 
-       dprintk("RPC: %4d rpc_getport(%s, %d, %d, %d)\n",
+       dprintk("RPC: %4d rpc_getport(%s, %u, %u, %d)\n",
                        task->tk_pid, clnt->cl_server,
-                       map->pm_prog, map->pm_vers, map->pm_prot);
+                       clnt->cl_prog, clnt->cl_vers, xprt->prot);
 
        /* Autobind on cloned rpc clients is discouraged */
        BUG_ON(clnt->cl_parent != clnt);
 
-       spin_lock(&pmap_lock);
-       if (map->pm_binding) {
-               rpc_sleep_on(&map->pm_bindwait, task, NULL, NULL);
-               spin_unlock(&pmap_lock);
+       if (xprt_test_and_set_binding(xprt)) {
+               task->tk_status = -EACCES;      /* tell caller to check again */
+               rpc_sleep_on(&xprt->binding, task, NULL, NULL);
                return;
        }
-       map->pm_binding = 1;
-       spin_unlock(&pmap_lock);
+
+       /* Someone else may have bound if we slept */
+       if (xprt_bound(xprt)) {
+               task->tk_status = 0;
+               goto bailout_nofree;
+       }
+
+       map = pmap_map_alloc();
+       if (!map) {
+               task->tk_status = -ENOMEM;
+               goto bailout_nofree;
+       }
+       map->pm_prog = clnt->cl_prog;
+       map->pm_vers = clnt->cl_vers;
+       map->pm_prot = xprt->prot;
+       map->pm_port = 0;
+       map->pm_task = task;
 
        pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot, 0);
        if (IS_ERR(pmap_clnt)) {
                task->tk_status = PTR_ERR(pmap_clnt);
                goto bailout;
        }
-       task->tk_status = 0;
 
-       /*
-        * Note: rpc_new_child will release client after a failure.
-        */
-       if (!(child = rpc_new_child(pmap_clnt, task)))
+       child = rpc_run_task(pmap_clnt, RPC_TASK_ASYNC, &pmap_getport_ops, map);
+       if (IS_ERR(child)) {
+               task->tk_status = -EIO;
                goto bailout;
+       }
+       rpc_release_task(child);
 
-       /* Setup the call info struct */
-       rpc_call_setup(child, &msg, 0);
+       rpc_sleep_on(&xprt->binding, task, NULL, NULL);
 
-       /* ... and run the child task */
        task->tk_xprt->stat.bind_count++;
-       rpc_run_child(task, child, pmap_getport_done);
        return;
 
 bailout:
-       spin_lock(&pmap_lock);
-       map->pm_binding = 0;
-       rpc_wake_up(&map->pm_bindwait);
-       spin_unlock(&pmap_lock);
-       rpc_exit(task, -EIO);
+       pmap_map_free(map);
+bailout_nofree:
+       pmap_wake_portmap_waiters(xprt);
 }
 
 #ifdef CONFIG_ROOT_NFS
 int
 rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
 {
-       struct rpc_portmap map = {
+       struct portmap_args map = {
                .pm_prog        = prog,
                .pm_vers        = vers,
                .pm_prot        = prot,
@@ -133,32 +183,32 @@ rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
 #endif
 
 static void
-pmap_getport_done(struct rpc_task *task)
+pmap_getport_done(struct rpc_task *child, void *data)
 {
-       struct rpc_clnt *clnt = task->tk_client;
+       struct portmap_args *map = data;
+       struct rpc_task *task = map->pm_task;
        struct rpc_xprt *xprt = task->tk_xprt;
-       struct rpc_portmap *map = clnt->cl_pmap;
-
-       dprintk("RPC: %4d pmap_getport_done(status %d, port %d)\n",
-                       task->tk_pid, task->tk_status, clnt->cl_port);
+       int status = child->tk_status;
 
-       if (task->tk_status < 0) {
-               /* Make the calling task exit with an error */
+       if (status < 0) {
+               /* Portmapper not available */
                xprt->ops->set_port(xprt, 0);
-               task->tk_action = rpc_exit_task;
-       } else if (clnt->cl_port == 0) {
-               /* Program not registered */
+               task->tk_status = status;
+       } else if (map->pm_port == 0) {
+               /* Requested RPC service wasn't registered */
                xprt->ops->set_port(xprt, 0);
-               rpc_exit(task, -EACCES);
+               task->tk_status = -EACCES;
        } else {
-               xprt->ops->set_port(xprt, clnt->cl_port);
+               /* Succeeded */
+               xprt->ops->set_port(xprt, map->pm_port);
                xprt_set_bound(xprt);
-               clnt->cl_port = htons(clnt->cl_port);
+               task->tk_status = 0;
        }
-       spin_lock(&pmap_lock);
-       map->pm_binding = 0;
-       rpc_wake_up(&map->pm_bindwait);
-       spin_unlock(&pmap_lock);
+
+       dprintk("RPC: %4d pmap_getport_done(status %d, port %u)\n",
+                       child->tk_pid, child->tk_status, map->pm_port);
+
+       pmap_wake_portmap_waiters(xprt);
 }
 
 /*
@@ -172,7 +222,7 @@ rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
                .sin_family     = AF_INET,
                .sin_addr.s_addr = htonl(INADDR_LOOPBACK),
        };
-       struct rpc_portmap      map = {
+       struct portmap_args     map = {
                .pm_prog        = prog,
                .pm_vers        = vers,
                .pm_prot        = prot,
@@ -239,7 +289,7 @@ pmap_create(char *hostname, struct sockaddr_in *srvaddr, int proto, int privileg
  * XDR encode/decode functions for PMAP
  */
 static int
-xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct rpc_portmap *map)
+xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct portmap_args *map)
 {
        dprintk("RPC: xdr_encode_mapping(%d, %d, %d, %d)\n",
                map->pm_prog, map->pm_vers, map->pm_prot, map->pm_port);
index e239ef9..b45abd0 100644 (file)
@@ -928,6 +928,7 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
        xprt->last_used = jiffies;
        xprt->cwnd = RPC_INITCWND;
 
+       rpc_init_wait_queue(&xprt->binding, "xprt_binding");
        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
        rpc_init_wait_queue(&xprt->resend, "xprt_resend");