SUNRPC: Don't disconnect if a connection is still in progress.
[safe/jmp/linux-2.6] / net / sunrpc / xprtsock.c
index 8bd3b0f..9d1898f 100644 (file)
@@ -3,8 +3,8 @@
  *
  * Client-side transport implementation for sockets.
  *
- * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
- * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
+ * TCP callback races fixes (C) 1998 Red Hat
+ * TCP send fixes (C) 1998 Red Hat
  * TCP NFS related read + write fixes
  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  *
@@ -136,12 +136,6 @@ static ctl_table sunrpc_table[] = {
 #endif
 
 /*
- * How many times to try sending a request on a socket before waiting
- * for the socket buffer to clear.
- */
-#define XS_SENDMSG_RETRY       (10U)
-
-/*
  * Time out for an RPC UDP socket connect.  UDP socket connects are
  * synchronous, but we set a timeout anyway in case of resource
  * exhaustion on the local host.
@@ -255,6 +249,7 @@ struct sock_xprt {
        void                    (*old_data_ready)(struct sock *, int);
        void                    (*old_state_change)(struct sock *);
        void                    (*old_write_space)(struct sock *);
+       void                    (*old_error_report)(struct sock *);
 };
 
 /*
@@ -289,8 +284,7 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(20, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 20, NIPQUAD_FMT,
-                               NIPQUAD(addr->sin_addr.s_addr));
+               snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -305,8 +299,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(48, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
-                       NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
+                       &addr->sin_addr.s_addr,
                        ntohs(addr->sin_port),
                        protocol);
        }
@@ -328,8 +322,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(30, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
-                               NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 30, "%pI4.%u.%u",
+                               &addr->sin_addr.s_addr,
                                ntohs(addr->sin_port) >> 8,
                                ntohs(addr->sin_port) & 0xff);
        }
@@ -347,8 +341,7 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(40, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 40, NIP6_FMT,
-                               NIP6(addr->sin6_addr));
+               snprintf(buf, 40, "%pI6",&addr->sin6_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -363,18 +356,17 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(64, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
-                               NIP6(addr->sin6_addr),
+               snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
+                               &addr->sin6_addr,
                                ntohs(addr->sin6_port),
                                protocol);
        }
        xprt->address_strings[RPC_DISPLAY_ALL] = buf;
 
        buf = kzalloc(36, GFP_KERNEL);
-       if (buf) {
-               snprintf(buf, 36, NIP6_SEQFMT,
-                               NIP6(addr->sin6_addr));
-       }
+       if (buf)
+               snprintf(buf, 36, "%pi6", &addr->sin6_addr);
+
        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
 
        buf = kzalloc(8, GFP_KERNEL);
@@ -386,10 +378,10 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(50, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 50, NIP6_FMT".%u.%u",
-                               NIP6(addr->sin6_addr),
-                               ntohs(addr->sin6_port) >> 8,
-                               ntohs(addr->sin6_port) & 0xff);
+               snprintf(buf, 50, "%pI6.%u.%u",
+                        &addr->sin6_addr,
+                        ntohs(addr->sin6_port) >> 8,
+                        ntohs(addr->sin6_port) & 0xff);
        }
        xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
 
@@ -475,7 +467,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
        int err, sent = 0;
 
        if (unlikely(!sock))
-               return -ENOTCONN;
+               return -ENOTSOCK;
 
        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
        if (base != 0) {
@@ -516,6 +508,14 @@ out:
        return sent;
 }
 
+static void xs_nospace_callback(struct rpc_task *task)
+{
+       struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
+
+       transport->inet->sk_write_pending--;
+       clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+}
+
 /**
  * xs_nospace - place task on wait queue if transmit was incomplete
  * @task: task to put to sleep
@@ -531,20 +531,27 @@ static void xs_nospace(struct rpc_task *task)
                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
                        req->rq_slen);
 
-       if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
-               /* Protect against races with write_space */
-               spin_lock_bh(&xprt->transport_lock);
-
-               /* Don't race with disconnect */
-               if (!xprt_connected(xprt))
-                       task->tk_status = -ENOTCONN;
-               else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
-                       xprt_wait_for_buffer_space(task);
+       /* Protect against races with write_space */
+       spin_lock_bh(&xprt->transport_lock);
+
+       /* Don't race with disconnect */
+       if (xprt_connected(xprt)) {
+               if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
+                       /*
+                        * Notify TCP that we're limited by the application
+                        * window size
+                        */
+                       set_bit(SOCK_NOSPACE, &transport->sock->flags);
+                       transport->inet->sk_write_pending++;
+                       /* ...and wait for more buffer space */
+                       xprt_wait_for_buffer_space(task, xs_nospace_callback);
+               }
+       } else {
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+               task->tk_status = -ENOTCONN;
+       }
 
-               spin_unlock_bh(&xprt->transport_lock);
-       } else
-               /* Keep holding the socket if it is blocked */
-               rpc_delay(task, HZ>>4);
+       spin_unlock_bh(&xprt->transport_lock);
 }
 
 /**
@@ -570,7 +577,8 @@ static int xs_udp_send_request(struct rpc_task *task)
                                req->rq_svec->iov_base,
                                req->rq_svec->iov_len);
 
-       req->rq_xtime = jiffies;
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
        status = xs_sendpages(transport->sock,
                              xs_addr(xprt),
                              xprt->addrlen, xdr,
@@ -588,19 +596,24 @@ static int xs_udp_send_request(struct rpc_task *task)
        }
 
        switch (status) {
+       case -ENOTSOCK:
+               status = -ENOTCONN;
+               /* Should we call xs_close() here? */
+               break;
+       case -EAGAIN:
+               xs_nospace(task);
+               break;
        case -ENETUNREACH:
        case -EPIPE:
        case -ECONNREFUSED:
                /* When the server has died, an ICMP port unreachable message
                 * prompts ECONNREFUSED. */
-               break;
-       case -EAGAIN:
-               xs_nospace(task);
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
                break;
        default:
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
                        -status);
-               break;
        }
 
        return status;
@@ -650,7 +663,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *xdr = &req->rq_snd_buf;
        int status;
-       unsigned int retry = 0;
 
        xs_encode_tcp_record_marker(&req->rq_snd_buf);
 
@@ -662,7 +674,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
         * to cope with writespace callbacks arriving _after_ we have
         * called sendmsg(). */
        while (1) {
-               req->rq_xtime = jiffies;
                status = xs_sendpages(transport->sock,
                                        NULL, 0, xdr, req->rq_bytes_sent);
 
@@ -681,26 +692,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
                        return 0;
                }
 
+               if (status != 0)
+                       continue;
                status = -EAGAIN;
-               if (retry++ > XS_SENDMSG_RETRY)
-                       break;
+               break;
        }
 
        switch (status) {
+       case -ENOTSOCK:
+               status = -ENOTCONN;
+               /* Should we call xs_close() here? */
+               break;
        case -EAGAIN:
                xs_nospace(task);
                break;
-       case -ECONNREFUSED:
        case -ECONNRESET:
+               xs_tcp_shutdown(xprt);
+       case -ECONNREFUSED:
        case -ENOTCONN:
        case -EPIPE:
                status = -ENOTCONN;
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
                break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
                        -status);
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
                xs_tcp_shutdown(xprt);
-               break;
        }
 
        return status;
@@ -733,38 +751,59 @@ out_release:
        xprt_release_xprt(xprt, task);
 }
 
-/**
- * xs_close - close a socket
- * @xprt: transport
- *
- * This is used when all requests are complete; ie, no DRC state remains
- * on the server we want to save.
- */
-static void xs_close(struct rpc_xprt *xprt)
+static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       transport->old_data_ready = sk->sk_data_ready;
+       transport->old_state_change = sk->sk_state_change;
+       transport->old_write_space = sk->sk_write_space;
+       transport->old_error_report = sk->sk_error_report;
+}
+
+static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       sk->sk_data_ready = transport->old_data_ready;
+       sk->sk_state_change = transport->old_state_change;
+       sk->sk_write_space = transport->old_write_space;
+       sk->sk_error_report = transport->old_error_report;
+}
+
+static void xs_reset_transport(struct sock_xprt *transport)
 {
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct socket *sock = transport->sock;
        struct sock *sk = transport->inet;
 
-       if (!sk)
-               goto clear_close_wait;
-
-       dprintk("RPC:       xs_close xprt %p\n", xprt);
+       if (sk == NULL)
+               return;
 
        write_lock_bh(&sk->sk_callback_lock);
        transport->inet = NULL;
        transport->sock = NULL;
 
        sk->sk_user_data = NULL;
-       sk->sk_data_ready = transport->old_data_ready;
-       sk->sk_state_change = transport->old_state_change;
-       sk->sk_write_space = transport->old_write_space;
+
+       xs_restore_old_callbacks(transport, sk);
        write_unlock_bh(&sk->sk_callback_lock);
 
        sk->sk_no_check = 0;
 
        sock_release(sock);
-clear_close_wait:
+}
+
+/**
+ * xs_close - close a socket
+ * @xprt: transport
+ *
+ * This is used when all requests are complete; ie, no DRC state remains
+ * on the server we want to save.
+ */
+static void xs_close(struct rpc_xprt *xprt)
+{
+       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+       dprintk("RPC:       xs_close xprt %p\n", xprt);
+
+       xs_reset_transport(transport);
+
        smp_mb__before_clear_bit();
        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
        clear_bit(XPRT_CLOSING, &xprt->state);
@@ -1131,6 +1170,7 @@ static void xs_tcp_state_change(struct sock *sk)
                break;
        case TCP_FIN_WAIT1:
                /* The client initiated a shutdown of the socket */
+               xprt->connect_cookie++;
                xprt->reestablish_timeout = 0;
                set_bit(XPRT_CLOSING, &xprt->state);
                smp_mb__before_clear_bit();
@@ -1140,9 +1180,9 @@ static void xs_tcp_state_change(struct sock *sk)
                break;
        case TCP_CLOSE_WAIT:
                /* The server initiated a shutdown of the socket */
-               set_bit(XPRT_CLOSING, &xprt->state);
                xprt_force_disconnect(xprt);
        case TCP_SYN_SENT:
+               xprt->connect_cookie++;
        case TCP_CLOSING:
                /*
                 * If the server closed down the connection, make sure that
@@ -1152,6 +1192,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
                break;
        case TCP_LAST_ACK:
+               set_bit(XPRT_CLOSING, &xprt->state);
                smp_mb__before_clear_bit();
                clear_bit(XPRT_CONNECTED, &xprt->state);
                smp_mb__after_clear_bit();
@@ -1169,6 +1210,28 @@ static void xs_tcp_state_change(struct sock *sk)
 }
 
 /**
+ * xs_tcp_error_report - callback mainly for catching RST events
+ * @sk: socket
+ */
+static void xs_tcp_error_report(struct sock *sk)
+{
+       struct rpc_xprt *xprt;
+
+       read_lock(&sk->sk_callback_lock);
+       if (sk->sk_err != ECONNRESET || sk->sk_state != TCP_ESTABLISHED)
+               goto out;
+       if (!(xprt = xprt_from_sock(sk)))
+               goto out;
+       dprintk("RPC:       %s client %p...\n"
+                       "RPC:       error %d\n",
+                       __func__, xprt, sk->sk_err);
+
+       xprt_force_disconnect(xprt);
+out:
+       read_unlock(&sk->sk_callback_lock);
+}
+
+/**
  * xs_udp_write_space - callback invoked when socket buffer space
  *                             becomes available
  * @sk: socket whose state has changed
@@ -1189,9 +1252,11 @@ static void xs_udp_write_space(struct sock *sk)
 
                if (unlikely(!(sock = sk->sk_socket)))
                        goto out;
+               clear_bit(SOCK_NOSPACE, &sock->flags);
+
                if (unlikely(!(xprt = xprt_from_sock(sk))))
                        goto out;
-               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
+               if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
                        goto out;
 
                xprt_write_space(xprt);
@@ -1222,9 +1287,11 @@ static void xs_tcp_write_space(struct sock *sk)
 
                if (unlikely(!(sock = sk->sk_socket)))
                        goto out;
+               clear_bit(SOCK_NOSPACE, &sock->flags);
+
                if (unlikely(!(xprt = xprt_from_sock(sk))))
                        goto out;
-               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
+               if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
                        goto out;
 
                xprt_write_space(xprt);
@@ -1361,8 +1428,8 @@ static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       %s "NIPQUAD_FMT":%u: %s (%d)\n",
-                       __FUNCTION__, NIPQUAD(myaddr.sin_addr),
+       dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
+                       __func__, &myaddr.sin_addr,
                        port, err ? "failed" : "ok", err);
        return err;
 }
@@ -1394,8 +1461,8 @@ static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
-               NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
+       dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
+               &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
        return err;
 }
 
@@ -1439,10 +1506,9 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
                sk->sk_no_check = UDP_CSUM_NORCV;
@@ -1473,13 +1539,14 @@ static void xs_udp_connect_worker4(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1514,13 +1581,14 @@ static void xs_udp_connect_worker6(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1545,10 +1613,9 @@ out:
  * We need to preserve the port number so the reply cache on the server can
  * find our cached RPC replies when we get around to reconnecting.
  */
-static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
+static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
 {
        int result;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct sockaddr any;
 
        dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
@@ -1565,6 +1632,17 @@ static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
                                result);
 }
 
+static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
+{
+       unsigned int state = transport->inet->sk_state;
+
+       if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
+               return;
+       if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
+               return;
+       xs_abort_connection(xprt, transport);
+}
+
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -1574,13 +1652,13 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
+               sk->sk_error_report = xs_tcp_error_report;
                sk->sk_allocation = GFP_ATOMIC;
 
                /* socket options */
@@ -1598,6 +1676,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
+
        /* Tell the socket layer to start connecting... */
        xprt->stat.connect_count++;
        xprt->stat.connect_start = jiffies;
@@ -1618,7 +1699,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        if (!sock) {
@@ -1635,7 +1716,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
                }
        } else
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1678,7 +1759,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        if (!sock) {
@@ -1695,7 +1776,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
                }
        } else
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1765,9 +1846,6 @@ static void xs_tcp_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       /* Initiate graceful shutdown of the socket if not already done */
-       if (test_bit(XPRT_CONNECTED, &xprt->state))
-               xs_tcp_shutdown(xprt);
        /* Exit if we need to wait for socket shutdown to complete */
        if (test_bit(XPRT_CLOSING, &xprt->state))
                return;