[PATCH] RPC: transport switch function naming
[safe/jmp/linux-2.6] / net / sunrpc / xprtsock.c
index fa1180a..80222de 100644 (file)
 #include <net/udp.h>
 #include <net/tcp.h>
 
+/*
+ * Maximum port number to use when requesting a reserved port.
+ */
+#define XS_MAX_RESVPORT                (800U)
+
 #ifdef RPC_DEBUG
 # undef  RPC_DEBUG_DATA
-# define RPCDBG_FACILITY       RPCDBG_XPRT
+# define RPCDBG_FACILITY       RPCDBG_TRANS
 #endif
 
-#define XPRT_MAX_RESVPORT      (800)
-
 #ifdef RPC_DEBUG_DATA
-/*
- * Print the buffer contents (first 128 bytes only--just enough for
- * diropres return).
- */
-static void
-xprt_pktdump(char *msg, u32 *packet, unsigned int count)
+static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 {
-       u8      *buf = (u8 *) packet;
-       int     j;
+       u8 *buf = (u8 *) packet;
+       int j;
 
        dprintk("RPC:      %s\n", msg);
        for (j = 0; j < count && j < 128; j += 4) {
@@ -64,25 +62,22 @@ xprt_pktdump(char *msg, u32 *packet, unsigned int count)
        dprintk("\n");
 }
 #else
-static inline void
-xprt_pktdump(char *msg, u32 *packet, unsigned int count)
+static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 {
        /* NOP */
 }
 #endif
 
-/*
- * Look up RPC transport given an INET socket
+/**
+ * xs_sendpages - write pages directly to a socket
+ * @sock: socket to send on
+ * @addr: UDP only -- address of destination
+ * @addrlen: UDP only -- length of destination address
+ * @xdr: buffer containing this request
+ * @base: starting position in the buffer
+ *
  */
-static inline struct rpc_xprt *
-xprt_from_sock(struct sock *sk)
-{
-       return (struct rpc_xprt *) sk->sk_user_data;
-}
-
-static int
-xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
-               struct xdr_buf *xdr, unsigned int base, int msgflags)
+static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, int msgflags)
 {
        struct page **ppage = xdr->pages;
        unsigned int len, pglen = xdr->page_len;
@@ -125,7 +120,7 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
        }
        if (base || xdr->page_base) {
                pglen -= base;
-               base  += xdr->page_base;
+               base += xdr->page_base;
                ppage += base >> PAGE_CACHE_SHIFT;
                base &= ~PAGE_CACHE_MASK;
        }
@@ -176,23 +171,25 @@ out:
        return ret;
 }
 
-/*
- * Write data to socket.
+/**
+ * xs_sendmsg - write an RPC request to a socket
+ * @xprt: generic transport
+ * @req: the RPC request to write
+ *
  */
-static inline int
-xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
+static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
-       struct socket   *sock = xprt->sock;
-       struct xdr_buf  *xdr = &req->rq_snd_buf;
+       struct socket *sock = xprt->sock;
+       struct xdr_buf *xdr = &req->rq_snd_buf;
        struct sockaddr *addr = NULL;
        int addrlen = 0;
-       unsigned int    skip;
-       int             result;
+       unsigned int skip;
+       int result;
 
        if (!sock)
                return -ENOTCONN;
 
-       xprt_pktdump("packet data:",
+       xs_pktdump("packet data:",
                                req->rq_svec->iov_base,
                                req->rq_svec->iov_len);
 
@@ -201,13 +198,13 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
                addr = (struct sockaddr *) &xprt->addr;
                addrlen = sizeof(xprt->addr);
        }
-       /* Dont repeat bytes */
+       /* Don't repeat bytes */
        skip = req->rq_bytes_sent;
 
        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
-       result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
+       result = xs_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
 
-       dprintk("RPC:      xprt_sendmsg(%d) = %d\n", xdr->len - skip, result);
+       dprintk("RPC:      xs_sendmsg(%d) = %d\n", xdr->len - skip, result);
 
        if (result >= 0)
                return result;
@@ -215,8 +212,7 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
        switch (result) {
        case -ECONNREFUSED:
                /* When the server has died, an ICMP port unreachable message
-                * prompts ECONNREFUSED.
-                */
+                * prompts ECONNREFUSED. */
        case -EAGAIN:
                break;
        case -ECONNRESET:
@@ -227,13 +223,25 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
                        result = -ENOTCONN;
                break;
        default:
-               printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
+               break;
        }
        return result;
 }
 
-static int
-xprt_send_request(struct rpc_task *task)
+/**
+ * xs_send_request - write an RPC request to a socket
+ * @task: address of RPC task that manages the state of an RPC request
+ *
+ * Return values:
+ *      0:  The request has been sent
+ * EAGAIN:  The socket was blocked, please call again later to
+ *          complete the request
+ *  other:  Some other error occured, the request was not sent
+ *
+ * XXX: In the case of soft timeouts, should we eventually give up
+ *      if the socket is not able to make progress?
+ */
+static int xs_send_request(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
@@ -242,18 +250,18 @@ xprt_send_request(struct rpc_task *task)
        /* set up everything as needed. */
        /* Write the record marker */
        if (xprt->stream) {
-               u32     *marker = req->rq_svec[0].iov_base;
+               u32 *marker = req->rq_svec[0].iov_base;
 
                *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
        }
 
        /* Continue transmitting the packet/record. We must be careful
         * to cope with writespace callbacks arriving _after_ we have
-        * called xprt_sendmsg().
+        * called sendmsg().
         */
        while (1) {
                req->rq_xtime = jiffies;
-               status = xprt_sendmsg(xprt, req);
+               status = xs_sendmsg(xprt, req);
 
                if (status < 0)
                        break;
@@ -285,7 +293,7 @@ xprt_send_request(struct rpc_task *task)
 
        if (status == -EAGAIN) {
                if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
-                       /* Protect against races with xprt_write_space */
+                       /* Protect against races with xs_write_space */
                        spin_lock_bh(&xprt->sock_lock);
                        /* Don't race with disconnect */
                        if (!xprt_connected(xprt))
@@ -303,65 +311,77 @@ xprt_send_request(struct rpc_task *task)
        return status;
 }
 
-/*
- * Close down a transport socket
+/**
+ * xs_close - close a socket
+ * @xprt: transport
+ *
  */
-static void
-xprt_close(struct rpc_xprt *xprt)
+static void xs_close(struct rpc_xprt *xprt)
 {
-       struct socket   *sock = xprt->sock;
-       struct sock     *sk = xprt->inet;
+       struct socket *sock = xprt->sock;
+       struct sock *sk = xprt->inet;
 
        if (!sk)
                return;
 
+       dprintk("RPC:      xs_close xprt %p\n", xprt);
+
        write_lock_bh(&sk->sk_callback_lock);
        xprt->inet = NULL;
        xprt->sock = NULL;
 
-       sk->sk_user_data    = NULL;
-       sk->sk_data_ready   = xprt->old_data_ready;
+       sk->sk_user_data = NULL;
+       sk->sk_data_ready = xprt->old_data_ready;
        sk->sk_state_change = xprt->old_state_change;
-       sk->sk_write_space  = xprt->old_write_space;
+       sk->sk_write_space = xprt->old_write_space;
        write_unlock_bh(&sk->sk_callback_lock);
 
-       sk->sk_no_check  = 0;
+       sk->sk_no_check = 0;
 
        sock_release(sock);
 }
 
-static void xprt_socket_destroy(struct rpc_xprt *xprt)
+/**
+ * xs_destroy - prepare to shutdown a transport
+ * @xprt: doomed transport
+ *
+ */
+static void xs_destroy(struct rpc_xprt *xprt)
 {
+       dprintk("RPC:      xs_destroy xprt %p\n", xprt);
+
        cancel_delayed_work(&xprt->sock_connect);
        flush_scheduled_work();
 
        xprt_disconnect(xprt);
-       xprt_close(xprt);
+       xs_close(xprt);
        kfree(xprt->slot);
 }
 
-/*
- * Input handler for RPC replies. Called from a bottom half and hence
- * atomic.
+static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
+{
+       return (struct rpc_xprt *) sk->sk_user_data;
+}
+
+/**
+ * xs_udp_data_ready - "data ready" callback for UDP sockets
+ * @sk: socket with data to read
+ * @len: how much data to read
+ *
  */
-static void
-udp_data_ready(struct sock *sk, int len)
+static void xs_udp_data_ready(struct sock *sk, int len)
 {
-       struct rpc_task *task;
-       struct rpc_xprt *xprt;
+       struct rpc_task *task;
+       struct rpc_xprt *xprt;
        struct rpc_rqst *rovr;
-       struct sk_buff  *skb;
+       struct sk_buff *skb;
        int err, repsize, copied;
        u32 _xid, *xp;
 
        read_lock(&sk->sk_callback_lock);
-       dprintk("RPC:      udp_data_ready...\n");
-       if (!(xprt = xprt_from_sock(sk))) {
-               printk("RPC:      udp_data_ready request not found!\n");
+       dprintk("RPC:      xs_udp_data_ready...\n");
+       if (!(xprt = xprt_from_sock(sk)))
                goto out;
-       }
-
-       dprintk("RPC:      udp_data_ready client %p\n", xprt);
 
        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
                goto out;
@@ -371,7 +391,7 @@ udp_data_ready(struct sock *sk, int len)
 
        repsize = skb->len - sizeof(struct udphdr);
        if (repsize < 4) {
-               printk("RPC: impossible RPC reply size %d!\n", repsize);
+               dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
                goto dropit;
        }
 
@@ -410,11 +430,7 @@ udp_data_ready(struct sock *sk, int len)
        read_unlock(&sk->sk_callback_lock);
 }
 
-/*
- * Copy from an skb into memory and shrink the skb.
- */
-static inline size_t
-tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
+static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
 {
        if (len > desc->count)
                len = desc->count;
@@ -430,18 +446,14 @@ tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
        return len;
 }
 
-/*
- * TCP read fragment marker
- */
-static inline void
-tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
        size_t len, used;
        char *p;
 
        p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
        len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
-       used = tcp_copy_data(desc, p, len);
+       used = xs_tcp_copy_data(desc, p, len);
        xprt->tcp_offset += used;
        if (used != len)
                return;
@@ -455,15 +467,15 @@ tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
        xprt->tcp_offset = 0;
        /* Sanity check of the record length */
        if (xprt->tcp_reclen < 4) {
-               printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
+               dprintk("RPC:      invalid TCP record fragment length\n");
                xprt_disconnect(xprt);
+               return;
        }
        dprintk("RPC:      reading TCP record fragment of length %d\n",
                        xprt->tcp_reclen);
 }
 
-static void
-tcp_check_recm(struct rpc_xprt *xprt)
+static void xs_tcp_check_recm(struct rpc_xprt *xprt)
 {
        dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
                        xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
@@ -478,11 +490,7 @@ tcp_check_recm(struct rpc_xprt *xprt)
        }
 }
 
-/*
- * TCP read xid
- */
-static inline void
-tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
        size_t len, used;
        char *p;
@@ -490,7 +498,7 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
        len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
        dprintk("RPC:      reading XID (%Zu bytes)\n", len);
        p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
-       used = tcp_copy_data(desc, p, len);
+       used = xs_tcp_copy_data(desc, p, len);
        xprt->tcp_offset += used;
        if (used != len)
                return;
@@ -499,14 +507,10 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
        xprt->tcp_copied = 4;
        dprintk("RPC:      reading reply for XID %08x\n",
                                                ntohl(xprt->tcp_xid));
-       tcp_check_recm(xprt);
+       xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP read and complete request
- */
-static inline void
-tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
        struct rpc_rqst *req;
        struct xdr_buf *rcvbuf;
@@ -533,12 +537,12 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
                memcpy(&my_desc, desc, sizeof(my_desc));
                my_desc.count = len;
                r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
-                                         &my_desc, tcp_copy_data);
+                                         &my_desc, xs_tcp_copy_data);
                desc->count -= r;
                desc->offset += r;
        } else
                r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
-                                         desc, tcp_copy_data);
+                                         desc, xs_tcp_copy_data);
 
        if (r > 0) {
                xprt->tcp_copied += r;
@@ -581,14 +585,10 @@ out:
                xprt_complete_rqst(xprt, req, xprt->tcp_copied);
        }
        spin_unlock(&xprt->sock_lock);
-       tcp_check_recm(xprt);
+       xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP discard extra bytes from a short read
- */
-static inline void
-tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
        size_t len;
 
@@ -599,16 +599,10 @@ tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
        desc->offset += len;
        xprt->tcp_offset += len;
        dprintk("RPC:      discarded %Zu bytes\n", len);
-       tcp_check_recm(xprt);
+       xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP record receive routine
- * We first have to grab the record marker, then the XID, then the data.
- */
-static int
-tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
-               unsigned int offset, size_t len)
+static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
 {
        struct rpc_xprt *xprt = rd_desc->arg.data;
        skb_reader_t desc = {
@@ -616,64 +610,72 @@ tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
                .offset = offset,
                .count  = len,
                .csum   = 0
-               };
+       };
 
-       dprintk("RPC:      tcp_data_recv\n");
+       dprintk("RPC:      xs_tcp_data_recv started\n");
        do {
                /* Read in a new fragment marker if necessary */
                /* Can we ever really expect to get completely empty fragments? */
                if (xprt->tcp_flags & XPRT_COPY_RECM) {
-                       tcp_read_fraghdr(xprt, &desc);
+                       xs_tcp_read_fraghdr(xprt, &desc);
                        continue;
                }
                /* Read in the xid if necessary */
                if (xprt->tcp_flags & XPRT_COPY_XID) {
-                       tcp_read_xid(xprt, &desc);
+                       xs_tcp_read_xid(xprt, &desc);
                        continue;
                }
                /* Read in the request data */
                if (xprt->tcp_flags & XPRT_COPY_DATA) {
-                       tcp_read_request(xprt, &desc);
+                       xs_tcp_read_request(xprt, &desc);
                        continue;
                }
                /* Skip over any trailing bytes on short reads */
-               tcp_read_discard(xprt, &desc);
+               xs_tcp_read_discard(xprt, &desc);
        } while (desc.count);
-       dprintk("RPC:      tcp_data_recv done\n");
+       dprintk("RPC:      xs_tcp_data_recv done\n");
        return len - desc.count;
 }
 
-static void tcp_data_ready(struct sock *sk, int bytes)
+/**
+ * xs_tcp_data_ready - "data ready" callback for TCP sockets
+ * @sk: socket with data to read
+ * @bytes: how much data to read
+ *
+ */
+static void xs_tcp_data_ready(struct sock *sk, int bytes)
 {
        struct rpc_xprt *xprt;
        read_descriptor_t rd_desc;
 
        read_lock(&sk->sk_callback_lock);
-       dprintk("RPC:      tcp_data_ready...\n");
-       if (!(xprt = xprt_from_sock(sk))) {
-               printk("RPC:      tcp_data_ready socket info not found!\n");
+       dprintk("RPC:      xs_tcp_data_ready...\n");
+       if (!(xprt = xprt_from_sock(sk)))
                goto out;
-       }
        if (xprt->shutdown)
                goto out;
 
-       /* We use rd_desc to pass struct xprt to tcp_data_recv */
+       /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
        rd_desc.arg.data = xprt;
        rd_desc.count = 65536;
-       tcp_read_sock(sk, &rd_desc, tcp_data_recv);
+       tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
 out:
        read_unlock(&sk->sk_callback_lock);
 }
 
-static void
-tcp_state_change(struct sock *sk)
+/**
+ * xs_tcp_state_change - callback to handle TCP socket state changes
+ * @sk: socket whose state has changed
+ *
+ */
+static void xs_tcp_state_change(struct sock *sk)
 {
-       struct rpc_xprt *xprt;
+       struct rpc_xprt *xprt;
 
        read_lock(&sk->sk_callback_lock);
        if (!(xprt = xprt_from_sock(sk)))
                goto out;
-       dprintk("RPC:      tcp_state_change client %p...\n", xprt);
+       dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
        dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
                                sk->sk_state, xprt_connected(xprt),
                                sock_flag(sk, SOCK_DEAD),
@@ -703,17 +705,20 @@ tcp_state_change(struct sock *sk)
        read_unlock(&sk->sk_callback_lock);
 }
 
-/*
+/**
+ * xs_write_space - callback invoked when socket buffer space becomes
+ *                         available
+ * @sk: socket whose state has changed
+ *
  * Called when more output buffer space is available for this socket.
  * We try not to wake our writers until they can make "significant"
  * progress, otherwise we'll waste resources thrashing sock_sendmsg
  * with a bunch of small requests.
  */
-static void
-xprt_write_space(struct sock *sk)
+static void xs_write_space(struct sock *sk)
 {
-       struct rpc_xprt *xprt;
-       struct socket   *sock;
+       struct rpc_xprt *xprt;
+       struct socket *sock;
 
        read_lock(&sk->sk_callback_lock);
        if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
@@ -743,11 +748,15 @@ out:
        read_unlock(&sk->sk_callback_lock);
 }
 
-/*
- * Set socket buffer length
+/**
+ * xs_set_buffer_size - set send and receive limits
+ * @xprt: generic transport
+ *
+ * Set socket send and receive limits based on the
+ * sndsize and rcvsize fields in the generic transport
+ * structure. This applies only to UDP sockets.
  */
-static void
-xprt_sock_setbufsize(struct rpc_xprt *xprt)
+static void xs_set_buffer_size(struct rpc_xprt *xprt)
 {
        struct sock *sk = xprt->inet;
 
@@ -764,15 +773,12 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt)
        }
 }
 
-/*
- * Bind to a reserved port
- */
-static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
+static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
 {
        struct sockaddr_in myaddr = {
                .sin_family = AF_INET,
        };
-       int             err, port;
+       int err, port;
 
        /* Were we already bound to a given port? Try to reuse it */
        port = xprt->port;
@@ -782,20 +788,47 @@ static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
                                                sizeof(myaddr));
                if (err == 0) {
                        xprt->port = port;
+                       dprintk("RPC:      xs_bindresvport bound to port %u\n",
+                                       port);
                        return 0;
                }
                if (--port == 0)
-                       port = XPRT_MAX_RESVPORT;
+                       port = XS_MAX_RESVPORT;
        } while (err == -EADDRINUSE && port != xprt->port);
 
-       printk("RPC: Can't bind to reserved port (%d).\n", -err);
+       dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
        return err;
 }
 
-static void
-xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
+static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
 {
-       struct sock     *sk = sock->sk;
+       struct socket *sock;
+       int type, err;
+
+       dprintk("RPC:      xs_create(%s %d)\n",
+                          (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
+
+       type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
+       if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
+               dprintk("RPC:      can't create socket (%d).\n", -err);
+               return NULL;
+       }
+
+       /* If the caller has the capability, bind to a reserved port */
+       if (resvport && xs_bindresvport(xprt, sock) < 0)
+               goto failed;
+
+       return sock;
+
+failed:
+       sock_release(sock);
+       return NULL;
+}
+
+static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
+{
+       struct sock *sk = sock->sk;
 
        if (xprt->inet)
                return;
@@ -806,16 +839,16 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
        xprt->old_state_change = sk->sk_state_change;
        xprt->old_write_space = sk->sk_write_space;
        if (xprt->prot == IPPROTO_UDP) {
-               sk->sk_data_ready = udp_data_ready;
+               sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_no_check = UDP_CSUM_NORCV;
                xprt_set_connected(xprt);
        } else {
                tcp_sk(sk)->nonagle = 1;        /* disable Nagle's algorithm */
-               sk->sk_data_ready = tcp_data_ready;
-               sk->sk_state_change = tcp_state_change;
+               sk->sk_data_ready = xs_tcp_data_ready;
+               sk->sk_state_change = xs_tcp_state_change;
                xprt_clear_connected(xprt);
        }
-       sk->sk_write_space = xprt_write_space;
+       sk->sk_write_space = xs_write_space;
 
        /* Reset to new socket */
        xprt->sock = sock;
@@ -825,39 +858,13 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
        return;
 }
 
-/*
- * Datastream sockets are created here, but xprt_connect will create
- * and connect stream sockets.
+/**
+ * xs_connect_worker - try to connect a socket to a remote endpoint
+ * @args: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
  */
-static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport)
-{
-       struct socket   *sock;
-       int             type, err;
-
-       dprintk("RPC:      xprt_create_socket(%s %d)\n",
-                          (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
-
-       type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
-
-       if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
-               printk("RPC: can't create socket (%d).\n", -err);
-               return NULL;
-       }
-
-       /* If the caller has the capability, bind to a reserved port */
-       if (resvport && xprt_bindresvport(xprt, sock) < 0) {
-               printk("RPC: can't bind to reserved port.\n");
-               goto failed;
-       }
-
-       return sock;
-
-failed:
-       sock_release(sock);
-       return NULL;
-}
-
-static void xprt_socket_connect(void *args)
+static void xs_connect_worker(void *args)
 {
        struct rpc_xprt *xprt = (struct rpc_xprt *)args;
        struct socket *sock = xprt->sock;
@@ -866,18 +873,20 @@ static void xprt_socket_connect(void *args)
        if (xprt->shutdown || xprt->addr.sin_port == 0)
                goto out;
 
+       dprintk("RPC:      xs_connect_worker xprt %p\n", xprt);
+
        /*
         * Start by resetting any existing state
         */
-       xprt_close(xprt);
-       sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport);
+       xs_close(xprt);
+       sock = xs_create(xprt, xprt->prot, xprt->resvport);
        if (sock == NULL) {
                /* couldn't create socket or bind to reserved port;
                 * this is likely a permanent error, so cause an abort */
                goto out;
        }
-       xprt_bind_socket(xprt, sock);
-       xprt_sock_setbufsize(xprt);
+       xs_bind(xprt, sock);
+       xs_set_buffer_size(xprt);
 
        status = 0;
        if (!xprt->stream)
@@ -908,20 +917,23 @@ out_clear:
        smp_mb__after_clear_bit();
 }
 
-static void
-xprt_connect_sock(struct rpc_task *task)
+/**
+ * xs_connect - connect a socket to a remote endpoint
+ * @task: address of RPC task that manages state of connect request
+ *
+ * TCP: If the remote end dropped the connection, delay reconnecting.
+ */
+static void xs_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
        if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
-               /* Note: if we are here due to a dropped connection
-                *       we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
-                *       seconds
-                */
-               if (xprt->sock != NULL)
+               if (xprt->sock != NULL) {
+                       dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
                        schedule_delayed_work(&xprt->sock_connect,
                                        RPC_REESTABLISH_TIMEOUT);
-               else {
+               } else {
+                       dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
                        schedule_work(&xprt->sock_connect);
                        /* flush_scheduled_work can sleep... */
                        if (!RPC_IS_ASYNC(task))
@@ -930,29 +942,23 @@ xprt_connect_sock(struct rpc_task *task)
        }
 }
 
-/*
- * Set default timeout parameters
- */
-static void
-xprt_default_timeout(struct rpc_timeout *to, int proto)
-{
-       if (proto == IPPROTO_UDP)
-               xprt_set_timeout(to, 5,  5 * HZ);
-       else
-               xprt_set_timeout(to, 2, 60 * HZ);
-}
-
-static struct rpc_xprt_ops xprt_socket_ops = {
-       .set_buffer_size        = xprt_sock_setbufsize,
-       .connect                = xprt_connect_sock,
-       .send_request           = xprt_send_request,
-       .close                  = xprt_close,
-       .destroy                = xprt_socket_destroy,
+static struct rpc_xprt_ops xs_ops = {
+       .set_buffer_size        = xs_set_buffer_size,
+       .connect                = xs_connect,
+       .send_request           = xs_send_request,
+       .close                  = xs_close,
+       .destroy                = xs_destroy,
 };
 
 extern unsigned int xprt_udp_slot_table_entries;
 extern unsigned int xprt_tcp_slot_table_entries;
 
+/**
+ * xs_setup_udp - Set up transport to use a UDP socket
+ * @xprt: transport to set up
+ * @to:   timeout parameters
+ *
+ */
 int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
        size_t slot_table_size;
@@ -967,7 +973,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        memset(xprt->slot, 0, slot_table_size);
 
        xprt->prot = IPPROTO_UDP;
-       xprt->port = XPRT_MAX_RESVPORT;
+       xprt->port = XS_MAX_RESVPORT;
        xprt->stream = 0;
        xprt->nocong = 0;
        xprt->cwnd = RPC_INITCWND;
@@ -975,18 +981,24 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        /* XXX: header size can vary due to auth type, IPv6, etc. */
        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
-       INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
+       INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
 
-       xprt->ops = &xprt_socket_ops;
+       xprt->ops = &xs_ops;
 
        if (to)
                xprt->timeout = *to;
        else
-               xprt_default_timeout(to, xprt->prot);
+               xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
 
        return 0;
 }
 
+/**
+ * xs_setup_tcp - Set up transport to use a TCP socket
+ * @xprt: transport to set up
+ * @to: timeout parameters
+ *
+ */
 int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
        size_t slot_table_size;
@@ -1001,21 +1013,21 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        memset(xprt->slot, 0, slot_table_size);
 
        xprt->prot = IPPROTO_TCP;
-       xprt->port = XPRT_MAX_RESVPORT;
+       xprt->port = XS_MAX_RESVPORT;
        xprt->stream = 1;
        xprt->nocong = 1;
        xprt->cwnd = RPC_MAXCWND(xprt);
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
        xprt->max_payload = (1U << 31) - 1;
 
-       INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
+       INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
 
-       xprt->ops = &xprt_socket_ops;
+       xprt->ops = &xs_ops;
 
        if (to)
                xprt->timeout = *to;
        else
-               xprt_default_timeout(to, xprt->prot);
+               xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
 
        return 0;
 }