nfs41: New xs_tcp_read_data()
[safe/jmp/linux-2.6] / net / sunrpc / xprtsock.c
index 1127eb9..e3e3a57 100644 (file)
@@ -34,6 +34,9 @@
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
+#ifdef CONFIG_NFS_V4_1
+#include <linux/sunrpc/bc_xprt.h>
+#endif
 
 #include <net/sock.h>
 #include <net/checksum.h>
@@ -49,6 +52,9 @@ unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
 
+#define XS_TCP_LINGER_TO       (15U * HZ)
+static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
+
 /*
  * We can register our own files under /proc/sys/sunrpc by
  * calling register_sysctl_table() again.  The files in that
@@ -117,6 +123,14 @@ static ctl_table xs_tunables_table[] = {
                .extra2         = &xprt_max_resvport_limit
        },
        {
+               .procname       = "tcp_fin_timeout",
+               .data           = &xs_tcp_fin_timeout,
+               .maxlen         = sizeof(xs_tcp_fin_timeout),
+               .mode           = 0644,
+               .proc_handler   = &proc_dointvec_jiffies,
+               .strategy       = sysctl_jiffies
+       },
+       {
                .ctl_name = 0,
        },
 };
@@ -259,6 +273,13 @@ struct sock_xprt {
 #define TCP_RCV_COPY_FRAGHDR   (1UL << 1)
 #define TCP_RCV_COPY_XID       (1UL << 2)
 #define TCP_RCV_COPY_DATA      (1UL << 3)
+#define TCP_RCV_READ_CALLDIR   (1UL << 4)
+#define TCP_RCV_COPY_CALLDIR   (1UL << 5)
+
+/*
+ * TCP RPC flags
+ */
+#define TCP_RPC_REPLY          (1UL << 6)
 
 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 {
@@ -521,11 +542,12 @@ static void xs_nospace_callback(struct rpc_task *task)
  * @task: task to put to sleep
  *
  */
-static void xs_nospace(struct rpc_task *task)
+static int xs_nospace(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+       int ret = 0;
 
        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
@@ -537,6 +559,7 @@ static void xs_nospace(struct rpc_task *task)
        /* Don't race with disconnect */
        if (xprt_connected(xprt)) {
                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
+                       ret = -EAGAIN;
                        /*
                         * Notify TCP that we're limited by the application
                         * window size
@@ -548,10 +571,11 @@ static void xs_nospace(struct rpc_task *task)
                }
        } else {
                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               task->tk_status = -ENOTCONN;
+               ret = -ENOTCONN;
        }
 
        spin_unlock_bh(&xprt->transport_lock);
+       return ret;
 }
 
 /**
@@ -594,6 +618,8 @@ static int xs_udp_send_request(struct rpc_task *task)
                /* Still some bytes left; set up for a retry later. */
                status = -EAGAIN;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
        case -ENOTSOCK:
@@ -601,21 +627,19 @@ static int xs_udp_send_request(struct rpc_task *task)
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               xs_nospace(task);
+               status = xs_nospace(task);
                break;
+       default:
+               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
+                       -status);
        case -ENETUNREACH:
        case -EPIPE:
        case -ECONNREFUSED:
                /* When the server has died, an ICMP port unreachable message
                 * prompts ECONNREFUSED. */
                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               break;
-       default:
-               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
-                       -status);
        }
-
+out:
        return status;
 }
 
@@ -697,6 +721,8 @@ static int xs_tcp_send_request(struct rpc_task *task)
                status = -EAGAIN;
                break;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
        case -ENOTSOCK:
@@ -704,23 +730,19 @@ static int xs_tcp_send_request(struct rpc_task *task)
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               xs_nospace(task);
+               status = xs_nospace(task);
                break;
+       default:
+               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
+                       -status);
        case -ECONNRESET:
+       case -EPIPE:
                xs_tcp_shutdown(xprt);
        case -ECONNREFUSED:
        case -ENOTCONN:
-       case -EPIPE:
-               status = -ENOTCONN;
-               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               break;
-       default:
-               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
-                       -status);
                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               xs_tcp_shutdown(xprt);
        }
-
+out:
        return status;
 }
 
@@ -795,6 +817,9 @@ static void xs_reset_transport(struct sock_xprt *transport)
  *
  * This is used when all requests are complete; ie, no DRC state remains
  * on the server we want to save.
+ *
+ * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
+ * xs_reset_transport() zeroing the socket from underneath a writer.
  */
 static void xs_close(struct rpc_xprt *xprt)
 {
@@ -805,12 +830,21 @@ static void xs_close(struct rpc_xprt *xprt)
        xs_reset_transport(transport);
 
        smp_mb__before_clear_bit();
+       clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
        clear_bit(XPRT_CLOSING, &xprt->state);
        smp_mb__after_clear_bit();
        xprt_disconnect_done(xprt);
 }
 
+static void xs_tcp_close(struct rpc_xprt *xprt)
+{
+       if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
+               xs_close(xprt);
+       else
+               xs_tcp_shutdown(xprt);
+}
+
 /**
  * xs_destroy - prepare to shutdown a transport
  * @xprt: doomed transport
@@ -932,7 +966,7 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea
        transport->tcp_offset = 0;
 
        /* Sanity check of the record length */
-       if (unlikely(transport->tcp_reclen < 4)) {
+       if (unlikely(transport->tcp_reclen < 8)) {
                dprintk("RPC:       invalid TCP record fragment length\n");
                xprt_force_disconnect(xprt);
                return;
@@ -967,33 +1001,77 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r
        if (used != len)
                return;
        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
-       transport->tcp_flags |= TCP_RCV_COPY_DATA;
+       transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
        transport->tcp_copied = 4;
-       dprintk("RPC:       reading reply for XID %08x\n",
+       dprintk("RPC:       reading %s XID %08x\n",
+                       (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
+                                                             : "request with",
                        ntohl(transport->tcp_xid));
        xs_tcp_check_fraghdr(transport);
 }
 
-static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
+static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
+                                      struct xdr_skb_reader *desc)
 {
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
+       size_t len, used;
+       u32 offset;
+       __be32  calldir;
+
+       /*
+        * We want transport->tcp_offset to be 8 at the end of this routine
+        * (4 bytes for the xid and 4 bytes for the call/reply flag).
+        * When this function is called for the first time,
+        * transport->tcp_offset is 4 (after having already read the xid).
+        */
+       offset = transport->tcp_offset - sizeof(transport->tcp_xid);
+       len = sizeof(calldir) - offset;
+       dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
+       used = xdr_skb_read_bits(desc, &calldir, len);
+       transport->tcp_offset += used;
+       if (used != len)
+               return;
+       transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
+       transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
+       transport->tcp_flags |= TCP_RCV_COPY_DATA;
+       /*
+        * We don't yet have the XDR buffer, so we will write the calldir
+        * out after we get the buffer from the 'struct rpc_rqst'
+        */
+       if (ntohl(calldir) == RPC_REPLY)
+               transport->tcp_flags |= TCP_RPC_REPLY;
+       else
+               transport->tcp_flags &= ~TCP_RPC_REPLY;
+       dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
+                       (transport->tcp_flags & TCP_RPC_REPLY) ?
+                               "reply for" : "request with", calldir);
+       xs_tcp_check_fraghdr(transport);
+}
+
+static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
+                                    struct xdr_skb_reader *desc,
+                                    struct rpc_rqst *req)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *rcvbuf;
        size_t len;
        ssize_t r;
 
-       /* Find and lock the request corresponding to this xid */
-       spin_lock(&xprt->transport_lock);
-       req = xprt_lookup_rqst(xprt, transport->tcp_xid);
-       if (!req) {
-               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-               dprintk("RPC:       XID %08x request not found!\n",
-                               ntohl(transport->tcp_xid));
-               spin_unlock(&xprt->transport_lock);
-               return;
+       rcvbuf = &req->rq_private_buf;
+
+       if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
+               /*
+                * Save the RPC direction in the XDR buffer
+                */
+               __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
+                                       htonl(RPC_REPLY) : 0;
+
+               memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
+                       &calldir, sizeof(calldir));
+               transport->tcp_copied += sizeof(calldir);
+               transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
        }
 
-       rcvbuf = &req->rq_private_buf;
        len = desc->count;
        if (len > transport->tcp_reclen - transport->tcp_offset) {
                struct xdr_skb_reader my_desc;
@@ -1030,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
                                "tcp_offset = %u, tcp_reclen = %u\n",
                                xprt, transport->tcp_copied,
                                transport->tcp_offset, transport->tcp_reclen);
-               goto out;
+               return;
        }
 
        dprintk("RPC:       XID %08x read %Zd bytes\n",
@@ -1046,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
        }
 
-out:
+       return;
+}
+
+/*
+ * Finds the request corresponding to the RPC xid and invokes the common
+ * tcp read code to read the data.
+ */
+static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
+                                   struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+       struct rpc_rqst *req;
+
+       dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
+
+       /* Find and lock the request corresponding to this xid */
+       spin_lock(&xprt->transport_lock);
+       req = xprt_lookup_rqst(xprt, transport->tcp_xid);
+       if (!req) {
+               dprintk("RPC:       XID %08x request not found!\n",
+                               ntohl(transport->tcp_xid));
+               spin_unlock(&xprt->transport_lock);
+               return -1;
+       }
+
+       xs_tcp_read_common(xprt, desc, req);
+
        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
+
        spin_unlock(&xprt->transport_lock);
-       xs_tcp_check_fraghdr(transport);
+       return 0;
+}
+
+#if defined(CONFIG_NFS_V4_1)
+/*
+ * Obtains an rpc_rqst previously allocated and invokes the common
+ * tcp read code to read the data.  The result is placed in the callback
+ * queue.
+ * If we're unable to obtain the rpc_rqst we schedule the closing of the
+ * connection and return -1.
+ */
+static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
+                                      struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+       struct rpc_rqst *req;
+
+       req = xprt_alloc_bc_request(xprt);
+       if (req == NULL) {
+               printk(KERN_WARNING "Callback slot table overflowed\n");
+               xprt_force_disconnect(xprt);
+               return -1;
+       }
+
+       req->rq_xid = transport->tcp_xid;
+       dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
+       xs_tcp_read_common(xprt, desc, req);
+
+       if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
+               struct svc_serv *bc_serv = xprt->bc_serv;
+
+               /*
+                * Add callback request to callback list.  The callback
+                * service sleeps on the sv_cb_waitq waiting for new
+                * requests.  Wake it up after adding enqueing the
+                * request.
+                */
+               dprintk("RPC:       add callback request to list\n");
+               spin_lock(&bc_serv->sv_cb_lock);
+               list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
+               spin_unlock(&bc_serv->sv_cb_lock);
+               wake_up(&bc_serv->sv_cb_waitq);
+       }
+
+       req->rq_private_buf.len = transport->tcp_copied;
+
+       return 0;
+}
+
+static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
+                                       struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+
+       return (transport->tcp_flags & TCP_RPC_REPLY) ?
+               xs_tcp_read_reply(xprt, desc) :
+               xs_tcp_read_callback(xprt, desc);
+}
+#else
+static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
+                                       struct xdr_skb_reader *desc)
+{
+       return xs_tcp_read_reply(xprt, desc);
+}
+#endif /* CONFIG_NFS_V4_1 */
+
+/*
+ * Read data off the transport.  This can be either an RPC_CALL or an
+ * RPC_REPLY.  Relay the processing to helper functions.
+ */
+static void xs_tcp_read_data(struct rpc_xprt *xprt,
+                                   struct xdr_skb_reader *desc)
+{
+       struct sock_xprt *transport =
+                               container_of(xprt, struct sock_xprt, xprt);
+
+       if (_xs_tcp_read_data(xprt, desc) == 0)
+               xs_tcp_check_fraghdr(transport);
+       else {
+               /*
+                * The transport_lock protects the request handling.
+                * There's no need to hold it to update the tcp_flags.
+                */
+               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+       }
 }
 
 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
@@ -1090,9 +1282,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
                        xs_tcp_read_xid(transport, &desc);
                        continue;
                }
+               /* Read in the call/reply flag */
+               if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
+                       xs_tcp_read_calldir(transport, &desc);
+                       continue;
+               }
                /* Read in the request data */
                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
-                       xs_tcp_read_request(xprt, &desc);
+                       xs_tcp_read_data(xprt, &desc);
                        continue;
                }
                /* Skip over any trailing bytes on short reads */
@@ -1132,6 +1329,47 @@ out:
        read_unlock(&sk->sk_callback_lock);
 }
 
+/*
+ * Do the equivalent of linger/linger2 handling for dealing with
+ * broken servers that don't close the socket in a timely
+ * fashion
+ */
+static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
+               unsigned long timeout)
+{
+       struct sock_xprt *transport;
+
+       if (xprt_test_and_set_connecting(xprt))
+               return;
+       set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+       transport = container_of(xprt, struct sock_xprt, xprt);
+       queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
+                          timeout);
+}
+
+static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
+{
+       struct sock_xprt *transport;
+
+       transport = container_of(xprt, struct sock_xprt, xprt);
+
+       if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
+           !cancel_delayed_work(&transport->connect_worker))
+               return;
+       clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+       xprt_clear_connecting(xprt);
+}
+
+static void xs_sock_mark_closed(struct rpc_xprt *xprt)
+{
+       smp_mb__before_clear_bit();
+       clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+       clear_bit(XPRT_CLOSING, &xprt->state);
+       smp_mb__after_clear_bit();
+       /* Mark transport as closed and wake up all pending tasks */
+       xprt_disconnect_done(xprt);
+}
+
 /**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
@@ -1164,7 +1402,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        transport->tcp_flags =
                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 
-                       xprt_wake_pending_tasks(xprt, 0);
+                       xprt_wake_pending_tasks(xprt, -EAGAIN);
                }
                spin_unlock_bh(&xprt->transport_lock);
                break;
@@ -1177,10 +1415,10 @@ static void xs_tcp_state_change(struct sock *sk)
                clear_bit(XPRT_CONNECTED, &xprt->state);
                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
                smp_mb__after_clear_bit();
+               xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
                break;
        case TCP_CLOSE_WAIT:
                /* The server initiated a shutdown of the socket */
-               set_bit(XPRT_CLOSING, &xprt->state);
                xprt_force_disconnect(xprt);
        case TCP_SYN_SENT:
                xprt->connect_cookie++;
@@ -1193,44 +1431,56 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
                break;
        case TCP_LAST_ACK:
+               set_bit(XPRT_CLOSING, &xprt->state);
+               xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
                smp_mb__before_clear_bit();
                clear_bit(XPRT_CONNECTED, &xprt->state);
                smp_mb__after_clear_bit();
                break;
        case TCP_CLOSE:
-               smp_mb__before_clear_bit();
-               clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
-               clear_bit(XPRT_CLOSING, &xprt->state);
-               smp_mb__after_clear_bit();
-               /* Mark transport as closed and wake up all pending tasks */
-               xprt_disconnect_done(xprt);
+               xs_tcp_cancel_linger_timeout(xprt);
+               xs_sock_mark_closed(xprt);
        }
  out:
        read_unlock(&sk->sk_callback_lock);
 }
 
 /**
- * xs_tcp_error_report - callback mainly for catching RST events
+ * xs_error_report - callback mainly for catching socket errors
  * @sk: socket
  */
-static void xs_tcp_error_report(struct sock *sk)
+static void xs_error_report(struct sock *sk)
 {
        struct rpc_xprt *xprt;
 
        read_lock(&sk->sk_callback_lock);
-       if (sk->sk_err != ECONNRESET || sk->sk_state != TCP_ESTABLISHED)
-               goto out;
        if (!(xprt = xprt_from_sock(sk)))
                goto out;
        dprintk("RPC:       %s client %p...\n"
                        "RPC:       error %d\n",
                        __func__, xprt, sk->sk_err);
-
-       xprt_force_disconnect(xprt);
+       xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
        read_unlock(&sk->sk_callback_lock);
 }
 
+static void xs_write_space(struct sock *sk)
+{
+       struct socket *sock;
+       struct rpc_xprt *xprt;
+
+       if (unlikely(!(sock = sk->sk_socket)))
+               return;
+       clear_bit(SOCK_NOSPACE, &sock->flags);
+
+       if (unlikely(!(xprt = xprt_from_sock(sk))))
+               return;
+       if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
+               return;
+
+       xprt_write_space(xprt);
+}
+
 /**
  * xs_udp_write_space - callback invoked when socket buffer space
  *                             becomes available
@@ -1246,23 +1496,9 @@ static void xs_udp_write_space(struct sock *sk)
        read_lock(&sk->sk_callback_lock);
 
        /* from net/core/sock.c:sock_def_write_space */
-       if (sock_writeable(sk)) {
-               struct socket *sock;
-               struct rpc_xprt *xprt;
+       if (sock_writeable(sk))
+               xs_write_space(sk);
 
-               if (unlikely(!(sock = sk->sk_socket)))
-                       goto out;
-               clear_bit(SOCK_NOSPACE, &sock->flags);
-
-               if (unlikely(!(xprt = xprt_from_sock(sk))))
-                       goto out;
-               if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
-                       goto out;
-
-               xprt_write_space(xprt);
-       }
-
- out:
        read_unlock(&sk->sk_callback_lock);
 }
 
@@ -1281,23 +1517,9 @@ static void xs_tcp_write_space(struct sock *sk)
        read_lock(&sk->sk_callback_lock);
 
        /* from net/core/stream.c:sk_stream_write_space */
-       if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
-               struct socket *sock;
-               struct rpc_xprt *xprt;
-
-               if (unlikely(!(sock = sk->sk_socket)))
-                       goto out;
-               clear_bit(SOCK_NOSPACE, &sock->flags);
+       if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
+               xs_write_space(sk);
 
-               if (unlikely(!(xprt = xprt_from_sock(sk))))
-                       goto out;
-               if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
-                       goto out;
-
-               xprt_write_space(xprt);
-       }
-
- out:
        read_unlock(&sk->sk_callback_lock);
 }
 
@@ -1511,6 +1733,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                sk->sk_user_data = xprt;
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
+               sk->sk_error_report = xs_error_report;
                sk->sk_no_check = UDP_CSUM_NORCV;
                sk->sk_allocation = GFP_ATOMIC;
 
@@ -1563,8 +1786,8 @@ static void xs_udp_connect_worker4(struct work_struct *work)
        xs_udp_finish_connecting(xprt, sock);
        status = 0;
 out:
-       xprt_wake_pending_tasks(xprt, status);
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
 }
 
 /**
@@ -1605,18 +1828,17 @@ static void xs_udp_connect_worker6(struct work_struct *work)
        xs_udp_finish_connecting(xprt, sock);
        status = 0;
 out:
-       xprt_wake_pending_tasks(xprt, status);
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
 }
 
 /*
  * We need to preserve the port number so the reply cache on the server can
  * find our cached RPC replies when we get around to reconnecting.
  */
-static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
+static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
 {
        int result;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct sockaddr any;
 
        dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
@@ -1628,11 +1850,24 @@ static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
        memset(&any, 0, sizeof(any));
        any.sa_family = AF_UNSPEC;
        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
-       if (result)
+       if (!result)
+               xs_sock_mark_closed(xprt);
+       else
                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
                                result);
 }
 
+static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
+{
+       unsigned int state = transport->inet->sk_state;
+
+       if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
+               return;
+       if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
+               return;
+       xs_abort_connection(xprt, transport);
+}
+
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -1648,7 +1883,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
-               sk->sk_error_report = xs_tcp_error_report;
+               sk->sk_error_report = xs_error_report;
                sk->sk_allocation = GFP_ATOMIC;
 
                /* socket options */
@@ -1676,37 +1911,42 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 }
 
 /**
- * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
- * @work: RPC transport to connect
+ * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
+ * @xprt: RPC transport to connect
+ * @transport: socket transport to connect
+ * @create_sock: function to create a socket of the correct type
  *
  * Invoked by a work queue tasklet.
  */
-static void xs_tcp_connect_worker4(struct work_struct *work)
+static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
+               struct sock_xprt *transport,
+               struct socket *(*create_sock)(struct rpc_xprt *,
+                       struct sock_xprt *))
 {
-       struct sock_xprt *transport =
-               container_of(work, struct sock_xprt, connect_worker.work);
-       struct rpc_xprt *xprt = &transport->xprt;
        struct socket *sock = transport->sock;
-       int err, status = -EIO;
+       int status = -EIO;
 
        if (xprt->shutdown)
                goto out;
 
        if (!sock) {
-               /* start from scratch */
-               if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-                       dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
+               clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
+               sock = create_sock(xprt, transport);
+               if (IS_ERR(sock)) {
+                       status = PTR_ERR(sock);
                        goto out;
                }
-               xs_reclassify_socket4(sock);
+       } else {
+               int abort_and_exit;
 
-               if (xs_bind4(transport, sock) < 0) {
-                       sock_release(sock);
-                       goto out;
-               }
-       } else
+               abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
+                               &xprt->state);
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
+
+               if (abort_and_exit)
+                       goto out_eagain;
+       }
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1715,83 +1955,109 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
                        xprt, -status, xprt_connected(xprt),
                        sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
+       switch (status) {
+       default:
+               printk("%s: connect returned unhandled error %d\n",
+                       __func__, status);
+       case -EADDRNOTAVAIL:
+               /* We're probably in TIME_WAIT. Get rid of existing socket,
+                * and retry
+                */
+               set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
+               xprt_force_disconnect(xprt);
+       case -ECONNREFUSED:
+       case -ECONNRESET:
+       case -ENETUNREACH:
+               /* retry with existing socket, after a delay */
+       case 0:
+       case -EINPROGRESS:
+       case -EALREADY:
+               xprt_clear_connecting(xprt);
+               return;
        }
+out_eagain:
+       status = -EAGAIN;
 out:
-       xprt_wake_pending_tasks(xprt, status);
-out_clear:
        xprt_clear_connecting(xprt);
+       xprt_wake_pending_tasks(xprt, status);
+}
+
+static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
+               struct sock_xprt *transport)
+{
+       struct socket *sock;
+       int err;
+
+       /* start from scratch */
+       err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (err < 0) {
+               dprintk("RPC:       can't create TCP transport socket (%d).\n",
+                               -err);
+               goto out_err;
+       }
+       xs_reclassify_socket4(sock);
+
+       if (xs_bind4(transport, sock) < 0) {
+               sock_release(sock);
+               goto out_err;
+       }
+       return sock;
+out_err:
+       return ERR_PTR(-EIO);
 }
 
 /**
- * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
+ * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
  * @work: RPC transport to connect
  *
  * Invoked by a work queue tasklet.
  */
-static void xs_tcp_connect_worker6(struct work_struct *work)
+static void xs_tcp_connect_worker4(struct work_struct *work)
 {
        struct sock_xprt *transport =
                container_of(work, struct sock_xprt, connect_worker.work);
        struct rpc_xprt *xprt = &transport->xprt;
-       struct socket *sock = transport->sock;
-       int err, status = -EIO;
 
-       if (xprt->shutdown)
-               goto out;
-
-       if (!sock) {
-               /* start from scratch */
-               if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-                       dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
-                       goto out;
-               }
-               xs_reclassify_socket6(sock);
+       xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
+}
 
-               if (xs_bind6(transport, sock) < 0) {
-                       sock_release(sock);
-                       goto out;
-               }
-       } else
-               /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
+               struct sock_xprt *transport)
+{
+       struct socket *sock;
+       int err;
 
-       dprintk("RPC:       worker connecting xprt %p to address: %s\n",
-                       xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+       /* start from scratch */
+       err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (err < 0) {
+               dprintk("RPC:       can't create TCP transport socket (%d).\n",
+                               -err);
+               goto out_err;
+       }
+       xs_reclassify_socket6(sock);
 
-       status = xs_tcp_finish_connecting(xprt, sock);
-       dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
-                       xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
+       if (xs_bind6(transport, sock) < 0) {
+               sock_release(sock);
+               goto out_err;
        }
-out:
-       xprt_wake_pending_tasks(xprt, status);
-out_clear:
-       xprt_clear_connecting(xprt);
+       return sock;
+out_err:
+       return ERR_PTR(-EIO);
+}
+
+/**
+ * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_tcp_connect_worker6(struct work_struct *work)
+{
+       struct sock_xprt *transport =
+               container_of(work, struct sock_xprt, connect_worker.work);
+       struct rpc_xprt *xprt = &transport->xprt;
+
+       xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
 }
 
 /**
@@ -1836,9 +2102,6 @@ static void xs_tcp_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       /* Initiate graceful shutdown of the socket if not already done */
-       if (test_bit(XPRT_CONNECTED, &xprt->state))
-               xs_tcp_shutdown(xprt);
        /* Exit if we need to wait for socket shutdown to complete */
        if (test_bit(XPRT_CLOSING, &xprt->state))
                return;
@@ -1920,7 +2183,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
        .buf_free               = rpc_free,
        .send_request           = xs_tcp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
-       .close                  = xs_tcp_shutdown,
+       .close                  = xs_tcp_close,
        .destroy                = xs_destroy,
        .print_stats            = xs_tcp_print_stats,
 };