sunrpc/cache: fix module refcnt leak in a failure path
[safe/jmp/linux-2.6] / net / sunrpc / cache.c
1 /*
2  * net/sunrpc/cache.c
3  *
4  * Generic code for various authentication-related caches
5  * used by sunrpc clients and servers.
6  *
7  * Copyright (C) 2002 Neil Brown <neilb@cse.unsw.edu.au>
8  *
9  * Released under terms in GPL version 2.  See COPYING.
10  *
11  */
12
13 #include <linux/types.h>
14 #include <linux/fs.h>
15 #include <linux/file.h>
16 #include <linux/slab.h>
17 #include <linux/signal.h>
18 #include <linux/sched.h>
19 #include <linux/kmod.h>
20 #include <linux/list.h>
21 #include <linux/module.h>
22 #include <linux/ctype.h>
23 #include <asm/uaccess.h>
24 #include <linux/poll.h>
25 #include <linux/seq_file.h>
26 #include <linux/proc_fs.h>
27 #include <linux/net.h>
28 #include <linux/workqueue.h>
29 #include <linux/mutex.h>
30 #include <linux/pagemap.h>
31 #include <asm/ioctls.h>
32 #include <linux/sunrpc/types.h>
33 #include <linux/sunrpc/cache.h>
34 #include <linux/sunrpc/stats.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36
37 #define  RPCDBG_FACILITY RPCDBG_CACHE
38
39 static int cache_defer_req(struct cache_req *req, struct cache_head *item);
40 static void cache_revisit_request(struct cache_head *item);
41
42 static void cache_init(struct cache_head *h)
43 {
44         time_t now = get_seconds();
45         h->next = NULL;
46         h->flags = 0;
47         kref_init(&h->ref);
48         h->expiry_time = now + CACHE_NEW_EXPIRY;
49         h->last_refresh = now;
50 }
51
52 static inline int cache_is_expired(struct cache_detail *detail, struct cache_head *h)
53 {
54         return  (h->expiry_time < get_seconds()) ||
55                 (detail->flush_time > h->last_refresh);
56 }
57
58 struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
59                                        struct cache_head *key, int hash)
60 {
61         struct cache_head **head,  **hp;
62         struct cache_head *new = NULL, *freeme = NULL;
63
64         head = &detail->hash_table[hash];
65
66         read_lock(&detail->hash_lock);
67
68         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
69                 struct cache_head *tmp = *hp;
70                 if (detail->match(tmp, key)) {
71                         if (cache_is_expired(detail, tmp))
72                                 /* This entry is expired, we will discard it. */
73                                 break;
74                         cache_get(tmp);
75                         read_unlock(&detail->hash_lock);
76                         return tmp;
77                 }
78         }
79         read_unlock(&detail->hash_lock);
80         /* Didn't find anything, insert an empty entry */
81
82         new = detail->alloc();
83         if (!new)
84                 return NULL;
85         /* must fully initialise 'new', else
86          * we might get lose if we need to
87          * cache_put it soon.
88          */
89         cache_init(new);
90         detail->init(new, key);
91
92         write_lock(&detail->hash_lock);
93
94         /* check if entry appeared while we slept */
95         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
96                 struct cache_head *tmp = *hp;
97                 if (detail->match(tmp, key)) {
98                         if (cache_is_expired(detail, tmp)) {
99                                 *hp = tmp->next;
100                                 tmp->next = NULL;
101                                 detail->entries --;
102                                 freeme = tmp;
103                                 break;
104                         }
105                         cache_get(tmp);
106                         write_unlock(&detail->hash_lock);
107                         cache_put(new, detail);
108                         return tmp;
109                 }
110         }
111         new->next = *head;
112         *head = new;
113         detail->entries++;
114         cache_get(new);
115         write_unlock(&detail->hash_lock);
116
117         if (freeme)
118                 cache_put(freeme, detail);
119         return new;
120 }
121 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
122
123
124 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
125
126 static void cache_fresh_locked(struct cache_head *head, time_t expiry)
127 {
128         head->expiry_time = expiry;
129         head->last_refresh = get_seconds();
130         set_bit(CACHE_VALID, &head->flags);
131 }
132
133 static void cache_fresh_unlocked(struct cache_head *head,
134                                  struct cache_detail *detail)
135 {
136         if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
137                 cache_revisit_request(head);
138                 cache_dequeue(detail, head);
139         }
140 }
141
142 struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
143                                        struct cache_head *new, struct cache_head *old, int hash)
144 {
145         /* The 'old' entry is to be replaced by 'new'.
146          * If 'old' is not VALID, we update it directly,
147          * otherwise we need to replace it
148          */
149         struct cache_head **head;
150         struct cache_head *tmp;
151
152         if (!test_bit(CACHE_VALID, &old->flags)) {
153                 write_lock(&detail->hash_lock);
154                 if (!test_bit(CACHE_VALID, &old->flags)) {
155                         if (test_bit(CACHE_NEGATIVE, &new->flags))
156                                 set_bit(CACHE_NEGATIVE, &old->flags);
157                         else
158                                 detail->update(old, new);
159                         cache_fresh_locked(old, new->expiry_time);
160                         write_unlock(&detail->hash_lock);
161                         cache_fresh_unlocked(old, detail);
162                         return old;
163                 }
164                 write_unlock(&detail->hash_lock);
165         }
166         /* We need to insert a new entry */
167         tmp = detail->alloc();
168         if (!tmp) {
169                 cache_put(old, detail);
170                 return NULL;
171         }
172         cache_init(tmp);
173         detail->init(tmp, old);
174         head = &detail->hash_table[hash];
175
176         write_lock(&detail->hash_lock);
177         if (test_bit(CACHE_NEGATIVE, &new->flags))
178                 set_bit(CACHE_NEGATIVE, &tmp->flags);
179         else
180                 detail->update(tmp, new);
181         tmp->next = *head;
182         *head = tmp;
183         detail->entries++;
184         cache_get(tmp);
185         cache_fresh_locked(tmp, new->expiry_time);
186         cache_fresh_locked(old, 0);
187         write_unlock(&detail->hash_lock);
188         cache_fresh_unlocked(tmp, detail);
189         cache_fresh_unlocked(old, detail);
190         cache_put(old, detail);
191         return tmp;
192 }
193 EXPORT_SYMBOL_GPL(sunrpc_cache_update);
194
195 static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
196 {
197         if (!cd->cache_upcall)
198                 return -EINVAL;
199         return cd->cache_upcall(cd, h);
200 }
201
202 static inline int cache_is_valid(struct cache_detail *detail, struct cache_head *h)
203 {
204         if (!test_bit(CACHE_VALID, &h->flags))
205                 return -EAGAIN;
206         else {
207                 /* entry is valid */
208                 if (test_bit(CACHE_NEGATIVE, &h->flags))
209                         return -ENOENT;
210                 else
211                         return 0;
212         }
213 }
214
215 /*
216  * This is the generic cache management routine for all
217  * the authentication caches.
218  * It checks the currency of a cache item and will (later)
219  * initiate an upcall to fill it if needed.
220  *
221  *
222  * Returns 0 if the cache_head can be used, or cache_puts it and returns
223  * -EAGAIN if upcall is pending and request has been queued
224  * -ETIMEDOUT if upcall failed or request could not be queue or
225  *           upcall completed but item is still invalid (implying that
226  *           the cache item has been replaced with a newer one).
227  * -ENOENT if cache entry was negative
228  */
229 int cache_check(struct cache_detail *detail,
230                     struct cache_head *h, struct cache_req *rqstp)
231 {
232         int rv;
233         long refresh_age, age;
234
235         /* First decide return status as best we can */
236         rv = cache_is_valid(detail, h);
237
238         /* now see if we want to start an upcall */
239         refresh_age = (h->expiry_time - h->last_refresh);
240         age = get_seconds() - h->last_refresh;
241
242         if (rqstp == NULL) {
243                 if (rv == -EAGAIN)
244                         rv = -ENOENT;
245         } else if (rv == -EAGAIN || age > refresh_age/2) {
246                 dprintk("RPC:       Want update, refage=%ld, age=%ld\n",
247                                 refresh_age, age);
248                 if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
249                         switch (cache_make_upcall(detail, h)) {
250                         case -EINVAL:
251                                 clear_bit(CACHE_PENDING, &h->flags);
252                                 cache_revisit_request(h);
253                                 if (rv == -EAGAIN) {
254                                         set_bit(CACHE_NEGATIVE, &h->flags);
255                                         cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY);
256                                         cache_fresh_unlocked(h, detail);
257                                         rv = -ENOENT;
258                                 }
259                                 break;
260
261                         case -EAGAIN:
262                                 clear_bit(CACHE_PENDING, &h->flags);
263                                 cache_revisit_request(h);
264                                 break;
265                         }
266                 }
267         }
268
269         if (rv == -EAGAIN) {
270                 if (cache_defer_req(rqstp, h) < 0) {
271                         /* Request is not deferred */
272                         rv = cache_is_valid(detail, h);
273                         if (rv == -EAGAIN)
274                                 rv = -ETIMEDOUT;
275                 }
276         }
277         if (rv)
278                 cache_put(h, detail);
279         return rv;
280 }
281 EXPORT_SYMBOL_GPL(cache_check);
282
283 /*
284  * caches need to be periodically cleaned.
285  * For this we maintain a list of cache_detail and
286  * a current pointer into that list and into the table
287  * for that entry.
288  *
289  * Each time clean_cache is called it finds the next non-empty entry
290  * in the current table and walks the list in that entry
291  * looking for entries that can be removed.
292  *
293  * An entry gets removed if:
294  * - The expiry is before current time
295  * - The last_refresh time is before the flush_time for that cache
296  *
297  * later we might drop old entries with non-NEVER expiry if that table
298  * is getting 'full' for some definition of 'full'
299  *
300  * The question of "how often to scan a table" is an interesting one
301  * and is answered in part by the use of the "nextcheck" field in the
302  * cache_detail.
303  * When a scan of a table begins, the nextcheck field is set to a time
304  * that is well into the future.
305  * While scanning, if an expiry time is found that is earlier than the
306  * current nextcheck time, nextcheck is set to that expiry time.
307  * If the flush_time is ever set to a time earlier than the nextcheck
308  * time, the nextcheck time is then set to that flush_time.
309  *
310  * A table is then only scanned if the current time is at least
311  * the nextcheck time.
312  *
313  */
314
315 static LIST_HEAD(cache_list);
316 static DEFINE_SPINLOCK(cache_list_lock);
317 static struct cache_detail *current_detail;
318 static int current_index;
319
320 static void do_cache_clean(struct work_struct *work);
321 static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
322
323 static void sunrpc_init_cache_detail(struct cache_detail *cd)
324 {
325         rwlock_init(&cd->hash_lock);
326         INIT_LIST_HEAD(&cd->queue);
327         spin_lock(&cache_list_lock);
328         cd->nextcheck = 0;
329         cd->entries = 0;
330         atomic_set(&cd->readers, 0);
331         cd->last_close = 0;
332         cd->last_warn = -1;
333         list_add(&cd->others, &cache_list);
334         spin_unlock(&cache_list_lock);
335
336         /* start the cleaning process */
337         schedule_delayed_work(&cache_cleaner, 0);
338 }
339
340 static void sunrpc_destroy_cache_detail(struct cache_detail *cd)
341 {
342         cache_purge(cd);
343         spin_lock(&cache_list_lock);
344         write_lock(&cd->hash_lock);
345         if (cd->entries || atomic_read(&cd->inuse)) {
346                 write_unlock(&cd->hash_lock);
347                 spin_unlock(&cache_list_lock);
348                 goto out;
349         }
350         if (current_detail == cd)
351                 current_detail = NULL;
352         list_del_init(&cd->others);
353         write_unlock(&cd->hash_lock);
354         spin_unlock(&cache_list_lock);
355         if (list_empty(&cache_list)) {
356                 /* module must be being unloaded so its safe to kill the worker */
357                 cancel_delayed_work_sync(&cache_cleaner);
358         }
359         return;
360 out:
361         printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
362 }
363
364 /* clean cache tries to find something to clean
365  * and cleans it.
366  * It returns 1 if it cleaned something,
367  *            0 if it didn't find anything this time
368  *           -1 if it fell off the end of the list.
369  */
370 static int cache_clean(void)
371 {
372         int rv = 0;
373         struct list_head *next;
374
375         spin_lock(&cache_list_lock);
376
377         /* find a suitable table if we don't already have one */
378         while (current_detail == NULL ||
379             current_index >= current_detail->hash_size) {
380                 if (current_detail)
381                         next = current_detail->others.next;
382                 else
383                         next = cache_list.next;
384                 if (next == &cache_list) {
385                         current_detail = NULL;
386                         spin_unlock(&cache_list_lock);
387                         return -1;
388                 }
389                 current_detail = list_entry(next, struct cache_detail, others);
390                 if (current_detail->nextcheck > get_seconds())
391                         current_index = current_detail->hash_size;
392                 else {
393                         current_index = 0;
394                         current_detail->nextcheck = get_seconds()+30*60;
395                 }
396         }
397
398         /* find a non-empty bucket in the table */
399         while (current_detail &&
400                current_index < current_detail->hash_size &&
401                current_detail->hash_table[current_index] == NULL)
402                 current_index++;
403
404         /* find a cleanable entry in the bucket and clean it, or set to next bucket */
405
406         if (current_detail && current_index < current_detail->hash_size) {
407                 struct cache_head *ch, **cp;
408                 struct cache_detail *d;
409
410                 write_lock(&current_detail->hash_lock);
411
412                 /* Ok, now to clean this strand */
413
414                 cp = & current_detail->hash_table[current_index];
415                 for (ch = *cp ; ch ; cp = & ch->next, ch = *cp) {
416                         if (current_detail->nextcheck > ch->expiry_time)
417                                 current_detail->nextcheck = ch->expiry_time+1;
418                         if (!cache_is_expired(current_detail, ch))
419                                 continue;
420
421                         *cp = ch->next;
422                         ch->next = NULL;
423                         current_detail->entries--;
424                         rv = 1;
425                         break;
426                 }
427
428                 write_unlock(&current_detail->hash_lock);
429                 d = current_detail;
430                 if (!ch)
431                         current_index ++;
432                 spin_unlock(&cache_list_lock);
433                 if (ch) {
434                         if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
435                                 cache_dequeue(current_detail, ch);
436                         cache_revisit_request(ch);
437                         cache_put(ch, d);
438                 }
439         } else
440                 spin_unlock(&cache_list_lock);
441
442         return rv;
443 }
444
445 /*
446  * We want to regularly clean the cache, so we need to schedule some work ...
447  */
448 static void do_cache_clean(struct work_struct *work)
449 {
450         int delay = 5;
451         if (cache_clean() == -1)
452                 delay = round_jiffies_relative(30*HZ);
453
454         if (list_empty(&cache_list))
455                 delay = 0;
456
457         if (delay)
458                 schedule_delayed_work(&cache_cleaner, delay);
459 }
460
461
462 /*
463  * Clean all caches promptly.  This just calls cache_clean
464  * repeatedly until we are sure that every cache has had a chance to
465  * be fully cleaned
466  */
467 void cache_flush(void)
468 {
469         while (cache_clean() != -1)
470                 cond_resched();
471         while (cache_clean() != -1)
472                 cond_resched();
473 }
474 EXPORT_SYMBOL_GPL(cache_flush);
475
476 void cache_purge(struct cache_detail *detail)
477 {
478         detail->flush_time = LONG_MAX;
479         detail->nextcheck = get_seconds();
480         cache_flush();
481         detail->flush_time = 1;
482 }
483 EXPORT_SYMBOL_GPL(cache_purge);
484
485
486 /*
487  * Deferral and Revisiting of Requests.
488  *
489  * If a cache lookup finds a pending entry, we
490  * need to defer the request and revisit it later.
491  * All deferred requests are stored in a hash table,
492  * indexed by "struct cache_head *".
493  * As it may be wasteful to store a whole request
494  * structure, we allow the request to provide a
495  * deferred form, which must contain a
496  * 'struct cache_deferred_req'
497  * This cache_deferred_req contains a method to allow
498  * it to be revisited when cache info is available
499  */
500
501 #define DFR_HASHSIZE    (PAGE_SIZE/sizeof(struct list_head))
502 #define DFR_HASH(item)  ((((long)item)>>4 ^ (((long)item)>>13)) % DFR_HASHSIZE)
503
504 #define DFR_MAX 300     /* ??? */
505
506 static DEFINE_SPINLOCK(cache_defer_lock);
507 static LIST_HEAD(cache_defer_list);
508 static struct list_head cache_defer_hash[DFR_HASHSIZE];
509 static int cache_defer_cnt;
510
511 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
512 {
513         struct cache_deferred_req *dreq, *discard;
514         int hash = DFR_HASH(item);
515
516         if (cache_defer_cnt >= DFR_MAX) {
517                 /* too much in the cache, randomly drop this one,
518                  * or continue and drop the oldest below
519                  */
520                 if (net_random()&1)
521                         return -ENOMEM;
522         }
523         dreq = req->defer(req);
524         if (dreq == NULL)
525                 return -ENOMEM;
526
527         dreq->item = item;
528
529         spin_lock(&cache_defer_lock);
530
531         list_add(&dreq->recent, &cache_defer_list);
532
533         if (cache_defer_hash[hash].next == NULL)
534                 INIT_LIST_HEAD(&cache_defer_hash[hash]);
535         list_add(&dreq->hash, &cache_defer_hash[hash]);
536
537         /* it is in, now maybe clean up */
538         discard = NULL;
539         if (++cache_defer_cnt > DFR_MAX) {
540                 discard = list_entry(cache_defer_list.prev,
541                                      struct cache_deferred_req, recent);
542                 list_del_init(&discard->recent);
543                 list_del_init(&discard->hash);
544                 cache_defer_cnt--;
545         }
546         spin_unlock(&cache_defer_lock);
547
548         if (discard)
549                 /* there was one too many */
550                 discard->revisit(discard, 1);
551
552         if (!test_bit(CACHE_PENDING, &item->flags)) {
553                 /* must have just been validated... */
554                 cache_revisit_request(item);
555                 return -EAGAIN;
556         }
557         return 0;
558 }
559
560 static void cache_revisit_request(struct cache_head *item)
561 {
562         struct cache_deferred_req *dreq;
563         struct list_head pending;
564
565         struct list_head *lp;
566         int hash = DFR_HASH(item);
567
568         INIT_LIST_HEAD(&pending);
569         spin_lock(&cache_defer_lock);
570
571         lp = cache_defer_hash[hash].next;
572         if (lp) {
573                 while (lp != &cache_defer_hash[hash]) {
574                         dreq = list_entry(lp, struct cache_deferred_req, hash);
575                         lp = lp->next;
576                         if (dreq->item == item) {
577                                 list_del_init(&dreq->hash);
578                                 list_move(&dreq->recent, &pending);
579                                 cache_defer_cnt--;
580                         }
581                 }
582         }
583         spin_unlock(&cache_defer_lock);
584
585         while (!list_empty(&pending)) {
586                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
587                 list_del_init(&dreq->recent);
588                 dreq->revisit(dreq, 0);
589         }
590 }
591
592 void cache_clean_deferred(void *owner)
593 {
594         struct cache_deferred_req *dreq, *tmp;
595         struct list_head pending;
596
597
598         INIT_LIST_HEAD(&pending);
599         spin_lock(&cache_defer_lock);
600
601         list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
602                 if (dreq->owner == owner) {
603                         list_del_init(&dreq->hash);
604                         list_move(&dreq->recent, &pending);
605                         cache_defer_cnt--;
606                 }
607         }
608         spin_unlock(&cache_defer_lock);
609
610         while (!list_empty(&pending)) {
611                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
612                 list_del_init(&dreq->recent);
613                 dreq->revisit(dreq, 1);
614         }
615 }
616
617 /*
618  * communicate with user-space
619  *
620  * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
621  * On read, you get a full request, or block.
622  * On write, an update request is processed.
623  * Poll works if anything to read, and always allows write.
624  *
625  * Implemented by linked list of requests.  Each open file has
626  * a ->private that also exists in this list.  New requests are added
627  * to the end and may wakeup and preceding readers.
628  * New readers are added to the head.  If, on read, an item is found with
629  * CACHE_UPCALLING clear, we free it from the list.
630  *
631  */
632
633 static DEFINE_SPINLOCK(queue_lock);
634 static DEFINE_MUTEX(queue_io_mutex);
635
636 struct cache_queue {
637         struct list_head        list;
638         int                     reader; /* if 0, then request */
639 };
640 struct cache_request {
641         struct cache_queue      q;
642         struct cache_head       *item;
643         char                    * buf;
644         int                     len;
645         int                     readers;
646 };
647 struct cache_reader {
648         struct cache_queue      q;
649         int                     offset; /* if non-0, we have a refcnt on next request */
650 };
651
652 static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
653                           loff_t *ppos, struct cache_detail *cd)
654 {
655         struct cache_reader *rp = filp->private_data;
656         struct cache_request *rq;
657         struct inode *inode = filp->f_path.dentry->d_inode;
658         int err;
659
660         if (count == 0)
661                 return 0;
662
663         mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
664                               * readers on this file */
665  again:
666         spin_lock(&queue_lock);
667         /* need to find next request */
668         while (rp->q.list.next != &cd->queue &&
669                list_entry(rp->q.list.next, struct cache_queue, list)
670                ->reader) {
671                 struct list_head *next = rp->q.list.next;
672                 list_move(&rp->q.list, next);
673         }
674         if (rp->q.list.next == &cd->queue) {
675                 spin_unlock(&queue_lock);
676                 mutex_unlock(&inode->i_mutex);
677                 BUG_ON(rp->offset);
678                 return 0;
679         }
680         rq = container_of(rp->q.list.next, struct cache_request, q.list);
681         BUG_ON(rq->q.reader);
682         if (rp->offset == 0)
683                 rq->readers++;
684         spin_unlock(&queue_lock);
685
686         if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
687                 err = -EAGAIN;
688                 spin_lock(&queue_lock);
689                 list_move(&rp->q.list, &rq->q.list);
690                 spin_unlock(&queue_lock);
691         } else {
692                 if (rp->offset + count > rq->len)
693                         count = rq->len - rp->offset;
694                 err = -EFAULT;
695                 if (copy_to_user(buf, rq->buf + rp->offset, count))
696                         goto out;
697                 rp->offset += count;
698                 if (rp->offset >= rq->len) {
699                         rp->offset = 0;
700                         spin_lock(&queue_lock);
701                         list_move(&rp->q.list, &rq->q.list);
702                         spin_unlock(&queue_lock);
703                 }
704                 err = 0;
705         }
706  out:
707         if (rp->offset == 0) {
708                 /* need to release rq */
709                 spin_lock(&queue_lock);
710                 rq->readers--;
711                 if (rq->readers == 0 &&
712                     !test_bit(CACHE_PENDING, &rq->item->flags)) {
713                         list_del(&rq->q.list);
714                         spin_unlock(&queue_lock);
715                         cache_put(rq->item, cd);
716                         kfree(rq->buf);
717                         kfree(rq);
718                 } else
719                         spin_unlock(&queue_lock);
720         }
721         if (err == -EAGAIN)
722                 goto again;
723         mutex_unlock(&inode->i_mutex);
724         return err ? err :  count;
725 }
726
727 static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
728                                  size_t count, struct cache_detail *cd)
729 {
730         ssize_t ret;
731
732         if (copy_from_user(kaddr, buf, count))
733                 return -EFAULT;
734         kaddr[count] = '\0';
735         ret = cd->cache_parse(cd, kaddr, count);
736         if (!ret)
737                 ret = count;
738         return ret;
739 }
740
741 static ssize_t cache_slow_downcall(const char __user *buf,
742                                    size_t count, struct cache_detail *cd)
743 {
744         static char write_buf[8192]; /* protected by queue_io_mutex */
745         ssize_t ret = -EINVAL;
746
747         if (count >= sizeof(write_buf))
748                 goto out;
749         mutex_lock(&queue_io_mutex);
750         ret = cache_do_downcall(write_buf, buf, count, cd);
751         mutex_unlock(&queue_io_mutex);
752 out:
753         return ret;
754 }
755
756 static ssize_t cache_downcall(struct address_space *mapping,
757                               const char __user *buf,
758                               size_t count, struct cache_detail *cd)
759 {
760         struct page *page;
761         char *kaddr;
762         ssize_t ret = -ENOMEM;
763
764         if (count >= PAGE_CACHE_SIZE)
765                 goto out_slow;
766
767         page = find_or_create_page(mapping, 0, GFP_KERNEL);
768         if (!page)
769                 goto out_slow;
770
771         kaddr = kmap(page);
772         ret = cache_do_downcall(kaddr, buf, count, cd);
773         kunmap(page);
774         unlock_page(page);
775         page_cache_release(page);
776         return ret;
777 out_slow:
778         return cache_slow_downcall(buf, count, cd);
779 }
780
781 static ssize_t cache_write(struct file *filp, const char __user *buf,
782                            size_t count, loff_t *ppos,
783                            struct cache_detail *cd)
784 {
785         struct address_space *mapping = filp->f_mapping;
786         struct inode *inode = filp->f_path.dentry->d_inode;
787         ssize_t ret = -EINVAL;
788
789         if (!cd->cache_parse)
790                 goto out;
791
792         mutex_lock(&inode->i_mutex);
793         ret = cache_downcall(mapping, buf, count, cd);
794         mutex_unlock(&inode->i_mutex);
795 out:
796         return ret;
797 }
798
799 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
800
801 static unsigned int cache_poll(struct file *filp, poll_table *wait,
802                                struct cache_detail *cd)
803 {
804         unsigned int mask;
805         struct cache_reader *rp = filp->private_data;
806         struct cache_queue *cq;
807
808         poll_wait(filp, &queue_wait, wait);
809
810         /* alway allow write */
811         mask = POLL_OUT | POLLWRNORM;
812
813         if (!rp)
814                 return mask;
815
816         spin_lock(&queue_lock);
817
818         for (cq= &rp->q; &cq->list != &cd->queue;
819              cq = list_entry(cq->list.next, struct cache_queue, list))
820                 if (!cq->reader) {
821                         mask |= POLLIN | POLLRDNORM;
822                         break;
823                 }
824         spin_unlock(&queue_lock);
825         return mask;
826 }
827
828 static int cache_ioctl(struct inode *ino, struct file *filp,
829                        unsigned int cmd, unsigned long arg,
830                        struct cache_detail *cd)
831 {
832         int len = 0;
833         struct cache_reader *rp = filp->private_data;
834         struct cache_queue *cq;
835
836         if (cmd != FIONREAD || !rp)
837                 return -EINVAL;
838
839         spin_lock(&queue_lock);
840
841         /* only find the length remaining in current request,
842          * or the length of the next request
843          */
844         for (cq= &rp->q; &cq->list != &cd->queue;
845              cq = list_entry(cq->list.next, struct cache_queue, list))
846                 if (!cq->reader) {
847                         struct cache_request *cr =
848                                 container_of(cq, struct cache_request, q);
849                         len = cr->len - rp->offset;
850                         break;
851                 }
852         spin_unlock(&queue_lock);
853
854         return put_user(len, (int __user *)arg);
855 }
856
857 static int cache_open(struct inode *inode, struct file *filp,
858                       struct cache_detail *cd)
859 {
860         struct cache_reader *rp = NULL;
861
862         if (!cd || !try_module_get(cd->owner))
863                 return -EACCES;
864         nonseekable_open(inode, filp);
865         if (filp->f_mode & FMODE_READ) {
866                 rp = kmalloc(sizeof(*rp), GFP_KERNEL);
867                 if (!rp)
868                         return -ENOMEM;
869                 rp->offset = 0;
870                 rp->q.reader = 1;
871                 atomic_inc(&cd->readers);
872                 spin_lock(&queue_lock);
873                 list_add(&rp->q.list, &cd->queue);
874                 spin_unlock(&queue_lock);
875         }
876         filp->private_data = rp;
877         return 0;
878 }
879
880 static int cache_release(struct inode *inode, struct file *filp,
881                          struct cache_detail *cd)
882 {
883         struct cache_reader *rp = filp->private_data;
884
885         if (rp) {
886                 spin_lock(&queue_lock);
887                 if (rp->offset) {
888                         struct cache_queue *cq;
889                         for (cq= &rp->q; &cq->list != &cd->queue;
890                              cq = list_entry(cq->list.next, struct cache_queue, list))
891                                 if (!cq->reader) {
892                                         container_of(cq, struct cache_request, q)
893                                                 ->readers--;
894                                         break;
895                                 }
896                         rp->offset = 0;
897                 }
898                 list_del(&rp->q.list);
899                 spin_unlock(&queue_lock);
900
901                 filp->private_data = NULL;
902                 kfree(rp);
903
904                 cd->last_close = get_seconds();
905                 atomic_dec(&cd->readers);
906         }
907         module_put(cd->owner);
908         return 0;
909 }
910
911
912
913 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
914 {
915         struct cache_queue *cq;
916         spin_lock(&queue_lock);
917         list_for_each_entry(cq, &detail->queue, list)
918                 if (!cq->reader) {
919                         struct cache_request *cr = container_of(cq, struct cache_request, q);
920                         if (cr->item != ch)
921                                 continue;
922                         if (cr->readers != 0)
923                                 continue;
924                         list_del(&cr->q.list);
925                         spin_unlock(&queue_lock);
926                         cache_put(cr->item, detail);
927                         kfree(cr->buf);
928                         kfree(cr);
929                         return;
930                 }
931         spin_unlock(&queue_lock);
932 }
933
934 /*
935  * Support routines for text-based upcalls.
936  * Fields are separated by spaces.
937  * Fields are either mangled to quote space tab newline slosh with slosh
938  * or a hexified with a leading \x
939  * Record is terminated with newline.
940  *
941  */
942
943 void qword_add(char **bpp, int *lp, char *str)
944 {
945         char *bp = *bpp;
946         int len = *lp;
947         char c;
948
949         if (len < 0) return;
950
951         while ((c=*str++) && len)
952                 switch(c) {
953                 case ' ':
954                 case '\t':
955                 case '\n':
956                 case '\\':
957                         if (len >= 4) {
958                                 *bp++ = '\\';
959                                 *bp++ = '0' + ((c & 0300)>>6);
960                                 *bp++ = '0' + ((c & 0070)>>3);
961                                 *bp++ = '0' + ((c & 0007)>>0);
962                         }
963                         len -= 4;
964                         break;
965                 default:
966                         *bp++ = c;
967                         len--;
968                 }
969         if (c || len <1) len = -1;
970         else {
971                 *bp++ = ' ';
972                 len--;
973         }
974         *bpp = bp;
975         *lp = len;
976 }
977 EXPORT_SYMBOL_GPL(qword_add);
978
979 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
980 {
981         char *bp = *bpp;
982         int len = *lp;
983
984         if (len < 0) return;
985
986         if (len > 2) {
987                 *bp++ = '\\';
988                 *bp++ = 'x';
989                 len -= 2;
990                 while (blen && len >= 2) {
991                         unsigned char c = *buf++;
992                         *bp++ = '0' + ((c&0xf0)>>4) + (c>=0xa0)*('a'-'9'-1);
993                         *bp++ = '0' + (c&0x0f) + ((c&0x0f)>=0x0a)*('a'-'9'-1);
994                         len -= 2;
995                         blen--;
996                 }
997         }
998         if (blen || len<1) len = -1;
999         else {
1000                 *bp++ = ' ';
1001                 len--;
1002         }
1003         *bpp = bp;
1004         *lp = len;
1005 }
1006 EXPORT_SYMBOL_GPL(qword_addhex);
1007
1008 static void warn_no_listener(struct cache_detail *detail)
1009 {
1010         if (detail->last_warn != detail->last_close) {
1011                 detail->last_warn = detail->last_close;
1012                 if (detail->warn_no_listener)
1013                         detail->warn_no_listener(detail, detail->last_close != 0);
1014         }
1015 }
1016
1017 /*
1018  * register an upcall request to user-space and queue it up for read() by the
1019  * upcall daemon.
1020  *
1021  * Each request is at most one page long.
1022  */
1023 int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h,
1024                 void (*cache_request)(struct cache_detail *,
1025                                       struct cache_head *,
1026                                       char **,
1027                                       int *))
1028 {
1029
1030         char *buf;
1031         struct cache_request *crq;
1032         char *bp;
1033         int len;
1034
1035         if (atomic_read(&detail->readers) == 0 &&
1036             detail->last_close < get_seconds() - 30) {
1037                         warn_no_listener(detail);
1038                         return -EINVAL;
1039         }
1040
1041         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1042         if (!buf)
1043                 return -EAGAIN;
1044
1045         crq = kmalloc(sizeof (*crq), GFP_KERNEL);
1046         if (!crq) {
1047                 kfree(buf);
1048                 return -EAGAIN;
1049         }
1050
1051         bp = buf; len = PAGE_SIZE;
1052
1053         cache_request(detail, h, &bp, &len);
1054
1055         if (len < 0) {
1056                 kfree(buf);
1057                 kfree(crq);
1058                 return -EAGAIN;
1059         }
1060         crq->q.reader = 0;
1061         crq->item = cache_get(h);
1062         crq->buf = buf;
1063         crq->len = PAGE_SIZE - len;
1064         crq->readers = 0;
1065         spin_lock(&queue_lock);
1066         list_add_tail(&crq->q.list, &detail->queue);
1067         spin_unlock(&queue_lock);
1068         wake_up(&queue_wait);
1069         return 0;
1070 }
1071 EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall);
1072
1073 /*
1074  * parse a message from user-space and pass it
1075  * to an appropriate cache
1076  * Messages are, like requests, separated into fields by
1077  * spaces and dequotes as \xHEXSTRING or embedded \nnn octal
1078  *
1079  * Message is
1080  *   reply cachename expiry key ... content....
1081  *
1082  * key and content are both parsed by cache
1083  */
1084
1085 #define isodigit(c) (isdigit(c) && c <= '7')
1086 int qword_get(char **bpp, char *dest, int bufsize)
1087 {
1088         /* return bytes copied, or -1 on error */
1089         char *bp = *bpp;
1090         int len = 0;
1091
1092         while (*bp == ' ') bp++;
1093
1094         if (bp[0] == '\\' && bp[1] == 'x') {
1095                 /* HEX STRING */
1096                 bp += 2;
1097                 while (isxdigit(bp[0]) && isxdigit(bp[1]) && len < bufsize) {
1098                         int byte = isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1099                         bp++;
1100                         byte <<= 4;
1101                         byte |= isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1102                         *dest++ = byte;
1103                         bp++;
1104                         len++;
1105                 }
1106         } else {
1107                 /* text with \nnn octal quoting */
1108                 while (*bp != ' ' && *bp != '\n' && *bp && len < bufsize-1) {
1109                         if (*bp == '\\' &&
1110                             isodigit(bp[1]) && (bp[1] <= '3') &&
1111                             isodigit(bp[2]) &&
1112                             isodigit(bp[3])) {
1113                                 int byte = (*++bp -'0');
1114                                 bp++;
1115                                 byte = (byte << 3) | (*bp++ - '0');
1116                                 byte = (byte << 3) | (*bp++ - '0');
1117                                 *dest++ = byte;
1118                                 len++;
1119                         } else {
1120                                 *dest++ = *bp++;
1121                                 len++;
1122                         }
1123                 }
1124         }
1125
1126         if (*bp != ' ' && *bp != '\n' && *bp != '\0')
1127                 return -1;
1128         while (*bp == ' ') bp++;
1129         *bpp = bp;
1130         *dest = '\0';
1131         return len;
1132 }
1133 EXPORT_SYMBOL_GPL(qword_get);
1134
1135
1136 /*
1137  * support /proc/sunrpc/cache/$CACHENAME/content
1138  * as a seqfile.
1139  * We call ->cache_show passing NULL for the item to
1140  * get a header, then pass each real item in the cache
1141  */
1142
1143 struct handle {
1144         struct cache_detail *cd;
1145 };
1146
1147 static void *c_start(struct seq_file *m, loff_t *pos)
1148         __acquires(cd->hash_lock)
1149 {
1150         loff_t n = *pos;
1151         unsigned hash, entry;
1152         struct cache_head *ch;
1153         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1154
1155
1156         read_lock(&cd->hash_lock);
1157         if (!n--)
1158                 return SEQ_START_TOKEN;
1159         hash = n >> 32;
1160         entry = n & ((1LL<<32) - 1);
1161
1162         for (ch=cd->hash_table[hash]; ch; ch=ch->next)
1163                 if (!entry--)
1164                         return ch;
1165         n &= ~((1LL<<32) - 1);
1166         do {
1167                 hash++;
1168                 n += 1LL<<32;
1169         } while(hash < cd->hash_size &&
1170                 cd->hash_table[hash]==NULL);
1171         if (hash >= cd->hash_size)
1172                 return NULL;
1173         *pos = n+1;
1174         return cd->hash_table[hash];
1175 }
1176
1177 static void *c_next(struct seq_file *m, void *p, loff_t *pos)
1178 {
1179         struct cache_head *ch = p;
1180         int hash = (*pos >> 32);
1181         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1182
1183         if (p == SEQ_START_TOKEN)
1184                 hash = 0;
1185         else if (ch->next == NULL) {
1186                 hash++;
1187                 *pos += 1LL<<32;
1188         } else {
1189                 ++*pos;
1190                 return ch->next;
1191         }
1192         *pos &= ~((1LL<<32) - 1);
1193         while (hash < cd->hash_size &&
1194                cd->hash_table[hash] == NULL) {
1195                 hash++;
1196                 *pos += 1LL<<32;
1197         }
1198         if (hash >= cd->hash_size)
1199                 return NULL;
1200         ++*pos;
1201         return cd->hash_table[hash];
1202 }
1203
1204 static void c_stop(struct seq_file *m, void *p)
1205         __releases(cd->hash_lock)
1206 {
1207         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1208         read_unlock(&cd->hash_lock);
1209 }
1210
1211 static int c_show(struct seq_file *m, void *p)
1212 {
1213         struct cache_head *cp = p;
1214         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1215
1216         if (p == SEQ_START_TOKEN)
1217                 return cd->cache_show(m, cd, NULL);
1218
1219         ifdebug(CACHE)
1220                 seq_printf(m, "# expiry=%ld refcnt=%d flags=%lx\n",
1221                            cp->expiry_time, atomic_read(&cp->ref.refcount), cp->flags);
1222         cache_get(cp);
1223         if (cache_check(cd, cp, NULL))
1224                 /* cache_check does a cache_put on failure */
1225                 seq_printf(m, "# ");
1226         else
1227                 cache_put(cp, cd);
1228
1229         return cd->cache_show(m, cd, cp);
1230 }
1231
1232 static const struct seq_operations cache_content_op = {
1233         .start  = c_start,
1234         .next   = c_next,
1235         .stop   = c_stop,
1236         .show   = c_show,
1237 };
1238
1239 static int content_open(struct inode *inode, struct file *file,
1240                         struct cache_detail *cd)
1241 {
1242         struct handle *han;
1243
1244         if (!cd || !try_module_get(cd->owner))
1245                 return -EACCES;
1246         han = __seq_open_private(file, &cache_content_op, sizeof(*han));
1247         if (han == NULL) {
1248                 module_put(cd->owner);
1249                 return -ENOMEM;
1250         }
1251
1252         han->cd = cd;
1253         return 0;
1254 }
1255
1256 static int content_release(struct inode *inode, struct file *file,
1257                 struct cache_detail *cd)
1258 {
1259         int ret = seq_release_private(inode, file);
1260         module_put(cd->owner);
1261         return ret;
1262 }
1263
1264 static int open_flush(struct inode *inode, struct file *file,
1265                         struct cache_detail *cd)
1266 {
1267         if (!cd || !try_module_get(cd->owner))
1268                 return -EACCES;
1269         return nonseekable_open(inode, file);
1270 }
1271
1272 static int release_flush(struct inode *inode, struct file *file,
1273                         struct cache_detail *cd)
1274 {
1275         module_put(cd->owner);
1276         return 0;
1277 }
1278
1279 static ssize_t read_flush(struct file *file, char __user *buf,
1280                           size_t count, loff_t *ppos,
1281                           struct cache_detail *cd)
1282 {
1283         char tbuf[20];
1284         unsigned long p = *ppos;
1285         size_t len;
1286
1287         sprintf(tbuf, "%lu\n", cd->flush_time);
1288         len = strlen(tbuf);
1289         if (p >= len)
1290                 return 0;
1291         len -= p;
1292         if (len > count)
1293                 len = count;
1294         if (copy_to_user(buf, (void*)(tbuf+p), len))
1295                 return -EFAULT;
1296         *ppos += len;
1297         return len;
1298 }
1299
1300 static ssize_t write_flush(struct file *file, const char __user *buf,
1301                            size_t count, loff_t *ppos,
1302                            struct cache_detail *cd)
1303 {
1304         char tbuf[20];
1305         char *ep;
1306         long flushtime;
1307         if (*ppos || count > sizeof(tbuf)-1)
1308                 return -EINVAL;
1309         if (copy_from_user(tbuf, buf, count))
1310                 return -EFAULT;
1311         tbuf[count] = 0;
1312         flushtime = simple_strtoul(tbuf, &ep, 0);
1313         if (*ep && *ep != '\n')
1314                 return -EINVAL;
1315
1316         cd->flush_time = flushtime;
1317         cd->nextcheck = get_seconds();
1318         cache_flush();
1319
1320         *ppos += count;
1321         return count;
1322 }
1323
1324 static ssize_t cache_read_procfs(struct file *filp, char __user *buf,
1325                                  size_t count, loff_t *ppos)
1326 {
1327         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1328
1329         return cache_read(filp, buf, count, ppos, cd);
1330 }
1331
1332 static ssize_t cache_write_procfs(struct file *filp, const char __user *buf,
1333                                   size_t count, loff_t *ppos)
1334 {
1335         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1336
1337         return cache_write(filp, buf, count, ppos, cd);
1338 }
1339
1340 static unsigned int cache_poll_procfs(struct file *filp, poll_table *wait)
1341 {
1342         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1343
1344         return cache_poll(filp, wait, cd);
1345 }
1346
1347 static int cache_ioctl_procfs(struct inode *inode, struct file *filp,
1348                               unsigned int cmd, unsigned long arg)
1349 {
1350         struct cache_detail *cd = PDE(inode)->data;
1351
1352         return cache_ioctl(inode, filp, cmd, arg, cd);
1353 }
1354
1355 static int cache_open_procfs(struct inode *inode, struct file *filp)
1356 {
1357         struct cache_detail *cd = PDE(inode)->data;
1358
1359         return cache_open(inode, filp, cd);
1360 }
1361
1362 static int cache_release_procfs(struct inode *inode, struct file *filp)
1363 {
1364         struct cache_detail *cd = PDE(inode)->data;
1365
1366         return cache_release(inode, filp, cd);
1367 }
1368
1369 static const struct file_operations cache_file_operations_procfs = {
1370         .owner          = THIS_MODULE,
1371         .llseek         = no_llseek,
1372         .read           = cache_read_procfs,
1373         .write          = cache_write_procfs,
1374         .poll           = cache_poll_procfs,
1375         .ioctl          = cache_ioctl_procfs, /* for FIONREAD */
1376         .open           = cache_open_procfs,
1377         .release        = cache_release_procfs,
1378 };
1379
1380 static int content_open_procfs(struct inode *inode, struct file *filp)
1381 {
1382         struct cache_detail *cd = PDE(inode)->data;
1383
1384         return content_open(inode, filp, cd);
1385 }
1386
1387 static int content_release_procfs(struct inode *inode, struct file *filp)
1388 {
1389         struct cache_detail *cd = PDE(inode)->data;
1390
1391         return content_release(inode, filp, cd);
1392 }
1393
1394 static const struct file_operations content_file_operations_procfs = {
1395         .open           = content_open_procfs,
1396         .read           = seq_read,
1397         .llseek         = seq_lseek,
1398         .release        = content_release_procfs,
1399 };
1400
1401 static int open_flush_procfs(struct inode *inode, struct file *filp)
1402 {
1403         struct cache_detail *cd = PDE(inode)->data;
1404
1405         return open_flush(inode, filp, cd);
1406 }
1407
1408 static int release_flush_procfs(struct inode *inode, struct file *filp)
1409 {
1410         struct cache_detail *cd = PDE(inode)->data;
1411
1412         return release_flush(inode, filp, cd);
1413 }
1414
1415 static ssize_t read_flush_procfs(struct file *filp, char __user *buf,
1416                             size_t count, loff_t *ppos)
1417 {
1418         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1419
1420         return read_flush(filp, buf, count, ppos, cd);
1421 }
1422
1423 static ssize_t write_flush_procfs(struct file *filp,
1424                                   const char __user *buf,
1425                                   size_t count, loff_t *ppos)
1426 {
1427         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1428
1429         return write_flush(filp, buf, count, ppos, cd);
1430 }
1431
1432 static const struct file_operations cache_flush_operations_procfs = {
1433         .open           = open_flush_procfs,
1434         .read           = read_flush_procfs,
1435         .write          = write_flush_procfs,
1436         .release        = release_flush_procfs,
1437 };
1438
1439 static void remove_cache_proc_entries(struct cache_detail *cd)
1440 {
1441         if (cd->u.procfs.proc_ent == NULL)
1442                 return;
1443         if (cd->u.procfs.flush_ent)
1444                 remove_proc_entry("flush", cd->u.procfs.proc_ent);
1445         if (cd->u.procfs.channel_ent)
1446                 remove_proc_entry("channel", cd->u.procfs.proc_ent);
1447         if (cd->u.procfs.content_ent)
1448                 remove_proc_entry("content", cd->u.procfs.proc_ent);
1449         cd->u.procfs.proc_ent = NULL;
1450         remove_proc_entry(cd->name, proc_net_rpc);
1451 }
1452
1453 #ifdef CONFIG_PROC_FS
1454 static int create_cache_proc_entries(struct cache_detail *cd)
1455 {
1456         struct proc_dir_entry *p;
1457
1458         cd->u.procfs.proc_ent = proc_mkdir(cd->name, proc_net_rpc);
1459         if (cd->u.procfs.proc_ent == NULL)
1460                 goto out_nomem;
1461         cd->u.procfs.channel_ent = NULL;
1462         cd->u.procfs.content_ent = NULL;
1463
1464         p = proc_create_data("flush", S_IFREG|S_IRUSR|S_IWUSR,
1465                              cd->u.procfs.proc_ent,
1466                              &cache_flush_operations_procfs, cd);
1467         cd->u.procfs.flush_ent = p;
1468         if (p == NULL)
1469                 goto out_nomem;
1470
1471         if (cd->cache_upcall || cd->cache_parse) {
1472                 p = proc_create_data("channel", S_IFREG|S_IRUSR|S_IWUSR,
1473                                      cd->u.procfs.proc_ent,
1474                                      &cache_file_operations_procfs, cd);
1475                 cd->u.procfs.channel_ent = p;
1476                 if (p == NULL)
1477                         goto out_nomem;
1478         }
1479         if (cd->cache_show) {
1480                 p = proc_create_data("content", S_IFREG|S_IRUSR|S_IWUSR,
1481                                 cd->u.procfs.proc_ent,
1482                                 &content_file_operations_procfs, cd);
1483                 cd->u.procfs.content_ent = p;
1484                 if (p == NULL)
1485                         goto out_nomem;
1486         }
1487         return 0;
1488 out_nomem:
1489         remove_cache_proc_entries(cd);
1490         return -ENOMEM;
1491 }
1492 #else /* CONFIG_PROC_FS */
1493 static int create_cache_proc_entries(struct cache_detail *cd)
1494 {
1495         return 0;
1496 }
1497 #endif
1498
1499 int cache_register(struct cache_detail *cd)
1500 {
1501         int ret;
1502
1503         sunrpc_init_cache_detail(cd);
1504         ret = create_cache_proc_entries(cd);
1505         if (ret)
1506                 sunrpc_destroy_cache_detail(cd);
1507         return ret;
1508 }
1509 EXPORT_SYMBOL_GPL(cache_register);
1510
1511 void cache_unregister(struct cache_detail *cd)
1512 {
1513         remove_cache_proc_entries(cd);
1514         sunrpc_destroy_cache_detail(cd);
1515 }
1516 EXPORT_SYMBOL_GPL(cache_unregister);
1517
1518 static ssize_t cache_read_pipefs(struct file *filp, char __user *buf,
1519                                  size_t count, loff_t *ppos)
1520 {
1521         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1522
1523         return cache_read(filp, buf, count, ppos, cd);
1524 }
1525
1526 static ssize_t cache_write_pipefs(struct file *filp, const char __user *buf,
1527                                   size_t count, loff_t *ppos)
1528 {
1529         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1530
1531         return cache_write(filp, buf, count, ppos, cd);
1532 }
1533
1534 static unsigned int cache_poll_pipefs(struct file *filp, poll_table *wait)
1535 {
1536         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1537
1538         return cache_poll(filp, wait, cd);
1539 }
1540
1541 static int cache_ioctl_pipefs(struct inode *inode, struct file *filp,
1542                               unsigned int cmd, unsigned long arg)
1543 {
1544         struct cache_detail *cd = RPC_I(inode)->private;
1545
1546         return cache_ioctl(inode, filp, cmd, arg, cd);
1547 }
1548
1549 static int cache_open_pipefs(struct inode *inode, struct file *filp)
1550 {
1551         struct cache_detail *cd = RPC_I(inode)->private;
1552
1553         return cache_open(inode, filp, cd);
1554 }
1555
1556 static int cache_release_pipefs(struct inode *inode, struct file *filp)
1557 {
1558         struct cache_detail *cd = RPC_I(inode)->private;
1559
1560         return cache_release(inode, filp, cd);
1561 }
1562
1563 const struct file_operations cache_file_operations_pipefs = {
1564         .owner          = THIS_MODULE,
1565         .llseek         = no_llseek,
1566         .read           = cache_read_pipefs,
1567         .write          = cache_write_pipefs,
1568         .poll           = cache_poll_pipefs,
1569         .ioctl          = cache_ioctl_pipefs, /* for FIONREAD */
1570         .open           = cache_open_pipefs,
1571         .release        = cache_release_pipefs,
1572 };
1573
1574 static int content_open_pipefs(struct inode *inode, struct file *filp)
1575 {
1576         struct cache_detail *cd = RPC_I(inode)->private;
1577
1578         return content_open(inode, filp, cd);
1579 }
1580
1581 static int content_release_pipefs(struct inode *inode, struct file *filp)
1582 {
1583         struct cache_detail *cd = RPC_I(inode)->private;
1584
1585         return content_release(inode, filp, cd);
1586 }
1587
1588 const struct file_operations content_file_operations_pipefs = {
1589         .open           = content_open_pipefs,
1590         .read           = seq_read,
1591         .llseek         = seq_lseek,
1592         .release        = content_release_pipefs,
1593 };
1594
1595 static int open_flush_pipefs(struct inode *inode, struct file *filp)
1596 {
1597         struct cache_detail *cd = RPC_I(inode)->private;
1598
1599         return open_flush(inode, filp, cd);
1600 }
1601
1602 static int release_flush_pipefs(struct inode *inode, struct file *filp)
1603 {
1604         struct cache_detail *cd = RPC_I(inode)->private;
1605
1606         return release_flush(inode, filp, cd);
1607 }
1608
1609 static ssize_t read_flush_pipefs(struct file *filp, char __user *buf,
1610                             size_t count, loff_t *ppos)
1611 {
1612         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1613
1614         return read_flush(filp, buf, count, ppos, cd);
1615 }
1616
1617 static ssize_t write_flush_pipefs(struct file *filp,
1618                                   const char __user *buf,
1619                                   size_t count, loff_t *ppos)
1620 {
1621         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1622
1623         return write_flush(filp, buf, count, ppos, cd);
1624 }
1625
1626 const struct file_operations cache_flush_operations_pipefs = {
1627         .open           = open_flush_pipefs,
1628         .read           = read_flush_pipefs,
1629         .write          = write_flush_pipefs,
1630         .release        = release_flush_pipefs,
1631 };
1632
1633 int sunrpc_cache_register_pipefs(struct dentry *parent,
1634                                  const char *name, mode_t umode,
1635                                  struct cache_detail *cd)
1636 {
1637         struct qstr q;
1638         struct dentry *dir;
1639         int ret = 0;
1640
1641         sunrpc_init_cache_detail(cd);
1642         q.name = name;
1643         q.len = strlen(name);
1644         q.hash = full_name_hash(q.name, q.len);
1645         dir = rpc_create_cache_dir(parent, &q, umode, cd);
1646         if (!IS_ERR(dir))
1647                 cd->u.pipefs.dir = dir;
1648         else {
1649                 sunrpc_destroy_cache_detail(cd);
1650                 ret = PTR_ERR(dir);
1651         }
1652         return ret;
1653 }
1654 EXPORT_SYMBOL_GPL(sunrpc_cache_register_pipefs);
1655
1656 void sunrpc_cache_unregister_pipefs(struct cache_detail *cd)
1657 {
1658         rpc_remove_cache_dir(cd->u.pipefs.dir);
1659         cd->u.pipefs.dir = NULL;
1660         sunrpc_destroy_cache_detail(cd);
1661 }
1662 EXPORT_SYMBOL_GPL(sunrpc_cache_unregister_pipefs);
1663