nfs41: New xs_tcp_read_data()
[safe/jmp/linux-2.6] / net / sunrpc / xprtsock.c
index b9b94f4..e3e3a57 100644 (file)
@@ -3,8 +3,8 @@
  *
  * Client-side transport implementation for sockets.
  *
- * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
- * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
+ * TCP callback races fixes (C) 1998 Red Hat
+ * TCP send fixes (C) 1998 Red Hat
  * TCP NFS related read + write fixes
  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  *
@@ -34,6 +34,9 @@
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
+#ifdef CONFIG_NFS_V4_1
+#include <linux/sunrpc/bc_xprt.h>
+#endif
 
 #include <net/sock.h>
 #include <net/checksum.h>
@@ -49,6 +52,9 @@ unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
 
+#define XS_TCP_LINGER_TO       (15U * HZ)
+static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
+
 /*
  * We can register our own files under /proc/sys/sunrpc by
  * calling register_sysctl_table() again.  The files in that
@@ -117,6 +123,14 @@ static ctl_table xs_tunables_table[] = {
                .extra2         = &xprt_max_resvport_limit
        },
        {
+               .procname       = "tcp_fin_timeout",
+               .data           = &xs_tcp_fin_timeout,
+               .maxlen         = sizeof(xs_tcp_fin_timeout),
+               .mode           = 0644,
+               .proc_handler   = &proc_dointvec_jiffies,
+               .strategy       = sysctl_jiffies
+       },
+       {
                .ctl_name = 0,
        },
 };
@@ -136,12 +150,6 @@ static ctl_table sunrpc_table[] = {
 #endif
 
 /*
- * How many times to try sending a request on a socket before waiting
- * for the socket buffer to clear.
- */
-#define XS_SENDMSG_RETRY       (10U)
-
-/*
  * Time out for an RPC UDP socket connect.  UDP socket connects are
  * synchronous, but we set a timeout anyway in case of resource
  * exhaustion on the local host.
@@ -255,6 +263,7 @@ struct sock_xprt {
        void                    (*old_data_ready)(struct sock *, int);
        void                    (*old_state_change)(struct sock *);
        void                    (*old_write_space)(struct sock *);
+       void                    (*old_error_report)(struct sock *);
 };
 
 /*
@@ -264,6 +273,13 @@ struct sock_xprt {
 #define TCP_RCV_COPY_FRAGHDR   (1UL << 1)
 #define TCP_RCV_COPY_XID       (1UL << 2)
 #define TCP_RCV_COPY_DATA      (1UL << 3)
+#define TCP_RCV_READ_CALLDIR   (1UL << 4)
+#define TCP_RCV_COPY_CALLDIR   (1UL << 5)
+
+/*
+ * TCP RPC flags
+ */
+#define TCP_RPC_REPLY          (1UL << 6)
 
 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 {
@@ -289,8 +305,7 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(20, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 20, NIPQUAD_FMT,
-                               NIPQUAD(addr->sin_addr.s_addr));
+               snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -305,8 +320,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(48, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
-                       NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
+                       &addr->sin_addr.s_addr,
                        ntohs(addr->sin_port),
                        protocol);
        }
@@ -328,8 +343,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(30, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
-                               NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 30, "%pI4.%u.%u",
+                               &addr->sin_addr.s_addr,
                                ntohs(addr->sin_port) >> 8,
                                ntohs(addr->sin_port) & 0xff);
        }
@@ -347,8 +362,7 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(40, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 40, NIP6_FMT,
-                               NIP6(addr->sin6_addr));
+               snprintf(buf, 40, "%pI6",&addr->sin6_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -363,18 +377,17 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(64, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
-                               NIP6(addr->sin6_addr),
+               snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
+                               &addr->sin6_addr,
                                ntohs(addr->sin6_port),
                                protocol);
        }
        xprt->address_strings[RPC_DISPLAY_ALL] = buf;
 
        buf = kzalloc(36, GFP_KERNEL);
-       if (buf) {
-               snprintf(buf, 36, NIP6_SEQFMT,
-                               NIP6(addr->sin6_addr));
-       }
+       if (buf)
+               snprintf(buf, 36, "%pi6", &addr->sin6_addr);
+
        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
 
        buf = kzalloc(8, GFP_KERNEL);
@@ -386,10 +399,10 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(50, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 50, NIP6_FMT".%u.%u",
-                               NIP6(addr->sin6_addr),
-                               ntohs(addr->sin6_port) >> 8,
-                               ntohs(addr->sin6_port) & 0xff);
+               snprintf(buf, 50, "%pI6.%u.%u",
+                        &addr->sin6_addr,
+                        ntohs(addr->sin6_port) >> 8,
+                        ntohs(addr->sin6_port) & 0xff);
        }
        xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
 
@@ -398,12 +411,16 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 {
-       kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
-       kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
-       kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
-       kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
-       kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
-       kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
+       unsigned int i;
+
+       for (i = 0; i < RPC_DISPLAY_MAX; i++)
+               switch (i) {
+               case RPC_DISPLAY_PROTO:
+               case RPC_DISPLAY_NETID:
+                       continue;
+               default:
+                       kfree(xprt->address_strings[i]);
+               }
 }
 
 #define XS_SENDMSG_FLAGS       (MSG_DONTWAIT | MSG_NOSIGNAL)
@@ -471,7 +488,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
        int err, sent = 0;
 
        if (unlikely(!sock))
-               return -ENOTCONN;
+               return -ENOTSOCK;
 
        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
        if (base != 0) {
@@ -512,35 +529,53 @@ out:
        return sent;
 }
 
+static void xs_nospace_callback(struct rpc_task *task)
+{
+       struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
+
+       transport->inet->sk_write_pending--;
+       clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+}
+
 /**
  * xs_nospace - place task on wait queue if transmit was incomplete
  * @task: task to put to sleep
  *
  */
-static void xs_nospace(struct rpc_task *task)
+static int xs_nospace(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+       int ret = 0;
 
        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
                        req->rq_slen);
 
-       if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
-               /* Protect against races with write_space */
-               spin_lock_bh(&xprt->transport_lock);
-
-               /* Don't race with disconnect */
-               if (!xprt_connected(xprt))
-                       task->tk_status = -ENOTCONN;
-               else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
-                       xprt_wait_for_buffer_space(task);
+       /* Protect against races with write_space */
+       spin_lock_bh(&xprt->transport_lock);
+
+       /* Don't race with disconnect */
+       if (xprt_connected(xprt)) {
+               if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
+                       ret = -EAGAIN;
+                       /*
+                        * Notify TCP that we're limited by the application
+                        * window size
+                        */
+                       set_bit(SOCK_NOSPACE, &transport->sock->flags);
+                       transport->inet->sk_write_pending++;
+                       /* ...and wait for more buffer space */
+                       xprt_wait_for_buffer_space(task, xs_nospace_callback);
+               }
+       } else {
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+               ret = -ENOTCONN;
+       }
 
-               spin_unlock_bh(&xprt->transport_lock);
-       } else
-               /* Keep holding the socket if it is blocked */
-               rpc_delay(task, HZ>>4);
+       spin_unlock_bh(&xprt->transport_lock);
+       return ret;
 }
 
 /**
@@ -566,7 +601,8 @@ static int xs_udp_send_request(struct rpc_task *task)
                                req->rq_svec->iov_base,
                                req->rq_svec->iov_len);
 
-       req->rq_xtime = jiffies;
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
        status = xs_sendpages(transport->sock,
                              xs_addr(xprt),
                              xprt->addrlen, xdr,
@@ -582,23 +618,28 @@ static int xs_udp_send_request(struct rpc_task *task)
                /* Still some bytes left; set up for a retry later. */
                status = -EAGAIN;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
-       case -ENETUNREACH:
-       case -EPIPE:
-       case -ECONNREFUSED:
-               /* When the server has died, an ICMP port unreachable message
-                * prompts ECONNREFUSED. */
+       case -ENOTSOCK:
+               status = -ENOTCONN;
+               /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               xs_nospace(task);
+               status = xs_nospace(task);
                break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
                        -status);
-               break;
+       case -ENETUNREACH:
+       case -EPIPE:
+       case -ECONNREFUSED:
+               /* When the server has died, an ICMP port unreachable message
+                * prompts ECONNREFUSED. */
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
        }
-
+out:
        return status;
 }
 
@@ -646,7 +687,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *xdr = &req->rq_snd_buf;
        int status;
-       unsigned int retry = 0;
 
        xs_encode_tcp_record_marker(&req->rq_snd_buf);
 
@@ -658,7 +698,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
         * to cope with writespace callbacks arriving _after_ we have
         * called sendmsg(). */
        while (1) {
-               req->rq_xtime = jiffies;
                status = xs_sendpages(transport->sock,
                                        NULL, 0, xdr, req->rq_bytes_sent);
 
@@ -677,28 +716,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
                        return 0;
                }
 
+               if (status != 0)
+                       continue;
                status = -EAGAIN;
-               if (retry++ > XS_SENDMSG_RETRY)
-                       break;
+               break;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
-       case -EAGAIN:
-               xs_nospace(task);
-               break;
-       case -ECONNREFUSED:
-       case -ECONNRESET:
-       case -ENOTCONN:
-       case -EPIPE:
+       case -ENOTSOCK:
                status = -ENOTCONN;
+               /* Should we call xs_close() here? */
+               break;
+       case -EAGAIN:
+               status = xs_nospace(task);
                break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
                        -status);
+       case -ECONNRESET:
+       case -EPIPE:
                xs_tcp_shutdown(xprt);
-               break;
+       case -ECONNREFUSED:
+       case -ENOTCONN:
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
        }
-
+out:
        return status;
 }
 
@@ -729,45 +773,78 @@ out_release:
        xprt_release_xprt(xprt, task);
 }
 
-/**
- * xs_close - close a socket
- * @xprt: transport
- *
- * This is used when all requests are complete; ie, no DRC state remains
- * on the server we want to save.
- */
-static void xs_close(struct rpc_xprt *xprt)
+static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       transport->old_data_ready = sk->sk_data_ready;
+       transport->old_state_change = sk->sk_state_change;
+       transport->old_write_space = sk->sk_write_space;
+       transport->old_error_report = sk->sk_error_report;
+}
+
+static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       sk->sk_data_ready = transport->old_data_ready;
+       sk->sk_state_change = transport->old_state_change;
+       sk->sk_write_space = transport->old_write_space;
+       sk->sk_error_report = transport->old_error_report;
+}
+
+static void xs_reset_transport(struct sock_xprt *transport)
 {
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct socket *sock = transport->sock;
        struct sock *sk = transport->inet;
 
-       if (!sk)
-               goto clear_close_wait;
-
-       dprintk("RPC:       xs_close xprt %p\n", xprt);
+       if (sk == NULL)
+               return;
 
        write_lock_bh(&sk->sk_callback_lock);
        transport->inet = NULL;
        transport->sock = NULL;
 
        sk->sk_user_data = NULL;
-       sk->sk_data_ready = transport->old_data_ready;
-       sk->sk_state_change = transport->old_state_change;
-       sk->sk_write_space = transport->old_write_space;
+
+       xs_restore_old_callbacks(transport, sk);
        write_unlock_bh(&sk->sk_callback_lock);
 
        sk->sk_no_check = 0;
 
        sock_release(sock);
-clear_close_wait:
+}
+
+/**
+ * xs_close - close a socket
+ * @xprt: transport
+ *
+ * This is used when all requests are complete; ie, no DRC state remains
+ * on the server we want to save.
+ *
+ * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
+ * xs_reset_transport() zeroing the socket from underneath a writer.
+ */
+static void xs_close(struct rpc_xprt *xprt)
+{
+       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+       dprintk("RPC:       xs_close xprt %p\n", xprt);
+
+       xs_reset_transport(transport);
+
        smp_mb__before_clear_bit();
+       clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
        clear_bit(XPRT_CLOSING, &xprt->state);
        smp_mb__after_clear_bit();
        xprt_disconnect_done(xprt);
 }
 
+static void xs_tcp_close(struct rpc_xprt *xprt)
+{
+       if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
+               xs_close(xprt);
+       else
+               xs_tcp_shutdown(xprt);
+}
+
 /**
  * xs_destroy - prepare to shutdown a transport
  * @xprt: doomed transport
@@ -889,7 +966,7 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea
        transport->tcp_offset = 0;
 
        /* Sanity check of the record length */
-       if (unlikely(transport->tcp_reclen < 4)) {
+       if (unlikely(transport->tcp_reclen < 8)) {
                dprintk("RPC:       invalid TCP record fragment length\n");
                xprt_force_disconnect(xprt);
                return;
@@ -924,33 +1001,77 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r
        if (used != len)
                return;
        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
-       transport->tcp_flags |= TCP_RCV_COPY_DATA;
+       transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
        transport->tcp_copied = 4;
-       dprintk("RPC:       reading reply for XID %08x\n",
+       dprintk("RPC:       reading %s XID %08x\n",
+                       (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
+                                                             : "request with",
                        ntohl(transport->tcp_xid));
        xs_tcp_check_fraghdr(transport);
 }
 
-static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
+static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
+                                      struct xdr_skb_reader *desc)
 {
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
+       size_t len, used;
+       u32 offset;
+       __be32  calldir;
+
+       /*
+        * We want transport->tcp_offset to be 8 at the end of this routine
+        * (4 bytes for the xid and 4 bytes for the call/reply flag).
+        * When this function is called for the first time,
+        * transport->tcp_offset is 4 (after having already read the xid).
+        */
+       offset = transport->tcp_offset - sizeof(transport->tcp_xid);
+       len = sizeof(calldir) - offset;
+       dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
+       used = xdr_skb_read_bits(desc, &calldir, len);
+       transport->tcp_offset += used;
+       if (used != len)
+               return;
+       transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
+       transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
+       transport->tcp_flags |= TCP_RCV_COPY_DATA;
+       /*
+        * We don't yet have the XDR buffer, so we will write the calldir
+        * out after we get the buffer from the 'struct rpc_rqst'
+        */
+       if (ntohl(calldir) == RPC_REPLY)
+               transport->tcp_flags |= TCP_RPC_REPLY;
+       else
+               transport->tcp_flags &= ~TCP_RPC_REPLY;
+       dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
+                       (transport->tcp_flags & TCP_RPC_REPLY) ?
+                               "reply for" : "request with", calldir);
+       xs_tcp_check_fraghdr(transport);
+}
+
+static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
+                                    struct xdr_skb_reader *desc,
+                                    struct rpc_rqst *req)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *rcvbuf;
        size_t len;
        ssize_t r;
 
-       /* Find and lock the request corresponding to this xid */
-       spin_lock(&xprt->transport_lock);
-       req = xprt_lookup_rqst(xprt, transport->tcp_xid);
-       if (!req) {
-               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-               dprintk("RPC:       XID %08x request not found!\n",
-                               ntohl(transport->tcp_xid));
-               spin_unlock(&xprt->transport_lock);
-               return;
+       rcvbuf = &req->rq_private_buf;
+
+       if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
+               /*
+                * Save the RPC direction in the XDR buffer
+                */
+               __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
+                                       htonl(RPC_REPLY) : 0;
+
+               memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
+                       &calldir, sizeof(calldir));
+               transport->tcp_copied += sizeof(calldir);
+               transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
        }
 
-       rcvbuf = &req->rq_private_buf;
        len = desc->count;
        if (len > transport->tcp_reclen - transport->tcp_offset) {
                struct xdr_skb_reader my_desc;
@@ -987,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
                                "tcp_offset = %u, tcp_reclen = %u\n",
                                xprt, transport->tcp_copied,
                                transport->tcp_offset, transport->tcp_reclen);
-               goto out;
+               return;
        }
 
        dprintk("RPC:       XID %08x read %Zd bytes\n",
@@ -1003,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
        }
 
-out:
+       return;
+}
+
+/*
+ * Finds the request corresponding to the RPC xid and invokes the common
+ * tcp read code to read the data.
+ */
+static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
+                                   struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+       struct rpc_rqst *req;
+
+       dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
+
+       /* Find and lock the request corresponding to this xid */
+       spin_lock(&xprt->transport_lock);
+       req = xprt_lookup_rqst(xprt, transport->tcp_xid);
+       if (!req) {
+               dprintk("RPC:       XID %08x request not found!\n",
+                               ntohl(transport->tcp_xid));
+               spin_unlock(&xprt->transport_lock);
+               return -1;
+       }
+
+       xs_tcp_read_common(xprt, desc, req);
+
        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
+
        spin_unlock(&xprt->transport_lock);
-       xs_tcp_check_fraghdr(transport);
+       return 0;
+}
+
+#if defined(CONFIG_NFS_V4_1)
+/*
+ * Obtains an rpc_rqst previously allocated and invokes the common
+ * tcp read code to read the data.  The result is placed in the callback
+ * queue.
+ * If we're unable to obtain the rpc_rqst we schedule the closing of the
+ * connection and return -1.
+ */
+static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
+                                      struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+       struct rpc_rqst *req;
+
+       req = xprt_alloc_bc_request(xprt);
+       if (req == NULL) {
+               printk(KERN_WARNING "Callback slot table overflowed\n");
+               xprt_force_disconnect(xprt);
+               return -1;
+       }
+
+       req->rq_xid = transport->tcp_xid;
+       dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
+       xs_tcp_read_common(xprt, desc, req);
+
+       if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
+               struct svc_serv *bc_serv = xprt->bc_serv;
+
+               /*
+                * Add callback request to callback list.  The callback
+                * service sleeps on the sv_cb_waitq waiting for new
+                * requests.  Wake it up after adding enqueing the
+                * request.
+                */
+               dprintk("RPC:       add callback request to list\n");
+               spin_lock(&bc_serv->sv_cb_lock);
+               list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
+               spin_unlock(&bc_serv->sv_cb_lock);
+               wake_up(&bc_serv->sv_cb_waitq);
+       }
+
+       req->rq_private_buf.len = transport->tcp_copied;
+
+       return 0;
+}
+
+static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
+                                       struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+
+       return (transport->tcp_flags & TCP_RPC_REPLY) ?
+               xs_tcp_read_reply(xprt, desc) :
+               xs_tcp_read_callback(xprt, desc);
+}
+#else
+static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
+                                       struct xdr_skb_reader *desc)
+{
+       return xs_tcp_read_reply(xprt, desc);
+}
+#endif /* CONFIG_NFS_V4_1 */
+
+/*
+ * Read data off the transport.  This can be either an RPC_CALL or an
+ * RPC_REPLY.  Relay the processing to helper functions.
+ */
+static void xs_tcp_read_data(struct rpc_xprt *xprt,
+                                   struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+
+       if (_xs_tcp_read_data(xprt, desc) == 0)
+               xs_tcp_check_fraghdr(transport);
+       else {
+               /*
+                * The transport_lock protects the request handling.
+                * There's no need to hold it to update the tcp_flags.
+                */
+               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+       }
 }
 
 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
@@ -1047,9 +1282,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
                        xs_tcp_read_xid(transport, &desc);
                        continue;
                }
+               /* Read in the call/reply flag */
+               if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
+                       xs_tcp_read_calldir(transport, &desc);
+                       continue;
+               }
                /* Read in the request data */
                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
-                       xs_tcp_read_request(xprt, &desc);
+                       xs_tcp_read_data(xprt, &desc);
                        continue;
                }
                /* Skip over any trailing bytes on short reads */
@@ -1069,6 +1309,7 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
 {
        struct rpc_xprt *xprt;
        read_descriptor_t rd_desc;
+       int read;
 
        dprintk("RPC:       xs_tcp_data_ready...\n");
 
@@ -1080,12 +1321,55 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
 
        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
        rd_desc.arg.data = xprt;
-       rd_desc.count = 65536;
-       tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+       do {
+               rd_desc.count = 65536;
+               read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+       } while (read > 0);
 out:
        read_unlock(&sk->sk_callback_lock);
 }
 
+/*
+ * Do the equivalent of linger/linger2 handling for dealing with
+ * broken servers that don't close the socket in a timely
+ * fashion
+ */
+static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
+               unsigned long timeout)
+{
+       struct sock_xprt *transport;
+
+       if (xprt_test_and_set_connecting(xprt))
+               return;
+       set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+       transport = container_of(xprt, struct sock_xprt, xprt);
+       queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
+                          timeout);
+}
+
+static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
+{
+       struct sock_xprt *transport;
+
+       transport = container_of(xprt, struct sock_xprt, xprt);
+
+       if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
+           !cancel_delayed_work(&transport->connect_worker))
+               return;
+       clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+       xprt_clear_connecting(xprt);
+}
+
+static void xs_sock_mark_closed(struct rpc_xprt *xprt)
+{
+       smp_mb__before_clear_bit();
+       clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+       clear_bit(XPRT_CLOSING, &xprt->state);
+       smp_mb__after_clear_bit();
+       /* Mark transport as closed and wake up all pending tasks */
+       xprt_disconnect_done(xprt);
+}
+
 /**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
@@ -1118,24 +1402,26 @@ static void xs_tcp_state_change(struct sock *sk)
                        transport->tcp_flags =
                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 
-                       xprt_wake_pending_tasks(xprt, 0);
+                       xprt_wake_pending_tasks(xprt, -EAGAIN);
                }
                spin_unlock_bh(&xprt->transport_lock);
                break;
        case TCP_FIN_WAIT1:
                /* The client initiated a shutdown of the socket */
+               xprt->connect_cookie++;
                xprt->reestablish_timeout = 0;
                set_bit(XPRT_CLOSING, &xprt->state);
                smp_mb__before_clear_bit();
                clear_bit(XPRT_CONNECTED, &xprt->state);
                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
                smp_mb__after_clear_bit();
+               xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
                break;
        case TCP_CLOSE_WAIT:
                /* The server initiated a shutdown of the socket */
-               set_bit(XPRT_CLOSING, &xprt->state);
                xprt_force_disconnect(xprt);
        case TCP_SYN_SENT:
+               xprt->connect_cookie++;
        case TCP_CLOSING:
                /*
                 * If the server closed down the connection, make sure that
@@ -1145,23 +1431,57 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
                break;
        case TCP_LAST_ACK:
+               set_bit(XPRT_CLOSING, &xprt->state);
+               xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
                smp_mb__before_clear_bit();
                clear_bit(XPRT_CONNECTED, &xprt->state);
                smp_mb__after_clear_bit();
                break;
        case TCP_CLOSE:
-               smp_mb__before_clear_bit();
-               clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
-               clear_bit(XPRT_CLOSING, &xprt->state);
-               smp_mb__after_clear_bit();
-               /* Mark transport as closed and wake up all pending tasks */
-               xprt_disconnect_done(xprt);
+               xs_tcp_cancel_linger_timeout(xprt);
+               xs_sock_mark_closed(xprt);
        }
  out:
        read_unlock(&sk->sk_callback_lock);
 }
 
 /**
+ * xs_error_report - callback mainly for catching socket errors
+ * @sk: socket
+ */
+static void xs_error_report(struct sock *sk)
+{
+       struct rpc_xprt *xprt;
+
+       read_lock(&sk->sk_callback_lock);
+       if (!(xprt = xprt_from_sock(sk)))
+               goto out;
+       dprintk("RPC:       %s client %p...\n"
+                       "RPC:       error %d\n",
+                       __func__, xprt, sk->sk_err);
+       xprt_wake_pending_tasks(xprt, -EAGAIN);
+out:
+       read_unlock(&sk->sk_callback_lock);
+}
+
+static void xs_write_space(struct sock *sk)
+{
+       struct socket *sock;
+       struct rpc_xprt *xprt;
+
+       if (unlikely(!(sock = sk->sk_socket)))
+               return;
+       clear_bit(SOCK_NOSPACE, &sock->flags);
+
+       if (unlikely(!(xprt = xprt_from_sock(sk))))
+               return;
+       if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
+               return;
+
+       xprt_write_space(xprt);
+}
+
+/**
  * xs_udp_write_space - callback invoked when socket buffer space
  *                             becomes available
  * @sk: socket whose state has changed
@@ -1176,21 +1496,9 @@ static void xs_udp_write_space(struct sock *sk)
        read_lock(&sk->sk_callback_lock);
 
        /* from net/core/sock.c:sock_def_write_space */
-       if (sock_writeable(sk)) {
-               struct socket *sock;
-               struct rpc_xprt *xprt;
-
-               if (unlikely(!(sock = sk->sk_socket)))
-                       goto out;
-               if (unlikely(!(xprt = xprt_from_sock(sk))))
-                       goto out;
-               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
-                       goto out;
+       if (sock_writeable(sk))
+               xs_write_space(sk);
 
-               xprt_write_space(xprt);
-       }
-
- out:
        read_unlock(&sk->sk_callback_lock);
 }
 
@@ -1209,21 +1517,9 @@ static void xs_tcp_write_space(struct sock *sk)
        read_lock(&sk->sk_callback_lock);
 
        /* from net/core/stream.c:sk_stream_write_space */
-       if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
-               struct socket *sock;
-               struct rpc_xprt *xprt;
-
-               if (unlikely(!(sock = sk->sk_socket)))
-                       goto out;
-               if (unlikely(!(xprt = xprt_from_sock(sk))))
-                       goto out;
-               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
-                       goto out;
+       if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
+               xs_write_space(sk);
 
-               xprt_write_space(xprt);
-       }
-
- out:
        read_unlock(&sk->sk_callback_lock);
 }
 
@@ -1354,8 +1650,8 @@ static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       %s "NIPQUAD_FMT":%u: %s (%d)\n",
-                       __FUNCTION__, NIPQUAD(myaddr.sin_addr),
+       dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
+                       __func__, &myaddr.sin_addr,
                        port, err ? "failed" : "ok", err);
        return err;
 }
@@ -1387,8 +1683,8 @@ static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
-               NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
+       dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
+               &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
        return err;
 }
 
@@ -1432,12 +1728,12 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
+               sk->sk_error_report = xs_error_report;
                sk->sk_no_check = UDP_CSUM_NORCV;
                sk->sk_allocation = GFP_ATOMIC;
 
@@ -1466,13 +1762,14 @@ static void xs_udp_connect_worker4(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1489,8 +1786,8 @@ static void xs_udp_connect_worker4(struct work_struct *work)
        xs_udp_finish_connecting(xprt, sock);
        status = 0;
 out:
-       xprt_wake_pending_tasks(xprt, status);
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
 }
 
 /**
@@ -1507,13 +1804,14 @@ static void xs_udp_connect_worker6(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1530,18 +1828,17 @@ static void xs_udp_connect_worker6(struct work_struct *work)
        xs_udp_finish_connecting(xprt, sock);
        status = 0;
 out:
-       xprt_wake_pending_tasks(xprt, status);
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
 }
 
 /*
  * We need to preserve the port number so the reply cache on the server can
  * find our cached RPC replies when we get around to reconnecting.
  */
-static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
+static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
 {
        int result;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct sockaddr any;
 
        dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
@@ -1553,11 +1850,24 @@ static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
        memset(&any, 0, sizeof(any));
        any.sa_family = AF_UNSPEC;
        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
-       if (result)
+       if (!result)
+               xs_sock_mark_closed(xprt);
+       else
                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
                                result);
 }
 
+static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
+{
+       unsigned int state = transport->inet->sk_state;
+
+       if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
+               return;
+       if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
+               return;
+       xs_abort_connection(xprt, transport);
+}
+
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -1567,13 +1877,13 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
+               sk->sk_error_report = xs_error_report;
                sk->sk_allocation = GFP_ATOMIC;
 
                /* socket options */
@@ -1591,6 +1901,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
+
        /* Tell the socket layer to start connecting... */
        xprt->stat.connect_count++;
        xprt->stat.connect_start = jiffies;
@@ -1598,37 +1911,42 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 }
 
 /**
- * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
- * @work: RPC transport to connect
+ * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
+ * @xprt: RPC transport to connect
+ * @transport: socket transport to connect
+ * @create_sock: function to create a socket of the correct type
  *
  * Invoked by a work queue tasklet.
  */
-static void xs_tcp_connect_worker4(struct work_struct *work)
+static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
+               struct sock_xprt *transport,
+               struct socket *(*create_sock)(struct rpc_xprt *,
+                       struct sock_xprt *))
 {
-       struct sock_xprt *transport =
-               container_of(work, struct sock_xprt, connect_worker.work);
-       struct rpc_xprt *xprt = &transport->xprt;
        struct socket *sock = transport->sock;
-       int err, status = -EIO;
+       int status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        if (!sock) {
-               /* start from scratch */
-               if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-                       dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
+               clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+               sock = create_sock(xprt, transport);
+               if (IS_ERR(sock)) {
+                       status = PTR_ERR(sock);
                        goto out;
                }
-               xs_reclassify_socket4(sock);
+       } else {
+               int abort_and_exit;
 
-               if (xs_bind4(transport, sock) < 0) {
-                       sock_release(sock);
-                       goto out;
-               }
-       } else
+               abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
+                               &xprt->state);
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
+
+               if (abort_and_exit)
+                       goto out_eagain;
+       }
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1637,83 +1955,109 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
                        xprt, -status, xprt_connected(xprt),
                        sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
+       switch (status) {
+       default:
+               printk("%s: connect returned unhandled error %d\n",
+                       __func__, status);
+       case -EADDRNOTAVAIL:
+               /* We're probably in TIME_WAIT. Get rid of existing socket,
+                * and retry
+                */
+               set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
+               xprt_force_disconnect(xprt);
+       case -ECONNREFUSED:
+       case -ECONNRESET:
+       case -ENETUNREACH:
+               /* retry with existing socket, after a delay */
+       case 0:
+       case -EINPROGRESS:
+       case -EALREADY:
+               xprt_clear_connecting(xprt);
+               return;
        }
+out_eagain:
+       status = -EAGAIN;
 out:
-       xprt_wake_pending_tasks(xprt, status);
-out_clear:
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
+}
+
+static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
+               struct sock_xprt *transport)
+{
+       struct socket *sock;
+       int err;
+
+       /* start from scratch */
+       err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (err < 0) {
+               dprintk("RPC:       can't create TCP transport socket (%d).\n",
+                               -err);
+               goto out_err;
+       }
+       xs_reclassify_socket4(sock);
+
+       if (xs_bind4(transport, sock) < 0) {
+               sock_release(sock);
+               goto out_err;
+       }
+       return sock;
+out_err:
+       return ERR_PTR(-EIO);
 }
 
 /**
- * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
+ * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
  * @work: RPC transport to connect
  *
  * Invoked by a work queue tasklet.
  */
-static void xs_tcp_connect_worker6(struct work_struct *work)
+static void xs_tcp_connect_worker4(struct work_struct *work)
 {
        struct sock_xprt *transport =
                container_of(work, struct sock_xprt, connect_worker.work);
        struct rpc_xprt *xprt = &transport->xprt;
-       struct socket *sock = transport->sock;
-       int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
-               goto out;
+       xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
+}
 
-       if (!sock) {
-               /* start from scratch */
-               if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-                       dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
-                       goto out;
-               }
-               xs_reclassify_socket6(sock);
+static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
+               struct sock_xprt *transport)
+{
+       struct socket *sock;
+       int err;
+
+       /* start from scratch */
+       err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (err < 0) {
+               dprintk("RPC:       can't create TCP transport socket (%d).\n",
+                               -err);
+               goto out_err;
+       }
+       xs_reclassify_socket6(sock);
 
-               if (xs_bind6(transport, sock) < 0) {
-                       sock_release(sock);
-                       goto out;
-               }
-       } else
-               /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+       if (xs_bind6(transport, sock) < 0) {
+               sock_release(sock);
+               goto out_err;
+       }
+       return sock;
+out_err:
+       return ERR_PTR(-EIO);
+}
 
-       dprintk("RPC:       worker connecting xprt %p to address: %s\n",
-                       xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+/**
+ * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_tcp_connect_worker6(struct work_struct *work)
+{
+       struct sock_xprt *transport =
+               container_of(work, struct sock_xprt, connect_worker.work);
+       struct rpc_xprt *xprt = &transport->xprt;
 
-       status = xs_tcp_finish_connecting(xprt, sock);
-       dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
-                       xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
-       }
-out:
-       xprt_wake_pending_tasks(xprt, status);
-out_clear:
-       xprt_clear_connecting(xprt);
+       xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
 }
 
 /**
@@ -1758,9 +2102,6 @@ static void xs_tcp_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       /* Initiate graceful shutdown of the socket if not already done */
-       if (test_bit(XPRT_CONNECTED, &xprt->state))
-               xs_tcp_shutdown(xprt);
        /* Exit if we need to wait for socket shutdown to complete */
        if (test_bit(XPRT_CLOSING, &xprt->state))
                return;
@@ -1842,7 +2183,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
        .buf_free               = rpc_free,
        .send_request           = xs_tcp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
-       .close                  = xs_tcp_shutdown,
+       .close                  = xs_tcp_close,
        .destroy                = xs_destroy,
        .print_stats            = xs_tcp_print_stats,
 };