svcrdma: Modify the RPC reply path to use FRMR when available
authorTom Tucker <tom@opengridcomputing.com>
Fri, 3 Oct 2008 20:45:03 +0000 (15:45 -0500)
committerTom Tucker <tom@opengridcomputing.com>
Mon, 6 Oct 2008 19:46:05 +0000 (14:46 -0500)
Use FRMR to map local RPC reply data. This allows RDMA_WRITE to send reply
data using a single WR. The FRMR is invalidated by linking the LOCAL_INV WR
to the RDMA_SEND message used to complete the reply.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
net/sunrpc/xprtrdma/svc_rdma_sendto.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

index 84d3283..9a7a8e7 100644 (file)
  * array is only concerned with the reply we are assured that we have
  * on extra page for the RPCRMDA header.
  */
-static void xdr_to_sge(struct svcxprt_rdma *xprt,
-                      struct xdr_buf *xdr,
-                      struct svc_rdma_req_map *vec)
+int fast_reg_xdr(struct svcxprt_rdma *xprt,
+                struct xdr_buf *xdr,
+                struct svc_rdma_req_map *vec)
+{
+       int sge_no;
+       u32 sge_bytes;
+       u32 page_bytes;
+       u32 page_off;
+       int page_no = 0;
+       u8 *frva;
+       struct svc_rdma_fastreg_mr *frmr;
+
+       frmr = svc_rdma_get_frmr(xprt);
+       if (IS_ERR(frmr))
+               return -ENOMEM;
+       vec->frmr = frmr;
+
+       /* Skip the RPCRDMA header */
+       sge_no = 1;
+
+       /* Map the head. */
+       frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
+       vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
+       vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
+       vec->count = 2;
+       sge_no++;
+
+       /* Build the FRMR */
+       frmr->kva = frva;
+       frmr->direction = DMA_TO_DEVICE;
+       frmr->access_flags = 0;
+       frmr->map_len = PAGE_SIZE;
+       frmr->page_list_len = 1;
+       frmr->page_list->page_list[page_no] =
+               ib_dma_map_single(xprt->sc_cm_id->device,
+                                 (void *)xdr->head[0].iov_base,
+                                 PAGE_SIZE, DMA_TO_DEVICE);
+       if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                frmr->page_list->page_list[page_no]))
+               goto fatal_err;
+       atomic_inc(&xprt->sc_dma_used);
+
+       page_off = xdr->page_base;
+       page_bytes = xdr->page_len + page_off;
+       if (!page_bytes)
+               goto encode_tail;
+
+       /* Map the pages */
+       vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
+       vec->sge[sge_no].iov_len = page_bytes;
+       sge_no++;
+       while (page_bytes) {
+               struct page *page;
+
+               page = xdr->pages[page_no++];
+               sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
+               page_bytes -= sge_bytes;
+
+               frmr->page_list->page_list[page_no] =
+                       ib_dma_map_page(xprt->sc_cm_id->device, page, 0,
+                                         PAGE_SIZE, DMA_TO_DEVICE);
+               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                        frmr->page_list->page_list[page_no]))
+                       goto fatal_err;
+
+               atomic_inc(&xprt->sc_dma_used);
+               page_off = 0; /* reset for next time through loop */
+               frmr->map_len += PAGE_SIZE;
+               frmr->page_list_len++;
+       }
+       vec->count++;
+
+ encode_tail:
+       /* Map tail */
+       if (0 == xdr->tail[0].iov_len)
+               goto done;
+
+       vec->count++;
+       vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
+
+       if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
+           ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
+               /*
+                * If head and tail use the same page, we don't need
+                * to map it again.
+                */
+               vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
+       } else {
+               void *va;
+
+               /* Map another page for the tail */
+               page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+               va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
+               vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
+
+               frmr->page_list->page_list[page_no] =
+                       ib_dma_map_single(xprt->sc_cm_id->device, va, PAGE_SIZE,
+                                         DMA_TO_DEVICE);
+               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                        frmr->page_list->page_list[page_no]))
+                       goto fatal_err;
+               atomic_inc(&xprt->sc_dma_used);
+               frmr->map_len += PAGE_SIZE;
+               frmr->page_list_len++;
+       }
+
+ done:
+       if (svc_rdma_fastreg(xprt, frmr))
+               goto fatal_err;
+
+       return 0;
+
+ fatal_err:
+       printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
+       svc_rdma_put_frmr(xprt, frmr);
+       return -EIO;
+}
+
+static int map_xdr(struct svcxprt_rdma *xprt,
+                  struct xdr_buf *xdr,
+                  struct svc_rdma_req_map *vec)
 {
        int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
        int sge_no;
@@ -83,6 +201,9 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,
        BUG_ON(xdr->len !=
               (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
 
+       if (xprt->sc_frmr_pg_list_len)
+               return fast_reg_xdr(xprt, xdr, vec);
+
        /* Skip the first sge, this is for the RPCRDMA header */
        sge_no = 1;
 
@@ -116,9 +237,12 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,
 
        BUG_ON(sge_no > sge_max);
        vec->count = sge_no;
+       return 0;
 }
 
 /* Assumptions:
+ * - We are using FRMR
+ *     - or -
  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
  */
 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
@@ -158,30 +282,35 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
        sge_no = 0;
 
        /* Copy the remaining SGE */
-       while (bc != 0 && xdr_sge_no < vec->count) {
-               sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
-               sge_bytes = min((size_t)bc,
-                               (size_t)(vec->sge[xdr_sge_no].iov_len-sge_off));
+       while (bc != 0) {
+               sge_bytes = min_t(size_t,
+                         bc, vec->sge[xdr_sge_no].iov_len-sge_off);
                sge[sge_no].length = sge_bytes;
-               atomic_inc(&xprt->sc_dma_used);
-               sge[sge_no].addr =
-                       ib_dma_map_single(xprt->sc_cm_id->device,
-                                         (void *)
-                                         vec->sge[xdr_sge_no].iov_base + sge_off,
-                                         sge_bytes, DMA_TO_DEVICE);
-               if (dma_mapping_error(xprt->sc_cm_id->device->dma_device,
-                                       sge[sge_no].addr))
-                       goto err;
+               if (!vec->frmr) {
+                       sge[sge_no].addr =
+                               ib_dma_map_single(xprt->sc_cm_id->device,
+                                                 (void *)
+                                                 vec->sge[xdr_sge_no].iov_base + sge_off,
+                                                 sge_bytes, DMA_TO_DEVICE);
+                       if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                                sge[sge_no].addr))
+                               goto err;
+                       atomic_inc(&xprt->sc_dma_used);
+                       sge[sge_no].lkey = xprt->sc_dma_lkey;
+               } else {
+                       sge[sge_no].addr = (unsigned long)
+                               vec->sge[xdr_sge_no].iov_base + sge_off;
+                       sge[sge_no].lkey = vec->frmr->mr->lkey;
+               }
+               ctxt->count++;
+               ctxt->frmr = vec->frmr;
                sge_off = 0;
                sge_no++;
-               ctxt->count++;
                xdr_sge_no++;
+               BUG_ON(xdr_sge_no > vec->count);
                bc -= sge_bytes;
        }
 
-       BUG_ON(bc != 0);
-       BUG_ON(xdr_sge_no > vec->count);
-
        /* Prepare WRITE WR */
        memset(&write_wr, 0, sizeof write_wr);
        ctxt->wr_op = IB_WR_RDMA_WRITE;
@@ -226,7 +355,10 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[1];
 
-       max_write = xprt->sc_max_sge * PAGE_SIZE;
+       if (vec->frmr)
+               max_write = vec->frmr->map_len;
+       else
+               max_write = xprt->sc_max_sge * PAGE_SIZE;
 
        /* Write chunks start at the pagelist */
        for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
@@ -297,7 +429,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[2];
 
-       max_write = xprt->sc_max_sge * PAGE_SIZE;
+       if (vec->frmr)
+               max_write = vec->frmr->map_len;
+       else
+               max_write = xprt->sc_max_sge * PAGE_SIZE;
 
        /* xdr offset starts at RPC message */
        for (xdr_off = 0, chunk_no = 0;
@@ -307,7 +442,6 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
                ch = &arg_ary->wc_array[chunk_no].wc_target;
                write_len = min(xfer_len, ch->rs_length);
 
-
                /* Prepare the reply chunk given the length actually
                 * written */
                rs_offset = get_unaligned(&(ch->rs_offset));
@@ -366,6 +500,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
                      int byte_count)
 {
        struct ib_send_wr send_wr;
+       struct ib_send_wr inv_wr;
        int sge_no;
        int sge_bytes;
        int page_no;
@@ -385,27 +520,45 @@ static int send_reply(struct svcxprt_rdma *rdma,
        /* Prepare the context */
        ctxt->pages[0] = page;
        ctxt->count = 1;
+       ctxt->frmr = vec->frmr;
+       if (vec->frmr)
+               set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
+       else
+               clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
 
        /* Prepare the SGE for the RPCRDMA Header */
-       atomic_inc(&rdma->sc_dma_used);
        ctxt->sge[0].addr =
                ib_dma_map_page(rdma->sc_cm_id->device,
                                page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+       if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
+               goto err;
+       atomic_inc(&rdma->sc_dma_used);
+
        ctxt->direction = DMA_TO_DEVICE;
+
        ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
-       ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+       ctxt->sge[0].lkey = rdma->sc_dma_lkey;
 
        /* Determine how many of our SGE are to be transmitted */
        for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
                sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
                byte_count -= sge_bytes;
-               atomic_inc(&rdma->sc_dma_used);
-               ctxt->sge[sge_no].addr =
-                       ib_dma_map_single(rdma->sc_cm_id->device,
-                                         vec->sge[sge_no].iov_base,
-                                         sge_bytes, DMA_TO_DEVICE);
+               if (!vec->frmr) {
+                       ctxt->sge[sge_no].addr =
+                               ib_dma_map_single(rdma->sc_cm_id->device,
+                                                 vec->sge[sge_no].iov_base,
+                                                 sge_bytes, DMA_TO_DEVICE);
+                       if (ib_dma_mapping_error(rdma->sc_cm_id->device,
+                                                ctxt->sge[sge_no].addr))
+                               goto err;
+                       atomic_inc(&rdma->sc_dma_used);
+                       ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
+               } else {
+                       ctxt->sge[sge_no].addr = (unsigned long)
+                               vec->sge[sge_no].iov_base;
+                       ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
+               }
                ctxt->sge[sge_no].length = sge_bytes;
-               ctxt->sge[sge_no].lkey = rdma->sc_phys_mr->lkey;
        }
        BUG_ON(byte_count != 0);
 
@@ -417,11 +570,16 @@ static int send_reply(struct svcxprt_rdma *rdma,
                ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
                ctxt->count++;
                rqstp->rq_respages[page_no] = NULL;
-               /* If there are more pages than SGE, terminate SGE list */
+               /*
+                * If there are more pages than SGE, terminate SGE
+                * list so that svc_rdma_unmap_dma doesn't attempt to
+                * unmap garbage.
+                */
                if (page_no+1 >= sge_no)
                        ctxt->sge[page_no+1].length = 0;
        }
        BUG_ON(sge_no > rdma->sc_max_sge);
+       BUG_ON(sge_no > ctxt->count);
        memset(&send_wr, 0, sizeof send_wr);
        ctxt->wr_op = IB_WR_SEND;
        send_wr.wr_id = (unsigned long)ctxt;
@@ -429,12 +587,26 @@ static int send_reply(struct svcxprt_rdma *rdma,
        send_wr.num_sge = sge_no;
        send_wr.opcode = IB_WR_SEND;
        send_wr.send_flags =  IB_SEND_SIGNALED;
+       if (vec->frmr) {
+               /* Prepare INVALIDATE WR */
+               memset(&inv_wr, 0, sizeof inv_wr);
+               inv_wr.opcode = IB_WR_LOCAL_INV;
+               inv_wr.send_flags = IB_SEND_SIGNALED;
+               inv_wr.ex.invalidate_rkey =
+                       vec->frmr->mr->lkey;
+               send_wr.next = &inv_wr;
+       }
 
        ret = svc_rdma_send(rdma, &send_wr);
        if (ret)
-               svc_rdma_put_context(ctxt, 1);
+               goto err;
 
-       return ret;
+       return 0;
+
+ err:
+       svc_rdma_put_frmr(rdma, vec->frmr);
+       svc_rdma_put_context(ctxt, 1);
+       return -EIO;
 }
 
 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
@@ -477,8 +649,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        ctxt = svc_rdma_get_context(rdma);
        ctxt->direction = DMA_TO_DEVICE;
        vec = svc_rdma_get_req_map();
-       xdr_to_sge(rdma, &rqstp->rq_res, vec);
-
+       ret = map_xdr(rdma, &rqstp->rq_res, vec);
+       if (ret)
+               goto err0;
        inline_bytes = rqstp->rq_res.len;
 
        /* Create the RDMA response header */
@@ -498,7 +671,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        if (ret < 0) {
                printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
                       ret);
-               goto error;
+               goto err1;
        }
        inline_bytes -= ret;
 
@@ -508,7 +681,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        if (ret < 0) {
                printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
                       ret);
-               goto error;
+               goto err1;
        }
        inline_bytes -= ret;
 
@@ -517,9 +690,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        svc_rdma_put_req_map(vec);
        dprintk("svcrdma: send_reply returns %d\n", ret);
        return ret;
- error:
+
+ err1:
+       put_page(res_page);
+ err0:
        svc_rdma_put_req_map(vec);
        svc_rdma_put_context(ctxt, 0);
-       put_page(res_page);
        return ret;
 }
index fb0dff5..98f945c 100644 (file)
@@ -335,6 +335,8 @@ static void process_context(struct svcxprt_rdma *xprt,
 
        switch (ctxt->wr_op) {
        case IB_WR_SEND:
+               if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
+                       svc_rdma_put_frmr(xprt, ctxt->frmr);
                svc_rdma_put_context(ctxt, 1);
                break;