#endif
/*
- * How many times to try sending a request on a socket before waiting
- * for the socket buffer to clear.
- */
-#define XS_SENDMSG_RETRY (10U)
-
-/*
* Time out for an RPC UDP socket connect. UDP socket connects are
* synchronous, but we set a timeout anyway in case of resource
* exhaustion on the local host.
return (struct sockaddr_in6 *) &xprt->addr;
}
-static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
+static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
+ const char *protocol,
+ const char *netid)
{
struct sockaddr_in *addr = xs_addr_in(xprt);
char *buf;
}
xprt->address_strings[RPC_DISPLAY_PORT] = buf;
- buf = kzalloc(8, GFP_KERNEL);
- if (buf) {
- if (xprt->prot == IPPROTO_UDP)
- snprintf(buf, 8, "udp");
- else
- snprintf(buf, 8, "tcp");
- }
- xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
+ xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
buf = kzalloc(48, GFP_KERNEL);
if (buf) {
snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
NIPQUAD(addr->sin_addr.s_addr),
ntohs(addr->sin_port),
- xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+ protocol);
}
xprt->address_strings[RPC_DISPLAY_ALL] = buf;
}
xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
- xprt->address_strings[RPC_DISPLAY_NETID] =
- kstrdup(xprt->prot == IPPROTO_UDP ?
- RPCBIND_NETID_UDP : RPCBIND_NETID_TCP, GFP_KERNEL);
+ xprt->address_strings[RPC_DISPLAY_NETID] = netid;
}
-static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt)
+static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
+ const char *protocol,
+ const char *netid)
{
struct sockaddr_in6 *addr = xs_addr_in6(xprt);
char *buf;
}
xprt->address_strings[RPC_DISPLAY_PORT] = buf;
- buf = kzalloc(8, GFP_KERNEL);
- if (buf) {
- if (xprt->prot == IPPROTO_UDP)
- snprintf(buf, 8, "udp");
- else
- snprintf(buf, 8, "tcp");
- }
- xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
+ xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
buf = kzalloc(64, GFP_KERNEL);
if (buf) {
snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
NIP6(addr->sin6_addr),
ntohs(addr->sin6_port),
- xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+ protocol);
}
xprt->address_strings[RPC_DISPLAY_ALL] = buf;
}
xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
- xprt->address_strings[RPC_DISPLAY_NETID] =
- kstrdup(xprt->prot == IPPROTO_UDP ?
- RPCBIND_NETID_UDP6 : RPCBIND_NETID_TCP6, GFP_KERNEL);
+ xprt->address_strings[RPC_DISPLAY_NETID] = netid;
}
static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
- int i;
+ unsigned int i;
for (i = 0; i < RPC_DISPLAY_MAX; i++)
- kfree(xprt->address_strings[i]);
+ switch (i) {
+ case RPC_DISPLAY_PROTO:
+ case RPC_DISPLAY_NETID:
+ continue;
+ default:
+ kfree(xprt->address_strings[i]);
+ }
}
#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
return sent;
}
+static void xs_nospace_callback(struct rpc_task *task)
+{
+ struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
+
+ transport->inet->sk_write_pending--;
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+}
+
/**
* xs_nospace - place task on wait queue if transmit was incomplete
* @task: task to put to sleep
task->tk_pid, req->rq_slen - req->rq_bytes_sent,
req->rq_slen);
- if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
- /* Protect against races with write_space */
- spin_lock_bh(&xprt->transport_lock);
-
- /* Don't race with disconnect */
- if (!xprt_connected(xprt))
- task->tk_status = -ENOTCONN;
- else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
- xprt_wait_for_buffer_space(task);
+ /* Protect against races with write_space */
+ spin_lock_bh(&xprt->transport_lock);
+
+ /* Don't race with disconnect */
+ if (xprt_connected(xprt)) {
+ if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
+ /*
+ * Notify TCP that we're limited by the application
+ * window size
+ */
+ set_bit(SOCK_NOSPACE, &transport->sock->flags);
+ transport->inet->sk_write_pending++;
+ /* ...and wait for more buffer space */
+ xprt_wait_for_buffer_space(task, xs_nospace_callback);
+ }
+ } else {
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ task->tk_status = -ENOTCONN;
+ }
- spin_unlock_bh(&xprt->transport_lock);
- } else
- /* Keep holding the socket if it is blocked */
- rpc_delay(task, HZ>>4);
+ spin_unlock_bh(&xprt->transport_lock);
}
/**
dprintk("RPC: xs_udp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status);
- if (likely(status >= (int) req->rq_slen))
- return 0;
-
- /* Still some bytes left; set up for a retry later. */
- if (status > 0)
+ if (status >= 0) {
+ task->tk_bytes_sent += status;
+ if (status >= req->rq_slen)
+ return 0;
+ /* Still some bytes left; set up for a retry later. */
status = -EAGAIN;
+ }
switch (status) {
+ case -EAGAIN:
+ xs_nospace(task);
+ break;
case -ENETUNREACH:
case -EPIPE:
case -ECONNREFUSED:
/* When the server has died, an ICMP port unreachable message
* prompts ECONNREFUSED. */
- break;
- case -EAGAIN:
- xs_nospace(task);
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
break;
default:
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
- break;
}
return status;
}
+/**
+ * xs_tcp_shutdown - gracefully shut down a TCP socket
+ * @xprt: transport
+ *
+ * Initiates a graceful shutdown of the TCP socket by calling the
+ * equivalent of shutdown(SHUT_WR);
+ */
+static void xs_tcp_shutdown(struct rpc_xprt *xprt)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ struct socket *sock = transport->sock;
+
+ if (sock != NULL)
+ kernel_sock_shutdown(sock, SHUT_WR);
+}
+
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
u32 reclen = buf->len - sizeof(rpc_fraghdr);
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
int status;
- unsigned int retry = 0;
xs_encode_tcp_record_marker(&req->rq_snd_buf);
return 0;
}
+ if (status != 0)
+ continue;
status = -EAGAIN;
- if (retry++ > XS_SENDMSG_RETRY)
- break;
+ break;
}
switch (status) {
case -ENOTCONN:
case -EPIPE:
status = -ENOTCONN;
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
break;
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
- xprt_disconnect(xprt);
- break;
+ clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ xs_tcp_shutdown(xprt);
}
return status;
clear_close_wait:
smp_mb__before_clear_bit();
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ clear_bit(XPRT_CLOSING, &xprt->state);
smp_mb__after_clear_bit();
+ xprt_disconnect_done(xprt);
}
/**
cancel_rearming_delayed_work(&transport->connect_worker);
- xprt_disconnect(xprt);
xs_close(xprt);
xs_free_peer_addresses(xprt);
kfree(xprt->slot);
copied = repsize;
/* Suck it into the iovec, verify checksum if not done by hw. */
- if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
+ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
+ UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
goto out_unlock;
+ }
+
+ UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
/* Something worked... */
dst_confirm(skb->dst);
/* Sanity check of the record length */
if (unlikely(transport->tcp_reclen < 4)) {
dprintk("RPC: invalid TCP record fragment length\n");
- xprt_disconnect(xprt);
+ xprt_force_disconnect(xprt);
return;
}
dprintk("RPC: reading TCP record fragment of length %d\n",
{
struct rpc_xprt *xprt;
read_descriptor_t rd_desc;
+ int read;
dprintk("RPC: xs_tcp_data_ready...\n");
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
rd_desc.arg.data = xprt;
- rd_desc.count = 65536;
- tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+ do {
+ rd_desc.count = 65536;
+ read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+ } while (read > 0);
out:
read_unlock(&sk->sk_callback_lock);
}
transport->tcp_flags =
TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
- xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
xprt_wake_pending_tasks(xprt, 0);
}
spin_unlock_bh(&xprt->transport_lock);
break;
- case TCP_SYN_SENT:
- case TCP_SYN_RECV:
+ case TCP_FIN_WAIT1:
+ /* The client initiated a shutdown of the socket */
+ xprt->reestablish_timeout = 0;
+ set_bit(XPRT_CLOSING, &xprt->state);
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CONNECTED, &xprt->state);
+ clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ smp_mb__after_clear_bit();
break;
case TCP_CLOSE_WAIT:
- /* Try to schedule an autoclose RPC calls */
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
- if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
- queue_work(rpciod_workqueue, &xprt->task_cleanup);
- default:
- xprt_disconnect(xprt);
+ /* The server initiated a shutdown of the socket */
+ set_bit(XPRT_CLOSING, &xprt->state);
+ xprt_force_disconnect(xprt);
+ case TCP_SYN_SENT:
+ case TCP_CLOSING:
+ /*
+ * If the server closed down the connection, make sure that
+ * we back off before reconnecting
+ */
+ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
+ xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
+ break;
+ case TCP_LAST_ACK:
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CONNECTED, &xprt->state);
+ smp_mb__after_clear_bit();
+ break;
+ case TCP_CLOSE:
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ clear_bit(XPRT_CLOSING, &xprt->state);
+ smp_mb__after_clear_bit();
+ /* Mark transport as closed and wake up all pending tasks */
+ xprt_disconnect_done(xprt);
}
out:
read_unlock(&sk->sk_callback_lock);
if (unlikely(!(sock = sk->sk_socket)))
goto out;
+ clear_bit(SOCK_NOSPACE, &sock->flags);
+
if (unlikely(!(xprt = xprt_from_sock(sk))))
goto out;
- if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
+ if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
goto out;
xprt_write_space(xprt);
if (unlikely(!(sock = sk->sk_socket)))
goto out;
+ clear_bit(SOCK_NOSPACE, &sock->flags);
+
if (unlikely(!(xprt = xprt_from_sock(sk))))
goto out;
- if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
+ if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
goto out;
xprt_write_space(xprt);
}
}
+static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
+{
+ unsigned short port = transport->port;
+
+ if (port == 0 && transport->xprt.resvport)
+ port = xs_get_random_port();
+ return port;
+}
+
+static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
+{
+ if (transport->port != 0)
+ transport->port = 0;
+ if (!transport->xprt.resvport)
+ return 0;
+ if (port <= xprt_min_resvport || port > xprt_max_resvport)
+ return xprt_max_resvport;
+ return --port;
+}
+
static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
{
struct sockaddr_in myaddr = {
.sin_family = AF_INET,
};
struct sockaddr_in *sa;
- int err;
- unsigned short port = transport->port;
+ int err, nloop = 0;
+ unsigned short port = xs_get_srcport(transport, sock);
+ unsigned short last;
- if (!transport->xprt.resvport)
- port = 0;
sa = (struct sockaddr_in *)&transport->addr;
myaddr.sin_addr = sa->sin_addr;
do {
myaddr.sin_port = htons(port);
err = kernel_bind(sock, (struct sockaddr *) &myaddr,
sizeof(myaddr));
- if (!transport->xprt.resvport)
+ if (port == 0)
break;
if (err == 0) {
transport->port = port;
break;
}
- if (port <= xprt_min_resvport)
- port = xprt_max_resvport;
- else
- port--;
- } while (err == -EADDRINUSE && port != transport->port);
+ last = port;
+ port = xs_next_srcport(transport, sock, port);
+ if (port > last)
+ nloop++;
+ } while (err == -EADDRINUSE && nloop != 2);
dprintk("RPC: %s "NIPQUAD_FMT":%u: %s (%d)\n",
__FUNCTION__, NIPQUAD(myaddr.sin_addr),
port, err ? "failed" : "ok", err);
.sin6_family = AF_INET6,
};
struct sockaddr_in6 *sa;
- int err;
- unsigned short port = transport->port;
+ int err, nloop = 0;
+ unsigned short port = xs_get_srcport(transport, sock);
+ unsigned short last;
- if (!transport->xprt.resvport)
- port = 0;
sa = (struct sockaddr_in6 *)&transport->addr;
myaddr.sin6_addr = sa->sin6_addr;
do {
myaddr.sin6_port = htons(port);
err = kernel_bind(sock, (struct sockaddr *) &myaddr,
sizeof(myaddr));
- if (!transport->xprt.resvport)
+ if (port == 0)
break;
if (err == 0) {
transport->port = port;
break;
}
- if (port <= xprt_min_resvport)
- port = xprt_max_resvport;
- else
- port--;
- } while (err == -EADDRINUSE && port != transport->port);
+ last = port;
+ port = xs_next_srcport(transport, sock, port);
+ if (port > last)
+ nloop++;
+ } while (err == -EADDRINUSE && nloop != 2);
dprintk("RPC: xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
return err;
{
struct sock *sk = sock->sk;
- BUG_ON(sk->sk_lock.owner != NULL);
+ BUG_ON(sock_owned_by_user(sk));
sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
}
{
struct sock *sk = sock->sk;
- BUG_ON(sk->sk_lock.owner != NULL);
+ BUG_ON(sock_owned_by_user(sk));
sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
}
break;
default:
/* get rid of existing socket, and retry */
- xs_close(xprt);
- break;
+ xs_tcp_shutdown(xprt);
}
}
out:
break;
default:
/* get rid of existing socket, and retry */
- xs_close(xprt);
- break;
+ xs_tcp_shutdown(xprt);
}
}
out:
}
}
+static void xs_tcp_connect(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+
+ /* Initiate graceful shutdown of the socket if not already done */
+ if (test_bit(XPRT_CONNECTED, &xprt->state))
+ xs_tcp_shutdown(xprt);
+ /* Exit if we need to wait for socket shutdown to complete */
+ if (test_bit(XPRT_CLOSING, &xprt->state))
+ return;
+ xs_connect(task);
+}
+
/**
* xs_udp_print_stats - display UDP socket-specifc stats
* @xprt: rpc_xprt struct containing statistics
.release_xprt = xs_tcp_release_xprt,
.rpcbind = rpcb_getport_async,
.set_port = xs_set_port,
- .connect = xs_connect,
+ .connect = xs_tcp_connect,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
- .close = xs_close,
+ .close = xs_tcp_shutdown,
.destroy = xs_destroy,
.print_stats = xs_tcp_print_stats,
};
xprt->addrlen = args->addrlen;
if (args->srcaddr)
memcpy(&new->addr, args->srcaddr, args->addrlen);
- new->port = xs_get_random_port();
return xprt;
}
+static const struct rpc_timeout xs_udp_default_timeout = {
+ .to_initval = 5 * HZ,
+ .to_maxval = 30 * HZ,
+ .to_increment = 5 * HZ,
+ .to_retries = 5,
+};
+
/**
* xs_setup_udp - Set up transport to use a UDP socket
* @args: rpc transport creation arguments
*
*/
-struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
+static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
{
struct sockaddr *addr = args->dstaddr;
struct rpc_xprt *xprt;
xprt->ops = &xs_udp_ops;
- if (args->timeout)
- xprt->timeout = *args->timeout;
- else
- xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
+ xprt->timeout = &xs_udp_default_timeout;
switch (addr->sa_family) {
case AF_INET:
INIT_DELAYED_WORK(&transport->connect_worker,
xs_udp_connect_worker4);
- xs_format_ipv4_peer_addresses(xprt);
+ xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
INIT_DELAYED_WORK(&transport->connect_worker,
xs_udp_connect_worker6);
- xs_format_ipv6_peer_addresses(xprt);
+ xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
break;
default:
kfree(xprt);
return ERR_PTR(-EINVAL);
}
+static const struct rpc_timeout xs_tcp_default_timeout = {
+ .to_initval = 60 * HZ,
+ .to_maxval = 60 * HZ,
+ .to_retries = 2,
+};
+
/**
* xs_setup_tcp - Set up transport to use a TCP socket
* @args: rpc transport creation arguments
*
*/
-struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
+static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
{
struct sockaddr *addr = args->dstaddr;
struct rpc_xprt *xprt;
xprt->idle_timeout = XS_IDLE_DISC_TO;
xprt->ops = &xs_tcp_ops;
-
- if (args->timeout)
- xprt->timeout = *args->timeout;
- else
- xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
+ xprt->timeout = &xs_tcp_default_timeout;
switch (addr->sa_family) {
case AF_INET:
xprt_set_bound(xprt);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
- xs_format_ipv4_peer_addresses(xprt);
+ xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
xprt_set_bound(xprt);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
- xs_format_ipv6_peer_addresses(xprt);
+ xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
break;
default:
kfree(xprt);