X-Git-Url: http://ftp.safe.ca/?a=blobdiff_plain;f=net%2Fsunrpc%2Fxprtsock.c;h=83c73c4d017ad9ec91b4f1895344749c51b9839b;hb=23ac6581702ac6d029643328a7e6ea3baf834c5e;hp=fbc8725c20cbcfaed6d458e79e470393bb980ffc;hpb=55420c24a0d4d1fce70ca713f84aa00b6b74a70e;p=safe%2Fjmp%2Flinux-2.6 diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fbc8725..83c73c4 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -34,6 +34,9 @@ #include #include #include +#ifdef CONFIG_NFS_V4_1 +#include +#endif #include #include @@ -270,6 +273,13 @@ struct sock_xprt { #define TCP_RCV_COPY_FRAGHDR (1UL << 1) #define TCP_RCV_COPY_XID (1UL << 2) #define TCP_RCV_COPY_DATA (1UL << 3) +#define TCP_RCV_READ_CALLDIR (1UL << 4) +#define TCP_RCV_COPY_CALLDIR (1UL << 5) + +/* + * TCP RPC flags + */ +#define TCP_RPC_REPLY (1UL << 6) static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { @@ -807,6 +817,9 @@ static void xs_reset_transport(struct sock_xprt *transport) * * This is used when all requests are complete; ie, no DRC state remains * on the server we want to save. + * + * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with + * xs_reset_transport() zeroing the socket from underneath a writer. */ static void xs_close(struct rpc_xprt *xprt) { @@ -824,6 +837,14 @@ static void xs_close(struct rpc_xprt *xprt) xprt_disconnect_done(xprt); } +static void xs_tcp_close(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) + xs_close(xprt); + else + xs_tcp_shutdown(xprt); +} + /** * xs_destroy - prepare to shutdown a transport * @xprt: doomed transport @@ -907,7 +928,7 @@ static void xs_udp_data_ready(struct sock *sk, int len) UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); /* Something worked... */ - dst_confirm(skb->dst); + dst_confirm(skb_dst(skb)); xprt_adjust_cwnd(task, copied); xprt_update_rtt(task); @@ -945,7 +966,7 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_offset = 0; /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 4)) { + if (unlikely(transport->tcp_reclen < 8)) { dprintk("RPC: invalid TCP record fragment length\n"); xprt_force_disconnect(xprt); return; @@ -980,33 +1001,77 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r if (used != len) return; transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_COPY_DATA; + transport->tcp_flags |= TCP_RCV_READ_CALLDIR; transport->tcp_copied = 4; - dprintk("RPC: reading reply for XID %08x\n", + dprintk("RPC: reading %s XID %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" + : "request with", ntohl(transport->tcp_xid)); xs_tcp_check_fraghdr(transport); } -static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) +static inline void xs_tcp_read_calldir(struct sock_xprt *transport, + struct xdr_skb_reader *desc) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; + size_t len, used; + u32 offset; + __be32 calldir; + + /* + * We want transport->tcp_offset to be 8 at the end of this routine + * (4 bytes for the xid and 4 bytes for the call/reply flag). + * When this function is called for the first time, + * transport->tcp_offset is 4 (after having already read the xid). + */ + offset = transport->tcp_offset - sizeof(transport->tcp_xid); + len = sizeof(calldir) - offset; + dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); + used = xdr_skb_read_bits(desc, &calldir, len); + transport->tcp_offset += used; + if (used != len) + return; + transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_DATA; + /* + * We don't yet have the XDR buffer, so we will write the calldir + * out after we get the buffer from the 'struct rpc_rqst' + */ + if (ntohl(calldir) == RPC_REPLY) + transport->tcp_flags |= TCP_RPC_REPLY; + else + transport->tcp_flags &= ~TCP_RPC_REPLY; + dprintk("RPC: reading %s CALL/REPLY flag %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? + "reply for" : "request with", calldir); + xs_tcp_check_fraghdr(transport); +} + +static inline void xs_tcp_read_common(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc, + struct rpc_rqst *req) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *rcvbuf; size_t len; ssize_t r; - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); - if (!req) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); - return; + rcvbuf = &req->rq_private_buf; + + if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + /* + * Save the RPC direction in the XDR buffer + */ + __be32 calldir = transport->tcp_flags & TCP_RPC_REPLY ? + htonl(RPC_REPLY) : 0; + + memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, + &calldir, sizeof(calldir)); + transport->tcp_copied += sizeof(calldir); + transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; } - rcvbuf = &req->rq_private_buf; len = desc->count; if (len > transport->tcp_reclen - transport->tcp_offset) { struct xdr_skb_reader my_desc; @@ -1043,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea "tcp_offset = %u, tcp_reclen = %u\n", xprt, transport->tcp_copied, transport->tcp_offset, transport->tcp_reclen); - goto out; + return; } dprintk("RPC: XID %08x read %Zd bytes\n", @@ -1059,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_flags &= ~TCP_RCV_COPY_DATA; } -out: + return; +} + +/* + * Finds the request corresponding to the RPC xid and invokes the common + * tcp read code to read the data. + */ +static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); + + /* Find and lock the request corresponding to this xid */ + spin_lock(&xprt->transport_lock); + req = xprt_lookup_rqst(xprt, transport->tcp_xid); + if (!req) { + dprintk("RPC: XID %08x request not found!\n", + ntohl(transport->tcp_xid)); + spin_unlock(&xprt->transport_lock); + return -1; + } + + xs_tcp_read_common(xprt, desc, req); + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); + spin_unlock(&xprt->transport_lock); - xs_tcp_check_fraghdr(transport); + return 0; +} + +#if defined(CONFIG_NFS_V4_1) +/* + * Obtains an rpc_rqst previously allocated and invokes the common + * tcp read code to read the data. The result is placed in the callback + * queue. + * If we're unable to obtain the rpc_rqst we schedule the closing of the + * connection and return -1. + */ +static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + req = xprt_alloc_bc_request(xprt); + if (req == NULL) { + printk(KERN_WARNING "Callback slot table overflowed\n"); + xprt_force_disconnect(xprt); + return -1; + } + + req->rq_xid = transport->tcp_xid; + dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); + xs_tcp_read_common(xprt, desc, req); + + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { + struct svc_serv *bc_serv = xprt->bc_serv; + + /* + * Add callback request to callback list. The callback + * service sleeps on the sv_cb_waitq waiting for new + * requests. Wake it up after adding enqueing the + * request. + */ + dprintk("RPC: add callback request to list\n"); + spin_lock(&bc_serv->sv_cb_lock); + list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + wake_up(&bc_serv->sv_cb_waitq); + } + + req->rq_private_buf.len = transport->tcp_copied; + + return 0; +} + +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + return (transport->tcp_flags & TCP_RPC_REPLY) ? + xs_tcp_read_reply(xprt, desc) : + xs_tcp_read_callback(xprt, desc); +} +#else +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + return xs_tcp_read_reply(xprt, desc); +} +#endif /* CONFIG_NFS_V4_1 */ + +/* + * Read data off the transport. This can be either an RPC_CALL or an + * RPC_REPLY. Relay the processing to helper functions. + */ +static void xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + if (_xs_tcp_read_data(xprt, desc) == 0) + xs_tcp_check_fraghdr(transport); + else { + /* + * The transport_lock protects the request handling. + * There's no need to hold it to update the tcp_flags. + */ + transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + } } static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) @@ -1103,9 +1282,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns xs_tcp_read_xid(transport, &desc); continue; } + /* Read in the call/reply flag */ + if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { + xs_tcp_read_calldir(transport, &desc); + continue; + } /* Read in the request data */ if (transport->tcp_flags & TCP_RCV_COPY_DATA) { - xs_tcp_read_request(xprt, &desc); + xs_tcp_read_data(xprt, &desc); continue; } /* Skip over any trailing bytes on short reads */ @@ -1280,6 +1464,23 @@ out: read_unlock(&sk->sk_callback_lock); } +static void xs_write_space(struct sock *sk) +{ + struct socket *sock; + struct rpc_xprt *xprt; + + if (unlikely(!(sock = sk->sk_socket))) + return; + clear_bit(SOCK_NOSPACE, &sock->flags); + + if (unlikely(!(xprt = xprt_from_sock(sk)))) + return; + if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) + return; + + xprt_write_space(xprt); +} + /** * xs_udp_write_space - callback invoked when socket buffer space * becomes available @@ -1295,23 +1496,9 @@ static void xs_udp_write_space(struct sock *sk) read_lock(&sk->sk_callback_lock); /* from net/core/sock.c:sock_def_write_space */ - if (sock_writeable(sk)) { - struct socket *sock; - struct rpc_xprt *xprt; - - if (unlikely(!(sock = sk->sk_socket))) - goto out; - clear_bit(SOCK_NOSPACE, &sock->flags); - - if (unlikely(!(xprt = xprt_from_sock(sk)))) - goto out; - if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) - goto out; + if (sock_writeable(sk)) + xs_write_space(sk); - xprt_write_space(xprt); - } - - out: read_unlock(&sk->sk_callback_lock); } @@ -1330,23 +1517,9 @@ static void xs_tcp_write_space(struct sock *sk) read_lock(&sk->sk_callback_lock); /* from net/core/stream.c:sk_stream_write_space */ - if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { - struct socket *sock; - struct rpc_xprt *xprt; - - if (unlikely(!(sock = sk->sk_socket))) - goto out; - clear_bit(SOCK_NOSPACE, &sock->flags); + if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) + xs_write_space(sk); - if (unlikely(!(xprt = xprt_from_sock(sk)))) - goto out; - if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) - goto out; - - xprt_write_space(xprt); - } - - out: read_unlock(&sk->sk_callback_lock); } @@ -1783,6 +1956,16 @@ static void xs_tcp_setup_socket(struct rpc_xprt *xprt, xprt, -status, xprt_connected(xprt), sock->sk->sk_state); switch (status) { + default: + printk("%s: connect returned unhandled error %d\n", + __func__, status); + case -EADDRNOTAVAIL: + /* We're probably in TIME_WAIT. Get rid of existing socket, + * and retry + */ + set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); + xprt_force_disconnect(xprt); + break; case -ECONNREFUSED: case -ECONNRESET: case -ENETUNREACH: @@ -1793,10 +1976,6 @@ static void xs_tcp_setup_socket(struct rpc_xprt *xprt, xprt_clear_connecting(xprt); return; } - /* get rid of existing socket, and retry */ - xs_tcp_shutdown(xprt); - printk("%s: connect returned unhandled error %d\n", - __func__, status); out_eagain: status = -EAGAIN; out: @@ -2005,7 +2184,10 @@ static struct rpc_xprt_ops xs_tcp_ops = { .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, - .close = xs_tcp_shutdown, +#if defined(CONFIG_NFS_V4_1) + .release_request = bc_release_request, +#endif /* CONFIG_NFS_V4_1 */ + .close = xs_tcp_close, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, };