RPC/RDMA: optionally emit useful transport info upon connect/disconnect.
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280         struct ib_qp_attr attr;
281         struct ib_qp_init_attr iattr;
282         int connstate = 0;
283
284         switch (event->event) {
285         case RDMA_CM_EVENT_ADDR_RESOLVED:
286         case RDMA_CM_EVENT_ROUTE_RESOLVED:
287                 ia->ri_async_rc = 0;
288                 complete(&ia->ri_done);
289                 break;
290         case RDMA_CM_EVENT_ADDR_ERROR:
291                 ia->ri_async_rc = -EHOSTUNREACH;
292                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
293                         __func__, ep);
294                 complete(&ia->ri_done);
295                 break;
296         case RDMA_CM_EVENT_ROUTE_ERROR:
297                 ia->ri_async_rc = -ENETUNREACH;
298                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
299                         __func__, ep);
300                 complete(&ia->ri_done);
301                 break;
302         case RDMA_CM_EVENT_ESTABLISHED:
303                 connstate = 1;
304                 ib_query_qp(ia->ri_id->qp, &attr,
305                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
306                         &iattr);
307                 dprintk("RPC:       %s: %d responder resources"
308                         " (%d initiator)\n",
309                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
310                 goto connected;
311         case RDMA_CM_EVENT_CONNECT_ERROR:
312                 connstate = -ENOTCONN;
313                 goto connected;
314         case RDMA_CM_EVENT_UNREACHABLE:
315                 connstate = -ENETDOWN;
316                 goto connected;
317         case RDMA_CM_EVENT_REJECTED:
318                 connstate = -ECONNREFUSED;
319                 goto connected;
320         case RDMA_CM_EVENT_DISCONNECTED:
321                 connstate = -ECONNABORTED;
322                 goto connected;
323         case RDMA_CM_EVENT_DEVICE_REMOVAL:
324                 connstate = -ENODEV;
325 connected:
326                 dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
327                         " (ep 0x%p event 0x%x)\n",
328                         __func__,
329                         (event->event <= 11) ? conn[event->event] :
330                                                 "unknown connection error",
331                         NIPQUAD(addr->sin_addr.s_addr),
332                         ntohs(addr->sin_port),
333                         ep, event->event);
334                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
335                 dprintk("RPC:       %s: %sconnected\n",
336                                         __func__, connstate > 0 ? "" : "dis");
337                 ep->rep_connected = connstate;
338                 ep->rep_func(ep);
339                 wake_up_all(&ep->rep_connect_wait);
340                 break;
341         default:
342                 dprintk("RPC:       %s: unexpected CM event %d\n",
343                         __func__, event->event);
344                 break;
345         }
346
347 #ifdef RPC_DEBUG
348         if (connstate == 1) {
349                 int ird = attr.max_dest_rd_atomic;
350                 int tird = ep->rep_remote_cma.responder_resources;
351                 printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
352                         "on %s, memreg %d slots %d ird %d%s\n",
353                         NIPQUAD(addr->sin_addr.s_addr),
354                         ntohs(addr->sin_port),
355                         ia->ri_id->device->name,
356                         ia->ri_memreg_strategy,
357                         xprt->rx_buf.rb_max_requests,
358                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
359         } else if (connstate < 0) {
360                 printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
361                         "closed (%d)\n",
362                         NIPQUAD(addr->sin_addr.s_addr),
363                         ntohs(addr->sin_port),
364                         connstate);
365         }
366 #endif
367
368         return 0;
369 }
370
371 static struct rdma_cm_id *
372 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373                         struct rpcrdma_ia *ia, struct sockaddr *addr)
374 {
375         struct rdma_cm_id *id;
376         int rc;
377
378         init_completion(&ia->ri_done);
379
380         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381         if (IS_ERR(id)) {
382                 rc = PTR_ERR(id);
383                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
384                         __func__, rc);
385                 return id;
386         }
387
388         ia->ri_async_rc = -ETIMEDOUT;
389         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390         if (rc) {
391                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
392                         __func__, rc);
393                 goto out;
394         }
395         wait_for_completion_interruptible_timeout(&ia->ri_done,
396                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397         rc = ia->ri_async_rc;
398         if (rc)
399                 goto out;
400
401         ia->ri_async_rc = -ETIMEDOUT;
402         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403         if (rc) {
404                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
405                         __func__, rc);
406                 goto out;
407         }
408         wait_for_completion_interruptible_timeout(&ia->ri_done,
409                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410         rc = ia->ri_async_rc;
411         if (rc)
412                 goto out;
413
414         return id;
415
416 out:
417         rdma_destroy_id(id);
418         return ERR_PTR(rc);
419 }
420
421 /*
422  * Drain any cq, prior to teardown.
423  */
424 static void
425 rpcrdma_clean_cq(struct ib_cq *cq)
426 {
427         struct ib_wc wc;
428         int count = 0;
429
430         while (1 == ib_poll_cq(cq, 1, &wc))
431                 ++count;
432
433         if (count)
434                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
435                         __func__, count, wc.opcode);
436 }
437
438 /*
439  * Exported functions.
440  */
441
442 /*
443  * Open and initialize an Interface Adapter.
444  *  o initializes fields of struct rpcrdma_ia, including
445  *    interface and provider attributes and protection zone.
446  */
447 int
448 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
449 {
450         int rc, mem_priv;
451         struct ib_device_attr devattr;
452         struct rpcrdma_ia *ia = &xprt->rx_ia;
453
454         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455         if (IS_ERR(ia->ri_id)) {
456                 rc = PTR_ERR(ia->ri_id);
457                 goto out1;
458         }
459
460         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461         if (IS_ERR(ia->ri_pd)) {
462                 rc = PTR_ERR(ia->ri_pd);
463                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
464                         __func__, rc);
465                 goto out2;
466         }
467
468         /*
469          * Query the device to determine if the requested memory
470          * registration strategy is supported. If it isn't, set the
471          * strategy to a globally supported model.
472          */
473         rc = ib_query_device(ia->ri_id->device, &devattr);
474         if (rc) {
475                 dprintk("RPC:       %s: ib_query_device failed %d\n",
476                         __func__, rc);
477                 goto out2;
478         }
479
480         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481                 ia->ri_have_dma_lkey = 1;
482                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
483         }
484
485         switch (memreg) {
486         case RPCRDMA_MEMWINDOWS:
487         case RPCRDMA_MEMWINDOWS_ASYNC:
488                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489                         dprintk("RPC:       %s: MEMWINDOWS registration "
490                                 "specified but not supported by adapter, "
491                                 "using slower RPCRDMA_REGISTER\n",
492                                 __func__);
493                         memreg = RPCRDMA_REGISTER;
494                 }
495                 break;
496         case RPCRDMA_MTHCAFMR:
497                 if (!ia->ri_id->device->alloc_fmr) {
498 #if RPCRDMA_PERSISTENT_REGISTRATION
499                         dprintk("RPC:       %s: MTHCAFMR registration "
500                                 "specified but not supported by adapter, "
501                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
502                                 __func__);
503                         memreg = RPCRDMA_ALLPHYSICAL;
504 #else
505                         dprintk("RPC:       %s: MTHCAFMR registration "
506                                 "specified but not supported by adapter, "
507                                 "using slower RPCRDMA_REGISTER\n",
508                                 __func__);
509                         memreg = RPCRDMA_REGISTER;
510 #endif
511                 }
512                 break;
513         case RPCRDMA_FRMR:
514                 /* Requires both frmr reg and local dma lkey */
515                 if ((devattr.device_cap_flags &
516                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518 #if RPCRDMA_PERSISTENT_REGISTRATION
519                         dprintk("RPC:       %s: FRMR registration "
520                                 "specified but not supported by adapter, "
521                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
522                                 __func__);
523                         memreg = RPCRDMA_ALLPHYSICAL;
524 #else
525                         dprintk("RPC:       %s: FRMR registration "
526                                 "specified but not supported by adapter, "
527                                 "using slower RPCRDMA_REGISTER\n",
528                                 __func__);
529                         memreg = RPCRDMA_REGISTER;
530 #endif
531                 }
532                 break;
533         }
534
535         /*
536          * Optionally obtain an underlying physical identity mapping in
537          * order to do a memory window-based bind. This base registration
538          * is protected from remote access - that is enabled only by binding
539          * for the specific bytes targeted during each RPC operation, and
540          * revoked after the corresponding completion similar to a storage
541          * adapter.
542          */
543         switch (memreg) {
544         case RPCRDMA_BOUNCEBUFFERS:
545         case RPCRDMA_REGISTER:
546         case RPCRDMA_FRMR:
547                 break;
548 #if RPCRDMA_PERSISTENT_REGISTRATION
549         case RPCRDMA_ALLPHYSICAL:
550                 mem_priv = IB_ACCESS_LOCAL_WRITE |
551                                 IB_ACCESS_REMOTE_WRITE |
552                                 IB_ACCESS_REMOTE_READ;
553                 goto register_setup;
554 #endif
555         case RPCRDMA_MEMWINDOWS_ASYNC:
556         case RPCRDMA_MEMWINDOWS:
557                 mem_priv = IB_ACCESS_LOCAL_WRITE |
558                                 IB_ACCESS_MW_BIND;
559                 goto register_setup;
560         case RPCRDMA_MTHCAFMR:
561                 if (ia->ri_have_dma_lkey)
562                         break;
563                 mem_priv = IB_ACCESS_LOCAL_WRITE;
564         register_setup:
565                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566                 if (IS_ERR(ia->ri_bind_mem)) {
567                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
568                                 "phys register failed with %lX\n\t"
569                                 "Will continue with degraded performance\n",
570                                 __func__, PTR_ERR(ia->ri_bind_mem));
571                         memreg = RPCRDMA_REGISTER;
572                         ia->ri_bind_mem = NULL;
573                 }
574                 break;
575         default:
576                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577                                 __func__, memreg);
578                 rc = -EINVAL;
579                 goto out2;
580         }
581         dprintk("RPC:       %s: memory registration strategy is %d\n",
582                 __func__, memreg);
583
584         /* Else will do memory reg/dereg for each chunk */
585         ia->ri_memreg_strategy = memreg;
586
587         return 0;
588 out2:
589         rdma_destroy_id(ia->ri_id);
590         ia->ri_id = NULL;
591 out1:
592         return rc;
593 }
594
595 /*
596  * Clean up/close an IA.
597  *   o if event handles and PD have been initialized, free them.
598  *   o close the IA
599  */
600 void
601 rpcrdma_ia_close(struct rpcrdma_ia *ia)
602 {
603         int rc;
604
605         dprintk("RPC:       %s: entering\n", __func__);
606         if (ia->ri_bind_mem != NULL) {
607                 rc = ib_dereg_mr(ia->ri_bind_mem);
608                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
609                         __func__, rc);
610         }
611         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612                 if (ia->ri_id->qp)
613                         rdma_destroy_qp(ia->ri_id);
614                 rdma_destroy_id(ia->ri_id);
615                 ia->ri_id = NULL;
616         }
617         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618                 rc = ib_dealloc_pd(ia->ri_pd);
619                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
620                         __func__, rc);
621         }
622 }
623
624 /*
625  * Create unconnected endpoint.
626  */
627 int
628 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629                                 struct rpcrdma_create_data_internal *cdata)
630 {
631         struct ib_device_attr devattr;
632         int rc, err;
633
634         rc = ib_query_device(ia->ri_id->device, &devattr);
635         if (rc) {
636                 dprintk("RPC:       %s: ib_query_device failed %d\n",
637                         __func__, rc);
638                 return rc;
639         }
640
641         /* check provider's send/recv wr limits */
642         if (cdata->max_requests > devattr.max_qp_wr)
643                 cdata->max_requests = devattr.max_qp_wr;
644
645         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646         ep->rep_attr.qp_context = ep;
647         /* send_cq and recv_cq initialized below */
648         ep->rep_attr.srq = NULL;
649         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650         switch (ia->ri_memreg_strategy) {
651         case RPCRDMA_FRMR:
652                 /* Add room for frmr register and invalidate WRs */
653                 ep->rep_attr.cap.max_send_wr *= 3;
654                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655                         return -EINVAL;
656                 break;
657         case RPCRDMA_MEMWINDOWS_ASYNC:
658         case RPCRDMA_MEMWINDOWS:
659                 /* Add room for mw_binds+unbinds - overkill! */
660                 ep->rep_attr.cap.max_send_wr++;
661                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663                         return -EINVAL;
664                 break;
665         default:
666                 break;
667         }
668         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670         ep->rep_attr.cap.max_recv_sge = 1;
671         ep->rep_attr.cap.max_inline_data = 0;
672         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673         ep->rep_attr.qp_type = IB_QPT_RC;
674         ep->rep_attr.port_num = ~0;
675
676         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
677                 "iovs: send %d recv %d\n",
678                 __func__,
679                 ep->rep_attr.cap.max_send_wr,
680                 ep->rep_attr.cap.max_recv_wr,
681                 ep->rep_attr.cap.max_send_sge,
682                 ep->rep_attr.cap.max_recv_sge);
683
684         /* set trigger for requesting send completion */
685         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
686         switch (ia->ri_memreg_strategy) {
687         case RPCRDMA_MEMWINDOWS_ASYNC:
688         case RPCRDMA_MEMWINDOWS:
689                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690                 break;
691         default:
692                 break;
693         }
694         if (ep->rep_cqinit <= 2)
695                 ep->rep_cqinit = 0;
696         INIT_CQCOUNT(ep);
697         ep->rep_ia = ia;
698         init_waitqueue_head(&ep->rep_connect_wait);
699
700         /*
701          * Create a single cq for receive dto and mw_bind (only ever
702          * care about unbind, really). Send completions are suppressed.
703          * Use single threaded tasklet upcalls to maintain ordering.
704          */
705         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706                                   rpcrdma_cq_async_error_upcall, NULL,
707                                   ep->rep_attr.cap.max_recv_wr +
708                                   ep->rep_attr.cap.max_send_wr + 1, 0);
709         if (IS_ERR(ep->rep_cq)) {
710                 rc = PTR_ERR(ep->rep_cq);
711                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
712                         __func__, rc);
713                 goto out1;
714         }
715
716         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717         if (rc) {
718                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
719                         __func__, rc);
720                 goto out2;
721         }
722
723         ep->rep_attr.send_cq = ep->rep_cq;
724         ep->rep_attr.recv_cq = ep->rep_cq;
725
726         /* Initialize cma parameters */
727
728         /* RPC/RDMA does not use private data */
729         ep->rep_remote_cma.private_data = NULL;
730         ep->rep_remote_cma.private_data_len = 0;
731
732         /* Client offers RDMA Read but does not initiate */
733         ep->rep_remote_cma.initiator_depth = 0;
734         if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735                 ep->rep_remote_cma.responder_resources = 0;
736         else if (devattr.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
737                 ep->rep_remote_cma.responder_resources = 32;
738         else
739                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
740
741         ep->rep_remote_cma.retry_count = 7;
742         ep->rep_remote_cma.flow_control = 0;
743         ep->rep_remote_cma.rnr_retry_count = 0;
744
745         return 0;
746
747 out2:
748         err = ib_destroy_cq(ep->rep_cq);
749         if (err)
750                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
751                         __func__, err);
752 out1:
753         return rc;
754 }
755
756 /*
757  * rpcrdma_ep_destroy
758  *
759  * Disconnect and destroy endpoint. After this, the only
760  * valid operations on the ep are to free it (if dynamically
761  * allocated) or re-create it.
762  *
763  * The caller's error handling must be sure to not leak the endpoint
764  * if this function fails.
765  */
766 int
767 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
768 {
769         int rc;
770
771         dprintk("RPC:       %s: entering, connected is %d\n",
772                 __func__, ep->rep_connected);
773
774         if (ia->ri_id->qp) {
775                 rc = rpcrdma_ep_disconnect(ep, ia);
776                 if (rc)
777                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
778                                 " returned %i\n", __func__, rc);
779                 rdma_destroy_qp(ia->ri_id);
780                 ia->ri_id->qp = NULL;
781         }
782
783         /* padding - could be done in rpcrdma_buffer_destroy... */
784         if (ep->rep_pad_mr) {
785                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786                 ep->rep_pad_mr = NULL;
787         }
788
789         rpcrdma_clean_cq(ep->rep_cq);
790         rc = ib_destroy_cq(ep->rep_cq);
791         if (rc)
792                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
793                         __func__, rc);
794
795         return rc;
796 }
797
798 /*
799  * Connect unconnected endpoint.
800  */
801 int
802 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
803 {
804         struct rdma_cm_id *id;
805         int rc = 0;
806         int retry_count = 0;
807         int reconnect = (ep->rep_connected != 0);
808
809         if (reconnect) {
810                 struct rpcrdma_xprt *xprt;
811 retry:
812                 rc = rpcrdma_ep_disconnect(ep, ia);
813                 if (rc && rc != -ENOTCONN)
814                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
815                                 " status %i\n", __func__, rc);
816                 rpcrdma_clean_cq(ep->rep_cq);
817
818                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
819                 id = rpcrdma_create_id(xprt, ia,
820                                 (struct sockaddr *)&xprt->rx_data.addr);
821                 if (IS_ERR(id)) {
822                         rc = PTR_ERR(id);
823                         goto out;
824                 }
825                 /* TEMP TEMP TEMP - fail if new device:
826                  * Deregister/remarshal *all* requests!
827                  * Close and recreate adapter, pd, etc!
828                  * Re-determine all attributes still sane!
829                  * More stuff I haven't thought of!
830                  * Rrrgh!
831                  */
832                 if (ia->ri_id->device != id->device) {
833                         printk("RPC:       %s: can't reconnect on "
834                                 "different device!\n", __func__);
835                         rdma_destroy_id(id);
836                         rc = -ENETDOWN;
837                         goto out;
838                 }
839                 /* END TEMP */
840                 rdma_destroy_qp(ia->ri_id);
841                 rdma_destroy_id(ia->ri_id);
842                 ia->ri_id = id;
843         }
844
845         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
846         if (rc) {
847                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
848                         __func__, rc);
849                 goto out;
850         }
851
852 /* XXX Tavor device performs badly with 2K MTU! */
853 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
854         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
855         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
856             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
857              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
858                 struct ib_qp_attr attr = {
859                         .path_mtu = IB_MTU_1024
860                 };
861                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
862         }
863 }
864
865         ep->rep_connected = 0;
866
867         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
868         if (rc) {
869                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
870                                 __func__, rc);
871                 goto out;
872         }
873
874         if (reconnect)
875                 return 0;
876
877         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
878
879         /*
880          * Check state. A non-peer reject indicates no listener
881          * (ECONNREFUSED), which may be a transient state. All
882          * others indicate a transport condition which has already
883          * undergone a best-effort.
884          */
885         if (ep->rep_connected == -ECONNREFUSED
886             && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
887                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
888                 goto retry;
889         }
890         if (ep->rep_connected <= 0) {
891                 /* Sometimes, the only way to reliably connect to remote
892                  * CMs is to use same nonzero values for ORD and IRD. */
893                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
894                     (ep->rep_remote_cma.responder_resources == 0 ||
895                      ep->rep_remote_cma.initiator_depth !=
896                                 ep->rep_remote_cma.responder_resources)) {
897                         if (ep->rep_remote_cma.responder_resources == 0)
898                                 ep->rep_remote_cma.responder_resources = 1;
899                         ep->rep_remote_cma.initiator_depth =
900                                 ep->rep_remote_cma.responder_resources;
901                         goto retry;
902                 }
903                 rc = ep->rep_connected;
904         } else {
905                 dprintk("RPC:       %s: connected\n", __func__);
906         }
907
908 out:
909         if (rc)
910                 ep->rep_connected = rc;
911         return rc;
912 }
913
914 /*
915  * rpcrdma_ep_disconnect
916  *
917  * This is separate from destroy to facilitate the ability
918  * to reconnect without recreating the endpoint.
919  *
920  * This call is not reentrant, and must not be made in parallel
921  * on the same endpoint.
922  */
923 int
924 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
925 {
926         int rc;
927
928         rpcrdma_clean_cq(ep->rep_cq);
929         rc = rdma_disconnect(ia->ri_id);
930         if (!rc) {
931                 /* returns without wait if not connected */
932                 wait_event_interruptible(ep->rep_connect_wait,
933                                                         ep->rep_connected != 1);
934                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
935                         (ep->rep_connected == 1) ? "still " : "dis");
936         } else {
937                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
938                 ep->rep_connected = rc;
939         }
940         return rc;
941 }
942
943 /*
944  * Initialize buffer memory
945  */
946 int
947 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
948         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
949 {
950         char *p;
951         size_t len;
952         int i, rc;
953         struct rpcrdma_mw *r;
954
955         buf->rb_max_requests = cdata->max_requests;
956         spin_lock_init(&buf->rb_lock);
957         atomic_set(&buf->rb_credits, 1);
958
959         /* Need to allocate:
960          *   1.  arrays for send and recv pointers
961          *   2.  arrays of struct rpcrdma_req to fill in pointers
962          *   3.  array of struct rpcrdma_rep for replies
963          *   4.  padding, if any
964          *   5.  mw's, fmr's or frmr's, if any
965          * Send/recv buffers in req/rep need to be registered
966          */
967
968         len = buf->rb_max_requests *
969                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
970         len += cdata->padding;
971         switch (ia->ri_memreg_strategy) {
972         case RPCRDMA_FRMR:
973                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
974                                 sizeof(struct rpcrdma_mw);
975                 break;
976         case RPCRDMA_MTHCAFMR:
977                 /* TBD we are perhaps overallocating here */
978                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
979                                 sizeof(struct rpcrdma_mw);
980                 break;
981         case RPCRDMA_MEMWINDOWS_ASYNC:
982         case RPCRDMA_MEMWINDOWS:
983                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
984                                 sizeof(struct rpcrdma_mw);
985                 break;
986         default:
987                 break;
988         }
989
990         /* allocate 1, 4 and 5 in one shot */
991         p = kzalloc(len, GFP_KERNEL);
992         if (p == NULL) {
993                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
994                         __func__, len);
995                 rc = -ENOMEM;
996                 goto out;
997         }
998         buf->rb_pool = p;       /* for freeing it later */
999
1000         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1001         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1002         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1003         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1004
1005         /*
1006          * Register the zeroed pad buffer, if any.
1007          */
1008         if (cdata->padding) {
1009                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1010                                             &ep->rep_pad_mr, &ep->rep_pad);
1011                 if (rc)
1012                         goto out;
1013         }
1014         p += cdata->padding;
1015
1016         /*
1017          * Allocate the fmr's, or mw's for mw_bind chunk registration.
1018          * We "cycle" the mw's in order to minimize rkey reuse,
1019          * and also reduce unbind-to-bind collision.
1020          */
1021         INIT_LIST_HEAD(&buf->rb_mws);
1022         r = (struct rpcrdma_mw *)p;
1023         switch (ia->ri_memreg_strategy) {
1024         case RPCRDMA_FRMR:
1025                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1026                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1027                                                          RPCRDMA_MAX_SEGS);
1028                         if (IS_ERR(r->r.frmr.fr_mr)) {
1029                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1030                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1031                                         " failed %i\n", __func__, rc);
1032                                 goto out;
1033                         }
1034                         r->r.frmr.fr_pgl =
1035                                 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1036                                                             RPCRDMA_MAX_SEGS);
1037                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1038                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1039                                 dprintk("RPC:       %s: "
1040                                         "ib_alloc_fast_reg_page_list "
1041                                         "failed %i\n", __func__, rc);
1042                                 goto out;
1043                         }
1044                         list_add(&r->mw_list, &buf->rb_mws);
1045                         ++r;
1046                 }
1047                 break;
1048         case RPCRDMA_MTHCAFMR:
1049                 /* TBD we are perhaps overallocating here */
1050                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1051                         static struct ib_fmr_attr fa =
1052                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1053                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1054                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1055                                 &fa);
1056                         if (IS_ERR(r->r.fmr)) {
1057                                 rc = PTR_ERR(r->r.fmr);
1058                                 dprintk("RPC:       %s: ib_alloc_fmr"
1059                                         " failed %i\n", __func__, rc);
1060                                 goto out;
1061                         }
1062                         list_add(&r->mw_list, &buf->rb_mws);
1063                         ++r;
1064                 }
1065                 break;
1066         case RPCRDMA_MEMWINDOWS_ASYNC:
1067         case RPCRDMA_MEMWINDOWS:
1068                 /* Allocate one extra request's worth, for full cycling */
1069                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1070                         r->r.mw = ib_alloc_mw(ia->ri_pd);
1071                         if (IS_ERR(r->r.mw)) {
1072                                 rc = PTR_ERR(r->r.mw);
1073                                 dprintk("RPC:       %s: ib_alloc_mw"
1074                                         " failed %i\n", __func__, rc);
1075                                 goto out;
1076                         }
1077                         list_add(&r->mw_list, &buf->rb_mws);
1078                         ++r;
1079                 }
1080                 break;
1081         default:
1082                 break;
1083         }
1084
1085         /*
1086          * Allocate/init the request/reply buffers. Doing this
1087          * using kmalloc for now -- one for each buf.
1088          */
1089         for (i = 0; i < buf->rb_max_requests; i++) {
1090                 struct rpcrdma_req *req;
1091                 struct rpcrdma_rep *rep;
1092
1093                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1094                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1095                 /* Typical ~2400b, so rounding up saves work later */
1096                 if (len < 4096)
1097                         len = 4096;
1098                 req = kmalloc(len, GFP_KERNEL);
1099                 if (req == NULL) {
1100                         dprintk("RPC:       %s: request buffer %d alloc"
1101                                 " failed\n", __func__, i);
1102                         rc = -ENOMEM;
1103                         goto out;
1104                 }
1105                 memset(req, 0, sizeof(struct rpcrdma_req));
1106                 buf->rb_send_bufs[i] = req;
1107                 buf->rb_send_bufs[i]->rl_buffer = buf;
1108
1109                 rc = rpcrdma_register_internal(ia, req->rl_base,
1110                                 len - offsetof(struct rpcrdma_req, rl_base),
1111                                 &buf->rb_send_bufs[i]->rl_handle,
1112                                 &buf->rb_send_bufs[i]->rl_iov);
1113                 if (rc)
1114                         goto out;
1115
1116                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1117
1118                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1119                 rep = kmalloc(len, GFP_KERNEL);
1120                 if (rep == NULL) {
1121                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1122                                 __func__, i);
1123                         rc = -ENOMEM;
1124                         goto out;
1125                 }
1126                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1127                 buf->rb_recv_bufs[i] = rep;
1128                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1129                 init_waitqueue_head(&rep->rr_unbind);
1130
1131                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1132                                 len - offsetof(struct rpcrdma_rep, rr_base),
1133                                 &buf->rb_recv_bufs[i]->rr_handle,
1134                                 &buf->rb_recv_bufs[i]->rr_iov);
1135                 if (rc)
1136                         goto out;
1137
1138         }
1139         dprintk("RPC:       %s: max_requests %d\n",
1140                 __func__, buf->rb_max_requests);
1141         /* done */
1142         return 0;
1143 out:
1144         rpcrdma_buffer_destroy(buf);
1145         return rc;
1146 }
1147
1148 /*
1149  * Unregister and destroy buffer memory. Need to deal with
1150  * partial initialization, so it's callable from failed create.
1151  * Must be called before destroying endpoint, as registrations
1152  * reference it.
1153  */
1154 void
1155 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1156 {
1157         int rc, i;
1158         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1159         struct rpcrdma_mw *r;
1160
1161         /* clean up in reverse order from create
1162          *   1.  recv mr memory (mr free, then kfree)
1163          *   1a. bind mw memory
1164          *   2.  send mr memory (mr free, then kfree)
1165          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1166          *   4.  arrays
1167          */
1168         dprintk("RPC:       %s: entering\n", __func__);
1169
1170         for (i = 0; i < buf->rb_max_requests; i++) {
1171                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1172                         rpcrdma_deregister_internal(ia,
1173                                         buf->rb_recv_bufs[i]->rr_handle,
1174                                         &buf->rb_recv_bufs[i]->rr_iov);
1175                         kfree(buf->rb_recv_bufs[i]);
1176                 }
1177                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1178                         while (!list_empty(&buf->rb_mws)) {
1179                                 r = list_entry(buf->rb_mws.next,
1180                                         struct rpcrdma_mw, mw_list);
1181                                 list_del(&r->mw_list);
1182                                 switch (ia->ri_memreg_strategy) {
1183                                 case RPCRDMA_FRMR:
1184                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1185                                         if (rc)
1186                                                 dprintk("RPC:       %s:"
1187                                                         " ib_dereg_mr"
1188                                                         " failed %i\n",
1189                                                         __func__, rc);
1190                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1191                                         break;
1192                                 case RPCRDMA_MTHCAFMR:
1193                                         rc = ib_dealloc_fmr(r->r.fmr);
1194                                         if (rc)
1195                                                 dprintk("RPC:       %s:"
1196                                                         " ib_dealloc_fmr"
1197                                                         " failed %i\n",
1198                                                         __func__, rc);
1199                                         break;
1200                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1201                                 case RPCRDMA_MEMWINDOWS:
1202                                         rc = ib_dealloc_mw(r->r.mw);
1203                                         if (rc)
1204                                                 dprintk("RPC:       %s:"
1205                                                         " ib_dealloc_mw"
1206                                                         " failed %i\n",
1207                                                         __func__, rc);
1208                                         break;
1209                                 default:
1210                                         break;
1211                                 }
1212                         }
1213                         rpcrdma_deregister_internal(ia,
1214                                         buf->rb_send_bufs[i]->rl_handle,
1215                                         &buf->rb_send_bufs[i]->rl_iov);
1216                         kfree(buf->rb_send_bufs[i]);
1217                 }
1218         }
1219
1220         kfree(buf->rb_pool);
1221 }
1222
1223 /*
1224  * Get a set of request/reply buffers.
1225  *
1226  * Reply buffer (if needed) is attached to send buffer upon return.
1227  * Rule:
1228  *    rb_send_index and rb_recv_index MUST always be pointing to the
1229  *    *next* available buffer (non-NULL). They are incremented after
1230  *    removing buffers, and decremented *before* returning them.
1231  */
1232 struct rpcrdma_req *
1233 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1234 {
1235         struct rpcrdma_req *req;
1236         unsigned long flags;
1237         int i;
1238         struct rpcrdma_mw *r;
1239
1240         spin_lock_irqsave(&buffers->rb_lock, flags);
1241         if (buffers->rb_send_index == buffers->rb_max_requests) {
1242                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1243                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1244                 return ((struct rpcrdma_req *)NULL);
1245         }
1246
1247         req = buffers->rb_send_bufs[buffers->rb_send_index];
1248         if (buffers->rb_send_index < buffers->rb_recv_index) {
1249                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1250                         __func__,
1251                         buffers->rb_recv_index - buffers->rb_send_index);
1252                 req->rl_reply = NULL;
1253         } else {
1254                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1255                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1256         }
1257         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1258         if (!list_empty(&buffers->rb_mws)) {
1259                 i = RPCRDMA_MAX_SEGS - 1;
1260                 do {
1261                         r = list_entry(buffers->rb_mws.next,
1262                                         struct rpcrdma_mw, mw_list);
1263                         list_del(&r->mw_list);
1264                         req->rl_segments[i].mr_chunk.rl_mw = r;
1265                 } while (--i >= 0);
1266         }
1267         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1268         return req;
1269 }
1270
1271 /*
1272  * Put request/reply buffers back into pool.
1273  * Pre-decrement counter/array index.
1274  */
1275 void
1276 rpcrdma_buffer_put(struct rpcrdma_req *req)
1277 {
1278         struct rpcrdma_buffer *buffers = req->rl_buffer;
1279         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1280         int i;
1281         unsigned long flags;
1282
1283         BUG_ON(req->rl_nchunks != 0);
1284         spin_lock_irqsave(&buffers->rb_lock, flags);
1285         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1286         req->rl_niovs = 0;
1287         if (req->rl_reply) {
1288                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1289                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1290                 req->rl_reply->rr_func = NULL;
1291                 req->rl_reply = NULL;
1292         }
1293         switch (ia->ri_memreg_strategy) {
1294         case RPCRDMA_FRMR:
1295         case RPCRDMA_MTHCAFMR:
1296         case RPCRDMA_MEMWINDOWS_ASYNC:
1297         case RPCRDMA_MEMWINDOWS:
1298                 /*
1299                  * Cycle mw's back in reverse order, and "spin" them.
1300                  * This delays and scrambles reuse as much as possible.
1301                  */
1302                 i = 1;
1303                 do {
1304                         struct rpcrdma_mw **mw;
1305                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1306                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1307                         *mw = NULL;
1308                 } while (++i < RPCRDMA_MAX_SEGS);
1309                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1310                                         &buffers->rb_mws);
1311                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1312                 break;
1313         default:
1314                 break;
1315         }
1316         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1317 }
1318
1319 /*
1320  * Recover reply buffers from pool.
1321  * This happens when recovering from error conditions.
1322  * Post-increment counter/array index.
1323  */
1324 void
1325 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1326 {
1327         struct rpcrdma_buffer *buffers = req->rl_buffer;
1328         unsigned long flags;
1329
1330         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1331                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1332         spin_lock_irqsave(&buffers->rb_lock, flags);
1333         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1334                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1335                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1336         }
1337         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1338 }
1339
1340 /*
1341  * Put reply buffers back into pool when not attached to
1342  * request. This happens in error conditions, and when
1343  * aborting unbinds. Pre-decrement counter/array index.
1344  */
1345 void
1346 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1347 {
1348         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1349         unsigned long flags;
1350
1351         rep->rr_func = NULL;
1352         spin_lock_irqsave(&buffers->rb_lock, flags);
1353         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1354         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1355 }
1356
1357 /*
1358  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1359  */
1360
1361 int
1362 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1363                                 struct ib_mr **mrp, struct ib_sge *iov)
1364 {
1365         struct ib_phys_buf ipb;
1366         struct ib_mr *mr;
1367         int rc;
1368
1369         /*
1370          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1371          */
1372         iov->addr = ib_dma_map_single(ia->ri_id->device,
1373                         va, len, DMA_BIDIRECTIONAL);
1374         iov->length = len;
1375
1376         if (ia->ri_have_dma_lkey) {
1377                 *mrp = NULL;
1378                 iov->lkey = ia->ri_dma_lkey;
1379                 return 0;
1380         } else if (ia->ri_bind_mem != NULL) {
1381                 *mrp = NULL;
1382                 iov->lkey = ia->ri_bind_mem->lkey;
1383                 return 0;
1384         }
1385
1386         ipb.addr = iov->addr;
1387         ipb.size = iov->length;
1388         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1389                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1390
1391         dprintk("RPC:       %s: phys convert: 0x%llx "
1392                         "registered 0x%llx length %d\n",
1393                         __func__, (unsigned long long)ipb.addr,
1394                         (unsigned long long)iov->addr, len);
1395
1396         if (IS_ERR(mr)) {
1397                 *mrp = NULL;
1398                 rc = PTR_ERR(mr);
1399                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1400         } else {
1401                 *mrp = mr;
1402                 iov->lkey = mr->lkey;
1403                 rc = 0;
1404         }
1405
1406         return rc;
1407 }
1408
1409 int
1410 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1411                                 struct ib_mr *mr, struct ib_sge *iov)
1412 {
1413         int rc;
1414
1415         ib_dma_unmap_single(ia->ri_id->device,
1416                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1417
1418         if (NULL == mr)
1419                 return 0;
1420
1421         rc = ib_dereg_mr(mr);
1422         if (rc)
1423                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1424         return rc;
1425 }
1426
1427 /*
1428  * Wrappers for chunk registration, shared by read/write chunk code.
1429  */
1430
1431 static void
1432 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1433 {
1434         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1435         seg->mr_dmalen = seg->mr_len;
1436         if (seg->mr_page)
1437                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1438                                 seg->mr_page, offset_in_page(seg->mr_offset),
1439                                 seg->mr_dmalen, seg->mr_dir);
1440         else
1441                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1442                                 seg->mr_offset,
1443                                 seg->mr_dmalen, seg->mr_dir);
1444 }
1445
1446 static void
1447 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1448 {
1449         if (seg->mr_page)
1450                 ib_dma_unmap_page(ia->ri_id->device,
1451                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1452         else
1453                 ib_dma_unmap_single(ia->ri_id->device,
1454                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1455 }
1456
1457 static int
1458 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1459                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1460                         struct rpcrdma_xprt *r_xprt)
1461 {
1462         struct rpcrdma_mr_seg *seg1 = seg;
1463         struct ib_send_wr frmr_wr, *bad_wr;
1464         u8 key;
1465         int len, pageoff;
1466         int i, rc;
1467
1468         pageoff = offset_in_page(seg1->mr_offset);
1469         seg1->mr_offset -= pageoff;     /* start of page */
1470         seg1->mr_len += pageoff;
1471         len = -pageoff;
1472         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1473                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1474         for (i = 0; i < *nsegs;) {
1475                 rpcrdma_map_one(ia, seg, writing);
1476                 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1477                 len += seg->mr_len;
1478                 ++seg;
1479                 ++i;
1480                 /* Check for holes */
1481                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1482                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1483                         break;
1484         }
1485         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1486                 __func__, seg1->mr_chunk.rl_mw, i);
1487
1488         /* Bump the key */
1489         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1490         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1491
1492         /* Prepare FRMR WR */
1493         memset(&frmr_wr, 0, sizeof frmr_wr);
1494         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1495         frmr_wr.send_flags = 0;                 /* unsignaled */
1496         frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1497         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1498         frmr_wr.wr.fast_reg.page_list_len = i;
1499         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1500         frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1501         frmr_wr.wr.fast_reg.access_flags = (writing ?
1502                                 IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1503         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1504         DECR_CQCOUNT(&r_xprt->rx_ep);
1505
1506         rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1507
1508         if (rc) {
1509                 dprintk("RPC:       %s: failed ib_post_send for register,"
1510                         " status %i\n", __func__, rc);
1511                 while (i--)
1512                         rpcrdma_unmap_one(ia, --seg);
1513         } else {
1514                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1515                 seg1->mr_base = seg1->mr_dma + pageoff;
1516                 seg1->mr_nsegs = i;
1517                 seg1->mr_len = len;
1518         }
1519         *nsegs = i;
1520         return rc;
1521 }
1522
1523 static int
1524 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1525                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1526 {
1527         struct rpcrdma_mr_seg *seg1 = seg;
1528         struct ib_send_wr invalidate_wr, *bad_wr;
1529         int rc;
1530
1531         while (seg1->mr_nsegs--)
1532                 rpcrdma_unmap_one(ia, seg++);
1533
1534         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1535         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1536         invalidate_wr.send_flags = 0;                   /* unsignaled */
1537         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1538         DECR_CQCOUNT(&r_xprt->rx_ep);
1539
1540         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1541         if (rc)
1542                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1543                         " status %i\n", __func__, rc);
1544         return rc;
1545 }
1546
1547 static int
1548 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1549                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1550 {
1551         struct rpcrdma_mr_seg *seg1 = seg;
1552         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1553         int len, pageoff, i, rc;
1554
1555         pageoff = offset_in_page(seg1->mr_offset);
1556         seg1->mr_offset -= pageoff;     /* start of page */
1557         seg1->mr_len += pageoff;
1558         len = -pageoff;
1559         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1560                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1561         for (i = 0; i < *nsegs;) {
1562                 rpcrdma_map_one(ia, seg, writing);
1563                 physaddrs[i] = seg->mr_dma;
1564                 len += seg->mr_len;
1565                 ++seg;
1566                 ++i;
1567                 /* Check for holes */
1568                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1569                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1570                         break;
1571         }
1572         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1573                                 physaddrs, i, seg1->mr_dma);
1574         if (rc) {
1575                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1576                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1577                         len, (unsigned long long)seg1->mr_dma,
1578                         pageoff, i, rc);
1579                 while (i--)
1580                         rpcrdma_unmap_one(ia, --seg);
1581         } else {
1582                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1583                 seg1->mr_base = seg1->mr_dma + pageoff;
1584                 seg1->mr_nsegs = i;
1585                 seg1->mr_len = len;
1586         }
1587         *nsegs = i;
1588         return rc;
1589 }
1590
1591 static int
1592 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1593                         struct rpcrdma_ia *ia)
1594 {
1595         struct rpcrdma_mr_seg *seg1 = seg;
1596         LIST_HEAD(l);
1597         int rc;
1598
1599         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1600         rc = ib_unmap_fmr(&l);
1601         while (seg1->mr_nsegs--)
1602                 rpcrdma_unmap_one(ia, seg++);
1603         if (rc)
1604                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1605                         " status %i\n", __func__, rc);
1606         return rc;
1607 }
1608
1609 static int
1610 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1611                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1612                         struct rpcrdma_xprt *r_xprt)
1613 {
1614         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1615                                   IB_ACCESS_REMOTE_READ);
1616         struct ib_mw_bind param;
1617         int rc;
1618
1619         *nsegs = 1;
1620         rpcrdma_map_one(ia, seg, writing);
1621         param.mr = ia->ri_bind_mem;
1622         param.wr_id = 0ULL;     /* no send cookie */
1623         param.addr = seg->mr_dma;
1624         param.length = seg->mr_len;
1625         param.send_flags = 0;
1626         param.mw_access_flags = mem_priv;
1627
1628         DECR_CQCOUNT(&r_xprt->rx_ep);
1629         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1630         if (rc) {
1631                 dprintk("RPC:       %s: failed ib_bind_mw "
1632                         "%u@0x%llx status %i\n",
1633                         __func__, seg->mr_len,
1634                         (unsigned long long)seg->mr_dma, rc);
1635                 rpcrdma_unmap_one(ia, seg);
1636         } else {
1637                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1638                 seg->mr_base = param.addr;
1639                 seg->mr_nsegs = 1;
1640         }
1641         return rc;
1642 }
1643
1644 static int
1645 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1646                         struct rpcrdma_ia *ia,
1647                         struct rpcrdma_xprt *r_xprt, void **r)
1648 {
1649         struct ib_mw_bind param;
1650         LIST_HEAD(l);
1651         int rc;
1652
1653         BUG_ON(seg->mr_nsegs != 1);
1654         param.mr = ia->ri_bind_mem;
1655         param.addr = 0ULL;      /* unbind */
1656         param.length = 0;
1657         param.mw_access_flags = 0;
1658         if (*r) {
1659                 param.wr_id = (u64) (unsigned long) *r;
1660                 param.send_flags = IB_SEND_SIGNALED;
1661                 INIT_CQCOUNT(&r_xprt->rx_ep);
1662         } else {
1663                 param.wr_id = 0ULL;
1664                 param.send_flags = 0;
1665                 DECR_CQCOUNT(&r_xprt->rx_ep);
1666         }
1667         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1668         rpcrdma_unmap_one(ia, seg);
1669         if (rc)
1670                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1671                         " status %i\n", __func__, rc);
1672         else
1673                 *r = NULL;      /* will upcall on completion */
1674         return rc;
1675 }
1676
1677 static int
1678 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1679                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1680 {
1681         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1682                                   IB_ACCESS_REMOTE_READ);
1683         struct rpcrdma_mr_seg *seg1 = seg;
1684         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1685         int len, i, rc = 0;
1686
1687         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1688                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1689         for (len = 0, i = 0; i < *nsegs;) {
1690                 rpcrdma_map_one(ia, seg, writing);
1691                 ipb[i].addr = seg->mr_dma;
1692                 ipb[i].size = seg->mr_len;
1693                 len += seg->mr_len;
1694                 ++seg;
1695                 ++i;
1696                 /* Check for holes */
1697                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1698                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1699                         break;
1700         }
1701         seg1->mr_base = seg1->mr_dma;
1702         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1703                                 ipb, i, mem_priv, &seg1->mr_base);
1704         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1705                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1706                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1707                         "%u@0x%llx (%d)... status %i\n",
1708                         __func__, len,
1709                         (unsigned long long)seg1->mr_dma, i, rc);
1710                 while (i--)
1711                         rpcrdma_unmap_one(ia, --seg);
1712         } else {
1713                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1714                 seg1->mr_nsegs = i;
1715                 seg1->mr_len = len;
1716         }
1717         *nsegs = i;
1718         return rc;
1719 }
1720
1721 static int
1722 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1723                         struct rpcrdma_ia *ia)
1724 {
1725         struct rpcrdma_mr_seg *seg1 = seg;
1726         int rc;
1727
1728         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1729         seg1->mr_chunk.rl_mr = NULL;
1730         while (seg1->mr_nsegs--)
1731                 rpcrdma_unmap_one(ia, seg++);
1732         if (rc)
1733                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1734                         " status %i\n", __func__, rc);
1735         return rc;
1736 }
1737
1738 int
1739 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1740                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1741 {
1742         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1743         int rc = 0;
1744
1745         switch (ia->ri_memreg_strategy) {
1746
1747 #if RPCRDMA_PERSISTENT_REGISTRATION
1748         case RPCRDMA_ALLPHYSICAL:
1749                 rpcrdma_map_one(ia, seg, writing);
1750                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1751                 seg->mr_base = seg->mr_dma;
1752                 seg->mr_nsegs = 1;
1753                 nsegs = 1;
1754                 break;
1755 #endif
1756
1757         /* Registration using frmr registration */
1758         case RPCRDMA_FRMR:
1759                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1760                 break;
1761
1762         /* Registration using fmr memory registration */
1763         case RPCRDMA_MTHCAFMR:
1764                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1765                 break;
1766
1767         /* Registration using memory windows */
1768         case RPCRDMA_MEMWINDOWS_ASYNC:
1769         case RPCRDMA_MEMWINDOWS:
1770                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1771                 break;
1772
1773         /* Default registration each time */
1774         default:
1775                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1776                 break;
1777         }
1778         if (rc)
1779                 return -1;
1780
1781         return nsegs;
1782 }
1783
1784 int
1785 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1786                 struct rpcrdma_xprt *r_xprt, void *r)
1787 {
1788         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1789         int nsegs = seg->mr_nsegs, rc;
1790
1791         switch (ia->ri_memreg_strategy) {
1792
1793 #if RPCRDMA_PERSISTENT_REGISTRATION
1794         case RPCRDMA_ALLPHYSICAL:
1795                 BUG_ON(nsegs != 1);
1796                 rpcrdma_unmap_one(ia, seg);
1797                 rc = 0;
1798                 break;
1799 #endif
1800
1801         case RPCRDMA_FRMR:
1802                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1803                 break;
1804
1805         case RPCRDMA_MTHCAFMR:
1806                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1807                 break;
1808
1809         case RPCRDMA_MEMWINDOWS_ASYNC:
1810         case RPCRDMA_MEMWINDOWS:
1811                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1812                 break;
1813
1814         default:
1815                 rc = rpcrdma_deregister_default_external(seg, ia);
1816                 break;
1817         }
1818         if (r) {
1819                 struct rpcrdma_rep *rep = r;
1820                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1821                 rep->rr_func = NULL;
1822                 func(rep);      /* dereg done, callback now */
1823         }
1824         return nsegs;
1825 }
1826
1827 /*
1828  * Prepost any receive buffer, then post send.
1829  *
1830  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1831  */
1832 int
1833 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1834                 struct rpcrdma_ep *ep,
1835                 struct rpcrdma_req *req)
1836 {
1837         struct ib_send_wr send_wr, *send_wr_fail;
1838         struct rpcrdma_rep *rep = req->rl_reply;
1839         int rc;
1840
1841         if (rep) {
1842                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1843                 if (rc)
1844                         goto out;
1845                 req->rl_reply = NULL;
1846         }
1847
1848         send_wr.next = NULL;
1849         send_wr.wr_id = 0ULL;   /* no send cookie */
1850         send_wr.sg_list = req->rl_send_iov;
1851         send_wr.num_sge = req->rl_niovs;
1852         send_wr.opcode = IB_WR_SEND;
1853         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1854                 ib_dma_sync_single_for_device(ia->ri_id->device,
1855                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1856                         DMA_TO_DEVICE);
1857         ib_dma_sync_single_for_device(ia->ri_id->device,
1858                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1859                 DMA_TO_DEVICE);
1860         ib_dma_sync_single_for_device(ia->ri_id->device,
1861                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1862                 DMA_TO_DEVICE);
1863
1864         if (DECR_CQCOUNT(ep) > 0)
1865                 send_wr.send_flags = 0;
1866         else { /* Provider must take a send completion every now and then */
1867                 INIT_CQCOUNT(ep);
1868                 send_wr.send_flags = IB_SEND_SIGNALED;
1869         }
1870
1871         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1872         if (rc)
1873                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1874                         rc);
1875 out:
1876         return rc;
1877 }
1878
1879 /*
1880  * (Re)post a receive buffer.
1881  */
1882 int
1883 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1884                      struct rpcrdma_ep *ep,
1885                      struct rpcrdma_rep *rep)
1886 {
1887         struct ib_recv_wr recv_wr, *recv_wr_fail;
1888         int rc;
1889
1890         recv_wr.next = NULL;
1891         recv_wr.wr_id = (u64) (unsigned long) rep;
1892         recv_wr.sg_list = &rep->rr_iov;
1893         recv_wr.num_sge = 1;
1894
1895         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1896                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1897
1898         DECR_CQCOUNT(ep);
1899         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1900
1901         if (rc)
1902                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1903                         rc);
1904         return rc;
1905 }