[PATCH] RPC: skip over transport-specific heads automatically
authorChuck Lever <cel@netapp.com>
Thu, 25 Aug 2005 23:25:49 +0000 (16:25 -0700)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Fri, 23 Sep 2005 16:38:33 +0000 (12:38 -0400)
 Add a generic mechanism for skipping over transport-specific headers
 when constructing an RPC request.  This removes another "xprt->stream"
 dependency.

 Test-plan:
 Write-intensive workload on a single mount point (try both UDP and
 TCP).

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/msg_prot.h
include/linux/sunrpc/xprt.h
net/sunrpc/auth_gss/auth_gss.c
net/sunrpc/clnt.c
net/sunrpc/xprtsock.c

index 15f1153..f43f237 100644 (file)
@@ -76,5 +76,30 @@ enum rpc_auth_stat {
 
 #define RPC_MAXNETNAMELEN      256
 
+/*
+ * From RFC 1831:
+ *
+ * "A record is composed of one or more record fragments.  A record
+ *  fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of
+ *  fragment data.  The bytes encode an unsigned binary number; as with
+ *  XDR integers, the byte order is from highest to lowest.  The number
+ *  encodes two values -- a boolean which indicates whether the fragment
+ *  is the last fragment of the record (bit value 1 implies the fragment
+ *  is the last fragment) and a 31-bit unsigned binary value which is the
+ *  length in bytes of the fragment's data.  The boolean value is the
+ *  highest-order bit of the header; the length is the 31 low-order bits.
+ *  (Note that this record specification is NOT in XDR standard form!)"
+ *
+ * The Linux RPC client always sends its requests in a single record
+ * fragment, limiting the maximum payload size for stream transports to
+ * 2GB.
+ */
+
+typedef u32    rpc_fraghdr;
+
+#define        RPC_LAST_STREAM_FRAGMENT        (1U << 31)
+#define        RPC_FRAGMENT_SIZE_MASK          (~RPC_LAST_STREAM_FRAGMENT)
+#define        RPC_MAX_FRAGMENT_SIZE           ((1U << 31) - 1)
+
 #endif /* __KERNEL__ */
 #endif /* _LINUX_SUNRPC_MSGPROT_H_ */
index e73174c..966c456 100644 (file)
@@ -155,6 +155,8 @@ struct rpc_xprt {
 
        size_t                  max_payload;    /* largest RPC payload size,
                                                   in bytes */
+       unsigned int            tsh_size;       /* size of transport specific
+                                                  header */
 
        struct rpc_wait_queue   sending;        /* requests waiting to send */
        struct rpc_wait_queue   resend;         /* requests waiting to resend */
@@ -236,6 +238,11 @@ int                        xprt_adjust_timeout(struct rpc_rqst *req);
 void                   xprt_release(struct rpc_task *task);
 int                    xprt_destroy(struct rpc_xprt *xprt);
 
+static inline u32 *xprt_skip_transport_header(struct rpc_xprt *xprt, u32 *p)
+{
+       return p + xprt->tsh_size;
+}
+
 /*
  * Transport switch helper functions
  */
index 53a030a..d2b08f1 100644 (file)
@@ -844,10 +844,8 @@ gss_marshal(struct rpc_task *task, u32 *p)
 
        /* We compute the checksum for the verifier over the xdr-encoded bytes
         * starting with the xid and ending at the end of the credential: */
-       iov.iov_base = req->rq_snd_buf.head[0].iov_base;
-       if (task->tk_client->cl_xprt->stream)
-               /* See clnt.c:call_header() */
-               iov.iov_base += 4;
+       iov.iov_base = xprt_skip_transport_header(task->tk_xprt,
+                                       req->rq_snd_buf.head[0].iov_base);
        iov.iov_len = (u8 *)p - (u8 *)iov.iov_base;
        xdr_buf_from_iov(&iov, &verf_buf);
 
index 4677959..cc1b773 100644 (file)
@@ -1075,13 +1075,12 @@ static u32 *
 call_header(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
-       struct rpc_xprt *xprt = clnt->cl_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
        u32             *p = req->rq_svec[0].iov_base;
 
        /* FIXME: check buffer size? */
-       if (xprt->stream)
-               *p++ = 0;               /* fill in later */
+
+       p = xprt_skip_transport_header(task->tk_xprt, p);
        *p++ = req->rq_xid;             /* XID */
        *p++ = htonl(RPC_CALL);         /* CALL */
        *p++ = htonl(RPC_VERSION);      /* RPC version */
index 5798830..aaf053b 100644 (file)
@@ -282,6 +282,13 @@ static int xs_udp_send_request(struct rpc_task *task)
        return status;
 }
 
+static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
+{
+       u32 reclen = buf->len - sizeof(rpc_fraghdr);
+       rpc_fraghdr *base = buf->head[0].iov_base;
+       *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
+}
+
 /**
  * xs_tcp_send_request - write an RPC request to a TCP socket
  * @task: address of RPC task that manages the state of an RPC request
@@ -301,11 +308,9 @@ static int xs_tcp_send_request(struct rpc_task *task)
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct xdr_buf *xdr = &req->rq_snd_buf;
-       u32 *marker = req->rq_svec[0].iov_base;
        int status, retry = 0;
 
-       /* Write the record marker */
-       *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
+       xs_encode_tcp_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
                                req->rq_svec->iov_base,
@@ -503,16 +508,19 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc
        xprt->tcp_offset += used;
        if (used != len)
                return;
+
        xprt->tcp_reclen = ntohl(xprt->tcp_recm);
-       if (xprt->tcp_reclen & 0x80000000)
+       if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
                xprt->tcp_flags |= XPRT_LAST_FRAG;
        else
                xprt->tcp_flags &= ~XPRT_LAST_FRAG;
-       xprt->tcp_reclen &= 0x7fffffff;
+       xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
+
        xprt->tcp_flags &= ~XPRT_COPY_RECM;
        xprt->tcp_offset = 0;
+
        /* Sanity check of the record length */
-       if (xprt->tcp_reclen < 4) {
+       if (unlikely(xprt->tcp_reclen < 4)) {
                dprintk("RPC:      invalid TCP record fragment length\n");
                xprt_disconnect(xprt);
                return;
@@ -1065,6 +1073,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 
        xprt->prot = IPPROTO_UDP;
        xprt->port = XS_MAX_RESVPORT;
+       xprt->tsh_size = 0;
        xprt->stream = 0;
        xprt->nocong = 0;
        xprt->cwnd = RPC_INITCWND;
@@ -1105,11 +1114,12 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 
        xprt->prot = IPPROTO_TCP;
        xprt->port = XS_MAX_RESVPORT;
+       xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
        xprt->stream = 1;
        xprt->nocong = 1;
        xprt->cwnd = RPC_MAXCWND(xprt);
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
-       xprt->max_payload = (1U << 31) - 1;
+       xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
        INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);