SUNRPC: Fix an unnecessary implicit type cast in rpcrdma_count_chunks()
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280         struct ib_qp_attr attr;
281         struct ib_qp_init_attr iattr;
282         int connstate = 0;
283
284         switch (event->event) {
285         case RDMA_CM_EVENT_ADDR_RESOLVED:
286         case RDMA_CM_EVENT_ROUTE_RESOLVED:
287                 complete(&ia->ri_done);
288                 break;
289         case RDMA_CM_EVENT_ADDR_ERROR:
290                 ia->ri_async_rc = -EHOSTUNREACH;
291                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292                         __func__, ep);
293                 complete(&ia->ri_done);
294                 break;
295         case RDMA_CM_EVENT_ROUTE_ERROR:
296                 ia->ri_async_rc = -ENETUNREACH;
297                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298                         __func__, ep);
299                 complete(&ia->ri_done);
300                 break;
301         case RDMA_CM_EVENT_ESTABLISHED:
302                 connstate = 1;
303                 ib_query_qp(ia->ri_id->qp, &attr,
304                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305                         &iattr);
306                 dprintk("RPC:       %s: %d responder resources"
307                         " (%d initiator)\n",
308                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309                 goto connected;
310         case RDMA_CM_EVENT_CONNECT_ERROR:
311                 connstate = -ENOTCONN;
312                 goto connected;
313         case RDMA_CM_EVENT_UNREACHABLE:
314                 connstate = -ENETDOWN;
315                 goto connected;
316         case RDMA_CM_EVENT_REJECTED:
317                 connstate = -ECONNREFUSED;
318                 goto connected;
319         case RDMA_CM_EVENT_DISCONNECTED:
320                 connstate = -ECONNABORTED;
321                 goto connected;
322         case RDMA_CM_EVENT_DEVICE_REMOVAL:
323                 connstate = -ENODEV;
324 connected:
325                 dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326                         " (ep 0x%p event 0x%x)\n",
327                         __func__,
328                         (event->event <= 11) ? conn[event->event] :
329                                                 "unknown connection error",
330                         NIPQUAD(addr->sin_addr.s_addr),
331                         ntohs(addr->sin_port),
332                         ep, event->event);
333                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334                 dprintk("RPC:       %s: %sconnected\n",
335                                         __func__, connstate > 0 ? "" : "dis");
336                 ep->rep_connected = connstate;
337                 ep->rep_func(ep);
338                 wake_up_all(&ep->rep_connect_wait);
339                 break;
340         default:
341                 ia->ri_async_rc = -EINVAL;
342                 dprintk("RPC:       %s: unexpected CM event %X\n",
343                         __func__, event->event);
344                 complete(&ia->ri_done);
345                 break;
346         }
347
348         return 0;
349 }
350
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353                         struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355         struct rdma_cm_id *id;
356         int rc;
357
358         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359         if (IS_ERR(id)) {
360                 rc = PTR_ERR(id);
361                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362                         __func__, rc);
363                 return id;
364         }
365
366         ia->ri_async_rc = 0;
367         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368         if (rc) {
369                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370                         __func__, rc);
371                 goto out;
372         }
373         wait_for_completion(&ia->ri_done);
374         rc = ia->ri_async_rc;
375         if (rc)
376                 goto out;
377
378         ia->ri_async_rc = 0;
379         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380         if (rc) {
381                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382                         __func__, rc);
383                 goto out;
384         }
385         wait_for_completion(&ia->ri_done);
386         rc = ia->ri_async_rc;
387         if (rc)
388                 goto out;
389
390         return id;
391
392 out:
393         rdma_destroy_id(id);
394         return ERR_PTR(rc);
395 }
396
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403         struct ib_wc wc;
404         int count = 0;
405
406         while (1 == ib_poll_cq(cq, 1, &wc))
407                 ++count;
408
409         if (count)
410                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411                         __func__, count, wc.opcode);
412 }
413
414 /*
415  * Exported functions.
416  */
417
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426         int rc;
427         struct rpcrdma_ia *ia = &xprt->rx_ia;
428
429         init_completion(&ia->ri_done);
430
431         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
432         if (IS_ERR(ia->ri_id)) {
433                 rc = PTR_ERR(ia->ri_id);
434                 goto out1;
435         }
436
437         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
438         if (IS_ERR(ia->ri_pd)) {
439                 rc = PTR_ERR(ia->ri_pd);
440                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
441                         __func__, rc);
442                 goto out2;
443         }
444
445         /*
446          * Optionally obtain an underlying physical identity mapping in
447          * order to do a memory window-based bind. This base registration
448          * is protected from remote access - that is enabled only by binding
449          * for the specific bytes targeted during each RPC operation, and
450          * revoked after the corresponding completion similar to a storage
451          * adapter.
452          */
453         if (memreg > RPCRDMA_REGISTER) {
454                 int mem_priv = IB_ACCESS_LOCAL_WRITE;
455                 switch (memreg) {
456 #if RPCRDMA_PERSISTENT_REGISTRATION
457                 case RPCRDMA_ALLPHYSICAL:
458                         mem_priv |= IB_ACCESS_REMOTE_WRITE;
459                         mem_priv |= IB_ACCESS_REMOTE_READ;
460                         break;
461 #endif
462                 case RPCRDMA_MEMWINDOWS_ASYNC:
463                 case RPCRDMA_MEMWINDOWS:
464                         mem_priv |= IB_ACCESS_MW_BIND;
465                         break;
466                 default:
467                         break;
468                 }
469                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
470                 if (IS_ERR(ia->ri_bind_mem)) {
471                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
472                                 "phys register failed with %lX\n\t"
473                                 "Will continue with degraded performance\n",
474                                 __func__, PTR_ERR(ia->ri_bind_mem));
475                         memreg = RPCRDMA_REGISTER;
476                         ia->ri_bind_mem = NULL;
477                 }
478         }
479
480         /* Else will do memory reg/dereg for each chunk */
481         ia->ri_memreg_strategy = memreg;
482
483         return 0;
484 out2:
485         rdma_destroy_id(ia->ri_id);
486 out1:
487         return rc;
488 }
489
490 /*
491  * Clean up/close an IA.
492  *   o if event handles and PD have been initialized, free them.
493  *   o close the IA
494  */
495 void
496 rpcrdma_ia_close(struct rpcrdma_ia *ia)
497 {
498         int rc;
499
500         dprintk("RPC:       %s: entering\n", __func__);
501         if (ia->ri_bind_mem != NULL) {
502                 rc = ib_dereg_mr(ia->ri_bind_mem);
503                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
504                         __func__, rc);
505         }
506         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
507                 rdma_destroy_qp(ia->ri_id);
508         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
509                 rc = ib_dealloc_pd(ia->ri_pd);
510                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
511                         __func__, rc);
512         }
513         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
514                 rdma_destroy_id(ia->ri_id);
515 }
516
517 /*
518  * Create unconnected endpoint.
519  */
520 int
521 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
522                                 struct rpcrdma_create_data_internal *cdata)
523 {
524         struct ib_device_attr devattr;
525         int rc;
526
527         rc = ib_query_device(ia->ri_id->device, &devattr);
528         if (rc) {
529                 dprintk("RPC:       %s: ib_query_device failed %d\n",
530                         __func__, rc);
531                 return rc;
532         }
533
534         /* check provider's send/recv wr limits */
535         if (cdata->max_requests > devattr.max_qp_wr)
536                 cdata->max_requests = devattr.max_qp_wr;
537
538         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
539         ep->rep_attr.qp_context = ep;
540         /* send_cq and recv_cq initialized below */
541         ep->rep_attr.srq = NULL;
542         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
543         switch (ia->ri_memreg_strategy) {
544         case RPCRDMA_MEMWINDOWS_ASYNC:
545         case RPCRDMA_MEMWINDOWS:
546                 /* Add room for mw_binds+unbinds - overkill! */
547                 ep->rep_attr.cap.max_send_wr++;
548                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
549                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
550                         return -EINVAL;
551                 break;
552         default:
553                 break;
554         }
555         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
556         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
557         ep->rep_attr.cap.max_recv_sge = 1;
558         ep->rep_attr.cap.max_inline_data = 0;
559         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
560         ep->rep_attr.qp_type = IB_QPT_RC;
561         ep->rep_attr.port_num = ~0;
562
563         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
564                 "iovs: send %d recv %d\n",
565                 __func__,
566                 ep->rep_attr.cap.max_send_wr,
567                 ep->rep_attr.cap.max_recv_wr,
568                 ep->rep_attr.cap.max_send_sge,
569                 ep->rep_attr.cap.max_recv_sge);
570
571         /* set trigger for requesting send completion */
572         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
573         switch (ia->ri_memreg_strategy) {
574         case RPCRDMA_MEMWINDOWS_ASYNC:
575         case RPCRDMA_MEMWINDOWS:
576                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
577                 break;
578         default:
579                 break;
580         }
581         if (ep->rep_cqinit <= 2)
582                 ep->rep_cqinit = 0;
583         INIT_CQCOUNT(ep);
584         ep->rep_ia = ia;
585         init_waitqueue_head(&ep->rep_connect_wait);
586
587         /*
588          * Create a single cq for receive dto and mw_bind (only ever
589          * care about unbind, really). Send completions are suppressed.
590          * Use single threaded tasklet upcalls to maintain ordering.
591          */
592         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
593                                   rpcrdma_cq_async_error_upcall, NULL,
594                                   ep->rep_attr.cap.max_recv_wr +
595                                   ep->rep_attr.cap.max_send_wr + 1, 0);
596         if (IS_ERR(ep->rep_cq)) {
597                 rc = PTR_ERR(ep->rep_cq);
598                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
599                         __func__, rc);
600                 goto out1;
601         }
602
603         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
604         if (rc) {
605                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
606                         __func__, rc);
607                 goto out2;
608         }
609
610         ep->rep_attr.send_cq = ep->rep_cq;
611         ep->rep_attr.recv_cq = ep->rep_cq;
612
613         /* Initialize cma parameters */
614
615         /* RPC/RDMA does not use private data */
616         ep->rep_remote_cma.private_data = NULL;
617         ep->rep_remote_cma.private_data_len = 0;
618
619         /* Client offers RDMA Read but does not initiate */
620         switch (ia->ri_memreg_strategy) {
621         case RPCRDMA_BOUNCEBUFFERS:
622                 ep->rep_remote_cma.responder_resources = 0;
623                 break;
624         case RPCRDMA_MTHCAFMR:
625         case RPCRDMA_REGISTER:
626                 ep->rep_remote_cma.responder_resources = cdata->max_requests *
627                                 (RPCRDMA_MAX_DATA_SEGS / 8);
628                 break;
629         case RPCRDMA_MEMWINDOWS:
630         case RPCRDMA_MEMWINDOWS_ASYNC:
631 #if RPCRDMA_PERSISTENT_REGISTRATION
632         case RPCRDMA_ALLPHYSICAL:
633 #endif
634                 ep->rep_remote_cma.responder_resources = cdata->max_requests *
635                                 (RPCRDMA_MAX_DATA_SEGS / 2);
636                 break;
637         default:
638                 break;
639         }
640         if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
641                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
642         ep->rep_remote_cma.initiator_depth = 0;
643
644         ep->rep_remote_cma.retry_count = 7;
645         ep->rep_remote_cma.flow_control = 0;
646         ep->rep_remote_cma.rnr_retry_count = 0;
647
648         return 0;
649
650 out2:
651         if (ib_destroy_cq(ep->rep_cq))
652                 ;
653 out1:
654         return rc;
655 }
656
657 /*
658  * rpcrdma_ep_destroy
659  *
660  * Disconnect and destroy endpoint. After this, the only
661  * valid operations on the ep are to free it (if dynamically
662  * allocated) or re-create it.
663  *
664  * The caller's error handling must be sure to not leak the endpoint
665  * if this function fails.
666  */
667 int
668 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
669 {
670         int rc;
671
672         dprintk("RPC:       %s: entering, connected is %d\n",
673                 __func__, ep->rep_connected);
674
675         if (ia->ri_id->qp) {
676                 rc = rpcrdma_ep_disconnect(ep, ia);
677                 if (rc)
678                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
679                                 " returned %i\n", __func__, rc);
680         }
681
682         ep->rep_func = NULL;
683
684         /* padding - could be done in rpcrdma_buffer_destroy... */
685         if (ep->rep_pad_mr) {
686                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
687                 ep->rep_pad_mr = NULL;
688         }
689
690         if (ia->ri_id->qp) {
691                 rdma_destroy_qp(ia->ri_id);
692                 ia->ri_id->qp = NULL;
693         }
694
695         rpcrdma_clean_cq(ep->rep_cq);
696         rc = ib_destroy_cq(ep->rep_cq);
697         if (rc)
698                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
699                         __func__, rc);
700
701         return rc;
702 }
703
704 /*
705  * Connect unconnected endpoint.
706  */
707 int
708 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
709 {
710         struct rdma_cm_id *id;
711         int rc = 0;
712         int retry_count = 0;
713         int reconnect = (ep->rep_connected != 0);
714
715         if (reconnect) {
716                 struct rpcrdma_xprt *xprt;
717 retry:
718                 rc = rpcrdma_ep_disconnect(ep, ia);
719                 if (rc && rc != -ENOTCONN)
720                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
721                                 " status %i\n", __func__, rc);
722                 rpcrdma_clean_cq(ep->rep_cq);
723
724                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
725                 id = rpcrdma_create_id(xprt, ia,
726                                 (struct sockaddr *)&xprt->rx_data.addr);
727                 if (IS_ERR(id)) {
728                         rc = PTR_ERR(id);
729                         goto out;
730                 }
731                 /* TEMP TEMP TEMP - fail if new device:
732                  * Deregister/remarshal *all* requests!
733                  * Close and recreate adapter, pd, etc!
734                  * Re-determine all attributes still sane!
735                  * More stuff I haven't thought of!
736                  * Rrrgh!
737                  */
738                 if (ia->ri_id->device != id->device) {
739                         printk("RPC:       %s: can't reconnect on "
740                                 "different device!\n", __func__);
741                         rdma_destroy_id(id);
742                         rc = -ENETDOWN;
743                         goto out;
744                 }
745                 /* END TEMP */
746                 rdma_destroy_id(ia->ri_id);
747                 ia->ri_id = id;
748         }
749
750         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
751         if (rc) {
752                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
753                         __func__, rc);
754                 goto out;
755         }
756
757 /* XXX Tavor device performs badly with 2K MTU! */
758 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
759         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
760         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
761             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
762              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
763                 struct ib_qp_attr attr = {
764                         .path_mtu = IB_MTU_1024
765                 };
766                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
767         }
768 }
769
770         /* Theoretically a client initiator_depth > 0 is not needed,
771          * but many peers fail to complete the connection unless they
772          * == responder_resources! */
773         if (ep->rep_remote_cma.initiator_depth !=
774                                 ep->rep_remote_cma.responder_resources)
775                 ep->rep_remote_cma.initiator_depth =
776                         ep->rep_remote_cma.responder_resources;
777
778         ep->rep_connected = 0;
779
780         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
781         if (rc) {
782                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
783                                 __func__, rc);
784                 goto out;
785         }
786
787         if (reconnect)
788                 return 0;
789
790         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
791
792         /*
793          * Check state. A non-peer reject indicates no listener
794          * (ECONNREFUSED), which may be a transient state. All
795          * others indicate a transport condition which has already
796          * undergone a best-effort.
797          */
798         if (ep->rep_connected == -ECONNREFUSED
799             && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
800                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
801                 goto retry;
802         }
803         if (ep->rep_connected <= 0) {
804                 /* Sometimes, the only way to reliably connect to remote
805                  * CMs is to use same nonzero values for ORD and IRD. */
806                 ep->rep_remote_cma.initiator_depth =
807                                         ep->rep_remote_cma.responder_resources;
808                 if (ep->rep_remote_cma.initiator_depth == 0)
809                         ++ep->rep_remote_cma.initiator_depth;
810                 if (ep->rep_remote_cma.responder_resources == 0)
811                         ++ep->rep_remote_cma.responder_resources;
812                 if (retry_count++ == 0)
813                         goto retry;
814                 rc = ep->rep_connected;
815         } else {
816                 dprintk("RPC:       %s: connected\n", __func__);
817         }
818
819 out:
820         if (rc)
821                 ep->rep_connected = rc;
822         return rc;
823 }
824
825 /*
826  * rpcrdma_ep_disconnect
827  *
828  * This is separate from destroy to facilitate the ability
829  * to reconnect without recreating the endpoint.
830  *
831  * This call is not reentrant, and must not be made in parallel
832  * on the same endpoint.
833  */
834 int
835 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
836 {
837         int rc;
838
839         rpcrdma_clean_cq(ep->rep_cq);
840         rc = rdma_disconnect(ia->ri_id);
841         if (!rc) {
842                 /* returns without wait if not connected */
843                 wait_event_interruptible(ep->rep_connect_wait,
844                                                         ep->rep_connected != 1);
845                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
846                         (ep->rep_connected == 1) ? "still " : "dis");
847         } else {
848                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
849                 ep->rep_connected = rc;
850         }
851         return rc;
852 }
853
854 /*
855  * Initialize buffer memory
856  */
857 int
858 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
859         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
860 {
861         char *p;
862         size_t len;
863         int i, rc;
864
865         buf->rb_max_requests = cdata->max_requests;
866         spin_lock_init(&buf->rb_lock);
867         atomic_set(&buf->rb_credits, 1);
868
869         /* Need to allocate:
870          *   1.  arrays for send and recv pointers
871          *   2.  arrays of struct rpcrdma_req to fill in pointers
872          *   3.  array of struct rpcrdma_rep for replies
873          *   4.  padding, if any
874          *   5.  mw's, if any
875          * Send/recv buffers in req/rep need to be registered
876          */
877
878         len = buf->rb_max_requests *
879                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
880         len += cdata->padding;
881         switch (ia->ri_memreg_strategy) {
882         case RPCRDMA_MTHCAFMR:
883                 /* TBD we are perhaps overallocating here */
884                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
885                                 sizeof(struct rpcrdma_mw);
886                 break;
887         case RPCRDMA_MEMWINDOWS_ASYNC:
888         case RPCRDMA_MEMWINDOWS:
889                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
890                                 sizeof(struct rpcrdma_mw);
891                 break;
892         default:
893                 break;
894         }
895
896         /* allocate 1, 4 and 5 in one shot */
897         p = kzalloc(len, GFP_KERNEL);
898         if (p == NULL) {
899                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
900                         __func__, len);
901                 rc = -ENOMEM;
902                 goto out;
903         }
904         buf->rb_pool = p;       /* for freeing it later */
905
906         buf->rb_send_bufs = (struct rpcrdma_req **) p;
907         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
908         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
909         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
910
911         /*
912          * Register the zeroed pad buffer, if any.
913          */
914         if (cdata->padding) {
915                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
916                                             &ep->rep_pad_mr, &ep->rep_pad);
917                 if (rc)
918                         goto out;
919         }
920         p += cdata->padding;
921
922         /*
923          * Allocate the fmr's, or mw's for mw_bind chunk registration.
924          * We "cycle" the mw's in order to minimize rkey reuse,
925          * and also reduce unbind-to-bind collision.
926          */
927         INIT_LIST_HEAD(&buf->rb_mws);
928         switch (ia->ri_memreg_strategy) {
929         case RPCRDMA_MTHCAFMR:
930                 {
931                 struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
932                 struct ib_fmr_attr fa = {
933                         RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
934                 };
935                 /* TBD we are perhaps overallocating here */
936                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
937                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
938                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
939                                 &fa);
940                         if (IS_ERR(r->r.fmr)) {
941                                 rc = PTR_ERR(r->r.fmr);
942                                 dprintk("RPC:       %s: ib_alloc_fmr"
943                                         " failed %i\n", __func__, rc);
944                                 goto out;
945                         }
946                         list_add(&r->mw_list, &buf->rb_mws);
947                         ++r;
948                 }
949                 }
950                 break;
951         case RPCRDMA_MEMWINDOWS_ASYNC:
952         case RPCRDMA_MEMWINDOWS:
953                 {
954                 struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
955                 /* Allocate one extra request's worth, for full cycling */
956                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
957                         r->r.mw = ib_alloc_mw(ia->ri_pd);
958                         if (IS_ERR(r->r.mw)) {
959                                 rc = PTR_ERR(r->r.mw);
960                                 dprintk("RPC:       %s: ib_alloc_mw"
961                                         " failed %i\n", __func__, rc);
962                                 goto out;
963                         }
964                         list_add(&r->mw_list, &buf->rb_mws);
965                         ++r;
966                 }
967                 }
968                 break;
969         default:
970                 break;
971         }
972
973         /*
974          * Allocate/init the request/reply buffers. Doing this
975          * using kmalloc for now -- one for each buf.
976          */
977         for (i = 0; i < buf->rb_max_requests; i++) {
978                 struct rpcrdma_req *req;
979                 struct rpcrdma_rep *rep;
980
981                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
982                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
983                 /* Typical ~2400b, so rounding up saves work later */
984                 if (len < 4096)
985                         len = 4096;
986                 req = kmalloc(len, GFP_KERNEL);
987                 if (req == NULL) {
988                         dprintk("RPC:       %s: request buffer %d alloc"
989                                 " failed\n", __func__, i);
990                         rc = -ENOMEM;
991                         goto out;
992                 }
993                 memset(req, 0, sizeof(struct rpcrdma_req));
994                 buf->rb_send_bufs[i] = req;
995                 buf->rb_send_bufs[i]->rl_buffer = buf;
996
997                 rc = rpcrdma_register_internal(ia, req->rl_base,
998                                 len - offsetof(struct rpcrdma_req, rl_base),
999                                 &buf->rb_send_bufs[i]->rl_handle,
1000                                 &buf->rb_send_bufs[i]->rl_iov);
1001                 if (rc)
1002                         goto out;
1003
1004                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1005
1006                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1007                 rep = kmalloc(len, GFP_KERNEL);
1008                 if (rep == NULL) {
1009                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1010                                 __func__, i);
1011                         rc = -ENOMEM;
1012                         goto out;
1013                 }
1014                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1015                 buf->rb_recv_bufs[i] = rep;
1016                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1017                 init_waitqueue_head(&rep->rr_unbind);
1018
1019                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1020                                 len - offsetof(struct rpcrdma_rep, rr_base),
1021                                 &buf->rb_recv_bufs[i]->rr_handle,
1022                                 &buf->rb_recv_bufs[i]->rr_iov);
1023                 if (rc)
1024                         goto out;
1025
1026         }
1027         dprintk("RPC:       %s: max_requests %d\n",
1028                 __func__, buf->rb_max_requests);
1029         /* done */
1030         return 0;
1031 out:
1032         rpcrdma_buffer_destroy(buf);
1033         return rc;
1034 }
1035
1036 /*
1037  * Unregister and destroy buffer memory. Need to deal with
1038  * partial initialization, so it's callable from failed create.
1039  * Must be called before destroying endpoint, as registrations
1040  * reference it.
1041  */
1042 void
1043 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1044 {
1045         int rc, i;
1046         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1047
1048         /* clean up in reverse order from create
1049          *   1.  recv mr memory (mr free, then kfree)
1050          *   1a. bind mw memory
1051          *   2.  send mr memory (mr free, then kfree)
1052          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1053          *   4.  arrays
1054          */
1055         dprintk("RPC:       %s: entering\n", __func__);
1056
1057         for (i = 0; i < buf->rb_max_requests; i++) {
1058                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1059                         rpcrdma_deregister_internal(ia,
1060                                         buf->rb_recv_bufs[i]->rr_handle,
1061                                         &buf->rb_recv_bufs[i]->rr_iov);
1062                         kfree(buf->rb_recv_bufs[i]);
1063                 }
1064                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1065                         while (!list_empty(&buf->rb_mws)) {
1066                                 struct rpcrdma_mw *r;
1067                                 r = list_entry(buf->rb_mws.next,
1068                                         struct rpcrdma_mw, mw_list);
1069                                 list_del(&r->mw_list);
1070                                 switch (ia->ri_memreg_strategy) {
1071                                 case RPCRDMA_MTHCAFMR:
1072                                         rc = ib_dealloc_fmr(r->r.fmr);
1073                                         if (rc)
1074                                                 dprintk("RPC:       %s:"
1075                                                         " ib_dealloc_fmr"
1076                                                         " failed %i\n",
1077                                                         __func__, rc);
1078                                         break;
1079                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1080                                 case RPCRDMA_MEMWINDOWS:
1081                                         rc = ib_dealloc_mw(r->r.mw);
1082                                         if (rc)
1083                                                 dprintk("RPC:       %s:"
1084                                                         " ib_dealloc_mw"
1085                                                         " failed %i\n",
1086                                                         __func__, rc);
1087                                         break;
1088                                 default:
1089                                         break;
1090                                 }
1091                         }
1092                         rpcrdma_deregister_internal(ia,
1093                                         buf->rb_send_bufs[i]->rl_handle,
1094                                         &buf->rb_send_bufs[i]->rl_iov);
1095                         kfree(buf->rb_send_bufs[i]);
1096                 }
1097         }
1098
1099         kfree(buf->rb_pool);
1100 }
1101
1102 /*
1103  * Get a set of request/reply buffers.
1104  *
1105  * Reply buffer (if needed) is attached to send buffer upon return.
1106  * Rule:
1107  *    rb_send_index and rb_recv_index MUST always be pointing to the
1108  *    *next* available buffer (non-NULL). They are incremented after
1109  *    removing buffers, and decremented *before* returning them.
1110  */
1111 struct rpcrdma_req *
1112 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1113 {
1114         struct rpcrdma_req *req;
1115         unsigned long flags;
1116
1117         spin_lock_irqsave(&buffers->rb_lock, flags);
1118         if (buffers->rb_send_index == buffers->rb_max_requests) {
1119                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1120                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1121                 return ((struct rpcrdma_req *)NULL);
1122         }
1123
1124         req = buffers->rb_send_bufs[buffers->rb_send_index];
1125         if (buffers->rb_send_index < buffers->rb_recv_index) {
1126                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1127                         __func__,
1128                         buffers->rb_recv_index - buffers->rb_send_index);
1129                 req->rl_reply = NULL;
1130         } else {
1131                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1132                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1133         }
1134         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1135         if (!list_empty(&buffers->rb_mws)) {
1136                 int i = RPCRDMA_MAX_SEGS - 1;
1137                 do {
1138                         struct rpcrdma_mw *r;
1139                         r = list_entry(buffers->rb_mws.next,
1140                                         struct rpcrdma_mw, mw_list);
1141                         list_del(&r->mw_list);
1142                         req->rl_segments[i].mr_chunk.rl_mw = r;
1143                 } while (--i >= 0);
1144         }
1145         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1146         return req;
1147 }
1148
1149 /*
1150  * Put request/reply buffers back into pool.
1151  * Pre-decrement counter/array index.
1152  */
1153 void
1154 rpcrdma_buffer_put(struct rpcrdma_req *req)
1155 {
1156         struct rpcrdma_buffer *buffers = req->rl_buffer;
1157         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1158         int i;
1159         unsigned long flags;
1160
1161         BUG_ON(req->rl_nchunks != 0);
1162         spin_lock_irqsave(&buffers->rb_lock, flags);
1163         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1164         req->rl_niovs = 0;
1165         if (req->rl_reply) {
1166                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1167                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1168                 req->rl_reply->rr_func = NULL;
1169                 req->rl_reply = NULL;
1170         }
1171         switch (ia->ri_memreg_strategy) {
1172         case RPCRDMA_MTHCAFMR:
1173         case RPCRDMA_MEMWINDOWS_ASYNC:
1174         case RPCRDMA_MEMWINDOWS:
1175                 /*
1176                  * Cycle mw's back in reverse order, and "spin" them.
1177                  * This delays and scrambles reuse as much as possible.
1178                  */
1179                 i = 1;
1180                 do {
1181                         struct rpcrdma_mw **mw;
1182                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1183                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1184                         *mw = NULL;
1185                 } while (++i < RPCRDMA_MAX_SEGS);
1186                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1187                                         &buffers->rb_mws);
1188                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1189                 break;
1190         default:
1191                 break;
1192         }
1193         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1194 }
1195
1196 /*
1197  * Recover reply buffers from pool.
1198  * This happens when recovering from error conditions.
1199  * Post-increment counter/array index.
1200  */
1201 void
1202 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1203 {
1204         struct rpcrdma_buffer *buffers = req->rl_buffer;
1205         unsigned long flags;
1206
1207         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1208                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1209         spin_lock_irqsave(&buffers->rb_lock, flags);
1210         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1211                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1212                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1213         }
1214         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1215 }
1216
1217 /*
1218  * Put reply buffers back into pool when not attached to
1219  * request. This happens in error conditions, and when
1220  * aborting unbinds. Pre-decrement counter/array index.
1221  */
1222 void
1223 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1224 {
1225         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1226         unsigned long flags;
1227
1228         rep->rr_func = NULL;
1229         spin_lock_irqsave(&buffers->rb_lock, flags);
1230         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1231         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1232 }
1233
1234 /*
1235  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1236  */
1237
1238 int
1239 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1240                                 struct ib_mr **mrp, struct ib_sge *iov)
1241 {
1242         struct ib_phys_buf ipb;
1243         struct ib_mr *mr;
1244         int rc;
1245
1246         /*
1247          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1248          */
1249         iov->addr = ib_dma_map_single(ia->ri_id->device,
1250                         va, len, DMA_BIDIRECTIONAL);
1251         iov->length = len;
1252
1253         if (ia->ri_bind_mem != NULL) {
1254                 *mrp = NULL;
1255                 iov->lkey = ia->ri_bind_mem->lkey;
1256                 return 0;
1257         }
1258
1259         ipb.addr = iov->addr;
1260         ipb.size = iov->length;
1261         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1262                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1263
1264         dprintk("RPC:       %s: phys convert: 0x%llx "
1265                         "registered 0x%llx length %d\n",
1266                         __func__, (unsigned long long)ipb.addr,
1267                         (unsigned long long)iov->addr, len);
1268
1269         if (IS_ERR(mr)) {
1270                 *mrp = NULL;
1271                 rc = PTR_ERR(mr);
1272                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1273         } else {
1274                 *mrp = mr;
1275                 iov->lkey = mr->lkey;
1276                 rc = 0;
1277         }
1278
1279         return rc;
1280 }
1281
1282 int
1283 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1284                                 struct ib_mr *mr, struct ib_sge *iov)
1285 {
1286         int rc;
1287
1288         ib_dma_unmap_single(ia->ri_id->device,
1289                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1290
1291         if (NULL == mr)
1292                 return 0;
1293
1294         rc = ib_dereg_mr(mr);
1295         if (rc)
1296                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1297         return rc;
1298 }
1299
1300 /*
1301  * Wrappers for chunk registration, shared by read/write chunk code.
1302  */
1303
1304 static void
1305 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1306 {
1307         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1308         seg->mr_dmalen = seg->mr_len;
1309         if (seg->mr_page)
1310                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1311                                 seg->mr_page, offset_in_page(seg->mr_offset),
1312                                 seg->mr_dmalen, seg->mr_dir);
1313         else
1314                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1315                                 seg->mr_offset,
1316                                 seg->mr_dmalen, seg->mr_dir);
1317 }
1318
1319 static void
1320 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1321 {
1322         if (seg->mr_page)
1323                 ib_dma_unmap_page(ia->ri_id->device,
1324                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1325         else
1326                 ib_dma_unmap_single(ia->ri_id->device,
1327                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1328 }
1329
1330 int
1331 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1332                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1333 {
1334         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1335         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1336                                   IB_ACCESS_REMOTE_READ);
1337         struct rpcrdma_mr_seg *seg1 = seg;
1338         int i;
1339         int rc = 0;
1340
1341         switch (ia->ri_memreg_strategy) {
1342
1343 #if RPCRDMA_PERSISTENT_REGISTRATION
1344         case RPCRDMA_ALLPHYSICAL:
1345                 rpcrdma_map_one(ia, seg, writing);
1346                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1347                 seg->mr_base = seg->mr_dma;
1348                 seg->mr_nsegs = 1;
1349                 nsegs = 1;
1350                 break;
1351 #endif
1352
1353         /* Registration using fast memory registration */
1354         case RPCRDMA_MTHCAFMR:
1355                 {
1356                 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1357                 int len, pageoff = offset_in_page(seg->mr_offset);
1358                 seg1->mr_offset -= pageoff;     /* start of page */
1359                 seg1->mr_len += pageoff;
1360                 len = -pageoff;
1361                 if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1362                         nsegs = RPCRDMA_MAX_DATA_SEGS;
1363                 for (i = 0; i < nsegs;) {
1364                         rpcrdma_map_one(ia, seg, writing);
1365                         physaddrs[i] = seg->mr_dma;
1366                         len += seg->mr_len;
1367                         ++seg;
1368                         ++i;
1369                         /* Check for holes */
1370                         if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1371                             offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1372                                 break;
1373                 }
1374                 nsegs = i;
1375                 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1376                                         physaddrs, nsegs, seg1->mr_dma);
1377                 if (rc) {
1378                         dprintk("RPC:       %s: failed ib_map_phys_fmr "
1379                                 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1380                                 len, (unsigned long long)seg1->mr_dma,
1381                                 pageoff, nsegs, rc);
1382                         while (nsegs--)
1383                                 rpcrdma_unmap_one(ia, --seg);
1384                 } else {
1385                         seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1386                         seg1->mr_base = seg1->mr_dma + pageoff;
1387                         seg1->mr_nsegs = nsegs;
1388                         seg1->mr_len = len;
1389                 }
1390                 }
1391                 break;
1392
1393         /* Registration using memory windows */
1394         case RPCRDMA_MEMWINDOWS_ASYNC:
1395         case RPCRDMA_MEMWINDOWS:
1396                 {
1397                 struct ib_mw_bind param;
1398                 rpcrdma_map_one(ia, seg, writing);
1399                 param.mr = ia->ri_bind_mem;
1400                 param.wr_id = 0ULL;     /* no send cookie */
1401                 param.addr = seg->mr_dma;
1402                 param.length = seg->mr_len;
1403                 param.send_flags = 0;
1404                 param.mw_access_flags = mem_priv;
1405
1406                 DECR_CQCOUNT(&r_xprt->rx_ep);
1407                 rc = ib_bind_mw(ia->ri_id->qp,
1408                                         seg->mr_chunk.rl_mw->r.mw, &param);
1409                 if (rc) {
1410                         dprintk("RPC:       %s: failed ib_bind_mw "
1411                                 "%u@0x%llx status %i\n",
1412                                 __func__, seg->mr_len,
1413                                 (unsigned long long)seg->mr_dma, rc);
1414                         rpcrdma_unmap_one(ia, seg);
1415                 } else {
1416                         seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1417                         seg->mr_base = param.addr;
1418                         seg->mr_nsegs = 1;
1419                         nsegs = 1;
1420                 }
1421                 }
1422                 break;
1423
1424         /* Default registration each time */
1425         default:
1426                 {
1427                 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1428                 int len = 0;
1429                 if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1430                         nsegs = RPCRDMA_MAX_DATA_SEGS;
1431                 for (i = 0; i < nsegs;) {
1432                         rpcrdma_map_one(ia, seg, writing);
1433                         ipb[i].addr = seg->mr_dma;
1434                         ipb[i].size = seg->mr_len;
1435                         len += seg->mr_len;
1436                         ++seg;
1437                         ++i;
1438                         /* Check for holes */
1439                         if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1440                             offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1441                                 break;
1442                 }
1443                 nsegs = i;
1444                 seg1->mr_base = seg1->mr_dma;
1445                 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1446                                         ipb, nsegs, mem_priv, &seg1->mr_base);
1447                 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1448                         rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1449                         dprintk("RPC:       %s: failed ib_reg_phys_mr "
1450                                 "%u@0x%llx (%d)... status %i\n",
1451                                 __func__, len,
1452                                 (unsigned long long)seg1->mr_dma, nsegs, rc);
1453                         while (nsegs--)
1454                                 rpcrdma_unmap_one(ia, --seg);
1455                 } else {
1456                         seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1457                         seg1->mr_nsegs = nsegs;
1458                         seg1->mr_len = len;
1459                 }
1460                 }
1461                 break;
1462         }
1463         if (rc)
1464                 return -1;
1465
1466         return nsegs;
1467 }
1468
1469 int
1470 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1471                 struct rpcrdma_xprt *r_xprt, void *r)
1472 {
1473         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1474         struct rpcrdma_mr_seg *seg1 = seg;
1475         int nsegs = seg->mr_nsegs, rc;
1476
1477         switch (ia->ri_memreg_strategy) {
1478
1479 #if RPCRDMA_PERSISTENT_REGISTRATION
1480         case RPCRDMA_ALLPHYSICAL:
1481                 BUG_ON(nsegs != 1);
1482                 rpcrdma_unmap_one(ia, seg);
1483                 rc = 0;
1484                 break;
1485 #endif
1486
1487         case RPCRDMA_MTHCAFMR:
1488                 {
1489                 LIST_HEAD(l);
1490                 list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
1491                 rc = ib_unmap_fmr(&l);
1492                 while (seg1->mr_nsegs--)
1493                         rpcrdma_unmap_one(ia, seg++);
1494                 }
1495                 if (rc)
1496                         dprintk("RPC:       %s: failed ib_unmap_fmr,"
1497                                 " status %i\n", __func__, rc);
1498                 break;
1499
1500         case RPCRDMA_MEMWINDOWS_ASYNC:
1501         case RPCRDMA_MEMWINDOWS:
1502                 {
1503                 struct ib_mw_bind param;
1504                 BUG_ON(nsegs != 1);
1505                 param.mr = ia->ri_bind_mem;
1506                 param.addr = 0ULL;      /* unbind */
1507                 param.length = 0;
1508                 param.mw_access_flags = 0;
1509                 if (r) {
1510                         param.wr_id = (u64) (unsigned long) r;
1511                         param.send_flags = IB_SEND_SIGNALED;
1512                         INIT_CQCOUNT(&r_xprt->rx_ep);
1513                 } else {
1514                         param.wr_id = 0ULL;
1515                         param.send_flags = 0;
1516                         DECR_CQCOUNT(&r_xprt->rx_ep);
1517                 }
1518                 rc = ib_bind_mw(ia->ri_id->qp,
1519                                 seg->mr_chunk.rl_mw->r.mw, &param);
1520                 rpcrdma_unmap_one(ia, seg);
1521                 }
1522                 if (rc)
1523                         dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1524                                 " status %i\n", __func__, rc);
1525                 else
1526                         r = NULL;       /* will upcall on completion */
1527                 break;
1528
1529         default:
1530                 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1531                 seg1->mr_chunk.rl_mr = NULL;
1532                 while (seg1->mr_nsegs--)
1533                         rpcrdma_unmap_one(ia, seg++);
1534                 if (rc)
1535                         dprintk("RPC:       %s: failed ib_dereg_mr,"
1536                                 " status %i\n", __func__, rc);
1537                 break;
1538         }
1539         if (r) {
1540                 struct rpcrdma_rep *rep = r;
1541                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1542                 rep->rr_func = NULL;
1543                 func(rep);      /* dereg done, callback now */
1544         }
1545         return nsegs;
1546 }
1547
1548 /*
1549  * Prepost any receive buffer, then post send.
1550  *
1551  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1552  */
1553 int
1554 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1555                 struct rpcrdma_ep *ep,
1556                 struct rpcrdma_req *req)
1557 {
1558         struct ib_send_wr send_wr, *send_wr_fail;
1559         struct rpcrdma_rep *rep = req->rl_reply;
1560         int rc;
1561
1562         if (rep) {
1563                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1564                 if (rc)
1565                         goto out;
1566                 req->rl_reply = NULL;
1567         }
1568
1569         send_wr.next = NULL;
1570         send_wr.wr_id = 0ULL;   /* no send cookie */
1571         send_wr.sg_list = req->rl_send_iov;
1572         send_wr.num_sge = req->rl_niovs;
1573         send_wr.opcode = IB_WR_SEND;
1574         send_wr.imm_data = 0;
1575         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1576                 ib_dma_sync_single_for_device(ia->ri_id->device,
1577                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1578                         DMA_TO_DEVICE);
1579         ib_dma_sync_single_for_device(ia->ri_id->device,
1580                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1581                 DMA_TO_DEVICE);
1582         ib_dma_sync_single_for_device(ia->ri_id->device,
1583                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1584                 DMA_TO_DEVICE);
1585
1586         if (DECR_CQCOUNT(ep) > 0)
1587                 send_wr.send_flags = 0;
1588         else { /* Provider must take a send completion every now and then */
1589                 INIT_CQCOUNT(ep);
1590                 send_wr.send_flags = IB_SEND_SIGNALED;
1591         }
1592
1593         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1594         if (rc)
1595                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1596                         rc);
1597 out:
1598         return rc;
1599 }
1600
1601 /*
1602  * (Re)post a receive buffer.
1603  */
1604 int
1605 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1606                      struct rpcrdma_ep *ep,
1607                      struct rpcrdma_rep *rep)
1608 {
1609         struct ib_recv_wr recv_wr, *recv_wr_fail;
1610         int rc;
1611
1612         recv_wr.next = NULL;
1613         recv_wr.wr_id = (u64) (unsigned long) rep;
1614         recv_wr.sg_list = &rep->rr_iov;
1615         recv_wr.num_sge = 1;
1616
1617         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1618                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1619
1620         DECR_CQCOUNT(ep);
1621         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1622
1623         if (rc)
1624                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1625                         rc);
1626         return rc;
1627 }