SUNRPC: Return EAGAIN instead of ENOTCONN when waking up xprt->pending
[safe/jmp/linux-2.6] / net / sunrpc / xprtsock.c
index 4a567a9..8e58b0b 100644 (file)
@@ -3,8 +3,8 @@
  *
  * Client-side transport implementation for sockets.
  *
- * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
- * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
+ * TCP callback races fixes (C) 1998 Red Hat
+ * TCP send fixes (C) 1998 Red Hat
  * TCP NFS related read + write fixes
  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  *
@@ -249,6 +249,7 @@ struct sock_xprt {
        void                    (*old_data_ready)(struct sock *, int);
        void                    (*old_state_change)(struct sock *);
        void                    (*old_write_space)(struct sock *);
+       void                    (*old_error_report)(struct sock *);
 };
 
 /*
@@ -283,8 +284,7 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(20, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 20, NIPQUAD_FMT,
-                               NIPQUAD(addr->sin_addr.s_addr));
+               snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -299,8 +299,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(48, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
-                       NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
+                       &addr->sin_addr.s_addr,
                        ntohs(addr->sin_port),
                        protocol);
        }
@@ -322,8 +322,8 @@ static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(30, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
-                               NIPQUAD(addr->sin_addr.s_addr),
+               snprintf(buf, 30, "%pI4.%u.%u",
+                               &addr->sin_addr.s_addr,
                                ntohs(addr->sin_port) >> 8,
                                ntohs(addr->sin_port) & 0xff);
        }
@@ -341,8 +341,7 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(40, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 40, NIP6_FMT,
-                               NIP6(addr->sin6_addr));
+               snprintf(buf, 40, "%pI6",&addr->sin6_addr);
        }
        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 
@@ -357,18 +356,17 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(64, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
-                               NIP6(addr->sin6_addr),
+               snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
+                               &addr->sin6_addr,
                                ntohs(addr->sin6_port),
                                protocol);
        }
        xprt->address_strings[RPC_DISPLAY_ALL] = buf;
 
        buf = kzalloc(36, GFP_KERNEL);
-       if (buf) {
-               snprintf(buf, 36, NIP6_SEQFMT,
-                               NIP6(addr->sin6_addr));
-       }
+       if (buf)
+               snprintf(buf, 36, "%pi6", &addr->sin6_addr);
+
        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
 
        buf = kzalloc(8, GFP_KERNEL);
@@ -380,10 +378,10 @@ static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 
        buf = kzalloc(50, GFP_KERNEL);
        if (buf) {
-               snprintf(buf, 50, NIP6_FMT".%u.%u",
-                               NIP6(addr->sin6_addr),
-                               ntohs(addr->sin6_port) >> 8,
-                               ntohs(addr->sin6_port) & 0xff);
+               snprintf(buf, 50, "%pI6.%u.%u",
+                        &addr->sin6_addr,
+                        ntohs(addr->sin6_port) >> 8,
+                        ntohs(addr->sin6_port) & 0xff);
        }
        xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
 
@@ -469,7 +467,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
        int err, sent = 0;
 
        if (unlikely(!sock))
-               return -ENOTCONN;
+               return -ENOTSOCK;
 
        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
        if (base != 0) {
@@ -579,7 +577,8 @@ static int xs_udp_send_request(struct rpc_task *task)
                                req->rq_svec->iov_base,
                                req->rq_svec->iov_len);
 
-       req->rq_xtime = jiffies;
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
        status = xs_sendpages(transport->sock,
                              xs_addr(xprt),
                              xprt->addrlen, xdr,
@@ -595,24 +594,28 @@ static int xs_udp_send_request(struct rpc_task *task)
                /* Still some bytes left; set up for a retry later. */
                status = -EAGAIN;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
+       case -ENOTSOCK:
+               status = -ENOTCONN;
+               /* Should we call xs_close() here? */
+               break;
        case -EAGAIN:
                xs_nospace(task);
                break;
+       default:
+               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
+                       -status);
        case -ENETUNREACH:
        case -EPIPE:
        case -ECONNREFUSED:
                /* When the server has died, an ICMP port unreachable message
                 * prompts ECONNREFUSED. */
                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               break;
-       default:
-               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               dprintk("RPC:       sendmsg returned unrecognized error %d\n",
-                       -status);
        }
-
+out:
        return status;
 }
 
@@ -671,7 +674,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
         * to cope with writespace callbacks arriving _after_ we have
         * called sendmsg(). */
        while (1) {
-               req->rq_xtime = jiffies;
                status = xs_sendpages(transport->sock,
                                        NULL, 0, xdr, req->rq_bytes_sent);
 
@@ -695,25 +697,28 @@ static int xs_tcp_send_request(struct rpc_task *task)
                status = -EAGAIN;
                break;
        }
+       if (!transport->sock)
+               goto out;
 
        switch (status) {
+       case -ENOTSOCK:
+               status = -ENOTCONN;
+               /* Should we call xs_close() here? */
+               break;
        case -EAGAIN:
                xs_nospace(task);
                break;
-       case -ECONNREFUSED:
-       case -ECONNRESET:
-       case -ENOTCONN:
-       case -EPIPE:
-               status = -ENOTCONN;
-               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
-               break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
                        -status);
-               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+       case -ECONNRESET:
                xs_tcp_shutdown(xprt);
+       case -ECONNREFUSED:
+       case -ENOTCONN:
+       case -EPIPE:
+               clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
        }
-
+out:
        return status;
 }
 
@@ -744,38 +749,59 @@ out_release:
        xprt_release_xprt(xprt, task);
 }
 
-/**
- * xs_close - close a socket
- * @xprt: transport
- *
- * This is used when all requests are complete; ie, no DRC state remains
- * on the server we want to save.
- */
-static void xs_close(struct rpc_xprt *xprt)
+static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       transport->old_data_ready = sk->sk_data_ready;
+       transport->old_state_change = sk->sk_state_change;
+       transport->old_write_space = sk->sk_write_space;
+       transport->old_error_report = sk->sk_error_report;
+}
+
+static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
+{
+       sk->sk_data_ready = transport->old_data_ready;
+       sk->sk_state_change = transport->old_state_change;
+       sk->sk_write_space = transport->old_write_space;
+       sk->sk_error_report = transport->old_error_report;
+}
+
+static void xs_reset_transport(struct sock_xprt *transport)
 {
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct socket *sock = transport->sock;
        struct sock *sk = transport->inet;
 
-       if (!sk)
-               goto clear_close_wait;
-
-       dprintk("RPC:       xs_close xprt %p\n", xprt);
+       if (sk == NULL)
+               return;
 
        write_lock_bh(&sk->sk_callback_lock);
        transport->inet = NULL;
        transport->sock = NULL;
 
        sk->sk_user_data = NULL;
-       sk->sk_data_ready = transport->old_data_ready;
-       sk->sk_state_change = transport->old_state_change;
-       sk->sk_write_space = transport->old_write_space;
+
+       xs_restore_old_callbacks(transport, sk);
        write_unlock_bh(&sk->sk_callback_lock);
 
        sk->sk_no_check = 0;
 
        sock_release(sock);
-clear_close_wait:
+}
+
+/**
+ * xs_close - close a socket
+ * @xprt: transport
+ *
+ * This is used when all requests are complete; ie, no DRC state remains
+ * on the server we want to save.
+ */
+static void xs_close(struct rpc_xprt *xprt)
+{
+       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+       dprintk("RPC:       xs_close xprt %p\n", xprt);
+
+       xs_reset_transport(transport);
+
        smp_mb__before_clear_bit();
        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
        clear_bit(XPRT_CLOSING, &xprt->state);
@@ -1136,12 +1162,13 @@ static void xs_tcp_state_change(struct sock *sk)
                        transport->tcp_flags =
                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 
-                       xprt_wake_pending_tasks(xprt, 0);
+                       xprt_wake_pending_tasks(xprt, -EAGAIN);
                }
                spin_unlock_bh(&xprt->transport_lock);
                break;
        case TCP_FIN_WAIT1:
                /* The client initiated a shutdown of the socket */
+               xprt->connect_cookie++;
                xprt->reestablish_timeout = 0;
                set_bit(XPRT_CLOSING, &xprt->state);
                smp_mb__before_clear_bit();
@@ -1151,9 +1178,9 @@ static void xs_tcp_state_change(struct sock *sk)
                break;
        case TCP_CLOSE_WAIT:
                /* The server initiated a shutdown of the socket */
-               set_bit(XPRT_CLOSING, &xprt->state);
                xprt_force_disconnect(xprt);
        case TCP_SYN_SENT:
+               xprt->connect_cookie++;
        case TCP_CLOSING:
                /*
                 * If the server closed down the connection, make sure that
@@ -1163,6 +1190,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
                break;
        case TCP_LAST_ACK:
+               set_bit(XPRT_CLOSING, &xprt->state);
                smp_mb__before_clear_bit();
                clear_bit(XPRT_CONNECTED, &xprt->state);
                smp_mb__after_clear_bit();
@@ -1180,6 +1208,25 @@ static void xs_tcp_state_change(struct sock *sk)
 }
 
 /**
+ * xs_error_report - callback mainly for catching socket errors
+ * @sk: socket
+ */
+static void xs_error_report(struct sock *sk)
+{
+       struct rpc_xprt *xprt;
+
+       read_lock(&sk->sk_callback_lock);
+       if (!(xprt = xprt_from_sock(sk)))
+               goto out;
+       dprintk("RPC:       %s client %p...\n"
+                       "RPC:       error %d\n",
+                       __func__, xprt, sk->sk_err);
+       xprt_wake_pending_tasks(xprt, -EAGAIN);
+out:
+       read_unlock(&sk->sk_callback_lock);
+}
+
+/**
  * xs_udp_write_space - callback invoked when socket buffer space
  *                             becomes available
  * @sk: socket whose state has changed
@@ -1376,8 +1423,8 @@ static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       %s "NIPQUAD_FMT":%u: %s (%d)\n",
-                       __FUNCTION__, NIPQUAD(myaddr.sin_addr),
+       dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
+                       __func__, &myaddr.sin_addr,
                        port, err ? "failed" : "ok", err);
        return err;
 }
@@ -1409,8 +1456,8 @@ static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
                if (port > last)
                        nloop++;
        } while (err == -EADDRINUSE && nloop != 2);
-       dprintk("RPC:       xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
-               NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
+       dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
+               &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
        return err;
 }
 
@@ -1454,12 +1501,12 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
+               sk->sk_error_report = xs_error_report;
                sk->sk_no_check = UDP_CSUM_NORCV;
                sk->sk_allocation = GFP_ATOMIC;
 
@@ -1488,13 +1535,14 @@ static void xs_udp_connect_worker4(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1529,13 +1577,14 @@ static void xs_udp_connect_worker6(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        /* Start by resetting any existing state */
-       xs_close(xprt);
+       xs_reset_transport(transport);
 
-       if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+       err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
+       if (err < 0) {
                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
                goto out;
        }
@@ -1560,10 +1609,9 @@ out:
  * We need to preserve the port number so the reply cache on the server can
  * find our cached RPC replies when we get around to reconnecting.
  */
-static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
+static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
 {
        int result;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct sockaddr any;
 
        dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
@@ -1580,6 +1628,17 @@ static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
                                result);
 }
 
+static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
+{
+       unsigned int state = transport->inet->sk_state;
+
+       if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
+               return;
+       if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
+               return;
+       xs_abort_connection(xprt, transport);
+}
+
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -1589,13 +1648,13 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
                write_lock_bh(&sk->sk_callback_lock);
 
+               xs_save_old_callbacks(transport, sk);
+
                sk->sk_user_data = xprt;
-               transport->old_data_ready = sk->sk_data_ready;
-               transport->old_state_change = sk->sk_state_change;
-               transport->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
+               sk->sk_error_report = xs_error_report;
                sk->sk_allocation = GFP_ATOMIC;
 
                /* socket options */
@@ -1613,6 +1672,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
+       if (!xprt_bound(xprt))
+               return -ENOTCONN;
+
        /* Tell the socket layer to start connecting... */
        xprt->stat.connect_count++;
        xprt->stat.connect_start = jiffies;
@@ -1633,7 +1695,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        if (!sock) {
@@ -1650,7 +1712,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
                }
        } else
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1659,20 +1721,22 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
                        xprt, -status, xprt_connected(xprt),
                        sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
+       switch (status) {
+       case 0:
+       case -EINPROGRESS:
+       case -EALREADY:
+               goto out_clear;
+       case -ECONNREFUSED:
+       case -ECONNRESET:
+               /* retry with existing socket, after a delay */
+               break;
+       default:
+               /* get rid of existing socket, and retry */
+               xs_tcp_shutdown(xprt);
+               printk("%s: connect returned unhandled error %d\n",
+                               __func__, status);
        }
+       status = -EAGAIN;
 out:
        xprt_wake_pending_tasks(xprt, status);
 out_clear:
@@ -1693,7 +1757,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
        struct socket *sock = transport->sock;
        int err, status = -EIO;
 
-       if (xprt->shutdown || !xprt_bound(xprt))
+       if (xprt->shutdown)
                goto out;
 
        if (!sock) {
@@ -1710,7 +1774,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
                }
        } else
                /* "close" the socket, preserving the local port */
-               xs_tcp_reuse_connection(xprt);
+               xs_tcp_reuse_connection(xprt, transport);
 
        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
@@ -1718,20 +1782,22 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
        status = xs_tcp_finish_connecting(xprt, sock);
        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
                        xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
-       if (status < 0) {
-               switch (status) {
-                       case -EINPROGRESS:
-                       case -EALREADY:
-                               goto out_clear;
-                       case -ECONNREFUSED:
-                       case -ECONNRESET:
-                               /* retry with existing socket, after a delay */
-                               break;
-                       default:
-                               /* get rid of existing socket, and retry */
-                               xs_tcp_shutdown(xprt);
-               }
+       switch (status) {
+       case 0:
+       case -EINPROGRESS:
+       case -EALREADY:
+               goto out_clear;
+       case -ECONNREFUSED:
+       case -ECONNRESET:
+               /* retry with existing socket, after a delay */
+               break;
+       default:
+               /* get rid of existing socket, and retry */
+               xs_tcp_shutdown(xprt);
+               printk("%s: connect returned unhandled error %d\n",
+                               __func__, status);
        }
+       status = -EAGAIN;
 out:
        xprt_wake_pending_tasks(xprt, status);
 out_clear:
@@ -1780,9 +1846,6 @@ static void xs_tcp_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       /* Initiate graceful shutdown of the socket if not already done */
-       if (test_bit(XPRT_CONNECTED, &xprt->state))
-               xs_tcp_shutdown(xprt);
        /* Exit if we need to wait for socket shutdown to complete */
        if (test_bit(XPRT_CLOSING, &xprt->state))
                return;