76eef19cf995f1cf61efc83af0916629a9a5ffde
[safe/jmp/linux-2.6] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/smp_lock.h>
31 #include <linux/utsname.h>
32 #include <linux/workqueue.h>
33
34 #include <linux/sunrpc/clnt.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36 #include <linux/sunrpc/metrics.h>
37
38
39 #ifdef RPC_DEBUG
40 # define RPCDBG_FACILITY        RPCDBG_CALL
41 #endif
42
43 #define dprint_status(t)                                        \
44         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
45                         __FUNCTION__, t->tk_status)
46
47 /*
48  * All RPC clients are linked into this list
49  */
50 static LIST_HEAD(all_clients);
51 static DEFINE_SPINLOCK(rpc_client_lock);
52
53 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
54
55
56 static void     call_start(struct rpc_task *task);
57 static void     call_reserve(struct rpc_task *task);
58 static void     call_reserveresult(struct rpc_task *task);
59 static void     call_allocate(struct rpc_task *task);
60 static void     call_encode(struct rpc_task *task);
61 static void     call_decode(struct rpc_task *task);
62 static void     call_bind(struct rpc_task *task);
63 static void     call_bind_status(struct rpc_task *task);
64 static void     call_transmit(struct rpc_task *task);
65 static void     call_status(struct rpc_task *task);
66 static void     call_transmit_status(struct rpc_task *task);
67 static void     call_refresh(struct rpc_task *task);
68 static void     call_refreshresult(struct rpc_task *task);
69 static void     call_timeout(struct rpc_task *task);
70 static void     call_connect(struct rpc_task *task);
71 static void     call_connect_status(struct rpc_task *task);
72 static __be32 * call_header(struct rpc_task *task);
73 static __be32 * call_verify(struct rpc_task *task);
74
75 static void rpc_register_client(struct rpc_clnt *clnt)
76 {
77         spin_lock(&rpc_client_lock);
78         list_add(&clnt->cl_clients, &all_clients);
79         spin_unlock(&rpc_client_lock);
80 }
81
82 static void rpc_unregister_client(struct rpc_clnt *clnt)
83 {
84         spin_lock(&rpc_client_lock);
85         list_del(&clnt->cl_clients);
86         spin_unlock(&rpc_client_lock);
87 }
88
89 static int
90 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
91 {
92         static uint32_t clntid;
93         int error;
94
95         clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
96         clnt->cl_dentry = ERR_PTR(-ENOENT);
97         if (dir_name == NULL)
98                 return 0;
99
100         clnt->cl_vfsmnt = rpc_get_mount();
101         if (IS_ERR(clnt->cl_vfsmnt))
102                 return PTR_ERR(clnt->cl_vfsmnt);
103
104         for (;;) {
105                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
106                                 "%s/clnt%x", dir_name,
107                                 (unsigned int)clntid++);
108                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
109                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
110                 if (!IS_ERR(clnt->cl_dentry))
111                         return 0;
112                 error = PTR_ERR(clnt->cl_dentry);
113                 if (error != -EEXIST) {
114                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
115                                         clnt->cl_pathname, error);
116                         rpc_put_mount();
117                         return error;
118                 }
119         }
120 }
121
122 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
123 {
124         struct rpc_version      *version;
125         struct rpc_clnt         *clnt = NULL;
126         struct rpc_auth         *auth;
127         int err;
128         int len;
129
130         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
131                         program->name, servname, xprt);
132
133         err = rpciod_up();
134         if (err)
135                 goto out_no_rpciod;
136         err = -EINVAL;
137         if (!xprt)
138                 goto out_no_xprt;
139         if (vers >= program->nrvers || !(version = program->version[vers]))
140                 goto out_err;
141
142         err = -ENOMEM;
143         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
144         if (!clnt)
145                 goto out_err;
146         clnt->cl_parent = clnt;
147
148         clnt->cl_server = clnt->cl_inline_name;
149         len = strlen(servname) + 1;
150         if (len > sizeof(clnt->cl_inline_name)) {
151                 char *buf = kmalloc(len, GFP_KERNEL);
152                 if (buf != 0)
153                         clnt->cl_server = buf;
154                 else
155                         len = sizeof(clnt->cl_inline_name);
156         }
157         strlcpy(clnt->cl_server, servname, len);
158
159         clnt->cl_xprt     = xprt;
160         clnt->cl_procinfo = version->procs;
161         clnt->cl_maxproc  = version->nrprocs;
162         clnt->cl_protname = program->name;
163         clnt->cl_prog     = program->number;
164         clnt->cl_vers     = version->number;
165         clnt->cl_stats    = program->stats;
166         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
167         err = -ENOMEM;
168         if (clnt->cl_metrics == NULL)
169                 goto out_no_stats;
170         clnt->cl_program  = program;
171         INIT_LIST_HEAD(&clnt->cl_tasks);
172         spin_lock_init(&clnt->cl_lock);
173
174         if (!xprt_bound(clnt->cl_xprt))
175                 clnt->cl_autobind = 1;
176
177         clnt->cl_rtt = &clnt->cl_rtt_default;
178         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
179
180         kref_init(&clnt->cl_kref);
181
182         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
183         if (err < 0)
184                 goto out_no_path;
185
186         auth = rpcauth_create(flavor, clnt);
187         if (IS_ERR(auth)) {
188                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
189                                 flavor);
190                 err = PTR_ERR(auth);
191                 goto out_no_auth;
192         }
193
194         /* save the nodename */
195         clnt->cl_nodelen = strlen(utsname()->nodename);
196         if (clnt->cl_nodelen > UNX_MAXNODENAME)
197                 clnt->cl_nodelen = UNX_MAXNODENAME;
198         memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
199         rpc_register_client(clnt);
200         return clnt;
201
202 out_no_auth:
203         if (!IS_ERR(clnt->cl_dentry)) {
204                 rpc_rmdir(clnt->cl_dentry);
205                 rpc_put_mount();
206         }
207 out_no_path:
208         rpc_free_iostats(clnt->cl_metrics);
209 out_no_stats:
210         if (clnt->cl_server != clnt->cl_inline_name)
211                 kfree(clnt->cl_server);
212         kfree(clnt);
213 out_err:
214         xprt_put(xprt);
215 out_no_xprt:
216         rpciod_down();
217 out_no_rpciod:
218         return ERR_PTR(err);
219 }
220
221 /*
222  * rpc_create - create an RPC client and transport with one call
223  * @args: rpc_clnt create argument structure
224  *
225  * Creates and initializes an RPC transport and an RPC client.
226  *
227  * It can ping the server in order to determine if it is up, and to see if
228  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
229  * this behavior so asynchronous tasks can also use rpc_create.
230  */
231 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
232 {
233         struct rpc_xprt *xprt;
234         struct rpc_clnt *clnt;
235
236         xprt = xprt_create_transport(args->protocol, args->address,
237                                         args->addrsize, args->timeout);
238         if (IS_ERR(xprt))
239                 return (struct rpc_clnt *)xprt;
240
241         /*
242          * By default, kernel RPC client connects from a reserved port.
243          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
244          * but it is always enabled for rpciod, which handles the connect
245          * operation.
246          */
247         xprt->resvport = 1;
248         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
249                 xprt->resvport = 0;
250
251         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
252                         args->program->name, args->servername, xprt);
253
254         clnt = rpc_new_client(xprt, args->servername, args->program,
255                                 args->version, args->authflavor);
256         if (IS_ERR(clnt))
257                 return clnt;
258
259         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
260                 int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
261                 if (err != 0) {
262                         rpc_shutdown_client(clnt);
263                         return ERR_PTR(err);
264                 }
265         }
266
267         clnt->cl_softrtry = 1;
268         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
269                 clnt->cl_softrtry = 0;
270
271         if (args->flags & RPC_CLNT_CREATE_INTR)
272                 clnt->cl_intr = 1;
273         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
274                 clnt->cl_autobind = 1;
275         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
276                 clnt->cl_discrtry = 1;
277
278         return clnt;
279 }
280 EXPORT_SYMBOL_GPL(rpc_create);
281
282 /*
283  * This function clones the RPC client structure. It allows us to share the
284  * same transport while varying parameters such as the authentication
285  * flavour.
286  */
287 struct rpc_clnt *
288 rpc_clone_client(struct rpc_clnt *clnt)
289 {
290         struct rpc_clnt *new;
291         int err = -ENOMEM;
292
293         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
294         if (!new)
295                 goto out_no_clnt;
296         new->cl_parent = clnt;
297         /* Turn off autobind on clones */
298         new->cl_autobind = 0;
299         INIT_LIST_HEAD(&new->cl_tasks);
300         spin_lock_init(&new->cl_lock);
301         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
302         new->cl_metrics = rpc_alloc_iostats(clnt);
303         if (new->cl_metrics == NULL)
304                 goto out_no_stats;
305         kref_init(&new->cl_kref);
306         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
307         if (err != 0)
308                 goto out_no_path;
309         if (new->cl_auth)
310                 atomic_inc(&new->cl_auth->au_count);
311         xprt_get(clnt->cl_xprt);
312         kref_get(&clnt->cl_kref);
313         rpc_register_client(new);
314         rpciod_up();
315         return new;
316 out_no_path:
317         rpc_free_iostats(new->cl_metrics);
318 out_no_stats:
319         kfree(new);
320 out_no_clnt:
321         dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
322         return ERR_PTR(err);
323 }
324
325 /*
326  * Properly shut down an RPC client, terminating all outstanding
327  * requests.
328  */
329 void rpc_shutdown_client(struct rpc_clnt *clnt)
330 {
331         dprintk("RPC:       shutting down %s client for %s\n",
332                         clnt->cl_protname, clnt->cl_server);
333
334         while (!list_empty(&clnt->cl_tasks)) {
335                 rpc_killall_tasks(clnt);
336                 wait_event_timeout(destroy_wait,
337                         list_empty(&clnt->cl_tasks), 1*HZ);
338         }
339
340         rpc_release_client(clnt);
341 }
342
343 /*
344  * Free an RPC client
345  */
346 static void
347 rpc_free_client(struct kref *kref)
348 {
349         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
350
351         dprintk("RPC:       destroying %s client for %s\n",
352                         clnt->cl_protname, clnt->cl_server);
353         if (clnt->cl_auth) {
354                 rpcauth_destroy(clnt->cl_auth);
355                 clnt->cl_auth = NULL;
356         }
357         if (!IS_ERR(clnt->cl_dentry)) {
358                 rpc_rmdir(clnt->cl_dentry);
359                 rpc_put_mount();
360         }
361         if (clnt->cl_parent != clnt) {
362                 rpc_release_client(clnt->cl_parent);
363                 goto out_free;
364         }
365         if (clnt->cl_server != clnt->cl_inline_name)
366                 kfree(clnt->cl_server);
367 out_free:
368         rpc_unregister_client(clnt);
369         rpc_free_iostats(clnt->cl_metrics);
370         clnt->cl_metrics = NULL;
371         xprt_put(clnt->cl_xprt);
372         rpciod_down();
373         kfree(clnt);
374 }
375
376 /*
377  * Release reference to the RPC client
378  */
379 void
380 rpc_release_client(struct rpc_clnt *clnt)
381 {
382         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
383
384         if (list_empty(&clnt->cl_tasks))
385                 wake_up(&destroy_wait);
386         kref_put(&clnt->cl_kref, rpc_free_client);
387 }
388
389 /**
390  * rpc_bind_new_program - bind a new RPC program to an existing client
391  * @old - old rpc_client
392  * @program - rpc program to set
393  * @vers - rpc program version
394  *
395  * Clones the rpc client and sets up a new RPC program. This is mainly
396  * of use for enabling different RPC programs to share the same transport.
397  * The Sun NFSv2/v3 ACL protocol can do this.
398  */
399 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
400                                       struct rpc_program *program,
401                                       int vers)
402 {
403         struct rpc_clnt *clnt;
404         struct rpc_version *version;
405         int err;
406
407         BUG_ON(vers >= program->nrvers || !program->version[vers]);
408         version = program->version[vers];
409         clnt = rpc_clone_client(old);
410         if (IS_ERR(clnt))
411                 goto out;
412         clnt->cl_procinfo = version->procs;
413         clnt->cl_maxproc  = version->nrprocs;
414         clnt->cl_protname = program->name;
415         clnt->cl_prog     = program->number;
416         clnt->cl_vers     = version->number;
417         clnt->cl_stats    = program->stats;
418         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
419         if (err != 0) {
420                 rpc_shutdown_client(clnt);
421                 clnt = ERR_PTR(err);
422         }
423 out:
424         return clnt;
425 }
426
427 /*
428  * Default callback for async RPC calls
429  */
430 static void
431 rpc_default_callback(struct rpc_task *task, void *data)
432 {
433 }
434
435 static const struct rpc_call_ops rpc_default_ops = {
436         .rpc_call_done = rpc_default_callback,
437 };
438
439 /*
440  *      Export the signal mask handling for synchronous code that
441  *      sleeps on RPC calls
442  */
443 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
444
445 static void rpc_save_sigmask(sigset_t *oldset, int intr)
446 {
447         unsigned long   sigallow = sigmask(SIGKILL);
448         sigset_t sigmask;
449
450         /* Block all signals except those listed in sigallow */
451         if (intr)
452                 sigallow |= RPC_INTR_SIGNALS;
453         siginitsetinv(&sigmask, sigallow);
454         sigprocmask(SIG_BLOCK, &sigmask, oldset);
455 }
456
457 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
458 {
459         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
460 }
461
462 static inline void rpc_restore_sigmask(sigset_t *oldset)
463 {
464         sigprocmask(SIG_SETMASK, oldset, NULL);
465 }
466
467 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
468 {
469         rpc_save_sigmask(oldset, clnt->cl_intr);
470 }
471
472 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
473 {
474         rpc_restore_sigmask(oldset);
475 }
476
477 static
478 struct rpc_task *rpc_do_run_task(struct rpc_clnt *clnt,
479                 struct rpc_message *msg,
480                 int flags,
481                 const struct rpc_call_ops *ops,
482                 void *data)
483 {
484         struct rpc_task *task, *ret;
485         sigset_t oldset;
486
487         task = rpc_new_task(clnt, flags, ops, data);
488         if (task == NULL) {
489                 rpc_release_calldata(ops, data);
490                 return ERR_PTR(-ENOMEM);
491         }
492
493         /* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */
494         rpc_task_sigmask(task, &oldset);
495         if (msg != NULL) {
496                 rpc_call_setup(task, msg, 0);
497                 if (task->tk_status != 0) {
498                         ret = ERR_PTR(task->tk_status);
499                         rpc_put_task(task);
500                         goto out;
501                 }
502         }
503         atomic_inc(&task->tk_count);
504         rpc_execute(task);
505         ret = task;
506 out:
507         rpc_restore_sigmask(&oldset);
508         return ret;
509 }
510
511 /**
512  * rpc_call_sync - Perform a synchronous RPC call
513  * @clnt: pointer to RPC client
514  * @msg: RPC call parameters
515  * @flags: RPC call flags
516  */
517 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
518 {
519         struct rpc_task *task;
520         int status;
521
522         BUG_ON(flags & RPC_TASK_ASYNC);
523
524         task = rpc_do_run_task(clnt, msg, flags, &rpc_default_ops, NULL);
525         if (IS_ERR(task))
526                 return PTR_ERR(task);
527         status = task->tk_status;
528         rpc_put_task(task);
529         return status;
530 }
531
532 /**
533  * rpc_call_async - Perform an asynchronous RPC call
534  * @clnt: pointer to RPC client
535  * @msg: RPC call parameters
536  * @flags: RPC call flags
537  * @ops: RPC call ops
538  * @data: user call data
539  */
540 int
541 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
542                const struct rpc_call_ops *tk_ops, void *data)
543 {
544         struct rpc_task *task;
545
546         task = rpc_do_run_task(clnt, msg, flags|RPC_TASK_ASYNC, tk_ops, data);
547         if (IS_ERR(task))
548                 return PTR_ERR(task);
549         rpc_put_task(task);
550         return 0;
551 }
552
553 /**
554  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
555  * @clnt: pointer to RPC client
556  * @flags: RPC flags
557  * @ops: RPC call ops
558  * @data: user call data
559  */
560 struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
561                                         const struct rpc_call_ops *tk_ops,
562                                         void *data)
563 {
564         return rpc_do_run_task(clnt, NULL, flags, tk_ops, data);
565 }
566 EXPORT_SYMBOL(rpc_run_task);
567
568 void
569 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
570 {
571         task->tk_msg   = *msg;
572         task->tk_flags |= flags;
573         /* Bind the user cred */
574         if (task->tk_msg.rpc_cred != NULL)
575                 rpcauth_holdcred(task);
576         else
577                 rpcauth_bindcred(task);
578
579         if (task->tk_status == 0)
580                 task->tk_action = call_start;
581         else
582                 task->tk_action = rpc_exit_task;
583 }
584
585 /**
586  * rpc_peeraddr - extract remote peer address from clnt's xprt
587  * @clnt: RPC client structure
588  * @buf: target buffer
589  * @size: length of target buffer
590  *
591  * Returns the number of bytes that are actually in the stored address.
592  */
593 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
594 {
595         size_t bytes;
596         struct rpc_xprt *xprt = clnt->cl_xprt;
597
598         bytes = sizeof(xprt->addr);
599         if (bytes > bufsize)
600                 bytes = bufsize;
601         memcpy(buf, &clnt->cl_xprt->addr, bytes);
602         return xprt->addrlen;
603 }
604 EXPORT_SYMBOL_GPL(rpc_peeraddr);
605
606 /**
607  * rpc_peeraddr2str - return remote peer address in printable format
608  * @clnt: RPC client structure
609  * @format: address format
610  *
611  */
612 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
613 {
614         struct rpc_xprt *xprt = clnt->cl_xprt;
615
616         if (xprt->address_strings[format] != NULL)
617                 return xprt->address_strings[format];
618         else
619                 return "unprintable";
620 }
621 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
622
623 void
624 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
625 {
626         struct rpc_xprt *xprt = clnt->cl_xprt;
627         if (xprt->ops->set_buffer_size)
628                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
629 }
630
631 /*
632  * Return size of largest payload RPC client can support, in bytes
633  *
634  * For stream transports, this is one RPC record fragment (see RFC
635  * 1831), as we don't support multi-record requests yet.  For datagram
636  * transports, this is the size of an IP packet minus the IP, UDP, and
637  * RPC header sizes.
638  */
639 size_t rpc_max_payload(struct rpc_clnt *clnt)
640 {
641         return clnt->cl_xprt->max_payload;
642 }
643 EXPORT_SYMBOL_GPL(rpc_max_payload);
644
645 /**
646  * rpc_force_rebind - force transport to check that remote port is unchanged
647  * @clnt: client to rebind
648  *
649  */
650 void rpc_force_rebind(struct rpc_clnt *clnt)
651 {
652         if (clnt->cl_autobind)
653                 xprt_clear_bound(clnt->cl_xprt);
654 }
655 EXPORT_SYMBOL_GPL(rpc_force_rebind);
656
657 /*
658  * Restart an (async) RPC call. Usually called from within the
659  * exit handler.
660  */
661 void
662 rpc_restart_call(struct rpc_task *task)
663 {
664         if (RPC_ASSASSINATED(task))
665                 return;
666
667         task->tk_action = call_start;
668 }
669
670 /*
671  * 0.  Initial state
672  *
673  *     Other FSM states can be visited zero or more times, but
674  *     this state is visited exactly once for each RPC.
675  */
676 static void
677 call_start(struct rpc_task *task)
678 {
679         struct rpc_clnt *clnt = task->tk_client;
680
681         dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
682                         clnt->cl_protname, clnt->cl_vers,
683                         task->tk_msg.rpc_proc->p_proc,
684                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
685
686         /* Increment call count */
687         task->tk_msg.rpc_proc->p_count++;
688         clnt->cl_stats->rpccnt++;
689         task->tk_action = call_reserve;
690 }
691
692 /*
693  * 1.   Reserve an RPC call slot
694  */
695 static void
696 call_reserve(struct rpc_task *task)
697 {
698         dprint_status(task);
699
700         if (!rpcauth_uptodatecred(task)) {
701                 task->tk_action = call_refresh;
702                 return;
703         }
704
705         task->tk_status  = 0;
706         task->tk_action  = call_reserveresult;
707         xprt_reserve(task);
708 }
709
710 /*
711  * 1b.  Grok the result of xprt_reserve()
712  */
713 static void
714 call_reserveresult(struct rpc_task *task)
715 {
716         int status = task->tk_status;
717
718         dprint_status(task);
719
720         /*
721          * After a call to xprt_reserve(), we must have either
722          * a request slot or else an error status.
723          */
724         task->tk_status = 0;
725         if (status >= 0) {
726                 if (task->tk_rqstp) {
727                         task->tk_action = call_allocate;
728                         return;
729                 }
730
731                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
732                                 __FUNCTION__, status);
733                 rpc_exit(task, -EIO);
734                 return;
735         }
736
737         /*
738          * Even though there was an error, we may have acquired
739          * a request slot somehow.  Make sure not to leak it.
740          */
741         if (task->tk_rqstp) {
742                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
743                                 __FUNCTION__, status);
744                 xprt_release(task);
745         }
746
747         switch (status) {
748         case -EAGAIN:   /* woken up; retry */
749                 task->tk_action = call_reserve;
750                 return;
751         case -EIO:      /* probably a shutdown */
752                 break;
753         default:
754                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
755                                 __FUNCTION__, status);
756                 break;
757         }
758         rpc_exit(task, status);
759 }
760
761 /*
762  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
763  *      (Note: buffer memory is freed in xprt_release).
764  */
765 static void
766 call_allocate(struct rpc_task *task)
767 {
768         unsigned int slack = task->tk_auth->au_cslack;
769         struct rpc_rqst *req = task->tk_rqstp;
770         struct rpc_xprt *xprt = task->tk_xprt;
771         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
772
773         dprint_status(task);
774
775         task->tk_status = 0;
776         task->tk_action = call_bind;
777
778         if (req->rq_buffer)
779                 return;
780
781         if (proc->p_proc != 0) {
782                 BUG_ON(proc->p_arglen == 0);
783                 if (proc->p_decode != NULL)
784                         BUG_ON(proc->p_replen == 0);
785         }
786
787         /*
788          * Calculate the size (in quads) of the RPC call
789          * and reply headers, and convert both values
790          * to byte sizes.
791          */
792         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
793         req->rq_callsize <<= 2;
794         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
795         req->rq_rcvsize <<= 2;
796
797         req->rq_buffer = xprt->ops->buf_alloc(task,
798                                         req->rq_callsize + req->rq_rcvsize);
799         if (req->rq_buffer != NULL)
800                 return;
801
802         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
803
804         if (RPC_IS_ASYNC(task) || !signalled()) {
805                 xprt_release(task);
806                 task->tk_action = call_reserve;
807                 rpc_delay(task, HZ>>4);
808                 return;
809         }
810
811         rpc_exit(task, -ERESTARTSYS);
812 }
813
814 static inline int
815 rpc_task_need_encode(struct rpc_task *task)
816 {
817         return task->tk_rqstp->rq_snd_buf.len == 0;
818 }
819
820 static inline void
821 rpc_task_force_reencode(struct rpc_task *task)
822 {
823         task->tk_rqstp->rq_snd_buf.len = 0;
824 }
825
826 static inline void
827 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
828 {
829         buf->head[0].iov_base = start;
830         buf->head[0].iov_len = len;
831         buf->tail[0].iov_len = 0;
832         buf->page_len = 0;
833         buf->len = 0;
834         buf->buflen = len;
835 }
836
837 /*
838  * 3.   Encode arguments of an RPC call
839  */
840 static void
841 call_encode(struct rpc_task *task)
842 {
843         struct rpc_rqst *req = task->tk_rqstp;
844         kxdrproc_t      encode;
845         __be32          *p;
846
847         dprint_status(task);
848
849         rpc_xdr_buf_init(&req->rq_snd_buf,
850                          req->rq_buffer,
851                          req->rq_callsize);
852         rpc_xdr_buf_init(&req->rq_rcv_buf,
853                          (char *)req->rq_buffer + req->rq_callsize,
854                          req->rq_rcvsize);
855
856         /* Encode header and provided arguments */
857         encode = task->tk_msg.rpc_proc->p_encode;
858         if (!(p = call_header(task))) {
859                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
860                 rpc_exit(task, -EIO);
861                 return;
862         }
863         if (encode == NULL)
864                 return;
865
866         lock_kernel();
867         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
868                         task->tk_msg.rpc_argp);
869         unlock_kernel();
870         if (task->tk_status == -ENOMEM) {
871                 /* XXX: Is this sane? */
872                 rpc_delay(task, 3*HZ);
873                 task->tk_status = -EAGAIN;
874         }
875 }
876
877 /*
878  * 4.   Get the server port number if not yet set
879  */
880 static void
881 call_bind(struct rpc_task *task)
882 {
883         struct rpc_xprt *xprt = task->tk_xprt;
884
885         dprint_status(task);
886
887         task->tk_action = call_connect;
888         if (!xprt_bound(xprt)) {
889                 task->tk_action = call_bind_status;
890                 task->tk_timeout = xprt->bind_timeout;
891                 xprt->ops->rpcbind(task);
892         }
893 }
894
895 /*
896  * 4a.  Sort out bind result
897  */
898 static void
899 call_bind_status(struct rpc_task *task)
900 {
901         int status = -EACCES;
902
903         if (task->tk_status >= 0) {
904                 dprint_status(task);
905                 task->tk_status = 0;
906                 task->tk_action = call_connect;
907                 return;
908         }
909
910         switch (task->tk_status) {
911         case -EACCES:
912                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
913                                 "unavailable\n", task->tk_pid);
914                 rpc_delay(task, 3*HZ);
915                 goto retry_timeout;
916         case -ETIMEDOUT:
917                 dprintk("RPC: %5u rpcbind request timed out\n",
918                                 task->tk_pid);
919                 goto retry_timeout;
920         case -EPFNOSUPPORT:
921                 dprintk("RPC: %5u remote rpcbind service unavailable\n",
922                                 task->tk_pid);
923                 break;
924         case -EPROTONOSUPPORT:
925                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
926                                 task->tk_pid);
927                 task->tk_status = 0;
928                 task->tk_action = call_bind;
929                 return;
930         default:
931                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
932                                 task->tk_pid, -task->tk_status);
933                 status = -EIO;
934         }
935
936         rpc_exit(task, status);
937         return;
938
939 retry_timeout:
940         task->tk_action = call_timeout;
941 }
942
943 /*
944  * 4b.  Connect to the RPC server
945  */
946 static void
947 call_connect(struct rpc_task *task)
948 {
949         struct rpc_xprt *xprt = task->tk_xprt;
950
951         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
952                         task->tk_pid, xprt,
953                         (xprt_connected(xprt) ? "is" : "is not"));
954
955         task->tk_action = call_transmit;
956         if (!xprt_connected(xprt)) {
957                 task->tk_action = call_connect_status;
958                 if (task->tk_status < 0)
959                         return;
960                 xprt_connect(task);
961         }
962 }
963
964 /*
965  * 4c.  Sort out connect result
966  */
967 static void
968 call_connect_status(struct rpc_task *task)
969 {
970         struct rpc_clnt *clnt = task->tk_client;
971         int status = task->tk_status;
972
973         dprint_status(task);
974
975         task->tk_status = 0;
976         if (status >= 0) {
977                 clnt->cl_stats->netreconn++;
978                 task->tk_action = call_transmit;
979                 return;
980         }
981
982         /* Something failed: remote service port may have changed */
983         rpc_force_rebind(clnt);
984
985         switch (status) {
986         case -ENOTCONN:
987         case -EAGAIN:
988                 task->tk_action = call_bind;
989                 if (!RPC_IS_SOFT(task))
990                         return;
991                 /* if soft mounted, test if we've timed out */
992         case -ETIMEDOUT:
993                 task->tk_action = call_timeout;
994                 return;
995         }
996         rpc_exit(task, -EIO);
997 }
998
999 /*
1000  * 5.   Transmit the RPC request, and wait for reply
1001  */
1002 static void
1003 call_transmit(struct rpc_task *task)
1004 {
1005         dprint_status(task);
1006
1007         task->tk_action = call_status;
1008         if (task->tk_status < 0)
1009                 return;
1010         task->tk_status = xprt_prepare_transmit(task);
1011         if (task->tk_status != 0)
1012                 return;
1013         task->tk_action = call_transmit_status;
1014         /* Encode here so that rpcsec_gss can use correct sequence number. */
1015         if (rpc_task_need_encode(task)) {
1016                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1017                 call_encode(task);
1018                 /* Did the encode result in an error condition? */
1019                 if (task->tk_status != 0)
1020                         return;
1021         }
1022         xprt_transmit(task);
1023         if (task->tk_status < 0)
1024                 return;
1025         /*
1026          * On success, ensure that we call xprt_end_transmit() before sleeping
1027          * in order to allow access to the socket to other RPC requests.
1028          */
1029         call_transmit_status(task);
1030         if (task->tk_msg.rpc_proc->p_decode != NULL)
1031                 return;
1032         task->tk_action = rpc_exit_task;
1033         rpc_wake_up_task(task);
1034 }
1035
1036 /*
1037  * 5a.  Handle cleanup after a transmission
1038  */
1039 static void
1040 call_transmit_status(struct rpc_task *task)
1041 {
1042         task->tk_action = call_status;
1043         /*
1044          * Special case: if we've been waiting on the socket's write_space()
1045          * callback, then don't call xprt_end_transmit().
1046          */
1047         if (task->tk_status == -EAGAIN)
1048                 return;
1049         xprt_end_transmit(task);
1050         rpc_task_force_reencode(task);
1051 }
1052
1053 /*
1054  * 6.   Sort out the RPC call status
1055  */
1056 static void
1057 call_status(struct rpc_task *task)
1058 {
1059         struct rpc_clnt *clnt = task->tk_client;
1060         struct rpc_rqst *req = task->tk_rqstp;
1061         int             status;
1062
1063         if (req->rq_received > 0 && !req->rq_bytes_sent)
1064                 task->tk_status = req->rq_received;
1065
1066         dprint_status(task);
1067
1068         status = task->tk_status;
1069         if (status >= 0) {
1070                 task->tk_action = call_decode;
1071                 return;
1072         }
1073
1074         task->tk_status = 0;
1075         switch(status) {
1076         case -EHOSTDOWN:
1077         case -EHOSTUNREACH:
1078         case -ENETUNREACH:
1079                 /*
1080                  * Delay any retries for 3 seconds, then handle as if it
1081                  * were a timeout.
1082                  */
1083                 rpc_delay(task, 3*HZ);
1084         case -ETIMEDOUT:
1085                 task->tk_action = call_timeout;
1086                 if (task->tk_client->cl_discrtry)
1087                         xprt_disconnect(task->tk_xprt);
1088                 break;
1089         case -ECONNREFUSED:
1090         case -ENOTCONN:
1091                 rpc_force_rebind(clnt);
1092                 task->tk_action = call_bind;
1093                 break;
1094         case -EAGAIN:
1095                 task->tk_action = call_transmit;
1096                 break;
1097         case -EIO:
1098                 /* shutdown or soft timeout */
1099                 rpc_exit(task, status);
1100                 break;
1101         default:
1102                 printk("%s: RPC call returned error %d\n",
1103                                clnt->cl_protname, -status);
1104                 rpc_exit(task, status);
1105         }
1106 }
1107
1108 /*
1109  * 6a.  Handle RPC timeout
1110  *      We do not release the request slot, so we keep using the
1111  *      same XID for all retransmits.
1112  */
1113 static void
1114 call_timeout(struct rpc_task *task)
1115 {
1116         struct rpc_clnt *clnt = task->tk_client;
1117
1118         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1119                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1120                 goto retry;
1121         }
1122
1123         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1124         task->tk_timeouts++;
1125
1126         if (RPC_IS_SOFT(task)) {
1127                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1128                                 clnt->cl_protname, clnt->cl_server);
1129                 rpc_exit(task, -EIO);
1130                 return;
1131         }
1132
1133         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1134                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1135                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1136                         clnt->cl_protname, clnt->cl_server);
1137         }
1138         rpc_force_rebind(clnt);
1139
1140 retry:
1141         clnt->cl_stats->rpcretrans++;
1142         task->tk_action = call_bind;
1143         task->tk_status = 0;
1144 }
1145
1146 /*
1147  * 7.   Decode the RPC reply
1148  */
1149 static void
1150 call_decode(struct rpc_task *task)
1151 {
1152         struct rpc_clnt *clnt = task->tk_client;
1153         struct rpc_rqst *req = task->tk_rqstp;
1154         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1155         __be32          *p;
1156
1157         dprintk("RPC: %5u call_decode (status %d)\n",
1158                         task->tk_pid, task->tk_status);
1159
1160         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1161                 printk(KERN_NOTICE "%s: server %s OK\n",
1162                         clnt->cl_protname, clnt->cl_server);
1163                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1164         }
1165
1166         if (task->tk_status < 12) {
1167                 if (!RPC_IS_SOFT(task)) {
1168                         task->tk_action = call_bind;
1169                         clnt->cl_stats->rpcretrans++;
1170                         goto out_retry;
1171                 }
1172                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1173                                 clnt->cl_protname, task->tk_status);
1174                 task->tk_action = call_timeout;
1175                 goto out_retry;
1176         }
1177
1178         /*
1179          * Ensure that we see all writes made by xprt_complete_rqst()
1180          * before it changed req->rq_received.
1181          */
1182         smp_rmb();
1183         req->rq_rcv_buf.len = req->rq_private_buf.len;
1184
1185         /* Check that the softirq receive buffer is valid */
1186         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1187                                 sizeof(req->rq_rcv_buf)) != 0);
1188
1189         /* Verify the RPC header */
1190         p = call_verify(task);
1191         if (IS_ERR(p)) {
1192                 if (p == ERR_PTR(-EAGAIN))
1193                         goto out_retry;
1194                 return;
1195         }
1196
1197         task->tk_action = rpc_exit_task;
1198
1199         if (decode) {
1200                 lock_kernel();
1201                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1202                                                       task->tk_msg.rpc_resp);
1203                 unlock_kernel();
1204         }
1205         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1206                         task->tk_status);
1207         return;
1208 out_retry:
1209         req->rq_received = req->rq_private_buf.len = 0;
1210         task->tk_status = 0;
1211         if (task->tk_client->cl_discrtry)
1212                 xprt_disconnect(task->tk_xprt);
1213 }
1214
1215 /*
1216  * 8.   Refresh the credentials if rejected by the server
1217  */
1218 static void
1219 call_refresh(struct rpc_task *task)
1220 {
1221         dprint_status(task);
1222
1223         xprt_release(task);     /* Must do to obtain new XID */
1224         task->tk_action = call_refreshresult;
1225         task->tk_status = 0;
1226         task->tk_client->cl_stats->rpcauthrefresh++;
1227         rpcauth_refreshcred(task);
1228 }
1229
1230 /*
1231  * 8a.  Process the results of a credential refresh
1232  */
1233 static void
1234 call_refreshresult(struct rpc_task *task)
1235 {
1236         int status = task->tk_status;
1237
1238         dprint_status(task);
1239
1240         task->tk_status = 0;
1241         task->tk_action = call_reserve;
1242         if (status >= 0 && rpcauth_uptodatecred(task))
1243                 return;
1244         if (status == -EACCES) {
1245                 rpc_exit(task, -EACCES);
1246                 return;
1247         }
1248         task->tk_action = call_refresh;
1249         if (status != -ETIMEDOUT)
1250                 rpc_delay(task, 3*HZ);
1251         return;
1252 }
1253
1254 /*
1255  * Call header serialization
1256  */
1257 static __be32 *
1258 call_header(struct rpc_task *task)
1259 {
1260         struct rpc_clnt *clnt = task->tk_client;
1261         struct rpc_rqst *req = task->tk_rqstp;
1262         __be32          *p = req->rq_svec[0].iov_base;
1263
1264         /* FIXME: check buffer size? */
1265
1266         p = xprt_skip_transport_header(task->tk_xprt, p);
1267         *p++ = req->rq_xid;             /* XID */
1268         *p++ = htonl(RPC_CALL);         /* CALL */
1269         *p++ = htonl(RPC_VERSION);      /* RPC version */
1270         *p++ = htonl(clnt->cl_prog);    /* program number */
1271         *p++ = htonl(clnt->cl_vers);    /* program version */
1272         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1273         p = rpcauth_marshcred(task, p);
1274         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1275         return p;
1276 }
1277
1278 /*
1279  * Reply header verification
1280  */
1281 static __be32 *
1282 call_verify(struct rpc_task *task)
1283 {
1284         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1285         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1286         __be32  *p = iov->iov_base;
1287         u32 n;
1288         int error = -EACCES;
1289
1290         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1291                 /* RFC-1014 says that the representation of XDR data must be a
1292                  * multiple of four bytes
1293                  * - if it isn't pointer subtraction in the NFS client may give
1294                  *   undefined results
1295                  */
1296                 printk(KERN_WARNING
1297                        "call_verify: XDR representation not a multiple of"
1298                        " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1299                 goto out_eio;
1300         }
1301         if ((len -= 3) < 0)
1302                 goto out_overflow;
1303         p += 1; /* skip XID */
1304
1305         if ((n = ntohl(*p++)) != RPC_REPLY) {
1306                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1307                 goto out_garbage;
1308         }
1309         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1310                 if (--len < 0)
1311                         goto out_overflow;
1312                 switch ((n = ntohl(*p++))) {
1313                         case RPC_AUTH_ERROR:
1314                                 break;
1315                         case RPC_MISMATCH:
1316                                 dprintk("RPC: %5u %s: RPC call version "
1317                                                 "mismatch!\n",
1318                                                 task->tk_pid, __FUNCTION__);
1319                                 error = -EPROTONOSUPPORT;
1320                                 goto out_err;
1321                         default:
1322                                 dprintk("RPC: %5u %s: RPC call rejected, "
1323                                                 "unknown error: %x\n",
1324                                                 task->tk_pid, __FUNCTION__, n);
1325                                 goto out_eio;
1326                 }
1327                 if (--len < 0)
1328                         goto out_overflow;
1329                 switch ((n = ntohl(*p++))) {
1330                 case RPC_AUTH_REJECTEDCRED:
1331                 case RPC_AUTH_REJECTEDVERF:
1332                 case RPCSEC_GSS_CREDPROBLEM:
1333                 case RPCSEC_GSS_CTXPROBLEM:
1334                         if (!task->tk_cred_retry)
1335                                 break;
1336                         task->tk_cred_retry--;
1337                         dprintk("RPC: %5u %s: retry stale creds\n",
1338                                         task->tk_pid, __FUNCTION__);
1339                         rpcauth_invalcred(task);
1340                         task->tk_action = call_refresh;
1341                         goto out_retry;
1342                 case RPC_AUTH_BADCRED:
1343                 case RPC_AUTH_BADVERF:
1344                         /* possibly garbled cred/verf? */
1345                         if (!task->tk_garb_retry)
1346                                 break;
1347                         task->tk_garb_retry--;
1348                         dprintk("RPC: %5u %s: retry garbled creds\n",
1349                                         task->tk_pid, __FUNCTION__);
1350                         task->tk_action = call_bind;
1351                         goto out_retry;
1352                 case RPC_AUTH_TOOWEAK:
1353                         printk(KERN_NOTICE "call_verify: server %s requires stronger "
1354                                "authentication.\n", task->tk_client->cl_server);
1355                         break;
1356                 default:
1357                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1358                         error = -EIO;
1359                 }
1360                 dprintk("RPC: %5u %s: call rejected %d\n",
1361                                 task->tk_pid, __FUNCTION__, n);
1362                 goto out_err;
1363         }
1364         if (!(p = rpcauth_checkverf(task, p))) {
1365                 printk(KERN_WARNING "call_verify: auth check failed\n");
1366                 goto out_garbage;               /* bad verifier, retry */
1367         }
1368         len = p - (__be32 *)iov->iov_base - 1;
1369         if (len < 0)
1370                 goto out_overflow;
1371         switch ((n = ntohl(*p++))) {
1372         case RPC_SUCCESS:
1373                 return p;
1374         case RPC_PROG_UNAVAIL:
1375                 dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1376                                 task->tk_pid, __FUNCTION__,
1377                                 (unsigned int)task->tk_client->cl_prog,
1378                                 task->tk_client->cl_server);
1379                 error = -EPFNOSUPPORT;
1380                 goto out_err;
1381         case RPC_PROG_MISMATCH:
1382                 dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1383                                 "server %s\n", task->tk_pid, __FUNCTION__,
1384                                 (unsigned int)task->tk_client->cl_prog,
1385                                 (unsigned int)task->tk_client->cl_vers,
1386                                 task->tk_client->cl_server);
1387                 error = -EPROTONOSUPPORT;
1388                 goto out_err;
1389         case RPC_PROC_UNAVAIL:
1390                 dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1391                                 "version %u on server %s\n",
1392                                 task->tk_pid, __FUNCTION__,
1393                                 task->tk_msg.rpc_proc,
1394                                 task->tk_client->cl_prog,
1395                                 task->tk_client->cl_vers,
1396                                 task->tk_client->cl_server);
1397                 error = -EOPNOTSUPP;
1398                 goto out_err;
1399         case RPC_GARBAGE_ARGS:
1400                 dprintk("RPC: %5u %s: server saw garbage\n",
1401                                 task->tk_pid, __FUNCTION__);
1402                 break;                  /* retry */
1403         default:
1404                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1405                 /* Also retry */
1406         }
1407
1408 out_garbage:
1409         task->tk_client->cl_stats->rpcgarbage++;
1410         if (task->tk_garb_retry) {
1411                 task->tk_garb_retry--;
1412                 dprintk("RPC: %5u %s: retrying\n",
1413                                 task->tk_pid, __FUNCTION__);
1414                 task->tk_action = call_bind;
1415 out_retry:
1416                 return ERR_PTR(-EAGAIN);
1417         }
1418         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1419 out_eio:
1420         error = -EIO;
1421 out_err:
1422         rpc_exit(task, error);
1423         return ERR_PTR(error);
1424 out_overflow:
1425         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1426         goto out_garbage;
1427 }
1428
1429 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1430 {
1431         return 0;
1432 }
1433
1434 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1435 {
1436         return 0;
1437 }
1438
1439 static struct rpc_procinfo rpcproc_null = {
1440         .p_encode = rpcproc_encode_null,
1441         .p_decode = rpcproc_decode_null,
1442 };
1443
1444 int rpc_ping(struct rpc_clnt *clnt, int flags)
1445 {
1446         struct rpc_message msg = {
1447                 .rpc_proc = &rpcproc_null,
1448         };
1449         int err;
1450         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1451         err = rpc_call_sync(clnt, &msg, flags);
1452         put_rpccred(msg.rpc_cred);
1453         return err;
1454 }
1455
1456 #ifdef RPC_DEBUG
1457 void rpc_show_tasks(void)
1458 {
1459         struct rpc_clnt *clnt;
1460         struct rpc_task *t;
1461
1462         spin_lock(&rpc_client_lock);
1463         if (list_empty(&all_clients))
1464                 goto out;
1465         printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1466                 "-rpcwait -action- ---ops--\n");
1467         list_for_each_entry(clnt, &all_clients, cl_clients) {
1468                 if (list_empty(&clnt->cl_tasks))
1469                         continue;
1470                 spin_lock(&clnt->cl_lock);
1471                 list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
1472                         const char *rpc_waitq = "none";
1473
1474                         if (RPC_IS_QUEUED(t))
1475                                 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1476
1477                         printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1478                                 t->tk_pid,
1479                                 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
1480                                 t->tk_flags, t->tk_status,
1481                                 t->tk_client,
1482                                 (t->tk_client ? t->tk_client->cl_prog : 0),
1483                                 t->tk_rqstp, t->tk_timeout,
1484                                 rpc_waitq,
1485                                 t->tk_action, t->tk_ops);
1486                 }
1487                 spin_unlock(&clnt->cl_lock);
1488         }
1489 out:
1490         spin_unlock(&rpc_client_lock);
1491 }
1492 #endif