RPC/RDMA: avoid an oops due to disconnect racing with async upcalls.
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280         struct ib_qp_attr attr;
281         struct ib_qp_init_attr iattr;
282         int connstate = 0;
283
284         switch (event->event) {
285         case RDMA_CM_EVENT_ADDR_RESOLVED:
286         case RDMA_CM_EVENT_ROUTE_RESOLVED:
287                 complete(&ia->ri_done);
288                 break;
289         case RDMA_CM_EVENT_ADDR_ERROR:
290                 ia->ri_async_rc = -EHOSTUNREACH;
291                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292                         __func__, ep);
293                 complete(&ia->ri_done);
294                 break;
295         case RDMA_CM_EVENT_ROUTE_ERROR:
296                 ia->ri_async_rc = -ENETUNREACH;
297                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298                         __func__, ep);
299                 complete(&ia->ri_done);
300                 break;
301         case RDMA_CM_EVENT_ESTABLISHED:
302                 connstate = 1;
303                 ib_query_qp(ia->ri_id->qp, &attr,
304                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305                         &iattr);
306                 dprintk("RPC:       %s: %d responder resources"
307                         " (%d initiator)\n",
308                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309                 goto connected;
310         case RDMA_CM_EVENT_CONNECT_ERROR:
311                 connstate = -ENOTCONN;
312                 goto connected;
313         case RDMA_CM_EVENT_UNREACHABLE:
314                 connstate = -ENETDOWN;
315                 goto connected;
316         case RDMA_CM_EVENT_REJECTED:
317                 connstate = -ECONNREFUSED;
318                 goto connected;
319         case RDMA_CM_EVENT_DISCONNECTED:
320                 connstate = -ECONNABORTED;
321                 goto connected;
322         case RDMA_CM_EVENT_DEVICE_REMOVAL:
323                 connstate = -ENODEV;
324 connected:
325                 dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326                         " (ep 0x%p event 0x%x)\n",
327                         __func__,
328                         (event->event <= 11) ? conn[event->event] :
329                                                 "unknown connection error",
330                         NIPQUAD(addr->sin_addr.s_addr),
331                         ntohs(addr->sin_port),
332                         ep, event->event);
333                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334                 dprintk("RPC:       %s: %sconnected\n",
335                                         __func__, connstate > 0 ? "" : "dis");
336                 ep->rep_connected = connstate;
337                 ep->rep_func(ep);
338                 wake_up_all(&ep->rep_connect_wait);
339                 break;
340         default:
341                 ia->ri_async_rc = -EINVAL;
342                 dprintk("RPC:       %s: unexpected CM event %X\n",
343                         __func__, event->event);
344                 complete(&ia->ri_done);
345                 break;
346         }
347
348         return 0;
349 }
350
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353                         struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355         struct rdma_cm_id *id;
356         int rc;
357
358         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359         if (IS_ERR(id)) {
360                 rc = PTR_ERR(id);
361                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362                         __func__, rc);
363                 return id;
364         }
365
366         ia->ri_async_rc = 0;
367         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368         if (rc) {
369                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370                         __func__, rc);
371                 goto out;
372         }
373         wait_for_completion(&ia->ri_done);
374         rc = ia->ri_async_rc;
375         if (rc)
376                 goto out;
377
378         ia->ri_async_rc = 0;
379         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380         if (rc) {
381                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382                         __func__, rc);
383                 goto out;
384         }
385         wait_for_completion(&ia->ri_done);
386         rc = ia->ri_async_rc;
387         if (rc)
388                 goto out;
389
390         return id;
391
392 out:
393         rdma_destroy_id(id);
394         return ERR_PTR(rc);
395 }
396
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403         struct ib_wc wc;
404         int count = 0;
405
406         while (1 == ib_poll_cq(cq, 1, &wc))
407                 ++count;
408
409         if (count)
410                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411                         __func__, count, wc.opcode);
412 }
413
414 /*
415  * Exported functions.
416  */
417
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426         int rc, mem_priv;
427         struct ib_device_attr devattr;
428         struct rpcrdma_ia *ia = &xprt->rx_ia;
429
430         init_completion(&ia->ri_done);
431
432         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
433         if (IS_ERR(ia->ri_id)) {
434                 rc = PTR_ERR(ia->ri_id);
435                 goto out1;
436         }
437
438         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
439         if (IS_ERR(ia->ri_pd)) {
440                 rc = PTR_ERR(ia->ri_pd);
441                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
442                         __func__, rc);
443                 goto out2;
444         }
445
446         /*
447          * Query the device to determine if the requested memory
448          * registration strategy is supported. If it isn't, set the
449          * strategy to a globally supported model.
450          */
451         rc = ib_query_device(ia->ri_id->device, &devattr);
452         if (rc) {
453                 dprintk("RPC:       %s: ib_query_device failed %d\n",
454                         __func__, rc);
455                 goto out2;
456         }
457
458         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
459                 ia->ri_have_dma_lkey = 1;
460                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
461         }
462
463         switch (memreg) {
464         case RPCRDMA_MEMWINDOWS:
465         case RPCRDMA_MEMWINDOWS_ASYNC:
466                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
467                         dprintk("RPC:       %s: MEMWINDOWS registration "
468                                 "specified but not supported by adapter, "
469                                 "using slower RPCRDMA_REGISTER\n",
470                                 __func__);
471                         memreg = RPCRDMA_REGISTER;
472                 }
473                 break;
474         case RPCRDMA_MTHCAFMR:
475                 if (!ia->ri_id->device->alloc_fmr) {
476 #if RPCRDMA_PERSISTENT_REGISTRATION
477                         dprintk("RPC:       %s: MTHCAFMR registration "
478                                 "specified but not supported by adapter, "
479                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
480                                 __func__);
481                         memreg = RPCRDMA_ALLPHYSICAL;
482 #else
483                         dprintk("RPC:       %s: MTHCAFMR registration "
484                                 "specified but not supported by adapter, "
485                                 "using slower RPCRDMA_REGISTER\n",
486                                 __func__);
487                         memreg = RPCRDMA_REGISTER;
488 #endif
489                 }
490                 break;
491         case RPCRDMA_FRMR:
492                 /* Requires both frmr reg and local dma lkey */
493                 if ((devattr.device_cap_flags &
494                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
495                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
496 #if RPCRDMA_PERSISTENT_REGISTRATION
497                         dprintk("RPC:       %s: FRMR registration "
498                                 "specified but not supported by adapter, "
499                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
500                                 __func__);
501                         memreg = RPCRDMA_ALLPHYSICAL;
502 #else
503                         dprintk("RPC:       %s: FRMR registration "
504                                 "specified but not supported by adapter, "
505                                 "using slower RPCRDMA_REGISTER\n",
506                                 __func__);
507                         memreg = RPCRDMA_REGISTER;
508 #endif
509                 }
510                 break;
511         }
512
513         /*
514          * Optionally obtain an underlying physical identity mapping in
515          * order to do a memory window-based bind. This base registration
516          * is protected from remote access - that is enabled only by binding
517          * for the specific bytes targeted during each RPC operation, and
518          * revoked after the corresponding completion similar to a storage
519          * adapter.
520          */
521         switch (memreg) {
522         case RPCRDMA_BOUNCEBUFFERS:
523         case RPCRDMA_REGISTER:
524         case RPCRDMA_FRMR:
525                 break;
526 #if RPCRDMA_PERSISTENT_REGISTRATION
527         case RPCRDMA_ALLPHYSICAL:
528                 mem_priv = IB_ACCESS_LOCAL_WRITE |
529                                 IB_ACCESS_REMOTE_WRITE |
530                                 IB_ACCESS_REMOTE_READ;
531                 goto register_setup;
532 #endif
533         case RPCRDMA_MEMWINDOWS_ASYNC:
534         case RPCRDMA_MEMWINDOWS:
535                 mem_priv = IB_ACCESS_LOCAL_WRITE |
536                                 IB_ACCESS_MW_BIND;
537                 goto register_setup;
538         case RPCRDMA_MTHCAFMR:
539                 if (ia->ri_have_dma_lkey)
540                         break;
541                 mem_priv = IB_ACCESS_LOCAL_WRITE;
542         register_setup:
543                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
544                 if (IS_ERR(ia->ri_bind_mem)) {
545                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
546                                 "phys register failed with %lX\n\t"
547                                 "Will continue with degraded performance\n",
548                                 __func__, PTR_ERR(ia->ri_bind_mem));
549                         memreg = RPCRDMA_REGISTER;
550                         ia->ri_bind_mem = NULL;
551                 }
552                 break;
553         default:
554                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
555                                 __func__, memreg);
556                 rc = -EINVAL;
557                 goto out2;
558         }
559         dprintk("RPC:       %s: memory registration strategy is %d\n",
560                 __func__, memreg);
561
562         /* Else will do memory reg/dereg for each chunk */
563         ia->ri_memreg_strategy = memreg;
564
565         return 0;
566 out2:
567         rdma_destroy_id(ia->ri_id);
568         ia->ri_id = NULL;
569 out1:
570         return rc;
571 }
572
573 /*
574  * Clean up/close an IA.
575  *   o if event handles and PD have been initialized, free them.
576  *   o close the IA
577  */
578 void
579 rpcrdma_ia_close(struct rpcrdma_ia *ia)
580 {
581         int rc;
582
583         dprintk("RPC:       %s: entering\n", __func__);
584         if (ia->ri_bind_mem != NULL) {
585                 rc = ib_dereg_mr(ia->ri_bind_mem);
586                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
587                         __func__, rc);
588         }
589         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
590                 if (ia->ri_id->qp)
591                         rdma_destroy_qp(ia->ri_id);
592                 rdma_destroy_id(ia->ri_id);
593                 ia->ri_id = NULL;
594         }
595         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
596                 rc = ib_dealloc_pd(ia->ri_pd);
597                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
598                         __func__, rc);
599         }
600 }
601
602 /*
603  * Create unconnected endpoint.
604  */
605 int
606 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
607                                 struct rpcrdma_create_data_internal *cdata)
608 {
609         struct ib_device_attr devattr;
610         int rc, err;
611
612         rc = ib_query_device(ia->ri_id->device, &devattr);
613         if (rc) {
614                 dprintk("RPC:       %s: ib_query_device failed %d\n",
615                         __func__, rc);
616                 return rc;
617         }
618
619         /* check provider's send/recv wr limits */
620         if (cdata->max_requests > devattr.max_qp_wr)
621                 cdata->max_requests = devattr.max_qp_wr;
622
623         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
624         ep->rep_attr.qp_context = ep;
625         /* send_cq and recv_cq initialized below */
626         ep->rep_attr.srq = NULL;
627         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
628         switch (ia->ri_memreg_strategy) {
629         case RPCRDMA_FRMR:
630                 /* Add room for frmr register and invalidate WRs */
631                 ep->rep_attr.cap.max_send_wr *= 3;
632                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
633                         return -EINVAL;
634                 break;
635         case RPCRDMA_MEMWINDOWS_ASYNC:
636         case RPCRDMA_MEMWINDOWS:
637                 /* Add room for mw_binds+unbinds - overkill! */
638                 ep->rep_attr.cap.max_send_wr++;
639                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
640                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
641                         return -EINVAL;
642                 break;
643         default:
644                 break;
645         }
646         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
647         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
648         ep->rep_attr.cap.max_recv_sge = 1;
649         ep->rep_attr.cap.max_inline_data = 0;
650         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
651         ep->rep_attr.qp_type = IB_QPT_RC;
652         ep->rep_attr.port_num = ~0;
653
654         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
655                 "iovs: send %d recv %d\n",
656                 __func__,
657                 ep->rep_attr.cap.max_send_wr,
658                 ep->rep_attr.cap.max_recv_wr,
659                 ep->rep_attr.cap.max_send_sge,
660                 ep->rep_attr.cap.max_recv_sge);
661
662         /* set trigger for requesting send completion */
663         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
664         switch (ia->ri_memreg_strategy) {
665         case RPCRDMA_MEMWINDOWS_ASYNC:
666         case RPCRDMA_MEMWINDOWS:
667                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
668                 break;
669         default:
670                 break;
671         }
672         if (ep->rep_cqinit <= 2)
673                 ep->rep_cqinit = 0;
674         INIT_CQCOUNT(ep);
675         ep->rep_ia = ia;
676         init_waitqueue_head(&ep->rep_connect_wait);
677
678         /*
679          * Create a single cq for receive dto and mw_bind (only ever
680          * care about unbind, really). Send completions are suppressed.
681          * Use single threaded tasklet upcalls to maintain ordering.
682          */
683         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
684                                   rpcrdma_cq_async_error_upcall, NULL,
685                                   ep->rep_attr.cap.max_recv_wr +
686                                   ep->rep_attr.cap.max_send_wr + 1, 0);
687         if (IS_ERR(ep->rep_cq)) {
688                 rc = PTR_ERR(ep->rep_cq);
689                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
690                         __func__, rc);
691                 goto out1;
692         }
693
694         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
695         if (rc) {
696                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
697                         __func__, rc);
698                 goto out2;
699         }
700
701         ep->rep_attr.send_cq = ep->rep_cq;
702         ep->rep_attr.recv_cq = ep->rep_cq;
703
704         /* Initialize cma parameters */
705
706         /* RPC/RDMA does not use private data */
707         ep->rep_remote_cma.private_data = NULL;
708         ep->rep_remote_cma.private_data_len = 0;
709
710         /* Client offers RDMA Read but does not initiate */
711         ep->rep_remote_cma.initiator_depth = 0;
712         if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
713                 ep->rep_remote_cma.responder_resources = 0;
714         else if (devattr.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
715                 ep->rep_remote_cma.responder_resources = 32;
716         else
717                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
718
719         ep->rep_remote_cma.retry_count = 7;
720         ep->rep_remote_cma.flow_control = 0;
721         ep->rep_remote_cma.rnr_retry_count = 0;
722
723         return 0;
724
725 out2:
726         err = ib_destroy_cq(ep->rep_cq);
727         if (err)
728                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
729                         __func__, err);
730 out1:
731         return rc;
732 }
733
734 /*
735  * rpcrdma_ep_destroy
736  *
737  * Disconnect and destroy endpoint. After this, the only
738  * valid operations on the ep are to free it (if dynamically
739  * allocated) or re-create it.
740  *
741  * The caller's error handling must be sure to not leak the endpoint
742  * if this function fails.
743  */
744 int
745 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
746 {
747         int rc;
748
749         dprintk("RPC:       %s: entering, connected is %d\n",
750                 __func__, ep->rep_connected);
751
752         if (ia->ri_id->qp) {
753                 rc = rpcrdma_ep_disconnect(ep, ia);
754                 if (rc)
755                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
756                                 " returned %i\n", __func__, rc);
757                 rdma_destroy_qp(ia->ri_id);
758                 ia->ri_id->qp = NULL;
759         }
760
761         /* padding - could be done in rpcrdma_buffer_destroy... */
762         if (ep->rep_pad_mr) {
763                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
764                 ep->rep_pad_mr = NULL;
765         }
766
767         rpcrdma_clean_cq(ep->rep_cq);
768         rc = ib_destroy_cq(ep->rep_cq);
769         if (rc)
770                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
771                         __func__, rc);
772
773         return rc;
774 }
775
776 /*
777  * Connect unconnected endpoint.
778  */
779 int
780 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
781 {
782         struct rdma_cm_id *id;
783         int rc = 0;
784         int retry_count = 0;
785         int reconnect = (ep->rep_connected != 0);
786
787         if (reconnect) {
788                 struct rpcrdma_xprt *xprt;
789 retry:
790                 rc = rpcrdma_ep_disconnect(ep, ia);
791                 if (rc && rc != -ENOTCONN)
792                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
793                                 " status %i\n", __func__, rc);
794                 rpcrdma_clean_cq(ep->rep_cq);
795
796                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
797                 id = rpcrdma_create_id(xprt, ia,
798                                 (struct sockaddr *)&xprt->rx_data.addr);
799                 if (IS_ERR(id)) {
800                         rc = PTR_ERR(id);
801                         goto out;
802                 }
803                 /* TEMP TEMP TEMP - fail if new device:
804                  * Deregister/remarshal *all* requests!
805                  * Close and recreate adapter, pd, etc!
806                  * Re-determine all attributes still sane!
807                  * More stuff I haven't thought of!
808                  * Rrrgh!
809                  */
810                 if (ia->ri_id->device != id->device) {
811                         printk("RPC:       %s: can't reconnect on "
812                                 "different device!\n", __func__);
813                         rdma_destroy_id(id);
814                         rc = -ENETDOWN;
815                         goto out;
816                 }
817                 /* END TEMP */
818                 rdma_destroy_id(ia->ri_id);
819                 ia->ri_id = id;
820         }
821
822         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
823         if (rc) {
824                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
825                         __func__, rc);
826                 goto out;
827         }
828
829 /* XXX Tavor device performs badly with 2K MTU! */
830 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
831         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
832         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
833             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
834              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
835                 struct ib_qp_attr attr = {
836                         .path_mtu = IB_MTU_1024
837                 };
838                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
839         }
840 }
841
842         ep->rep_connected = 0;
843
844         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
845         if (rc) {
846                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
847                                 __func__, rc);
848                 goto out;
849         }
850
851         if (reconnect)
852                 return 0;
853
854         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
855
856         /*
857          * Check state. A non-peer reject indicates no listener
858          * (ECONNREFUSED), which may be a transient state. All
859          * others indicate a transport condition which has already
860          * undergone a best-effort.
861          */
862         if (ep->rep_connected == -ECONNREFUSED
863             && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
864                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
865                 goto retry;
866         }
867         if (ep->rep_connected <= 0) {
868                 /* Sometimes, the only way to reliably connect to remote
869                  * CMs is to use same nonzero values for ORD and IRD. */
870                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
871                     (ep->rep_remote_cma.responder_resources == 0 ||
872                      ep->rep_remote_cma.initiator_depth !=
873                                 ep->rep_remote_cma.responder_resources)) {
874                         if (ep->rep_remote_cma.responder_resources == 0)
875                                 ep->rep_remote_cma.responder_resources = 1;
876                         ep->rep_remote_cma.initiator_depth =
877                                 ep->rep_remote_cma.responder_resources;
878                         goto retry;
879                 }
880                 rc = ep->rep_connected;
881         } else {
882                 dprintk("RPC:       %s: connected\n", __func__);
883         }
884
885 out:
886         if (rc)
887                 ep->rep_connected = rc;
888         return rc;
889 }
890
891 /*
892  * rpcrdma_ep_disconnect
893  *
894  * This is separate from destroy to facilitate the ability
895  * to reconnect without recreating the endpoint.
896  *
897  * This call is not reentrant, and must not be made in parallel
898  * on the same endpoint.
899  */
900 int
901 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
902 {
903         int rc;
904
905         rpcrdma_clean_cq(ep->rep_cq);
906         rc = rdma_disconnect(ia->ri_id);
907         if (!rc) {
908                 /* returns without wait if not connected */
909                 wait_event_interruptible(ep->rep_connect_wait,
910                                                         ep->rep_connected != 1);
911                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
912                         (ep->rep_connected == 1) ? "still " : "dis");
913         } else {
914                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
915                 ep->rep_connected = rc;
916         }
917         return rc;
918 }
919
920 /*
921  * Initialize buffer memory
922  */
923 int
924 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
925         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
926 {
927         char *p;
928         size_t len;
929         int i, rc;
930         struct rpcrdma_mw *r;
931
932         buf->rb_max_requests = cdata->max_requests;
933         spin_lock_init(&buf->rb_lock);
934         atomic_set(&buf->rb_credits, 1);
935
936         /* Need to allocate:
937          *   1.  arrays for send and recv pointers
938          *   2.  arrays of struct rpcrdma_req to fill in pointers
939          *   3.  array of struct rpcrdma_rep for replies
940          *   4.  padding, if any
941          *   5.  mw's, fmr's or frmr's, if any
942          * Send/recv buffers in req/rep need to be registered
943          */
944
945         len = buf->rb_max_requests *
946                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
947         len += cdata->padding;
948         switch (ia->ri_memreg_strategy) {
949         case RPCRDMA_FRMR:
950                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
951                                 sizeof(struct rpcrdma_mw);
952                 break;
953         case RPCRDMA_MTHCAFMR:
954                 /* TBD we are perhaps overallocating here */
955                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
956                                 sizeof(struct rpcrdma_mw);
957                 break;
958         case RPCRDMA_MEMWINDOWS_ASYNC:
959         case RPCRDMA_MEMWINDOWS:
960                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
961                                 sizeof(struct rpcrdma_mw);
962                 break;
963         default:
964                 break;
965         }
966
967         /* allocate 1, 4 and 5 in one shot */
968         p = kzalloc(len, GFP_KERNEL);
969         if (p == NULL) {
970                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
971                         __func__, len);
972                 rc = -ENOMEM;
973                 goto out;
974         }
975         buf->rb_pool = p;       /* for freeing it later */
976
977         buf->rb_send_bufs = (struct rpcrdma_req **) p;
978         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
979         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
980         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
981
982         /*
983          * Register the zeroed pad buffer, if any.
984          */
985         if (cdata->padding) {
986                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
987                                             &ep->rep_pad_mr, &ep->rep_pad);
988                 if (rc)
989                         goto out;
990         }
991         p += cdata->padding;
992
993         /*
994          * Allocate the fmr's, or mw's for mw_bind chunk registration.
995          * We "cycle" the mw's in order to minimize rkey reuse,
996          * and also reduce unbind-to-bind collision.
997          */
998         INIT_LIST_HEAD(&buf->rb_mws);
999         r = (struct rpcrdma_mw *)p;
1000         switch (ia->ri_memreg_strategy) {
1001         case RPCRDMA_FRMR:
1002                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1003                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1004                                                          RPCRDMA_MAX_SEGS);
1005                         if (IS_ERR(r->r.frmr.fr_mr)) {
1006                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1007                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1008                                         " failed %i\n", __func__, rc);
1009                                 goto out;
1010                         }
1011                         r->r.frmr.fr_pgl =
1012                                 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1013                                                             RPCRDMA_MAX_SEGS);
1014                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1015                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1016                                 dprintk("RPC:       %s: "
1017                                         "ib_alloc_fast_reg_page_list "
1018                                         "failed %i\n", __func__, rc);
1019                                 goto out;
1020                         }
1021                         list_add(&r->mw_list, &buf->rb_mws);
1022                         ++r;
1023                 }
1024                 break;
1025         case RPCRDMA_MTHCAFMR:
1026                 /* TBD we are perhaps overallocating here */
1027                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1028                         static struct ib_fmr_attr fa =
1029                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1030                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1031                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1032                                 &fa);
1033                         if (IS_ERR(r->r.fmr)) {
1034                                 rc = PTR_ERR(r->r.fmr);
1035                                 dprintk("RPC:       %s: ib_alloc_fmr"
1036                                         " failed %i\n", __func__, rc);
1037                                 goto out;
1038                         }
1039                         list_add(&r->mw_list, &buf->rb_mws);
1040                         ++r;
1041                 }
1042                 break;
1043         case RPCRDMA_MEMWINDOWS_ASYNC:
1044         case RPCRDMA_MEMWINDOWS:
1045                 /* Allocate one extra request's worth, for full cycling */
1046                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1047                         r->r.mw = ib_alloc_mw(ia->ri_pd);
1048                         if (IS_ERR(r->r.mw)) {
1049                                 rc = PTR_ERR(r->r.mw);
1050                                 dprintk("RPC:       %s: ib_alloc_mw"
1051                                         " failed %i\n", __func__, rc);
1052                                 goto out;
1053                         }
1054                         list_add(&r->mw_list, &buf->rb_mws);
1055                         ++r;
1056                 }
1057                 break;
1058         default:
1059                 break;
1060         }
1061
1062         /*
1063          * Allocate/init the request/reply buffers. Doing this
1064          * using kmalloc for now -- one for each buf.
1065          */
1066         for (i = 0; i < buf->rb_max_requests; i++) {
1067                 struct rpcrdma_req *req;
1068                 struct rpcrdma_rep *rep;
1069
1070                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1071                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1072                 /* Typical ~2400b, so rounding up saves work later */
1073                 if (len < 4096)
1074                         len = 4096;
1075                 req = kmalloc(len, GFP_KERNEL);
1076                 if (req == NULL) {
1077                         dprintk("RPC:       %s: request buffer %d alloc"
1078                                 " failed\n", __func__, i);
1079                         rc = -ENOMEM;
1080                         goto out;
1081                 }
1082                 memset(req, 0, sizeof(struct rpcrdma_req));
1083                 buf->rb_send_bufs[i] = req;
1084                 buf->rb_send_bufs[i]->rl_buffer = buf;
1085
1086                 rc = rpcrdma_register_internal(ia, req->rl_base,
1087                                 len - offsetof(struct rpcrdma_req, rl_base),
1088                                 &buf->rb_send_bufs[i]->rl_handle,
1089                                 &buf->rb_send_bufs[i]->rl_iov);
1090                 if (rc)
1091                         goto out;
1092
1093                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1094
1095                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1096                 rep = kmalloc(len, GFP_KERNEL);
1097                 if (rep == NULL) {
1098                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1099                                 __func__, i);
1100                         rc = -ENOMEM;
1101                         goto out;
1102                 }
1103                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1104                 buf->rb_recv_bufs[i] = rep;
1105                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1106                 init_waitqueue_head(&rep->rr_unbind);
1107
1108                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1109                                 len - offsetof(struct rpcrdma_rep, rr_base),
1110                                 &buf->rb_recv_bufs[i]->rr_handle,
1111                                 &buf->rb_recv_bufs[i]->rr_iov);
1112                 if (rc)
1113                         goto out;
1114
1115         }
1116         dprintk("RPC:       %s: max_requests %d\n",
1117                 __func__, buf->rb_max_requests);
1118         /* done */
1119         return 0;
1120 out:
1121         rpcrdma_buffer_destroy(buf);
1122         return rc;
1123 }
1124
1125 /*
1126  * Unregister and destroy buffer memory. Need to deal with
1127  * partial initialization, so it's callable from failed create.
1128  * Must be called before destroying endpoint, as registrations
1129  * reference it.
1130  */
1131 void
1132 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1133 {
1134         int rc, i;
1135         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1136         struct rpcrdma_mw *r;
1137
1138         /* clean up in reverse order from create
1139          *   1.  recv mr memory (mr free, then kfree)
1140          *   1a. bind mw memory
1141          *   2.  send mr memory (mr free, then kfree)
1142          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1143          *   4.  arrays
1144          */
1145         dprintk("RPC:       %s: entering\n", __func__);
1146
1147         for (i = 0; i < buf->rb_max_requests; i++) {
1148                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1149                         rpcrdma_deregister_internal(ia,
1150                                         buf->rb_recv_bufs[i]->rr_handle,
1151                                         &buf->rb_recv_bufs[i]->rr_iov);
1152                         kfree(buf->rb_recv_bufs[i]);
1153                 }
1154                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1155                         while (!list_empty(&buf->rb_mws)) {
1156                                 r = list_entry(buf->rb_mws.next,
1157                                         struct rpcrdma_mw, mw_list);
1158                                 list_del(&r->mw_list);
1159                                 switch (ia->ri_memreg_strategy) {
1160                                 case RPCRDMA_FRMR:
1161                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1162                                         if (rc)
1163                                                 dprintk("RPC:       %s:"
1164                                                         " ib_dereg_mr"
1165                                                         " failed %i\n",
1166                                                         __func__, rc);
1167                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1168                                         break;
1169                                 case RPCRDMA_MTHCAFMR:
1170                                         rc = ib_dealloc_fmr(r->r.fmr);
1171                                         if (rc)
1172                                                 dprintk("RPC:       %s:"
1173                                                         " ib_dealloc_fmr"
1174                                                         " failed %i\n",
1175                                                         __func__, rc);
1176                                         break;
1177                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1178                                 case RPCRDMA_MEMWINDOWS:
1179                                         rc = ib_dealloc_mw(r->r.mw);
1180                                         if (rc)
1181                                                 dprintk("RPC:       %s:"
1182                                                         " ib_dealloc_mw"
1183                                                         " failed %i\n",
1184                                                         __func__, rc);
1185                                         break;
1186                                 default:
1187                                         break;
1188                                 }
1189                         }
1190                         rpcrdma_deregister_internal(ia,
1191                                         buf->rb_send_bufs[i]->rl_handle,
1192                                         &buf->rb_send_bufs[i]->rl_iov);
1193                         kfree(buf->rb_send_bufs[i]);
1194                 }
1195         }
1196
1197         kfree(buf->rb_pool);
1198 }
1199
1200 /*
1201  * Get a set of request/reply buffers.
1202  *
1203  * Reply buffer (if needed) is attached to send buffer upon return.
1204  * Rule:
1205  *    rb_send_index and rb_recv_index MUST always be pointing to the
1206  *    *next* available buffer (non-NULL). They are incremented after
1207  *    removing buffers, and decremented *before* returning them.
1208  */
1209 struct rpcrdma_req *
1210 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1211 {
1212         struct rpcrdma_req *req;
1213         unsigned long flags;
1214         int i;
1215         struct rpcrdma_mw *r;
1216
1217         spin_lock_irqsave(&buffers->rb_lock, flags);
1218         if (buffers->rb_send_index == buffers->rb_max_requests) {
1219                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1220                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1221                 return ((struct rpcrdma_req *)NULL);
1222         }
1223
1224         req = buffers->rb_send_bufs[buffers->rb_send_index];
1225         if (buffers->rb_send_index < buffers->rb_recv_index) {
1226                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1227                         __func__,
1228                         buffers->rb_recv_index - buffers->rb_send_index);
1229                 req->rl_reply = NULL;
1230         } else {
1231                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1232                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1233         }
1234         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1235         if (!list_empty(&buffers->rb_mws)) {
1236                 i = RPCRDMA_MAX_SEGS - 1;
1237                 do {
1238                         r = list_entry(buffers->rb_mws.next,
1239                                         struct rpcrdma_mw, mw_list);
1240                         list_del(&r->mw_list);
1241                         req->rl_segments[i].mr_chunk.rl_mw = r;
1242                 } while (--i >= 0);
1243         }
1244         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1245         return req;
1246 }
1247
1248 /*
1249  * Put request/reply buffers back into pool.
1250  * Pre-decrement counter/array index.
1251  */
1252 void
1253 rpcrdma_buffer_put(struct rpcrdma_req *req)
1254 {
1255         struct rpcrdma_buffer *buffers = req->rl_buffer;
1256         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1257         int i;
1258         unsigned long flags;
1259
1260         BUG_ON(req->rl_nchunks != 0);
1261         spin_lock_irqsave(&buffers->rb_lock, flags);
1262         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1263         req->rl_niovs = 0;
1264         if (req->rl_reply) {
1265                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1266                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1267                 req->rl_reply->rr_func = NULL;
1268                 req->rl_reply = NULL;
1269         }
1270         switch (ia->ri_memreg_strategy) {
1271         case RPCRDMA_FRMR:
1272         case RPCRDMA_MTHCAFMR:
1273         case RPCRDMA_MEMWINDOWS_ASYNC:
1274         case RPCRDMA_MEMWINDOWS:
1275                 /*
1276                  * Cycle mw's back in reverse order, and "spin" them.
1277                  * This delays and scrambles reuse as much as possible.
1278                  */
1279                 i = 1;
1280                 do {
1281                         struct rpcrdma_mw **mw;
1282                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1283                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1284                         *mw = NULL;
1285                 } while (++i < RPCRDMA_MAX_SEGS);
1286                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1287                                         &buffers->rb_mws);
1288                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1289                 break;
1290         default:
1291                 break;
1292         }
1293         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1294 }
1295
1296 /*
1297  * Recover reply buffers from pool.
1298  * This happens when recovering from error conditions.
1299  * Post-increment counter/array index.
1300  */
1301 void
1302 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1303 {
1304         struct rpcrdma_buffer *buffers = req->rl_buffer;
1305         unsigned long flags;
1306
1307         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1308                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1309         spin_lock_irqsave(&buffers->rb_lock, flags);
1310         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1311                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1312                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1313         }
1314         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1315 }
1316
1317 /*
1318  * Put reply buffers back into pool when not attached to
1319  * request. This happens in error conditions, and when
1320  * aborting unbinds. Pre-decrement counter/array index.
1321  */
1322 void
1323 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1324 {
1325         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1326         unsigned long flags;
1327
1328         rep->rr_func = NULL;
1329         spin_lock_irqsave(&buffers->rb_lock, flags);
1330         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1331         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1332 }
1333
1334 /*
1335  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1336  */
1337
1338 int
1339 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1340                                 struct ib_mr **mrp, struct ib_sge *iov)
1341 {
1342         struct ib_phys_buf ipb;
1343         struct ib_mr *mr;
1344         int rc;
1345
1346         /*
1347          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1348          */
1349         iov->addr = ib_dma_map_single(ia->ri_id->device,
1350                         va, len, DMA_BIDIRECTIONAL);
1351         iov->length = len;
1352
1353         if (ia->ri_have_dma_lkey) {
1354                 *mrp = NULL;
1355                 iov->lkey = ia->ri_dma_lkey;
1356                 return 0;
1357         } else if (ia->ri_bind_mem != NULL) {
1358                 *mrp = NULL;
1359                 iov->lkey = ia->ri_bind_mem->lkey;
1360                 return 0;
1361         }
1362
1363         ipb.addr = iov->addr;
1364         ipb.size = iov->length;
1365         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1366                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1367
1368         dprintk("RPC:       %s: phys convert: 0x%llx "
1369                         "registered 0x%llx length %d\n",
1370                         __func__, (unsigned long long)ipb.addr,
1371                         (unsigned long long)iov->addr, len);
1372
1373         if (IS_ERR(mr)) {
1374                 *mrp = NULL;
1375                 rc = PTR_ERR(mr);
1376                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1377         } else {
1378                 *mrp = mr;
1379                 iov->lkey = mr->lkey;
1380                 rc = 0;
1381         }
1382
1383         return rc;
1384 }
1385
1386 int
1387 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1388                                 struct ib_mr *mr, struct ib_sge *iov)
1389 {
1390         int rc;
1391
1392         ib_dma_unmap_single(ia->ri_id->device,
1393                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1394
1395         if (NULL == mr)
1396                 return 0;
1397
1398         rc = ib_dereg_mr(mr);
1399         if (rc)
1400                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1401         return rc;
1402 }
1403
1404 /*
1405  * Wrappers for chunk registration, shared by read/write chunk code.
1406  */
1407
1408 static void
1409 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1410 {
1411         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1412         seg->mr_dmalen = seg->mr_len;
1413         if (seg->mr_page)
1414                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1415                                 seg->mr_page, offset_in_page(seg->mr_offset),
1416                                 seg->mr_dmalen, seg->mr_dir);
1417         else
1418                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1419                                 seg->mr_offset,
1420                                 seg->mr_dmalen, seg->mr_dir);
1421 }
1422
1423 static void
1424 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1425 {
1426         if (seg->mr_page)
1427                 ib_dma_unmap_page(ia->ri_id->device,
1428                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1429         else
1430                 ib_dma_unmap_single(ia->ri_id->device,
1431                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1432 }
1433
1434 static int
1435 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1436                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1437                         struct rpcrdma_xprt *r_xprt)
1438 {
1439         struct rpcrdma_mr_seg *seg1 = seg;
1440         struct ib_send_wr frmr_wr, *bad_wr;
1441         u8 key;
1442         int len, pageoff;
1443         int i, rc;
1444
1445         pageoff = offset_in_page(seg1->mr_offset);
1446         seg1->mr_offset -= pageoff;     /* start of page */
1447         seg1->mr_len += pageoff;
1448         len = -pageoff;
1449         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1450                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1451         for (i = 0; i < *nsegs;) {
1452                 rpcrdma_map_one(ia, seg, writing);
1453                 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1454                 len += seg->mr_len;
1455                 ++seg;
1456                 ++i;
1457                 /* Check for holes */
1458                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1459                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1460                         break;
1461         }
1462         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1463                 __func__, seg1->mr_chunk.rl_mw, i);
1464
1465         /* Bump the key */
1466         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1467         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1468
1469         /* Prepare FRMR WR */
1470         memset(&frmr_wr, 0, sizeof frmr_wr);
1471         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1472         frmr_wr.send_flags = 0;                 /* unsignaled */
1473         frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1474         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1475         frmr_wr.wr.fast_reg.page_list_len = i;
1476         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1477         frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1478         frmr_wr.wr.fast_reg.access_flags = (writing ?
1479                                 IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1480         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1481         DECR_CQCOUNT(&r_xprt->rx_ep);
1482
1483         rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1484
1485         if (rc) {
1486                 dprintk("RPC:       %s: failed ib_post_send for register,"
1487                         " status %i\n", __func__, rc);
1488                 while (i--)
1489                         rpcrdma_unmap_one(ia, --seg);
1490         } else {
1491                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1492                 seg1->mr_base = seg1->mr_dma + pageoff;
1493                 seg1->mr_nsegs = i;
1494                 seg1->mr_len = len;
1495         }
1496         *nsegs = i;
1497         return rc;
1498 }
1499
1500 static int
1501 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1502                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1503 {
1504         struct rpcrdma_mr_seg *seg1 = seg;
1505         struct ib_send_wr invalidate_wr, *bad_wr;
1506         int rc;
1507
1508         while (seg1->mr_nsegs--)
1509                 rpcrdma_unmap_one(ia, seg++);
1510
1511         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1512         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1513         invalidate_wr.send_flags = 0;                   /* unsignaled */
1514         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1515         DECR_CQCOUNT(&r_xprt->rx_ep);
1516
1517         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1518         if (rc)
1519                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1520                         " status %i\n", __func__, rc);
1521         return rc;
1522 }
1523
1524 static int
1525 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1526                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1527 {
1528         struct rpcrdma_mr_seg *seg1 = seg;
1529         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1530         int len, pageoff, i, rc;
1531
1532         pageoff = offset_in_page(seg1->mr_offset);
1533         seg1->mr_offset -= pageoff;     /* start of page */
1534         seg1->mr_len += pageoff;
1535         len = -pageoff;
1536         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1537                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1538         for (i = 0; i < *nsegs;) {
1539                 rpcrdma_map_one(ia, seg, writing);
1540                 physaddrs[i] = seg->mr_dma;
1541                 len += seg->mr_len;
1542                 ++seg;
1543                 ++i;
1544                 /* Check for holes */
1545                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1546                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1547                         break;
1548         }
1549         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1550                                 physaddrs, i, seg1->mr_dma);
1551         if (rc) {
1552                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1553                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1554                         len, (unsigned long long)seg1->mr_dma,
1555                         pageoff, i, rc);
1556                 while (i--)
1557                         rpcrdma_unmap_one(ia, --seg);
1558         } else {
1559                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1560                 seg1->mr_base = seg1->mr_dma + pageoff;
1561                 seg1->mr_nsegs = i;
1562                 seg1->mr_len = len;
1563         }
1564         *nsegs = i;
1565         return rc;
1566 }
1567
1568 static int
1569 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1570                         struct rpcrdma_ia *ia)
1571 {
1572         struct rpcrdma_mr_seg *seg1 = seg;
1573         LIST_HEAD(l);
1574         int rc;
1575
1576         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1577         rc = ib_unmap_fmr(&l);
1578         while (seg1->mr_nsegs--)
1579                 rpcrdma_unmap_one(ia, seg++);
1580         if (rc)
1581                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1582                         " status %i\n", __func__, rc);
1583         return rc;
1584 }
1585
1586 static int
1587 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1588                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1589                         struct rpcrdma_xprt *r_xprt)
1590 {
1591         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1592                                   IB_ACCESS_REMOTE_READ);
1593         struct ib_mw_bind param;
1594         int rc;
1595
1596         *nsegs = 1;
1597         rpcrdma_map_one(ia, seg, writing);
1598         param.mr = ia->ri_bind_mem;
1599         param.wr_id = 0ULL;     /* no send cookie */
1600         param.addr = seg->mr_dma;
1601         param.length = seg->mr_len;
1602         param.send_flags = 0;
1603         param.mw_access_flags = mem_priv;
1604
1605         DECR_CQCOUNT(&r_xprt->rx_ep);
1606         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1607         if (rc) {
1608                 dprintk("RPC:       %s: failed ib_bind_mw "
1609                         "%u@0x%llx status %i\n",
1610                         __func__, seg->mr_len,
1611                         (unsigned long long)seg->mr_dma, rc);
1612                 rpcrdma_unmap_one(ia, seg);
1613         } else {
1614                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1615                 seg->mr_base = param.addr;
1616                 seg->mr_nsegs = 1;
1617         }
1618         return rc;
1619 }
1620
1621 static int
1622 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1623                         struct rpcrdma_ia *ia,
1624                         struct rpcrdma_xprt *r_xprt, void **r)
1625 {
1626         struct ib_mw_bind param;
1627         LIST_HEAD(l);
1628         int rc;
1629
1630         BUG_ON(seg->mr_nsegs != 1);
1631         param.mr = ia->ri_bind_mem;
1632         param.addr = 0ULL;      /* unbind */
1633         param.length = 0;
1634         param.mw_access_flags = 0;
1635         if (*r) {
1636                 param.wr_id = (u64) (unsigned long) *r;
1637                 param.send_flags = IB_SEND_SIGNALED;
1638                 INIT_CQCOUNT(&r_xprt->rx_ep);
1639         } else {
1640                 param.wr_id = 0ULL;
1641                 param.send_flags = 0;
1642                 DECR_CQCOUNT(&r_xprt->rx_ep);
1643         }
1644         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1645         rpcrdma_unmap_one(ia, seg);
1646         if (rc)
1647                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1648                         " status %i\n", __func__, rc);
1649         else
1650                 *r = NULL;      /* will upcall on completion */
1651         return rc;
1652 }
1653
1654 static int
1655 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1656                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1657 {
1658         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1659                                   IB_ACCESS_REMOTE_READ);
1660         struct rpcrdma_mr_seg *seg1 = seg;
1661         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1662         int len, i, rc = 0;
1663
1664         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1665                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1666         for (len = 0, i = 0; i < *nsegs;) {
1667                 rpcrdma_map_one(ia, seg, writing);
1668                 ipb[i].addr = seg->mr_dma;
1669                 ipb[i].size = seg->mr_len;
1670                 len += seg->mr_len;
1671                 ++seg;
1672                 ++i;
1673                 /* Check for holes */
1674                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1675                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1676                         break;
1677         }
1678         seg1->mr_base = seg1->mr_dma;
1679         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1680                                 ipb, i, mem_priv, &seg1->mr_base);
1681         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1682                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1683                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1684                         "%u@0x%llx (%d)... status %i\n",
1685                         __func__, len,
1686                         (unsigned long long)seg1->mr_dma, i, rc);
1687                 while (i--)
1688                         rpcrdma_unmap_one(ia, --seg);
1689         } else {
1690                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1691                 seg1->mr_nsegs = i;
1692                 seg1->mr_len = len;
1693         }
1694         *nsegs = i;
1695         return rc;
1696 }
1697
1698 static int
1699 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1700                         struct rpcrdma_ia *ia)
1701 {
1702         struct rpcrdma_mr_seg *seg1 = seg;
1703         int rc;
1704
1705         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1706         seg1->mr_chunk.rl_mr = NULL;
1707         while (seg1->mr_nsegs--)
1708                 rpcrdma_unmap_one(ia, seg++);
1709         if (rc)
1710                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1711                         " status %i\n", __func__, rc);
1712         return rc;
1713 }
1714
1715 int
1716 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1717                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1718 {
1719         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1720         int rc = 0;
1721
1722         switch (ia->ri_memreg_strategy) {
1723
1724 #if RPCRDMA_PERSISTENT_REGISTRATION
1725         case RPCRDMA_ALLPHYSICAL:
1726                 rpcrdma_map_one(ia, seg, writing);
1727                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1728                 seg->mr_base = seg->mr_dma;
1729                 seg->mr_nsegs = 1;
1730                 nsegs = 1;
1731                 break;
1732 #endif
1733
1734         /* Registration using frmr registration */
1735         case RPCRDMA_FRMR:
1736                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1737                 break;
1738
1739         /* Registration using fmr memory registration */
1740         case RPCRDMA_MTHCAFMR:
1741                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1742                 break;
1743
1744         /* Registration using memory windows */
1745         case RPCRDMA_MEMWINDOWS_ASYNC:
1746         case RPCRDMA_MEMWINDOWS:
1747                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1748                 break;
1749
1750         /* Default registration each time */
1751         default:
1752                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1753                 break;
1754         }
1755         if (rc)
1756                 return -1;
1757
1758         return nsegs;
1759 }
1760
1761 int
1762 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1763                 struct rpcrdma_xprt *r_xprt, void *r)
1764 {
1765         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1766         int nsegs = seg->mr_nsegs, rc;
1767
1768         switch (ia->ri_memreg_strategy) {
1769
1770 #if RPCRDMA_PERSISTENT_REGISTRATION
1771         case RPCRDMA_ALLPHYSICAL:
1772                 BUG_ON(nsegs != 1);
1773                 rpcrdma_unmap_one(ia, seg);
1774                 rc = 0;
1775                 break;
1776 #endif
1777
1778         case RPCRDMA_FRMR:
1779                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1780                 break;
1781
1782         case RPCRDMA_MTHCAFMR:
1783                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1784                 break;
1785
1786         case RPCRDMA_MEMWINDOWS_ASYNC:
1787         case RPCRDMA_MEMWINDOWS:
1788                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1789                 break;
1790
1791         default:
1792                 rc = rpcrdma_deregister_default_external(seg, ia);
1793                 break;
1794         }
1795         if (r) {
1796                 struct rpcrdma_rep *rep = r;
1797                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1798                 rep->rr_func = NULL;
1799                 func(rep);      /* dereg done, callback now */
1800         }
1801         return nsegs;
1802 }
1803
1804 /*
1805  * Prepost any receive buffer, then post send.
1806  *
1807  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1808  */
1809 int
1810 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1811                 struct rpcrdma_ep *ep,
1812                 struct rpcrdma_req *req)
1813 {
1814         struct ib_send_wr send_wr, *send_wr_fail;
1815         struct rpcrdma_rep *rep = req->rl_reply;
1816         int rc;
1817
1818         if (rep) {
1819                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1820                 if (rc)
1821                         goto out;
1822                 req->rl_reply = NULL;
1823         }
1824
1825         send_wr.next = NULL;
1826         send_wr.wr_id = 0ULL;   /* no send cookie */
1827         send_wr.sg_list = req->rl_send_iov;
1828         send_wr.num_sge = req->rl_niovs;
1829         send_wr.opcode = IB_WR_SEND;
1830         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1831                 ib_dma_sync_single_for_device(ia->ri_id->device,
1832                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1833                         DMA_TO_DEVICE);
1834         ib_dma_sync_single_for_device(ia->ri_id->device,
1835                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1836                 DMA_TO_DEVICE);
1837         ib_dma_sync_single_for_device(ia->ri_id->device,
1838                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1839                 DMA_TO_DEVICE);
1840
1841         if (DECR_CQCOUNT(ep) > 0)
1842                 send_wr.send_flags = 0;
1843         else { /* Provider must take a send completion every now and then */
1844                 INIT_CQCOUNT(ep);
1845                 send_wr.send_flags = IB_SEND_SIGNALED;
1846         }
1847
1848         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1849         if (rc)
1850                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1851                         rc);
1852 out:
1853         return rc;
1854 }
1855
1856 /*
1857  * (Re)post a receive buffer.
1858  */
1859 int
1860 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1861                      struct rpcrdma_ep *ep,
1862                      struct rpcrdma_rep *rep)
1863 {
1864         struct ib_recv_wr recv_wr, *recv_wr_fail;
1865         int rc;
1866
1867         recv_wr.next = NULL;
1868         recv_wr.wr_id = (u64) (unsigned long) rep;
1869         recv_wr.sg_list = &rep->rr_iov;
1870         recv_wr.num_sge = 1;
1871
1872         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1873                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1874
1875         DECR_CQCOUNT(ep);
1876         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1877
1878         if (rc)
1879                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1880                         rc);
1881         return rc;
1882 }