d1389afc8342270a5df2ff7cc2e3177a33f7ea7e
[safe/jmp/linux-2.6] / net / sunrpc / xprtrdma / transport.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53
54 #include "xprt_rdma.h"
55
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY        RPCDBG_TRANS
58 #endif
59
60 MODULE_LICENSE("Dual BSD/GPL");
61
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64
65 /*
66  * tunables
67  */
68
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 #if !RPCRDMA_PERSISTENT_REGISTRATION
74 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75 #else
76 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77 #endif
78
79 #ifdef RPC_DEBUG
80
81 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83 static unsigned int zero;
84 static unsigned int max_padding = PAGE_SIZE;
85 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86 static unsigned int max_memreg = RPCRDMA_LAST - 1;
87
88 static struct ctl_table_header *sunrpc_table_header;
89
90 static ctl_table xr_tunables_table[] = {
91         {
92                 .ctl_name       = CTL_UNNUMBERED,
93                 .procname       = "rdma_slot_table_entries",
94                 .data           = &xprt_rdma_slot_table_entries,
95                 .maxlen         = sizeof(unsigned int),
96                 .mode           = 0644,
97                 .proc_handler   = &proc_dointvec_minmax,
98                 .strategy       = &sysctl_intvec,
99                 .extra1         = &min_slot_table_size,
100                 .extra2         = &max_slot_table_size
101         },
102         {
103                 .ctl_name       = CTL_UNNUMBERED,
104                 .procname       = "rdma_max_inline_read",
105                 .data           = &xprt_rdma_max_inline_read,
106                 .maxlen         = sizeof(unsigned int),
107                 .mode           = 0644,
108                 .proc_handler   = &proc_dointvec,
109                 .strategy       = &sysctl_intvec,
110         },
111         {
112                 .ctl_name       = CTL_UNNUMBERED,
113                 .procname       = "rdma_max_inline_write",
114                 .data           = &xprt_rdma_max_inline_write,
115                 .maxlen         = sizeof(unsigned int),
116                 .mode           = 0644,
117                 .proc_handler   = &proc_dointvec,
118                 .strategy       = &sysctl_intvec,
119         },
120         {
121                 .ctl_name       = CTL_UNNUMBERED,
122                 .procname       = "rdma_inline_write_padding",
123                 .data           = &xprt_rdma_inline_write_padding,
124                 .maxlen         = sizeof(unsigned int),
125                 .mode           = 0644,
126                 .proc_handler   = &proc_dointvec_minmax,
127                 .strategy       = &sysctl_intvec,
128                 .extra1         = &zero,
129                 .extra2         = &max_padding,
130         },
131         {
132                 .ctl_name       = CTL_UNNUMBERED,
133                 .procname       = "rdma_memreg_strategy",
134                 .data           = &xprt_rdma_memreg_strategy,
135                 .maxlen         = sizeof(unsigned int),
136                 .mode           = 0644,
137                 .proc_handler   = &proc_dointvec_minmax,
138                 .strategy       = &sysctl_intvec,
139                 .extra1         = &min_memreg,
140                 .extra2         = &max_memreg,
141         },
142         {
143                 .ctl_name = 0,
144         },
145 };
146
147 static ctl_table sunrpc_table[] = {
148         {
149                 .ctl_name       = CTL_SUNRPC,
150                 .procname       = "sunrpc",
151                 .mode           = 0555,
152                 .child          = xr_tunables_table
153         },
154         {
155                 .ctl_name = 0,
156         },
157 };
158
159 #endif
160
161 static struct rpc_xprt_ops xprt_rdma_procs;     /* forward reference */
162
163 static void
164 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165 {
166         struct sockaddr_in *addr = (struct sockaddr_in *)
167                                         &rpcx_to_rdmad(xprt).addr;
168         char *buf;
169
170         buf = kzalloc(20, GFP_KERNEL);
171         if (buf)
172                 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174
175         buf = kzalloc(8, GFP_KERNEL);
176         if (buf)
177                 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179
180         xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181
182         buf = kzalloc(48, GFP_KERNEL);
183         if (buf)
184                 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185                         NIPQUAD(addr->sin_addr.s_addr),
186                         ntohs(addr->sin_port), "rdma");
187         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188
189         buf = kzalloc(10, GFP_KERNEL);
190         if (buf)
191                 snprintf(buf, 10, "%02x%02x%02x%02x",
192                         NIPQUAD(addr->sin_addr.s_addr));
193         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194
195         buf = kzalloc(8, GFP_KERNEL);
196         if (buf)
197                 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199
200         buf = kzalloc(30, GFP_KERNEL);
201         if (buf)
202                 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203                         NIPQUAD(addr->sin_addr.s_addr),
204                         ntohs(addr->sin_port) >> 8,
205                         ntohs(addr->sin_port) & 0xff);
206         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207
208         /* netid */
209         xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210 }
211
212 static void
213 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214 {
215         kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
216         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
217         kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
218         kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
219         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
220         kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
221 }
222
223 static void
224 xprt_rdma_connect_worker(struct work_struct *work)
225 {
226         struct rpcrdma_xprt *r_xprt =
227                 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
228         struct rpc_xprt *xprt = &r_xprt->xprt;
229         int rc = 0;
230
231         if (!xprt->shutdown) {
232                 xprt_clear_connected(xprt);
233
234                 dprintk("RPC:       %s: %sconnect\n", __func__,
235                                 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
236                 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
237                 if (rc)
238                         goto out;
239         }
240         goto out_clear;
241
242 out:
243         xprt_wake_pending_tasks(xprt, rc);
244
245 out_clear:
246         dprintk("RPC:       %s: exit\n", __func__);
247         xprt_clear_connecting(xprt);
248 }
249
250 /*
251  * xprt_rdma_destroy
252  *
253  * Destroy the xprt.
254  * Free all memory associated with the object, including its own.
255  * NOTE: none of the *destroy methods free memory for their top-level
256  * objects, even though they may have allocated it (they do free
257  * private memory). It's up to the caller to handle it. In this
258  * case (RDMA transport), all structure memory is inlined with the
259  * struct rpcrdma_xprt.
260  */
261 static void
262 xprt_rdma_destroy(struct rpc_xprt *xprt)
263 {
264         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
265         int rc;
266
267         dprintk("RPC:       %s: called\n", __func__);
268
269         cancel_delayed_work(&r_xprt->rdma_connect);
270         flush_scheduled_work();
271
272         xprt_clear_connected(xprt);
273
274         rpcrdma_buffer_destroy(&r_xprt->rx_buf);
275         rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
276         if (rc)
277                 dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
278                         __func__, rc);
279         rpcrdma_ia_close(&r_xprt->rx_ia);
280
281         xprt_rdma_free_addresses(xprt);
282
283         kfree(xprt->slot);
284         xprt->slot = NULL;
285         kfree(xprt);
286
287         dprintk("RPC:       %s: returning\n", __func__);
288
289         module_put(THIS_MODULE);
290 }
291
292 static const struct rpc_timeout xprt_rdma_default_timeout = {
293         .to_initval = 60 * HZ,
294         .to_maxval = 60 * HZ,
295 };
296
297 /**
298  * xprt_setup_rdma - Set up transport to use RDMA
299  *
300  * @args: rpc transport arguments
301  */
302 static struct rpc_xprt *
303 xprt_setup_rdma(struct xprt_create *args)
304 {
305         struct rpcrdma_create_data_internal cdata;
306         struct rpc_xprt *xprt;
307         struct rpcrdma_xprt *new_xprt;
308         struct rpcrdma_ep *new_ep;
309         struct sockaddr_in *sin;
310         int rc;
311
312         if (args->addrlen > sizeof(xprt->addr)) {
313                 dprintk("RPC:       %s: address too large\n", __func__);
314                 return ERR_PTR(-EBADF);
315         }
316
317         xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
318         if (xprt == NULL) {
319                 dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
320                         __func__);
321                 return ERR_PTR(-ENOMEM);
322         }
323
324         xprt->max_reqs = xprt_rdma_slot_table_entries;
325         xprt->slot = kcalloc(xprt->max_reqs,
326                                 sizeof(struct rpc_rqst), GFP_KERNEL);
327         if (xprt->slot == NULL) {
328                 dprintk("RPC:       %s: couldn't allocate %d slots\n",
329                         __func__, xprt->max_reqs);
330                 kfree(xprt);
331                 return ERR_PTR(-ENOMEM);
332         }
333
334         /* 60 second timeout, no retries */
335         xprt->timeout = &xprt_rdma_default_timeout;
336         xprt->bind_timeout = (60U * HZ);
337         xprt->connect_timeout = (60U * HZ);
338         xprt->reestablish_timeout = (5U * HZ);
339         xprt->idle_timeout = (5U * 60 * HZ);
340
341         xprt->resvport = 0;             /* privileged port not needed */
342         xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
343         xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
344         xprt->ops = &xprt_rdma_procs;
345
346         /*
347          * Set up RDMA-specific connect data.
348          */
349
350         /* Put server RDMA address in local cdata */
351         memcpy(&cdata.addr, args->dstaddr, args->addrlen);
352
353         /* Ensure xprt->addr holds valid server TCP (not RDMA)
354          * address, for any side protocols which peek at it */
355         xprt->prot = IPPROTO_TCP;
356         xprt->addrlen = args->addrlen;
357         memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
358
359         sin = (struct sockaddr_in *)&cdata.addr;
360         if (ntohs(sin->sin_port) != 0)
361                 xprt_set_bound(xprt);
362
363         dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
364                         NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
365
366         /* Set max requests */
367         cdata.max_requests = xprt->max_reqs;
368
369         /* Set some length limits */
370         cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
371         cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
372
373         cdata.inline_wsize = xprt_rdma_max_inline_write;
374         if (cdata.inline_wsize > cdata.wsize)
375                 cdata.inline_wsize = cdata.wsize;
376
377         cdata.inline_rsize = xprt_rdma_max_inline_read;
378         if (cdata.inline_rsize > cdata.rsize)
379                 cdata.inline_rsize = cdata.rsize;
380
381         cdata.padding = xprt_rdma_inline_write_padding;
382
383         /*
384          * Create new transport instance, which includes initialized
385          *  o ia
386          *  o endpoint
387          *  o buffers
388          */
389
390         new_xprt = rpcx_to_rdmax(xprt);
391
392         rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
393                                 xprt_rdma_memreg_strategy);
394         if (rc)
395                 goto out1;
396
397         /*
398          * initialize and create ep
399          */
400         new_xprt->rx_data = cdata;
401         new_ep = &new_xprt->rx_ep;
402         new_ep->rep_remote_addr = cdata.addr;
403
404         rc = rpcrdma_ep_create(&new_xprt->rx_ep,
405                                 &new_xprt->rx_ia, &new_xprt->rx_data);
406         if (rc)
407                 goto out2;
408
409         /*
410          * Allocate pre-registered send and receive buffers for headers and
411          * any inline data. Also specify any padding which will be provided
412          * from a preregistered zero buffer.
413          */
414         rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
415                                 &new_xprt->rx_data);
416         if (rc)
417                 goto out3;
418
419         /*
420          * Register a callback for connection events. This is necessary because
421          * connection loss notification is async. We also catch connection loss
422          * when reaping receives.
423          */
424         INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
425         new_ep->rep_func = rpcrdma_conn_func;
426         new_ep->rep_xprt = xprt;
427
428         xprt_rdma_format_addresses(xprt);
429
430         if (!try_module_get(THIS_MODULE))
431                 goto out4;
432
433         return xprt;
434
435 out4:
436         xprt_rdma_free_addresses(xprt);
437         rc = -EINVAL;
438 out3:
439         (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
440 out2:
441         rpcrdma_ia_close(&new_xprt->rx_ia);
442 out1:
443         kfree(xprt->slot);
444         kfree(xprt);
445         return ERR_PTR(rc);
446 }
447
448 /*
449  * Close a connection, during shutdown or timeout/reconnect
450  */
451 static void
452 xprt_rdma_close(struct rpc_xprt *xprt)
453 {
454         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
455
456         dprintk("RPC:       %s: closing\n", __func__);
457         xprt_disconnect_done(xprt);
458         (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
459 }
460
461 static void
462 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
463 {
464         struct sockaddr_in *sap;
465
466         sap = (struct sockaddr_in *)&xprt->addr;
467         sap->sin_port = htons(port);
468         sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
469         sap->sin_port = htons(port);
470         dprintk("RPC:       %s: %u\n", __func__, port);
471 }
472
473 static void
474 xprt_rdma_connect(struct rpc_task *task)
475 {
476         struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
477         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
478
479         if (!xprt_test_and_set_connecting(xprt)) {
480                 if (r_xprt->rx_ep.rep_connected != 0) {
481                         /* Reconnect */
482                         schedule_delayed_work(&r_xprt->rdma_connect,
483                                 xprt->reestablish_timeout);
484                 } else {
485                         schedule_delayed_work(&r_xprt->rdma_connect, 0);
486                         if (!RPC_IS_ASYNC(task))
487                                 flush_scheduled_work();
488                 }
489         }
490 }
491
492 static int
493 xprt_rdma_reserve_xprt(struct rpc_task *task)
494 {
495         struct rpc_xprt *xprt = task->tk_xprt;
496         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
497         int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
498
499         /* == RPC_CWNDSCALE @ init, but *after* setup */
500         if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
501                 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
502                 dprintk("RPC:       %s: cwndscale %lu\n", __func__,
503                         r_xprt->rx_buf.rb_cwndscale);
504                 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
505         }
506         xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
507         return xprt_reserve_xprt_cong(task);
508 }
509
510 /*
511  * The RDMA allocate/free functions need the task structure as a place
512  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
513  * sequence. For this reason, the recv buffers are attached to send
514  * buffers for portions of the RPC. Note that the RPC layer allocates
515  * both send and receive buffers in the same call. We may register
516  * the receive buffer portion when using reply chunks.
517  */
518 static void *
519 xprt_rdma_allocate(struct rpc_task *task, size_t size)
520 {
521         struct rpc_xprt *xprt = task->tk_xprt;
522         struct rpcrdma_req *req, *nreq;
523
524         req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
525         BUG_ON(NULL == req);
526
527         if (size > req->rl_size) {
528                 dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
529                         "prog %d vers %d proc %d\n",
530                         __func__, size, req->rl_size,
531                         task->tk_client->cl_prog, task->tk_client->cl_vers,
532                         task->tk_msg.rpc_proc->p_proc);
533                 /*
534                  * Outgoing length shortage. Our inline write max must have
535                  * been configured to perform direct i/o.
536                  *
537                  * This is therefore a large metadata operation, and the
538                  * allocate call was made on the maximum possible message,
539                  * e.g. containing long filename(s) or symlink data. In
540                  * fact, while these metadata operations *might* carry
541                  * large outgoing payloads, they rarely *do*. However, we
542                  * have to commit to the request here, so reallocate and
543                  * register it now. The data path will never require this
544                  * reallocation.
545                  *
546                  * If the allocation or registration fails, the RPC framework
547                  * will (doggedly) retry.
548                  */
549                 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
550                                 RPCRDMA_BOUNCEBUFFERS) {
551                         /* forced to "pure inline" */
552                         dprintk("RPC:       %s: too much data (%zd) for inline "
553                                         "(r/w max %d/%d)\n", __func__, size,
554                                         rpcx_to_rdmad(xprt).inline_rsize,
555                                         rpcx_to_rdmad(xprt).inline_wsize);
556                         size = req->rl_size;
557                         rpc_exit(task, -EIO);           /* fail the operation */
558                         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
559                         goto out;
560                 }
561                 if (task->tk_flags & RPC_TASK_SWAPPER)
562                         nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
563                 else
564                         nreq = kmalloc(sizeof *req + size, GFP_NOFS);
565                 if (nreq == NULL)
566                         goto outfail;
567
568                 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
569                                 nreq->rl_base, size + sizeof(struct rpcrdma_req)
570                                 - offsetof(struct rpcrdma_req, rl_base),
571                                 &nreq->rl_handle, &nreq->rl_iov)) {
572                         kfree(nreq);
573                         goto outfail;
574                 }
575                 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
576                 nreq->rl_size = size;
577                 nreq->rl_niovs = 0;
578                 nreq->rl_nchunks = 0;
579                 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
580                 nreq->rl_reply = req->rl_reply;
581                 memcpy(nreq->rl_segments,
582                         req->rl_segments, sizeof nreq->rl_segments);
583                 /* flag the swap with an unused field */
584                 nreq->rl_iov.length = 0;
585                 req->rl_reply = NULL;
586                 req = nreq;
587         }
588         dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
589 out:
590         return req->rl_xdr_buf;
591
592 outfail:
593         rpcrdma_buffer_put(req);
594         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
595         return NULL;
596 }
597
598 /*
599  * This function returns all RDMA resources to the pool.
600  */
601 static void
602 xprt_rdma_free(void *buffer)
603 {
604         struct rpcrdma_req *req;
605         struct rpcrdma_xprt *r_xprt;
606         struct rpcrdma_rep *rep;
607         int i;
608
609         if (buffer == NULL)
610                 return;
611
612         req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
613         r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
614         rep = req->rl_reply;
615
616         dprintk("RPC:       %s: called on 0x%p%s\n",
617                 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
618
619         /*
620          * Finish the deregistration. When using mw bind, this was
621          * begun in rpcrdma_reply_handler(). In all other modes, we
622          * do it here, in thread context. The process is considered
623          * complete when the rr_func vector becomes NULL - this
624          * was put in place during rpcrdma_reply_handler() - the wait
625          * call below will not block if the dereg is "done". If
626          * interrupted, our framework will clean up.
627          */
628         for (i = 0; req->rl_nchunks;) {
629                 --req->rl_nchunks;
630                 i += rpcrdma_deregister_external(
631                         &req->rl_segments[i], r_xprt, NULL);
632         }
633
634         if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
635                 rep->rr_func = NULL;    /* abandon the callback */
636                 req->rl_reply = NULL;
637         }
638
639         if (req->rl_iov.length == 0) {  /* see allocate above */
640                 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
641                 oreq->rl_reply = req->rl_reply;
642                 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
643                                                    req->rl_handle,
644                                                    &req->rl_iov);
645                 kfree(req);
646                 req = oreq;
647         }
648
649         /* Put back request+reply buffers */
650         rpcrdma_buffer_put(req);
651 }
652
653 /*
654  * send_request invokes the meat of RPC RDMA. It must do the following:
655  *  1.  Marshal the RPC request into an RPC RDMA request, which means
656  *      putting a header in front of data, and creating IOVs for RDMA
657  *      from those in the request.
658  *  2.  In marshaling, detect opportunities for RDMA, and use them.
659  *  3.  Post a recv message to set up asynch completion, then send
660  *      the request (rpcrdma_ep_post).
661  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
662  */
663
664 static int
665 xprt_rdma_send_request(struct rpc_task *task)
666 {
667         struct rpc_rqst *rqst = task->tk_rqstp;
668         struct rpc_xprt *xprt = task->tk_xprt;
669         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
670         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
671
672         /* marshal the send itself */
673         if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
674                 r_xprt->rx_stats.failed_marshal_count++;
675                 dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
676                         __func__);
677                 return -EIO;
678         }
679
680         if (req->rl_reply == NULL)              /* e.g. reconnection */
681                 rpcrdma_recv_buffer_get(req);
682
683         if (req->rl_reply) {
684                 req->rl_reply->rr_func = rpcrdma_reply_handler;
685                 /* this need only be done once, but... */
686                 req->rl_reply->rr_xprt = xprt;
687         }
688
689         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
690                 xprt_disconnect_done(xprt);
691                 return -ENOTCONN;       /* implies disconnect */
692         }
693
694         rqst->rq_bytes_sent = 0;
695         return 0;
696 }
697
698 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
699 {
700         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
701         long idle_time = 0;
702
703         if (xprt_connected(xprt))
704                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
705
706         seq_printf(seq,
707           "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
708           "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
709
710            0,   /* need a local port? */
711            xprt->stat.bind_count,
712            xprt->stat.connect_count,
713            xprt->stat.connect_time,
714            idle_time,
715            xprt->stat.sends,
716            xprt->stat.recvs,
717            xprt->stat.bad_xids,
718            xprt->stat.req_u,
719            xprt->stat.bklog_u,
720
721            r_xprt->rx_stats.read_chunk_count,
722            r_xprt->rx_stats.write_chunk_count,
723            r_xprt->rx_stats.reply_chunk_count,
724            r_xprt->rx_stats.total_rdma_request,
725            r_xprt->rx_stats.total_rdma_reply,
726            r_xprt->rx_stats.pullup_copy_count,
727            r_xprt->rx_stats.fixup_copy_count,
728            r_xprt->rx_stats.hardway_register_count,
729            r_xprt->rx_stats.failed_marshal_count,
730            r_xprt->rx_stats.bad_reply_count);
731 }
732
733 /*
734  * Plumbing for rpc transport switch and kernel module
735  */
736
737 static struct rpc_xprt_ops xprt_rdma_procs = {
738         .reserve_xprt           = xprt_rdma_reserve_xprt,
739         .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
740         .release_request        = xprt_release_rqst_cong,       /* ditto */
741         .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
742         .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
743         .set_port               = xprt_rdma_set_port,
744         .connect                = xprt_rdma_connect,
745         .buf_alloc              = xprt_rdma_allocate,
746         .buf_free               = xprt_rdma_free,
747         .send_request           = xprt_rdma_send_request,
748         .close                  = xprt_rdma_close,
749         .destroy                = xprt_rdma_destroy,
750         .print_stats            = xprt_rdma_print_stats
751 };
752
753 static struct xprt_class xprt_rdma = {
754         .list                   = LIST_HEAD_INIT(xprt_rdma.list),
755         .name                   = "rdma",
756         .owner                  = THIS_MODULE,
757         .ident                  = XPRT_TRANSPORT_RDMA,
758         .setup                  = xprt_setup_rdma,
759 };
760
761 static void __exit xprt_rdma_cleanup(void)
762 {
763         int rc;
764
765         dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
766 #ifdef RPC_DEBUG
767         if (sunrpc_table_header) {
768                 unregister_sysctl_table(sunrpc_table_header);
769                 sunrpc_table_header = NULL;
770         }
771 #endif
772         rc = xprt_unregister_transport(&xprt_rdma);
773         if (rc)
774                 dprintk("RPC:       %s: xprt_unregister returned %i\n",
775                         __func__, rc);
776 }
777
778 static int __init xprt_rdma_init(void)
779 {
780         int rc;
781
782         rc = xprt_register_transport(&xprt_rdma);
783
784         if (rc)
785                 return rc;
786
787         dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
788
789         dprintk(KERN_INFO "Defaults:\n");
790         dprintk(KERN_INFO "\tSlots %d\n"
791                 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
792                 xprt_rdma_slot_table_entries,
793                 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
794         dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
795                 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
796
797 #ifdef RPC_DEBUG
798         if (!sunrpc_table_header)
799                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
800 #endif
801         return 0;
802 }
803
804 module_init(xprt_rdma_init);
805 module_exit(xprt_rdma_cleanup);