svcrdma: Use ib verbs version of dma_unmap
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / svc_rdma_transport.c
index e85ac77..0b72c4c 100644 (file)
@@ -103,8 +103,8 @@ static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
                spin_lock_bh(&xprt->sc_ctxt_lock);
                if (ctxt) {
                        at_least_one = 1;
-                       ctxt->next = xprt->sc_ctxt_head;
-                       xprt->sc_ctxt_head = ctxt;
+                       INIT_LIST_HEAD(&ctxt->free_list);
+                       list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
                } else {
                        /* kmalloc failed...give up for now */
                        xprt->sc_ctxt_cnt--;
@@ -123,7 +123,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 
        while (1) {
                spin_lock_bh(&xprt->sc_ctxt_lock);
-               if (unlikely(xprt->sc_ctxt_head == NULL)) {
+               if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
                        /* Try to bump my cache. */
                        spin_unlock_bh(&xprt->sc_ctxt_lock);
 
@@ -136,12 +136,15 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
                        schedule_timeout_uninterruptible(msecs_to_jiffies(500));
                        continue;
                }
-               ctxt = xprt->sc_ctxt_head;
-               xprt->sc_ctxt_head = ctxt->next;
+               ctxt = list_entry(xprt->sc_ctxt_free.next,
+                                 struct svc_rdma_op_ctxt,
+                                 free_list);
+               list_del_init(&ctxt->free_list);
                spin_unlock_bh(&xprt->sc_ctxt_lock);
                ctxt->xprt = xprt;
                INIT_LIST_HEAD(&ctxt->dto_q);
                ctxt->count = 0;
+               atomic_inc(&xprt->sc_ctxt_used);
                break;
        }
        return ctxt;
@@ -159,14 +162,15 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
                        put_page(ctxt->pages[i]);
 
        for (i = 0; i < ctxt->count; i++)
-               dma_unmap_single(xprt->sc_cm_id->device->dma_device,
-                                ctxt->sge[i].addr,
-                                ctxt->sge[i].length,
-                                ctxt->direction);
+               ib_dma_unmap_single(xprt->sc_cm_id->device,
+                                   ctxt->sge[i].addr,
+                                   ctxt->sge[i].length,
+                                   ctxt->direction);
+
        spin_lock_bh(&xprt->sc_ctxt_lock);
-       ctxt->next = xprt->sc_ctxt_head;
-       xprt->sc_ctxt_head = ctxt;
+       list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
        spin_unlock_bh(&xprt->sc_ctxt_lock);
+       atomic_dec(&xprt->sc_ctxt_used);
 }
 
 /* ib_cq event handler */
@@ -248,11 +252,15 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
        struct svcxprt_rdma *xprt = cq_context;
        unsigned long flags;
 
+       /* Guard against unconditional flush call for destroyed QP */
+       if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+               return;
+
        /*
         * Set the bit regardless of whether or not it's on the list
         * because it may be on the list already due to an SQ
         * completion.
-       */
+        */
        set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
 
        /*
@@ -275,6 +283,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
  *
  * Take all completing WC off the CQE and enqueue the associated DTO
  * context on the dto_q for the transport.
+ *
+ * Note that caller must hold a transport reference.
  */
 static void rq_cq_reap(struct svcxprt_rdma *xprt)
 {
@@ -288,20 +298,23 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
        ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
        atomic_inc(&rdma_stat_rq_poll);
 
-       spin_lock_bh(&xprt->sc_rq_dto_lock);
        while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
                ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
                ctxt->wc_status = wc.status;
                ctxt->byte_len = wc.byte_len;
                if (wc.status != IB_WC_SUCCESS) {
                        /* Close the transport */
+                       dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
                        set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
                        svc_rdma_put_context(ctxt, 1);
+                       svc_xprt_put(&xprt->sc_xprt);
                        continue;
                }
+               spin_lock_bh(&xprt->sc_rq_dto_lock);
                list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+               spin_unlock_bh(&xprt->sc_rq_dto_lock);
+               svc_xprt_put(&xprt->sc_xprt);
        }
-       spin_unlock_bh(&xprt->sc_rq_dto_lock);
 
        if (ctxt)
                atomic_inc(&rdma_stat_rq_prod);
@@ -318,6 +331,8 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
 
 /*
  * Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * Note that caller must hold a transport reference.
  */
 static void sq_cq_reap(struct svcxprt_rdma *xprt)
 {
@@ -352,14 +367,16 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
 
                case IB_WR_RDMA_READ:
                        if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+                               struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
+                               BUG_ON(!read_hdr);
                                set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-                               set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
                                spin_lock_bh(&xprt->sc_read_complete_lock);
-                               list_add_tail(&ctxt->dto_q,
+                               list_add_tail(&read_hdr->dto_q,
                                              &xprt->sc_read_complete_q);
                                spin_unlock_bh(&xprt->sc_read_complete_lock);
                                svc_xprt_enqueue(&xprt->sc_xprt);
                        }
+                       svc_rdma_put_context(ctxt, 0);
                        break;
 
                default:
@@ -368,6 +385,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
                               wc.opcode, wc.status);
                        break;
                }
+               svc_xprt_put(&xprt->sc_xprt);
        }
 
        if (ctxt)
@@ -379,11 +397,15 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
        struct svcxprt_rdma *xprt = cq_context;
        unsigned long flags;
 
+       /* Guard against unconditional flush call for destroyed QP */
+       if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+               return;
+
        /*
         * Set the bit regardless of whether or not it's on the list
         * because it may be on the list already due to an RQ
         * completion.
-       */
+        */
        set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
 
        /*
@@ -410,28 +432,29 @@ static void create_context_cache(struct svcxprt_rdma *xprt,
        xprt->sc_ctxt_max = ctxt_max;
        xprt->sc_ctxt_bump = ctxt_bump;
        xprt->sc_ctxt_cnt = 0;
-       xprt->sc_ctxt_head = NULL;
+       atomic_set(&xprt->sc_ctxt_used, 0);
+
+       INIT_LIST_HEAD(&xprt->sc_ctxt_free);
        for (i = 0; i < ctxt_count; i++) {
                ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
                if (ctxt) {
-                       ctxt->next = xprt->sc_ctxt_head;
-                       xprt->sc_ctxt_head = ctxt;
+                       INIT_LIST_HEAD(&ctxt->free_list);
+                       list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
                        xprt->sc_ctxt_cnt++;
                }
        }
 }
 
-static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+static void destroy_context_cache(struct svcxprt_rdma *xprt)
 {
-       struct svc_rdma_op_ctxt *next;
-       if (!ctxt)
-               return;
-
-       do {
-               next = ctxt->next;
+       while (!list_empty(&xprt->sc_ctxt_free)) {
+               struct svc_rdma_op_ctxt *ctxt;
+               ctxt = list_entry(xprt->sc_ctxt_free.next,
+                                 struct svc_rdma_op_ctxt,
+                                 free_list);
+               list_del_init(&ctxt->free_list);
                kfree(ctxt);
-               ctxt = next;
-       } while (next);
+       }
 }
 
 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -468,7 +491,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
                                     reqs +
                                     cma_xprt->sc_sq_depth +
                                     RPCRDMA_MAX_THREADS + 1); /* max */
-               if (!cma_xprt->sc_ctxt_head) {
+               if (list_empty(&cma_xprt->sc_ctxt_free)) {
                        kfree(cma_xprt);
                        return NULL;
                }
@@ -523,9 +546,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
        recv_wr.num_sge = ctxt->count;
        recv_wr.wr_id = (u64)(unsigned long)ctxt;
 
+       svc_xprt_get(&xprt->sc_xprt);
        ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
-       if (ret)
+       if (ret) {
+               svc_xprt_put(&xprt->sc_xprt);
                svc_rdma_put_context(ctxt, 1);
+       }
        return ret;
 }
 
@@ -667,31 +693,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 
        cma_xprt = rdma_create_xprt(serv, 1);
        if (!cma_xprt)
-               return ERR_PTR(ENOMEM);
+               return ERR_PTR(-ENOMEM);
        xprt = &cma_xprt->sc_xprt;
 
        listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
        if (IS_ERR(listen_id)) {
-               svc_xprt_put(&cma_xprt->sc_xprt);
-               dprintk("svcrdma: rdma_create_id failed = %ld\n",
-                       PTR_ERR(listen_id));
-               return (void *)listen_id;
+               ret = PTR_ERR(listen_id);
+               dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+               goto err0;
        }
+
        ret = rdma_bind_addr(listen_id, sa);
        if (ret) {
-               rdma_destroy_id(listen_id);
-               svc_xprt_put(&cma_xprt->sc_xprt);
                dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
-               return ERR_PTR(ret);
+               goto err1;
        }
        cma_xprt->sc_cm_id = listen_id;
 
        ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
        if (ret) {
-               rdma_destroy_id(listen_id);
-               svc_xprt_put(&cma_xprt->sc_xprt);
                dprintk("svcrdma: rdma_listen failed = %d\n", ret);
-               return ERR_PTR(ret);
+               goto err1;
        }
 
        /*
@@ -702,6 +724,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
        svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 
        return &cma_xprt->sc_xprt;
+
+ err1:
+       rdma_destroy_id(listen_id);
+ err0:
+       kfree(cma_xprt);
+       return ERR_PTR(ret);
 }
 
 /*
@@ -832,7 +860,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
                newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
        }
-       svc_xprt_get(&newxprt->sc_xprt);
        newxprt->sc_qp = newxprt->sc_cm_id->qp;
 
        /* Register all of physical memory */
@@ -906,10 +933,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
        /* Take a reference in case the DTO handler runs */
        svc_xprt_get(&newxprt->sc_xprt);
-       if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) {
+       if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
                ib_destroy_qp(newxprt->sc_qp);
-               svc_xprt_put(&newxprt->sc_xprt);
-       }
        rdma_destroy_id(newxprt->sc_cm_id);
        /* This call to put will destroy the transport */
        svc_xprt_put(&newxprt->sc_xprt);
@@ -921,10 +946,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 }
 
 /*
- * When connected, an svc_xprt has at least three references:
- *
- * - A reference held by the QP. We still hold that here because this
- *   code deletes the QP and puts the reference.
+ * When connected, an svc_xprt has at least two references:
  *
  * - A reference held by the cm_id between the ESTABLISHED and
  *   DISCONNECTED events. If the remote peer disconnected first, this
@@ -933,7 +955,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
  * - A reference held by the svc_recv code that called this function
  *   as part of close processing.
  *
- * At a minimum two references should still be held.
+ * At a minimum one references should still be held.
  */
 static void svc_rdma_detach(struct svc_xprt *xprt)
 {
@@ -943,23 +965,53 @@ static void svc_rdma_detach(struct svc_xprt *xprt)
 
        /* Disconnect and flush posted WQE */
        rdma_disconnect(rdma->sc_cm_id);
-
-       /* Destroy the QP if present (not a listener) */
-       if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
-               ib_destroy_qp(rdma->sc_qp);
-               svc_xprt_put(xprt);
-       }
-
-       /* Destroy the CM ID */
-       rdma_destroy_id(rdma->sc_cm_id);
 }
 
-static void svc_rdma_free(struct svc_xprt *xprt)
+static void __svc_rdma_free(struct work_struct *work)
 {
-       struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+       struct svcxprt_rdma *rdma =
+               container_of(work, struct svcxprt_rdma, sc_work);
        dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+
        /* We should only be called from kref_put */
-       BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0);
+       BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
+
+       /*
+        * Destroy queued, but not processed read completions. Note
+        * that this cleanup has to be done before destroying the
+        * cm_id because the device ptr is needed to unmap the dma in
+        * svc_rdma_put_context.
+        */
+       spin_lock_bh(&rdma->sc_read_complete_lock);
+       while (!list_empty(&rdma->sc_read_complete_q)) {
+               struct svc_rdma_op_ctxt *ctxt;
+               ctxt = list_entry(rdma->sc_read_complete_q.next,
+                                 struct svc_rdma_op_ctxt,
+                                 dto_q);
+               list_del_init(&ctxt->dto_q);
+               svc_rdma_put_context(ctxt, 1);
+       }
+       spin_unlock_bh(&rdma->sc_read_complete_lock);
+
+       /* Destroy queued, but not processed recv completions */
+       spin_lock_bh(&rdma->sc_rq_dto_lock);
+       while (!list_empty(&rdma->sc_rq_dto_q)) {
+               struct svc_rdma_op_ctxt *ctxt;
+               ctxt = list_entry(rdma->sc_rq_dto_q.next,
+                                 struct svc_rdma_op_ctxt,
+                                 dto_q);
+               list_del_init(&ctxt->dto_q);
+               svc_rdma_put_context(ctxt, 1);
+       }
+       spin_unlock_bh(&rdma->sc_rq_dto_lock);
+
+       /* Warn if we leaked a resource or under-referenced */
+       WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
+
+       /* Destroy the QP if present (not a listener) */
+       if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+               ib_destroy_qp(rdma->sc_qp);
+
        if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
                ib_destroy_cq(rdma->sc_sq_cq);
 
@@ -972,10 +1024,21 @@ static void svc_rdma_free(struct svc_xprt *xprt)
        if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
                ib_dealloc_pd(rdma->sc_pd);
 
-       destroy_context_cache(rdma->sc_ctxt_head);
+       /* Destroy the CM ID */
+       rdma_destroy_id(rdma->sc_cm_id);
+
+       destroy_context_cache(rdma);
        kfree(rdma);
 }
 
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+       struct svcxprt_rdma *rdma =
+               container_of(xprt, struct svcxprt_rdma, sc_xprt);
+       INIT_WORK(&rdma->sc_work, __svc_rdma_free);
+       schedule_work(&rdma->sc_work);
+}
+
 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
 {
        struct svcxprt_rdma *rdma =
@@ -1029,14 +1092,17 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
                        continue;
                }
                /* Bumped used SQ WR count and post */
+               svc_xprt_get(&xprt->sc_xprt);
                ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
                if (!ret)
                        atomic_inc(&xprt->sc_sq_count);
-               else
+               else {
+                       svc_xprt_put(&xprt->sc_xprt);
                        dprintk("svcrdma: failed to post SQ WR rc=%d, "
                               "sc_sq_count=%d, sc_sq_depth=%d\n",
                               ret, atomic_read(&xprt->sc_sq_count),
                               xprt->sc_sq_depth);
+               }
                spin_unlock_bh(&xprt->sc_lock);
                break;
        }