RPC/RDMA: harden connection logic against missing/late rdma_cm upcalls.
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280         struct ib_qp_attr attr;
281         struct ib_qp_init_attr iattr;
282         int connstate = 0;
283
284         switch (event->event) {
285         case RDMA_CM_EVENT_ADDR_RESOLVED:
286         case RDMA_CM_EVENT_ROUTE_RESOLVED:
287                 ia->ri_async_rc = 0;
288                 complete(&ia->ri_done);
289                 break;
290         case RDMA_CM_EVENT_ADDR_ERROR:
291                 ia->ri_async_rc = -EHOSTUNREACH;
292                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
293                         __func__, ep);
294                 complete(&ia->ri_done);
295                 break;
296         case RDMA_CM_EVENT_ROUTE_ERROR:
297                 ia->ri_async_rc = -ENETUNREACH;
298                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
299                         __func__, ep);
300                 complete(&ia->ri_done);
301                 break;
302         case RDMA_CM_EVENT_ESTABLISHED:
303                 connstate = 1;
304                 ib_query_qp(ia->ri_id->qp, &attr,
305                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
306                         &iattr);
307                 dprintk("RPC:       %s: %d responder resources"
308                         " (%d initiator)\n",
309                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
310                 goto connected;
311         case RDMA_CM_EVENT_CONNECT_ERROR:
312                 connstate = -ENOTCONN;
313                 goto connected;
314         case RDMA_CM_EVENT_UNREACHABLE:
315                 connstate = -ENETDOWN;
316                 goto connected;
317         case RDMA_CM_EVENT_REJECTED:
318                 connstate = -ECONNREFUSED;
319                 goto connected;
320         case RDMA_CM_EVENT_DISCONNECTED:
321                 connstate = -ECONNABORTED;
322                 goto connected;
323         case RDMA_CM_EVENT_DEVICE_REMOVAL:
324                 connstate = -ENODEV;
325 connected:
326                 dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
327                         " (ep 0x%p event 0x%x)\n",
328                         __func__,
329                         (event->event <= 11) ? conn[event->event] :
330                                                 "unknown connection error",
331                         NIPQUAD(addr->sin_addr.s_addr),
332                         ntohs(addr->sin_port),
333                         ep, event->event);
334                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
335                 dprintk("RPC:       %s: %sconnected\n",
336                                         __func__, connstate > 0 ? "" : "dis");
337                 ep->rep_connected = connstate;
338                 ep->rep_func(ep);
339                 wake_up_all(&ep->rep_connect_wait);
340                 break;
341         default:
342                 dprintk("RPC:       %s: unexpected CM event %d\n",
343                         __func__, event->event);
344                 break;
345         }
346
347         return 0;
348 }
349
350 static struct rdma_cm_id *
351 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
352                         struct rpcrdma_ia *ia, struct sockaddr *addr)
353 {
354         struct rdma_cm_id *id;
355         int rc;
356
357         init_completion(&ia->ri_done);
358
359         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
360         if (IS_ERR(id)) {
361                 rc = PTR_ERR(id);
362                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
363                         __func__, rc);
364                 return id;
365         }
366
367         ia->ri_async_rc = -ETIMEDOUT;
368         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
369         if (rc) {
370                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
371                         __func__, rc);
372                 goto out;
373         }
374         wait_for_completion_interruptible_timeout(&ia->ri_done,
375                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
376         rc = ia->ri_async_rc;
377         if (rc)
378                 goto out;
379
380         ia->ri_async_rc = -ETIMEDOUT;
381         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
382         if (rc) {
383                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
384                         __func__, rc);
385                 goto out;
386         }
387         wait_for_completion_interruptible_timeout(&ia->ri_done,
388                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
389         rc = ia->ri_async_rc;
390         if (rc)
391                 goto out;
392
393         return id;
394
395 out:
396         rdma_destroy_id(id);
397         return ERR_PTR(rc);
398 }
399
400 /*
401  * Drain any cq, prior to teardown.
402  */
403 static void
404 rpcrdma_clean_cq(struct ib_cq *cq)
405 {
406         struct ib_wc wc;
407         int count = 0;
408
409         while (1 == ib_poll_cq(cq, 1, &wc))
410                 ++count;
411
412         if (count)
413                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
414                         __func__, count, wc.opcode);
415 }
416
417 /*
418  * Exported functions.
419  */
420
421 /*
422  * Open and initialize an Interface Adapter.
423  *  o initializes fields of struct rpcrdma_ia, including
424  *    interface and provider attributes and protection zone.
425  */
426 int
427 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
428 {
429         int rc, mem_priv;
430         struct ib_device_attr devattr;
431         struct rpcrdma_ia *ia = &xprt->rx_ia;
432
433         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
434         if (IS_ERR(ia->ri_id)) {
435                 rc = PTR_ERR(ia->ri_id);
436                 goto out1;
437         }
438
439         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
440         if (IS_ERR(ia->ri_pd)) {
441                 rc = PTR_ERR(ia->ri_pd);
442                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
443                         __func__, rc);
444                 goto out2;
445         }
446
447         /*
448          * Query the device to determine if the requested memory
449          * registration strategy is supported. If it isn't, set the
450          * strategy to a globally supported model.
451          */
452         rc = ib_query_device(ia->ri_id->device, &devattr);
453         if (rc) {
454                 dprintk("RPC:       %s: ib_query_device failed %d\n",
455                         __func__, rc);
456                 goto out2;
457         }
458
459         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
460                 ia->ri_have_dma_lkey = 1;
461                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
462         }
463
464         switch (memreg) {
465         case RPCRDMA_MEMWINDOWS:
466         case RPCRDMA_MEMWINDOWS_ASYNC:
467                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
468                         dprintk("RPC:       %s: MEMWINDOWS registration "
469                                 "specified but not supported by adapter, "
470                                 "using slower RPCRDMA_REGISTER\n",
471                                 __func__);
472                         memreg = RPCRDMA_REGISTER;
473                 }
474                 break;
475         case RPCRDMA_MTHCAFMR:
476                 if (!ia->ri_id->device->alloc_fmr) {
477 #if RPCRDMA_PERSISTENT_REGISTRATION
478                         dprintk("RPC:       %s: MTHCAFMR registration "
479                                 "specified but not supported by adapter, "
480                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
481                                 __func__);
482                         memreg = RPCRDMA_ALLPHYSICAL;
483 #else
484                         dprintk("RPC:       %s: MTHCAFMR registration "
485                                 "specified but not supported by adapter, "
486                                 "using slower RPCRDMA_REGISTER\n",
487                                 __func__);
488                         memreg = RPCRDMA_REGISTER;
489 #endif
490                 }
491                 break;
492         case RPCRDMA_FRMR:
493                 /* Requires both frmr reg and local dma lkey */
494                 if ((devattr.device_cap_flags &
495                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
496                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
497 #if RPCRDMA_PERSISTENT_REGISTRATION
498                         dprintk("RPC:       %s: FRMR registration "
499                                 "specified but not supported by adapter, "
500                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
501                                 __func__);
502                         memreg = RPCRDMA_ALLPHYSICAL;
503 #else
504                         dprintk("RPC:       %s: FRMR registration "
505                                 "specified but not supported by adapter, "
506                                 "using slower RPCRDMA_REGISTER\n",
507                                 __func__);
508                         memreg = RPCRDMA_REGISTER;
509 #endif
510                 }
511                 break;
512         }
513
514         /*
515          * Optionally obtain an underlying physical identity mapping in
516          * order to do a memory window-based bind. This base registration
517          * is protected from remote access - that is enabled only by binding
518          * for the specific bytes targeted during each RPC operation, and
519          * revoked after the corresponding completion similar to a storage
520          * adapter.
521          */
522         switch (memreg) {
523         case RPCRDMA_BOUNCEBUFFERS:
524         case RPCRDMA_REGISTER:
525         case RPCRDMA_FRMR:
526                 break;
527 #if RPCRDMA_PERSISTENT_REGISTRATION
528         case RPCRDMA_ALLPHYSICAL:
529                 mem_priv = IB_ACCESS_LOCAL_WRITE |
530                                 IB_ACCESS_REMOTE_WRITE |
531                                 IB_ACCESS_REMOTE_READ;
532                 goto register_setup;
533 #endif
534         case RPCRDMA_MEMWINDOWS_ASYNC:
535         case RPCRDMA_MEMWINDOWS:
536                 mem_priv = IB_ACCESS_LOCAL_WRITE |
537                                 IB_ACCESS_MW_BIND;
538                 goto register_setup;
539         case RPCRDMA_MTHCAFMR:
540                 if (ia->ri_have_dma_lkey)
541                         break;
542                 mem_priv = IB_ACCESS_LOCAL_WRITE;
543         register_setup:
544                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
545                 if (IS_ERR(ia->ri_bind_mem)) {
546                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
547                                 "phys register failed with %lX\n\t"
548                                 "Will continue with degraded performance\n",
549                                 __func__, PTR_ERR(ia->ri_bind_mem));
550                         memreg = RPCRDMA_REGISTER;
551                         ia->ri_bind_mem = NULL;
552                 }
553                 break;
554         default:
555                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
556                                 __func__, memreg);
557                 rc = -EINVAL;
558                 goto out2;
559         }
560         dprintk("RPC:       %s: memory registration strategy is %d\n",
561                 __func__, memreg);
562
563         /* Else will do memory reg/dereg for each chunk */
564         ia->ri_memreg_strategy = memreg;
565
566         return 0;
567 out2:
568         rdma_destroy_id(ia->ri_id);
569         ia->ri_id = NULL;
570 out1:
571         return rc;
572 }
573
574 /*
575  * Clean up/close an IA.
576  *   o if event handles and PD have been initialized, free them.
577  *   o close the IA
578  */
579 void
580 rpcrdma_ia_close(struct rpcrdma_ia *ia)
581 {
582         int rc;
583
584         dprintk("RPC:       %s: entering\n", __func__);
585         if (ia->ri_bind_mem != NULL) {
586                 rc = ib_dereg_mr(ia->ri_bind_mem);
587                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
588                         __func__, rc);
589         }
590         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
591                 if (ia->ri_id->qp)
592                         rdma_destroy_qp(ia->ri_id);
593                 rdma_destroy_id(ia->ri_id);
594                 ia->ri_id = NULL;
595         }
596         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
597                 rc = ib_dealloc_pd(ia->ri_pd);
598                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
599                         __func__, rc);
600         }
601 }
602
603 /*
604  * Create unconnected endpoint.
605  */
606 int
607 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
608                                 struct rpcrdma_create_data_internal *cdata)
609 {
610         struct ib_device_attr devattr;
611         int rc, err;
612
613         rc = ib_query_device(ia->ri_id->device, &devattr);
614         if (rc) {
615                 dprintk("RPC:       %s: ib_query_device failed %d\n",
616                         __func__, rc);
617                 return rc;
618         }
619
620         /* check provider's send/recv wr limits */
621         if (cdata->max_requests > devattr.max_qp_wr)
622                 cdata->max_requests = devattr.max_qp_wr;
623
624         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
625         ep->rep_attr.qp_context = ep;
626         /* send_cq and recv_cq initialized below */
627         ep->rep_attr.srq = NULL;
628         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
629         switch (ia->ri_memreg_strategy) {
630         case RPCRDMA_FRMR:
631                 /* Add room for frmr register and invalidate WRs */
632                 ep->rep_attr.cap.max_send_wr *= 3;
633                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
634                         return -EINVAL;
635                 break;
636         case RPCRDMA_MEMWINDOWS_ASYNC:
637         case RPCRDMA_MEMWINDOWS:
638                 /* Add room for mw_binds+unbinds - overkill! */
639                 ep->rep_attr.cap.max_send_wr++;
640                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
641                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
642                         return -EINVAL;
643                 break;
644         default:
645                 break;
646         }
647         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
648         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
649         ep->rep_attr.cap.max_recv_sge = 1;
650         ep->rep_attr.cap.max_inline_data = 0;
651         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
652         ep->rep_attr.qp_type = IB_QPT_RC;
653         ep->rep_attr.port_num = ~0;
654
655         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
656                 "iovs: send %d recv %d\n",
657                 __func__,
658                 ep->rep_attr.cap.max_send_wr,
659                 ep->rep_attr.cap.max_recv_wr,
660                 ep->rep_attr.cap.max_send_sge,
661                 ep->rep_attr.cap.max_recv_sge);
662
663         /* set trigger for requesting send completion */
664         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
665         switch (ia->ri_memreg_strategy) {
666         case RPCRDMA_MEMWINDOWS_ASYNC:
667         case RPCRDMA_MEMWINDOWS:
668                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
669                 break;
670         default:
671                 break;
672         }
673         if (ep->rep_cqinit <= 2)
674                 ep->rep_cqinit = 0;
675         INIT_CQCOUNT(ep);
676         ep->rep_ia = ia;
677         init_waitqueue_head(&ep->rep_connect_wait);
678
679         /*
680          * Create a single cq for receive dto and mw_bind (only ever
681          * care about unbind, really). Send completions are suppressed.
682          * Use single threaded tasklet upcalls to maintain ordering.
683          */
684         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
685                                   rpcrdma_cq_async_error_upcall, NULL,
686                                   ep->rep_attr.cap.max_recv_wr +
687                                   ep->rep_attr.cap.max_send_wr + 1, 0);
688         if (IS_ERR(ep->rep_cq)) {
689                 rc = PTR_ERR(ep->rep_cq);
690                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
691                         __func__, rc);
692                 goto out1;
693         }
694
695         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
696         if (rc) {
697                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
698                         __func__, rc);
699                 goto out2;
700         }
701
702         ep->rep_attr.send_cq = ep->rep_cq;
703         ep->rep_attr.recv_cq = ep->rep_cq;
704
705         /* Initialize cma parameters */
706
707         /* RPC/RDMA does not use private data */
708         ep->rep_remote_cma.private_data = NULL;
709         ep->rep_remote_cma.private_data_len = 0;
710
711         /* Client offers RDMA Read but does not initiate */
712         ep->rep_remote_cma.initiator_depth = 0;
713         if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
714                 ep->rep_remote_cma.responder_resources = 0;
715         else if (devattr.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
716                 ep->rep_remote_cma.responder_resources = 32;
717         else
718                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
719
720         ep->rep_remote_cma.retry_count = 7;
721         ep->rep_remote_cma.flow_control = 0;
722         ep->rep_remote_cma.rnr_retry_count = 0;
723
724         return 0;
725
726 out2:
727         err = ib_destroy_cq(ep->rep_cq);
728         if (err)
729                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
730                         __func__, err);
731 out1:
732         return rc;
733 }
734
735 /*
736  * rpcrdma_ep_destroy
737  *
738  * Disconnect and destroy endpoint. After this, the only
739  * valid operations on the ep are to free it (if dynamically
740  * allocated) or re-create it.
741  *
742  * The caller's error handling must be sure to not leak the endpoint
743  * if this function fails.
744  */
745 int
746 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
747 {
748         int rc;
749
750         dprintk("RPC:       %s: entering, connected is %d\n",
751                 __func__, ep->rep_connected);
752
753         if (ia->ri_id->qp) {
754                 rc = rpcrdma_ep_disconnect(ep, ia);
755                 if (rc)
756                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
757                                 " returned %i\n", __func__, rc);
758                 rdma_destroy_qp(ia->ri_id);
759                 ia->ri_id->qp = NULL;
760         }
761
762         /* padding - could be done in rpcrdma_buffer_destroy... */
763         if (ep->rep_pad_mr) {
764                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
765                 ep->rep_pad_mr = NULL;
766         }
767
768         rpcrdma_clean_cq(ep->rep_cq);
769         rc = ib_destroy_cq(ep->rep_cq);
770         if (rc)
771                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
772                         __func__, rc);
773
774         return rc;
775 }
776
777 /*
778  * Connect unconnected endpoint.
779  */
780 int
781 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
782 {
783         struct rdma_cm_id *id;
784         int rc = 0;
785         int retry_count = 0;
786         int reconnect = (ep->rep_connected != 0);
787
788         if (reconnect) {
789                 struct rpcrdma_xprt *xprt;
790 retry:
791                 rc = rpcrdma_ep_disconnect(ep, ia);
792                 if (rc && rc != -ENOTCONN)
793                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
794                                 " status %i\n", __func__, rc);
795                 rpcrdma_clean_cq(ep->rep_cq);
796
797                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
798                 id = rpcrdma_create_id(xprt, ia,
799                                 (struct sockaddr *)&xprt->rx_data.addr);
800                 if (IS_ERR(id)) {
801                         rc = PTR_ERR(id);
802                         goto out;
803                 }
804                 /* TEMP TEMP TEMP - fail if new device:
805                  * Deregister/remarshal *all* requests!
806                  * Close and recreate adapter, pd, etc!
807                  * Re-determine all attributes still sane!
808                  * More stuff I haven't thought of!
809                  * Rrrgh!
810                  */
811                 if (ia->ri_id->device != id->device) {
812                         printk("RPC:       %s: can't reconnect on "
813                                 "different device!\n", __func__);
814                         rdma_destroy_id(id);
815                         rc = -ENETDOWN;
816                         goto out;
817                 }
818                 /* END TEMP */
819                 rdma_destroy_qp(ia->ri_id);
820                 rdma_destroy_id(ia->ri_id);
821                 ia->ri_id = id;
822         }
823
824         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
825         if (rc) {
826                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
827                         __func__, rc);
828                 goto out;
829         }
830
831 /* XXX Tavor device performs badly with 2K MTU! */
832 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
833         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
834         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
835             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
836              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
837                 struct ib_qp_attr attr = {
838                         .path_mtu = IB_MTU_1024
839                 };
840                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
841         }
842 }
843
844         ep->rep_connected = 0;
845
846         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
847         if (rc) {
848                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
849                                 __func__, rc);
850                 goto out;
851         }
852
853         if (reconnect)
854                 return 0;
855
856         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
857
858         /*
859          * Check state. A non-peer reject indicates no listener
860          * (ECONNREFUSED), which may be a transient state. All
861          * others indicate a transport condition which has already
862          * undergone a best-effort.
863          */
864         if (ep->rep_connected == -ECONNREFUSED
865             && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
866                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
867                 goto retry;
868         }
869         if (ep->rep_connected <= 0) {
870                 /* Sometimes, the only way to reliably connect to remote
871                  * CMs is to use same nonzero values for ORD and IRD. */
872                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
873                     (ep->rep_remote_cma.responder_resources == 0 ||
874                      ep->rep_remote_cma.initiator_depth !=
875                                 ep->rep_remote_cma.responder_resources)) {
876                         if (ep->rep_remote_cma.responder_resources == 0)
877                                 ep->rep_remote_cma.responder_resources = 1;
878                         ep->rep_remote_cma.initiator_depth =
879                                 ep->rep_remote_cma.responder_resources;
880                         goto retry;
881                 }
882                 rc = ep->rep_connected;
883         } else {
884                 dprintk("RPC:       %s: connected\n", __func__);
885         }
886
887 out:
888         if (rc)
889                 ep->rep_connected = rc;
890         return rc;
891 }
892
893 /*
894  * rpcrdma_ep_disconnect
895  *
896  * This is separate from destroy to facilitate the ability
897  * to reconnect without recreating the endpoint.
898  *
899  * This call is not reentrant, and must not be made in parallel
900  * on the same endpoint.
901  */
902 int
903 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
904 {
905         int rc;
906
907         rpcrdma_clean_cq(ep->rep_cq);
908         rc = rdma_disconnect(ia->ri_id);
909         if (!rc) {
910                 /* returns without wait if not connected */
911                 wait_event_interruptible(ep->rep_connect_wait,
912                                                         ep->rep_connected != 1);
913                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
914                         (ep->rep_connected == 1) ? "still " : "dis");
915         } else {
916                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
917                 ep->rep_connected = rc;
918         }
919         return rc;
920 }
921
922 /*
923  * Initialize buffer memory
924  */
925 int
926 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
927         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
928 {
929         char *p;
930         size_t len;
931         int i, rc;
932         struct rpcrdma_mw *r;
933
934         buf->rb_max_requests = cdata->max_requests;
935         spin_lock_init(&buf->rb_lock);
936         atomic_set(&buf->rb_credits, 1);
937
938         /* Need to allocate:
939          *   1.  arrays for send and recv pointers
940          *   2.  arrays of struct rpcrdma_req to fill in pointers
941          *   3.  array of struct rpcrdma_rep for replies
942          *   4.  padding, if any
943          *   5.  mw's, fmr's or frmr's, if any
944          * Send/recv buffers in req/rep need to be registered
945          */
946
947         len = buf->rb_max_requests *
948                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
949         len += cdata->padding;
950         switch (ia->ri_memreg_strategy) {
951         case RPCRDMA_FRMR:
952                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
953                                 sizeof(struct rpcrdma_mw);
954                 break;
955         case RPCRDMA_MTHCAFMR:
956                 /* TBD we are perhaps overallocating here */
957                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
958                                 sizeof(struct rpcrdma_mw);
959                 break;
960         case RPCRDMA_MEMWINDOWS_ASYNC:
961         case RPCRDMA_MEMWINDOWS:
962                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
963                                 sizeof(struct rpcrdma_mw);
964                 break;
965         default:
966                 break;
967         }
968
969         /* allocate 1, 4 and 5 in one shot */
970         p = kzalloc(len, GFP_KERNEL);
971         if (p == NULL) {
972                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
973                         __func__, len);
974                 rc = -ENOMEM;
975                 goto out;
976         }
977         buf->rb_pool = p;       /* for freeing it later */
978
979         buf->rb_send_bufs = (struct rpcrdma_req **) p;
980         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
981         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
982         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
983
984         /*
985          * Register the zeroed pad buffer, if any.
986          */
987         if (cdata->padding) {
988                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
989                                             &ep->rep_pad_mr, &ep->rep_pad);
990                 if (rc)
991                         goto out;
992         }
993         p += cdata->padding;
994
995         /*
996          * Allocate the fmr's, or mw's for mw_bind chunk registration.
997          * We "cycle" the mw's in order to minimize rkey reuse,
998          * and also reduce unbind-to-bind collision.
999          */
1000         INIT_LIST_HEAD(&buf->rb_mws);
1001         r = (struct rpcrdma_mw *)p;
1002         switch (ia->ri_memreg_strategy) {
1003         case RPCRDMA_FRMR:
1004                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1005                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1006                                                          RPCRDMA_MAX_SEGS);
1007                         if (IS_ERR(r->r.frmr.fr_mr)) {
1008                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1009                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1010                                         " failed %i\n", __func__, rc);
1011                                 goto out;
1012                         }
1013                         r->r.frmr.fr_pgl =
1014                                 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1015                                                             RPCRDMA_MAX_SEGS);
1016                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1017                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1018                                 dprintk("RPC:       %s: "
1019                                         "ib_alloc_fast_reg_page_list "
1020                                         "failed %i\n", __func__, rc);
1021                                 goto out;
1022                         }
1023                         list_add(&r->mw_list, &buf->rb_mws);
1024                         ++r;
1025                 }
1026                 break;
1027         case RPCRDMA_MTHCAFMR:
1028                 /* TBD we are perhaps overallocating here */
1029                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1030                         static struct ib_fmr_attr fa =
1031                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1032                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1033                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1034                                 &fa);
1035                         if (IS_ERR(r->r.fmr)) {
1036                                 rc = PTR_ERR(r->r.fmr);
1037                                 dprintk("RPC:       %s: ib_alloc_fmr"
1038                                         " failed %i\n", __func__, rc);
1039                                 goto out;
1040                         }
1041                         list_add(&r->mw_list, &buf->rb_mws);
1042                         ++r;
1043                 }
1044                 break;
1045         case RPCRDMA_MEMWINDOWS_ASYNC:
1046         case RPCRDMA_MEMWINDOWS:
1047                 /* Allocate one extra request's worth, for full cycling */
1048                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1049                         r->r.mw = ib_alloc_mw(ia->ri_pd);
1050                         if (IS_ERR(r->r.mw)) {
1051                                 rc = PTR_ERR(r->r.mw);
1052                                 dprintk("RPC:       %s: ib_alloc_mw"
1053                                         " failed %i\n", __func__, rc);
1054                                 goto out;
1055                         }
1056                         list_add(&r->mw_list, &buf->rb_mws);
1057                         ++r;
1058                 }
1059                 break;
1060         default:
1061                 break;
1062         }
1063
1064         /*
1065          * Allocate/init the request/reply buffers. Doing this
1066          * using kmalloc for now -- one for each buf.
1067          */
1068         for (i = 0; i < buf->rb_max_requests; i++) {
1069                 struct rpcrdma_req *req;
1070                 struct rpcrdma_rep *rep;
1071
1072                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1073                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1074                 /* Typical ~2400b, so rounding up saves work later */
1075                 if (len < 4096)
1076                         len = 4096;
1077                 req = kmalloc(len, GFP_KERNEL);
1078                 if (req == NULL) {
1079                         dprintk("RPC:       %s: request buffer %d alloc"
1080                                 " failed\n", __func__, i);
1081                         rc = -ENOMEM;
1082                         goto out;
1083                 }
1084                 memset(req, 0, sizeof(struct rpcrdma_req));
1085                 buf->rb_send_bufs[i] = req;
1086                 buf->rb_send_bufs[i]->rl_buffer = buf;
1087
1088                 rc = rpcrdma_register_internal(ia, req->rl_base,
1089                                 len - offsetof(struct rpcrdma_req, rl_base),
1090                                 &buf->rb_send_bufs[i]->rl_handle,
1091                                 &buf->rb_send_bufs[i]->rl_iov);
1092                 if (rc)
1093                         goto out;
1094
1095                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1096
1097                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1098                 rep = kmalloc(len, GFP_KERNEL);
1099                 if (rep == NULL) {
1100                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1101                                 __func__, i);
1102                         rc = -ENOMEM;
1103                         goto out;
1104                 }
1105                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1106                 buf->rb_recv_bufs[i] = rep;
1107                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1108                 init_waitqueue_head(&rep->rr_unbind);
1109
1110                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1111                                 len - offsetof(struct rpcrdma_rep, rr_base),
1112                                 &buf->rb_recv_bufs[i]->rr_handle,
1113                                 &buf->rb_recv_bufs[i]->rr_iov);
1114                 if (rc)
1115                         goto out;
1116
1117         }
1118         dprintk("RPC:       %s: max_requests %d\n",
1119                 __func__, buf->rb_max_requests);
1120         /* done */
1121         return 0;
1122 out:
1123         rpcrdma_buffer_destroy(buf);
1124         return rc;
1125 }
1126
1127 /*
1128  * Unregister and destroy buffer memory. Need to deal with
1129  * partial initialization, so it's callable from failed create.
1130  * Must be called before destroying endpoint, as registrations
1131  * reference it.
1132  */
1133 void
1134 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1135 {
1136         int rc, i;
1137         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1138         struct rpcrdma_mw *r;
1139
1140         /* clean up in reverse order from create
1141          *   1.  recv mr memory (mr free, then kfree)
1142          *   1a. bind mw memory
1143          *   2.  send mr memory (mr free, then kfree)
1144          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1145          *   4.  arrays
1146          */
1147         dprintk("RPC:       %s: entering\n", __func__);
1148
1149         for (i = 0; i < buf->rb_max_requests; i++) {
1150                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1151                         rpcrdma_deregister_internal(ia,
1152                                         buf->rb_recv_bufs[i]->rr_handle,
1153                                         &buf->rb_recv_bufs[i]->rr_iov);
1154                         kfree(buf->rb_recv_bufs[i]);
1155                 }
1156                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1157                         while (!list_empty(&buf->rb_mws)) {
1158                                 r = list_entry(buf->rb_mws.next,
1159                                         struct rpcrdma_mw, mw_list);
1160                                 list_del(&r->mw_list);
1161                                 switch (ia->ri_memreg_strategy) {
1162                                 case RPCRDMA_FRMR:
1163                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1164                                         if (rc)
1165                                                 dprintk("RPC:       %s:"
1166                                                         " ib_dereg_mr"
1167                                                         " failed %i\n",
1168                                                         __func__, rc);
1169                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1170                                         break;
1171                                 case RPCRDMA_MTHCAFMR:
1172                                         rc = ib_dealloc_fmr(r->r.fmr);
1173                                         if (rc)
1174                                                 dprintk("RPC:       %s:"
1175                                                         " ib_dealloc_fmr"
1176                                                         " failed %i\n",
1177                                                         __func__, rc);
1178                                         break;
1179                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1180                                 case RPCRDMA_MEMWINDOWS:
1181                                         rc = ib_dealloc_mw(r->r.mw);
1182                                         if (rc)
1183                                                 dprintk("RPC:       %s:"
1184                                                         " ib_dealloc_mw"
1185                                                         " failed %i\n",
1186                                                         __func__, rc);
1187                                         break;
1188                                 default:
1189                                         break;
1190                                 }
1191                         }
1192                         rpcrdma_deregister_internal(ia,
1193                                         buf->rb_send_bufs[i]->rl_handle,
1194                                         &buf->rb_send_bufs[i]->rl_iov);
1195                         kfree(buf->rb_send_bufs[i]);
1196                 }
1197         }
1198
1199         kfree(buf->rb_pool);
1200 }
1201
1202 /*
1203  * Get a set of request/reply buffers.
1204  *
1205  * Reply buffer (if needed) is attached to send buffer upon return.
1206  * Rule:
1207  *    rb_send_index and rb_recv_index MUST always be pointing to the
1208  *    *next* available buffer (non-NULL). They are incremented after
1209  *    removing buffers, and decremented *before* returning them.
1210  */
1211 struct rpcrdma_req *
1212 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1213 {
1214         struct rpcrdma_req *req;
1215         unsigned long flags;
1216         int i;
1217         struct rpcrdma_mw *r;
1218
1219         spin_lock_irqsave(&buffers->rb_lock, flags);
1220         if (buffers->rb_send_index == buffers->rb_max_requests) {
1221                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1222                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1223                 return ((struct rpcrdma_req *)NULL);
1224         }
1225
1226         req = buffers->rb_send_bufs[buffers->rb_send_index];
1227         if (buffers->rb_send_index < buffers->rb_recv_index) {
1228                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1229                         __func__,
1230                         buffers->rb_recv_index - buffers->rb_send_index);
1231                 req->rl_reply = NULL;
1232         } else {
1233                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1234                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1235         }
1236         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1237         if (!list_empty(&buffers->rb_mws)) {
1238                 i = RPCRDMA_MAX_SEGS - 1;
1239                 do {
1240                         r = list_entry(buffers->rb_mws.next,
1241                                         struct rpcrdma_mw, mw_list);
1242                         list_del(&r->mw_list);
1243                         req->rl_segments[i].mr_chunk.rl_mw = r;
1244                 } while (--i >= 0);
1245         }
1246         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1247         return req;
1248 }
1249
1250 /*
1251  * Put request/reply buffers back into pool.
1252  * Pre-decrement counter/array index.
1253  */
1254 void
1255 rpcrdma_buffer_put(struct rpcrdma_req *req)
1256 {
1257         struct rpcrdma_buffer *buffers = req->rl_buffer;
1258         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1259         int i;
1260         unsigned long flags;
1261
1262         BUG_ON(req->rl_nchunks != 0);
1263         spin_lock_irqsave(&buffers->rb_lock, flags);
1264         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1265         req->rl_niovs = 0;
1266         if (req->rl_reply) {
1267                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1268                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1269                 req->rl_reply->rr_func = NULL;
1270                 req->rl_reply = NULL;
1271         }
1272         switch (ia->ri_memreg_strategy) {
1273         case RPCRDMA_FRMR:
1274         case RPCRDMA_MTHCAFMR:
1275         case RPCRDMA_MEMWINDOWS_ASYNC:
1276         case RPCRDMA_MEMWINDOWS:
1277                 /*
1278                  * Cycle mw's back in reverse order, and "spin" them.
1279                  * This delays and scrambles reuse as much as possible.
1280                  */
1281                 i = 1;
1282                 do {
1283                         struct rpcrdma_mw **mw;
1284                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1285                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1286                         *mw = NULL;
1287                 } while (++i < RPCRDMA_MAX_SEGS);
1288                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1289                                         &buffers->rb_mws);
1290                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1291                 break;
1292         default:
1293                 break;
1294         }
1295         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1296 }
1297
1298 /*
1299  * Recover reply buffers from pool.
1300  * This happens when recovering from error conditions.
1301  * Post-increment counter/array index.
1302  */
1303 void
1304 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1305 {
1306         struct rpcrdma_buffer *buffers = req->rl_buffer;
1307         unsigned long flags;
1308
1309         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1310                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1311         spin_lock_irqsave(&buffers->rb_lock, flags);
1312         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1313                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1314                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1315         }
1316         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1317 }
1318
1319 /*
1320  * Put reply buffers back into pool when not attached to
1321  * request. This happens in error conditions, and when
1322  * aborting unbinds. Pre-decrement counter/array index.
1323  */
1324 void
1325 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1326 {
1327         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1328         unsigned long flags;
1329
1330         rep->rr_func = NULL;
1331         spin_lock_irqsave(&buffers->rb_lock, flags);
1332         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1333         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1334 }
1335
1336 /*
1337  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1338  */
1339
1340 int
1341 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1342                                 struct ib_mr **mrp, struct ib_sge *iov)
1343 {
1344         struct ib_phys_buf ipb;
1345         struct ib_mr *mr;
1346         int rc;
1347
1348         /*
1349          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1350          */
1351         iov->addr = ib_dma_map_single(ia->ri_id->device,
1352                         va, len, DMA_BIDIRECTIONAL);
1353         iov->length = len;
1354
1355         if (ia->ri_have_dma_lkey) {
1356                 *mrp = NULL;
1357                 iov->lkey = ia->ri_dma_lkey;
1358                 return 0;
1359         } else if (ia->ri_bind_mem != NULL) {
1360                 *mrp = NULL;
1361                 iov->lkey = ia->ri_bind_mem->lkey;
1362                 return 0;
1363         }
1364
1365         ipb.addr = iov->addr;
1366         ipb.size = iov->length;
1367         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1368                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1369
1370         dprintk("RPC:       %s: phys convert: 0x%llx "
1371                         "registered 0x%llx length %d\n",
1372                         __func__, (unsigned long long)ipb.addr,
1373                         (unsigned long long)iov->addr, len);
1374
1375         if (IS_ERR(mr)) {
1376                 *mrp = NULL;
1377                 rc = PTR_ERR(mr);
1378                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1379         } else {
1380                 *mrp = mr;
1381                 iov->lkey = mr->lkey;
1382                 rc = 0;
1383         }
1384
1385         return rc;
1386 }
1387
1388 int
1389 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1390                                 struct ib_mr *mr, struct ib_sge *iov)
1391 {
1392         int rc;
1393
1394         ib_dma_unmap_single(ia->ri_id->device,
1395                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1396
1397         if (NULL == mr)
1398                 return 0;
1399
1400         rc = ib_dereg_mr(mr);
1401         if (rc)
1402                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1403         return rc;
1404 }
1405
1406 /*
1407  * Wrappers for chunk registration, shared by read/write chunk code.
1408  */
1409
1410 static void
1411 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1412 {
1413         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1414         seg->mr_dmalen = seg->mr_len;
1415         if (seg->mr_page)
1416                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1417                                 seg->mr_page, offset_in_page(seg->mr_offset),
1418                                 seg->mr_dmalen, seg->mr_dir);
1419         else
1420                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1421                                 seg->mr_offset,
1422                                 seg->mr_dmalen, seg->mr_dir);
1423 }
1424
1425 static void
1426 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1427 {
1428         if (seg->mr_page)
1429                 ib_dma_unmap_page(ia->ri_id->device,
1430                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1431         else
1432                 ib_dma_unmap_single(ia->ri_id->device,
1433                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1434 }
1435
1436 static int
1437 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1438                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1439                         struct rpcrdma_xprt *r_xprt)
1440 {
1441         struct rpcrdma_mr_seg *seg1 = seg;
1442         struct ib_send_wr frmr_wr, *bad_wr;
1443         u8 key;
1444         int len, pageoff;
1445         int i, rc;
1446
1447         pageoff = offset_in_page(seg1->mr_offset);
1448         seg1->mr_offset -= pageoff;     /* start of page */
1449         seg1->mr_len += pageoff;
1450         len = -pageoff;
1451         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1452                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1453         for (i = 0; i < *nsegs;) {
1454                 rpcrdma_map_one(ia, seg, writing);
1455                 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1456                 len += seg->mr_len;
1457                 ++seg;
1458                 ++i;
1459                 /* Check for holes */
1460                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1461                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1462                         break;
1463         }
1464         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1465                 __func__, seg1->mr_chunk.rl_mw, i);
1466
1467         /* Bump the key */
1468         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1469         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1470
1471         /* Prepare FRMR WR */
1472         memset(&frmr_wr, 0, sizeof frmr_wr);
1473         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1474         frmr_wr.send_flags = 0;                 /* unsignaled */
1475         frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1476         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1477         frmr_wr.wr.fast_reg.page_list_len = i;
1478         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1479         frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1480         frmr_wr.wr.fast_reg.access_flags = (writing ?
1481                                 IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1482         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1483         DECR_CQCOUNT(&r_xprt->rx_ep);
1484
1485         rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1486
1487         if (rc) {
1488                 dprintk("RPC:       %s: failed ib_post_send for register,"
1489                         " status %i\n", __func__, rc);
1490                 while (i--)
1491                         rpcrdma_unmap_one(ia, --seg);
1492         } else {
1493                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1494                 seg1->mr_base = seg1->mr_dma + pageoff;
1495                 seg1->mr_nsegs = i;
1496                 seg1->mr_len = len;
1497         }
1498         *nsegs = i;
1499         return rc;
1500 }
1501
1502 static int
1503 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1504                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1505 {
1506         struct rpcrdma_mr_seg *seg1 = seg;
1507         struct ib_send_wr invalidate_wr, *bad_wr;
1508         int rc;
1509
1510         while (seg1->mr_nsegs--)
1511                 rpcrdma_unmap_one(ia, seg++);
1512
1513         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1514         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1515         invalidate_wr.send_flags = 0;                   /* unsignaled */
1516         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1517         DECR_CQCOUNT(&r_xprt->rx_ep);
1518
1519         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1520         if (rc)
1521                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1522                         " status %i\n", __func__, rc);
1523         return rc;
1524 }
1525
1526 static int
1527 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1528                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1529 {
1530         struct rpcrdma_mr_seg *seg1 = seg;
1531         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1532         int len, pageoff, i, rc;
1533
1534         pageoff = offset_in_page(seg1->mr_offset);
1535         seg1->mr_offset -= pageoff;     /* start of page */
1536         seg1->mr_len += pageoff;
1537         len = -pageoff;
1538         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1539                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1540         for (i = 0; i < *nsegs;) {
1541                 rpcrdma_map_one(ia, seg, writing);
1542                 physaddrs[i] = seg->mr_dma;
1543                 len += seg->mr_len;
1544                 ++seg;
1545                 ++i;
1546                 /* Check for holes */
1547                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1548                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1549                         break;
1550         }
1551         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1552                                 physaddrs, i, seg1->mr_dma);
1553         if (rc) {
1554                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1555                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1556                         len, (unsigned long long)seg1->mr_dma,
1557                         pageoff, i, rc);
1558                 while (i--)
1559                         rpcrdma_unmap_one(ia, --seg);
1560         } else {
1561                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1562                 seg1->mr_base = seg1->mr_dma + pageoff;
1563                 seg1->mr_nsegs = i;
1564                 seg1->mr_len = len;
1565         }
1566         *nsegs = i;
1567         return rc;
1568 }
1569
1570 static int
1571 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1572                         struct rpcrdma_ia *ia)
1573 {
1574         struct rpcrdma_mr_seg *seg1 = seg;
1575         LIST_HEAD(l);
1576         int rc;
1577
1578         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1579         rc = ib_unmap_fmr(&l);
1580         while (seg1->mr_nsegs--)
1581                 rpcrdma_unmap_one(ia, seg++);
1582         if (rc)
1583                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1584                         " status %i\n", __func__, rc);
1585         return rc;
1586 }
1587
1588 static int
1589 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1590                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1591                         struct rpcrdma_xprt *r_xprt)
1592 {
1593         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1594                                   IB_ACCESS_REMOTE_READ);
1595         struct ib_mw_bind param;
1596         int rc;
1597
1598         *nsegs = 1;
1599         rpcrdma_map_one(ia, seg, writing);
1600         param.mr = ia->ri_bind_mem;
1601         param.wr_id = 0ULL;     /* no send cookie */
1602         param.addr = seg->mr_dma;
1603         param.length = seg->mr_len;
1604         param.send_flags = 0;
1605         param.mw_access_flags = mem_priv;
1606
1607         DECR_CQCOUNT(&r_xprt->rx_ep);
1608         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1609         if (rc) {
1610                 dprintk("RPC:       %s: failed ib_bind_mw "
1611                         "%u@0x%llx status %i\n",
1612                         __func__, seg->mr_len,
1613                         (unsigned long long)seg->mr_dma, rc);
1614                 rpcrdma_unmap_one(ia, seg);
1615         } else {
1616                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1617                 seg->mr_base = param.addr;
1618                 seg->mr_nsegs = 1;
1619         }
1620         return rc;
1621 }
1622
1623 static int
1624 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1625                         struct rpcrdma_ia *ia,
1626                         struct rpcrdma_xprt *r_xprt, void **r)
1627 {
1628         struct ib_mw_bind param;
1629         LIST_HEAD(l);
1630         int rc;
1631
1632         BUG_ON(seg->mr_nsegs != 1);
1633         param.mr = ia->ri_bind_mem;
1634         param.addr = 0ULL;      /* unbind */
1635         param.length = 0;
1636         param.mw_access_flags = 0;
1637         if (*r) {
1638                 param.wr_id = (u64) (unsigned long) *r;
1639                 param.send_flags = IB_SEND_SIGNALED;
1640                 INIT_CQCOUNT(&r_xprt->rx_ep);
1641         } else {
1642                 param.wr_id = 0ULL;
1643                 param.send_flags = 0;
1644                 DECR_CQCOUNT(&r_xprt->rx_ep);
1645         }
1646         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1647         rpcrdma_unmap_one(ia, seg);
1648         if (rc)
1649                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1650                         " status %i\n", __func__, rc);
1651         else
1652                 *r = NULL;      /* will upcall on completion */
1653         return rc;
1654 }
1655
1656 static int
1657 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1658                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1659 {
1660         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1661                                   IB_ACCESS_REMOTE_READ);
1662         struct rpcrdma_mr_seg *seg1 = seg;
1663         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1664         int len, i, rc = 0;
1665
1666         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1667                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1668         for (len = 0, i = 0; i < *nsegs;) {
1669                 rpcrdma_map_one(ia, seg, writing);
1670                 ipb[i].addr = seg->mr_dma;
1671                 ipb[i].size = seg->mr_len;
1672                 len += seg->mr_len;
1673                 ++seg;
1674                 ++i;
1675                 /* Check for holes */
1676                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1677                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1678                         break;
1679         }
1680         seg1->mr_base = seg1->mr_dma;
1681         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1682                                 ipb, i, mem_priv, &seg1->mr_base);
1683         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1684                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1685                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1686                         "%u@0x%llx (%d)... status %i\n",
1687                         __func__, len,
1688                         (unsigned long long)seg1->mr_dma, i, rc);
1689                 while (i--)
1690                         rpcrdma_unmap_one(ia, --seg);
1691         } else {
1692                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1693                 seg1->mr_nsegs = i;
1694                 seg1->mr_len = len;
1695         }
1696         *nsegs = i;
1697         return rc;
1698 }
1699
1700 static int
1701 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1702                         struct rpcrdma_ia *ia)
1703 {
1704         struct rpcrdma_mr_seg *seg1 = seg;
1705         int rc;
1706
1707         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1708         seg1->mr_chunk.rl_mr = NULL;
1709         while (seg1->mr_nsegs--)
1710                 rpcrdma_unmap_one(ia, seg++);
1711         if (rc)
1712                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1713                         " status %i\n", __func__, rc);
1714         return rc;
1715 }
1716
1717 int
1718 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1719                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1720 {
1721         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1722         int rc = 0;
1723
1724         switch (ia->ri_memreg_strategy) {
1725
1726 #if RPCRDMA_PERSISTENT_REGISTRATION
1727         case RPCRDMA_ALLPHYSICAL:
1728                 rpcrdma_map_one(ia, seg, writing);
1729                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1730                 seg->mr_base = seg->mr_dma;
1731                 seg->mr_nsegs = 1;
1732                 nsegs = 1;
1733                 break;
1734 #endif
1735
1736         /* Registration using frmr registration */
1737         case RPCRDMA_FRMR:
1738                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1739                 break;
1740
1741         /* Registration using fmr memory registration */
1742         case RPCRDMA_MTHCAFMR:
1743                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1744                 break;
1745
1746         /* Registration using memory windows */
1747         case RPCRDMA_MEMWINDOWS_ASYNC:
1748         case RPCRDMA_MEMWINDOWS:
1749                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1750                 break;
1751
1752         /* Default registration each time */
1753         default:
1754                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1755                 break;
1756         }
1757         if (rc)
1758                 return -1;
1759
1760         return nsegs;
1761 }
1762
1763 int
1764 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1765                 struct rpcrdma_xprt *r_xprt, void *r)
1766 {
1767         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1768         int nsegs = seg->mr_nsegs, rc;
1769
1770         switch (ia->ri_memreg_strategy) {
1771
1772 #if RPCRDMA_PERSISTENT_REGISTRATION
1773         case RPCRDMA_ALLPHYSICAL:
1774                 BUG_ON(nsegs != 1);
1775                 rpcrdma_unmap_one(ia, seg);
1776                 rc = 0;
1777                 break;
1778 #endif
1779
1780         case RPCRDMA_FRMR:
1781                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1782                 break;
1783
1784         case RPCRDMA_MTHCAFMR:
1785                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1786                 break;
1787
1788         case RPCRDMA_MEMWINDOWS_ASYNC:
1789         case RPCRDMA_MEMWINDOWS:
1790                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1791                 break;
1792
1793         default:
1794                 rc = rpcrdma_deregister_default_external(seg, ia);
1795                 break;
1796         }
1797         if (r) {
1798                 struct rpcrdma_rep *rep = r;
1799                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1800                 rep->rr_func = NULL;
1801                 func(rep);      /* dereg done, callback now */
1802         }
1803         return nsegs;
1804 }
1805
1806 /*
1807  * Prepost any receive buffer, then post send.
1808  *
1809  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1810  */
1811 int
1812 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1813                 struct rpcrdma_ep *ep,
1814                 struct rpcrdma_req *req)
1815 {
1816         struct ib_send_wr send_wr, *send_wr_fail;
1817         struct rpcrdma_rep *rep = req->rl_reply;
1818         int rc;
1819
1820         if (rep) {
1821                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1822                 if (rc)
1823                         goto out;
1824                 req->rl_reply = NULL;
1825         }
1826
1827         send_wr.next = NULL;
1828         send_wr.wr_id = 0ULL;   /* no send cookie */
1829         send_wr.sg_list = req->rl_send_iov;
1830         send_wr.num_sge = req->rl_niovs;
1831         send_wr.opcode = IB_WR_SEND;
1832         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1833                 ib_dma_sync_single_for_device(ia->ri_id->device,
1834                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1835                         DMA_TO_DEVICE);
1836         ib_dma_sync_single_for_device(ia->ri_id->device,
1837                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1838                 DMA_TO_DEVICE);
1839         ib_dma_sync_single_for_device(ia->ri_id->device,
1840                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1841                 DMA_TO_DEVICE);
1842
1843         if (DECR_CQCOUNT(ep) > 0)
1844                 send_wr.send_flags = 0;
1845         else { /* Provider must take a send completion every now and then */
1846                 INIT_CQCOUNT(ep);
1847                 send_wr.send_flags = IB_SEND_SIGNALED;
1848         }
1849
1850         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1851         if (rc)
1852                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1853                         rc);
1854 out:
1855         return rc;
1856 }
1857
1858 /*
1859  * (Re)post a receive buffer.
1860  */
1861 int
1862 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1863                      struct rpcrdma_ep *ep,
1864                      struct rpcrdma_rep *rep)
1865 {
1866         struct ib_recv_wr recv_wr, *recv_wr_fail;
1867         int rc;
1868
1869         recv_wr.next = NULL;
1870         recv_wr.wr_id = (u64) (unsigned long) rep;
1871         recv_wr.sg_list = &rep->rr_iov;
1872         recv_wr.num_sge = 1;
1873
1874         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1875                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1876
1877         DECR_CQCOUNT(ep);
1878         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1879
1880         if (rc)
1881                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1882                         rc);
1883         return rc;
1884 }