sunrpc/cache: change cache_defer_req to return -ve error, not boolean.
[safe/jmp/linux-2.6] / net / sunrpc / cache.c
1 /*
2  * net/sunrpc/cache.c
3  *
4  * Generic code for various authentication-related caches
5  * used by sunrpc clients and servers.
6  *
7  * Copyright (C) 2002 Neil Brown <neilb@cse.unsw.edu.au>
8  *
9  * Released under terms in GPL version 2.  See COPYING.
10  *
11  */
12
13 #include <linux/types.h>
14 #include <linux/fs.h>
15 #include <linux/file.h>
16 #include <linux/slab.h>
17 #include <linux/signal.h>
18 #include <linux/sched.h>
19 #include <linux/kmod.h>
20 #include <linux/list.h>
21 #include <linux/module.h>
22 #include <linux/ctype.h>
23 #include <asm/uaccess.h>
24 #include <linux/poll.h>
25 #include <linux/seq_file.h>
26 #include <linux/proc_fs.h>
27 #include <linux/net.h>
28 #include <linux/workqueue.h>
29 #include <linux/mutex.h>
30 #include <linux/pagemap.h>
31 #include <asm/ioctls.h>
32 #include <linux/sunrpc/types.h>
33 #include <linux/sunrpc/cache.h>
34 #include <linux/sunrpc/stats.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36
37 #define  RPCDBG_FACILITY RPCDBG_CACHE
38
39 static int cache_defer_req(struct cache_req *req, struct cache_head *item);
40 static void cache_revisit_request(struct cache_head *item);
41
42 static void cache_init(struct cache_head *h)
43 {
44         time_t now = get_seconds();
45         h->next = NULL;
46         h->flags = 0;
47         kref_init(&h->ref);
48         h->expiry_time = now + CACHE_NEW_EXPIRY;
49         h->last_refresh = now;
50 }
51
52 struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
53                                        struct cache_head *key, int hash)
54 {
55         struct cache_head **head,  **hp;
56         struct cache_head *new = NULL;
57
58         head = &detail->hash_table[hash];
59
60         read_lock(&detail->hash_lock);
61
62         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
63                 struct cache_head *tmp = *hp;
64                 if (detail->match(tmp, key)) {
65                         cache_get(tmp);
66                         read_unlock(&detail->hash_lock);
67                         return tmp;
68                 }
69         }
70         read_unlock(&detail->hash_lock);
71         /* Didn't find anything, insert an empty entry */
72
73         new = detail->alloc();
74         if (!new)
75                 return NULL;
76         /* must fully initialise 'new', else
77          * we might get lose if we need to
78          * cache_put it soon.
79          */
80         cache_init(new);
81         detail->init(new, key);
82
83         write_lock(&detail->hash_lock);
84
85         /* check if entry appeared while we slept */
86         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
87                 struct cache_head *tmp = *hp;
88                 if (detail->match(tmp, key)) {
89                         cache_get(tmp);
90                         write_unlock(&detail->hash_lock);
91                         cache_put(new, detail);
92                         return tmp;
93                 }
94         }
95         new->next = *head;
96         *head = new;
97         detail->entries++;
98         cache_get(new);
99         write_unlock(&detail->hash_lock);
100
101         return new;
102 }
103 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
104
105
106 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
107
108 static int cache_fresh_locked(struct cache_head *head, time_t expiry)
109 {
110         head->expiry_time = expiry;
111         head->last_refresh = get_seconds();
112         return !test_and_set_bit(CACHE_VALID, &head->flags);
113 }
114
115 static void cache_fresh_unlocked(struct cache_head *head,
116                         struct cache_detail *detail, int new)
117 {
118         if (new)
119                 cache_revisit_request(head);
120         if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
121                 cache_revisit_request(head);
122                 cache_dequeue(detail, head);
123         }
124 }
125
126 struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
127                                        struct cache_head *new, struct cache_head *old, int hash)
128 {
129         /* The 'old' entry is to be replaced by 'new'.
130          * If 'old' is not VALID, we update it directly,
131          * otherwise we need to replace it
132          */
133         struct cache_head **head;
134         struct cache_head *tmp;
135         int is_new;
136
137         if (!test_bit(CACHE_VALID, &old->flags)) {
138                 write_lock(&detail->hash_lock);
139                 if (!test_bit(CACHE_VALID, &old->flags)) {
140                         if (test_bit(CACHE_NEGATIVE, &new->flags))
141                                 set_bit(CACHE_NEGATIVE, &old->flags);
142                         else
143                                 detail->update(old, new);
144                         is_new = cache_fresh_locked(old, new->expiry_time);
145                         write_unlock(&detail->hash_lock);
146                         cache_fresh_unlocked(old, detail, is_new);
147                         return old;
148                 }
149                 write_unlock(&detail->hash_lock);
150         }
151         /* We need to insert a new entry */
152         tmp = detail->alloc();
153         if (!tmp) {
154                 cache_put(old, detail);
155                 return NULL;
156         }
157         cache_init(tmp);
158         detail->init(tmp, old);
159         head = &detail->hash_table[hash];
160
161         write_lock(&detail->hash_lock);
162         if (test_bit(CACHE_NEGATIVE, &new->flags))
163                 set_bit(CACHE_NEGATIVE, &tmp->flags);
164         else
165                 detail->update(tmp, new);
166         tmp->next = *head;
167         *head = tmp;
168         detail->entries++;
169         cache_get(tmp);
170         is_new = cache_fresh_locked(tmp, new->expiry_time);
171         cache_fresh_locked(old, 0);
172         write_unlock(&detail->hash_lock);
173         cache_fresh_unlocked(tmp, detail, is_new);
174         cache_fresh_unlocked(old, detail, 0);
175         cache_put(old, detail);
176         return tmp;
177 }
178 EXPORT_SYMBOL_GPL(sunrpc_cache_update);
179
180 static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
181 {
182         if (!cd->cache_upcall)
183                 return -EINVAL;
184         return cd->cache_upcall(cd, h);
185 }
186
187 static inline int cache_is_valid(struct cache_detail *detail, struct cache_head *h)
188 {
189         if (!test_bit(CACHE_VALID, &h->flags) ||
190             h->expiry_time < get_seconds())
191                 return -EAGAIN;
192         else if (detail->flush_time > h->last_refresh)
193                 return -EAGAIN;
194         else {
195                 /* entry is valid */
196                 if (test_bit(CACHE_NEGATIVE, &h->flags))
197                         return -ENOENT;
198                 else
199                         return 0;
200         }
201 }
202
203 /*
204  * This is the generic cache management routine for all
205  * the authentication caches.
206  * It checks the currency of a cache item and will (later)
207  * initiate an upcall to fill it if needed.
208  *
209  *
210  * Returns 0 if the cache_head can be used, or cache_puts it and returns
211  * -EAGAIN if upcall is pending and request has been queued
212  * -ETIMEDOUT if upcall failed or request could not be queue or
213  *           upcall completed but item is still invalid (implying that
214  *           the cache item has been replaced with a newer one).
215  * -ENOENT if cache entry was negative
216  */
217 int cache_check(struct cache_detail *detail,
218                     struct cache_head *h, struct cache_req *rqstp)
219 {
220         int rv;
221         long refresh_age, age;
222
223         /* First decide return status as best we can */
224         rv = cache_is_valid(detail, h);
225
226         /* now see if we want to start an upcall */
227         refresh_age = (h->expiry_time - h->last_refresh);
228         age = get_seconds() - h->last_refresh;
229
230         if (rqstp == NULL) {
231                 if (rv == -EAGAIN)
232                         rv = -ENOENT;
233         } else if (rv == -EAGAIN || age > refresh_age/2) {
234                 dprintk("RPC:       Want update, refage=%ld, age=%ld\n",
235                                 refresh_age, age);
236                 if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
237                         switch (cache_make_upcall(detail, h)) {
238                         case -EINVAL:
239                                 clear_bit(CACHE_PENDING, &h->flags);
240                                 cache_revisit_request(h);
241                                 if (rv == -EAGAIN) {
242                                         set_bit(CACHE_NEGATIVE, &h->flags);
243                                         cache_fresh_unlocked(h, detail,
244                                              cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY));
245                                         rv = -ENOENT;
246                                 }
247                                 break;
248
249                         case -EAGAIN:
250                                 clear_bit(CACHE_PENDING, &h->flags);
251                                 cache_revisit_request(h);
252                                 break;
253                         }
254                 }
255         }
256
257         if (rv == -EAGAIN) {
258                 if (cache_defer_req(rqstp, h) < 0) {
259                         /* Request is not deferred */
260                         rv = cache_is_valid(detail, h);
261                         if (rv == -EAGAIN)
262                                 rv = -ETIMEDOUT;
263                 }
264         }
265         if (rv)
266                 cache_put(h, detail);
267         return rv;
268 }
269 EXPORT_SYMBOL_GPL(cache_check);
270
271 /*
272  * caches need to be periodically cleaned.
273  * For this we maintain a list of cache_detail and
274  * a current pointer into that list and into the table
275  * for that entry.
276  *
277  * Each time clean_cache is called it finds the next non-empty entry
278  * in the current table and walks the list in that entry
279  * looking for entries that can be removed.
280  *
281  * An entry gets removed if:
282  * - The expiry is before current time
283  * - The last_refresh time is before the flush_time for that cache
284  *
285  * later we might drop old entries with non-NEVER expiry if that table
286  * is getting 'full' for some definition of 'full'
287  *
288  * The question of "how often to scan a table" is an interesting one
289  * and is answered in part by the use of the "nextcheck" field in the
290  * cache_detail.
291  * When a scan of a table begins, the nextcheck field is set to a time
292  * that is well into the future.
293  * While scanning, if an expiry time is found that is earlier than the
294  * current nextcheck time, nextcheck is set to that expiry time.
295  * If the flush_time is ever set to a time earlier than the nextcheck
296  * time, the nextcheck time is then set to that flush_time.
297  *
298  * A table is then only scanned if the current time is at least
299  * the nextcheck time.
300  *
301  */
302
303 static LIST_HEAD(cache_list);
304 static DEFINE_SPINLOCK(cache_list_lock);
305 static struct cache_detail *current_detail;
306 static int current_index;
307
308 static void do_cache_clean(struct work_struct *work);
309 static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
310
311 static void sunrpc_init_cache_detail(struct cache_detail *cd)
312 {
313         rwlock_init(&cd->hash_lock);
314         INIT_LIST_HEAD(&cd->queue);
315         spin_lock(&cache_list_lock);
316         cd->nextcheck = 0;
317         cd->entries = 0;
318         atomic_set(&cd->readers, 0);
319         cd->last_close = 0;
320         cd->last_warn = -1;
321         list_add(&cd->others, &cache_list);
322         spin_unlock(&cache_list_lock);
323
324         /* start the cleaning process */
325         schedule_delayed_work(&cache_cleaner, 0);
326 }
327
328 static void sunrpc_destroy_cache_detail(struct cache_detail *cd)
329 {
330         cache_purge(cd);
331         spin_lock(&cache_list_lock);
332         write_lock(&cd->hash_lock);
333         if (cd->entries || atomic_read(&cd->inuse)) {
334                 write_unlock(&cd->hash_lock);
335                 spin_unlock(&cache_list_lock);
336                 goto out;
337         }
338         if (current_detail == cd)
339                 current_detail = NULL;
340         list_del_init(&cd->others);
341         write_unlock(&cd->hash_lock);
342         spin_unlock(&cache_list_lock);
343         if (list_empty(&cache_list)) {
344                 /* module must be being unloaded so its safe to kill the worker */
345                 cancel_delayed_work_sync(&cache_cleaner);
346         }
347         return;
348 out:
349         printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
350 }
351
352 /* clean cache tries to find something to clean
353  * and cleans it.
354  * It returns 1 if it cleaned something,
355  *            0 if it didn't find anything this time
356  *           -1 if it fell off the end of the list.
357  */
358 static int cache_clean(void)
359 {
360         int rv = 0;
361         struct list_head *next;
362
363         spin_lock(&cache_list_lock);
364
365         /* find a suitable table if we don't already have one */
366         while (current_detail == NULL ||
367             current_index >= current_detail->hash_size) {
368                 if (current_detail)
369                         next = current_detail->others.next;
370                 else
371                         next = cache_list.next;
372                 if (next == &cache_list) {
373                         current_detail = NULL;
374                         spin_unlock(&cache_list_lock);
375                         return -1;
376                 }
377                 current_detail = list_entry(next, struct cache_detail, others);
378                 if (current_detail->nextcheck > get_seconds())
379                         current_index = current_detail->hash_size;
380                 else {
381                         current_index = 0;
382                         current_detail->nextcheck = get_seconds()+30*60;
383                 }
384         }
385
386         /* find a non-empty bucket in the table */
387         while (current_detail &&
388                current_index < current_detail->hash_size &&
389                current_detail->hash_table[current_index] == NULL)
390                 current_index++;
391
392         /* find a cleanable entry in the bucket and clean it, or set to next bucket */
393
394         if (current_detail && current_index < current_detail->hash_size) {
395                 struct cache_head *ch, **cp;
396                 struct cache_detail *d;
397
398                 write_lock(&current_detail->hash_lock);
399
400                 /* Ok, now to clean this strand */
401
402                 cp = & current_detail->hash_table[current_index];
403                 ch = *cp;
404                 for (; ch; cp= & ch->next, ch= *cp) {
405                         if (current_detail->nextcheck > ch->expiry_time)
406                                 current_detail->nextcheck = ch->expiry_time+1;
407                         if (ch->expiry_time >= get_seconds()
408                             && ch->last_refresh >= current_detail->flush_time
409                                 )
410                                 continue;
411                         if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
412                                 cache_dequeue(current_detail, ch);
413
414                         if (atomic_read(&ch->ref.refcount) == 1)
415                                 break;
416                 }
417                 if (ch) {
418                         *cp = ch->next;
419                         ch->next = NULL;
420                         current_detail->entries--;
421                         rv = 1;
422                 }
423                 write_unlock(&current_detail->hash_lock);
424                 d = current_detail;
425                 if (!ch)
426                         current_index ++;
427                 spin_unlock(&cache_list_lock);
428                 if (ch) {
429                         cache_revisit_request(ch);
430                         cache_put(ch, d);
431                 }
432         } else
433                 spin_unlock(&cache_list_lock);
434
435         return rv;
436 }
437
438 /*
439  * We want to regularly clean the cache, so we need to schedule some work ...
440  */
441 static void do_cache_clean(struct work_struct *work)
442 {
443         int delay = 5;
444         if (cache_clean() == -1)
445                 delay = round_jiffies_relative(30*HZ);
446
447         if (list_empty(&cache_list))
448                 delay = 0;
449
450         if (delay)
451                 schedule_delayed_work(&cache_cleaner, delay);
452 }
453
454
455 /*
456  * Clean all caches promptly.  This just calls cache_clean
457  * repeatedly until we are sure that every cache has had a chance to
458  * be fully cleaned
459  */
460 void cache_flush(void)
461 {
462         while (cache_clean() != -1)
463                 cond_resched();
464         while (cache_clean() != -1)
465                 cond_resched();
466 }
467 EXPORT_SYMBOL_GPL(cache_flush);
468
469 void cache_purge(struct cache_detail *detail)
470 {
471         detail->flush_time = LONG_MAX;
472         detail->nextcheck = get_seconds();
473         cache_flush();
474         detail->flush_time = 1;
475 }
476 EXPORT_SYMBOL_GPL(cache_purge);
477
478
479 /*
480  * Deferral and Revisiting of Requests.
481  *
482  * If a cache lookup finds a pending entry, we
483  * need to defer the request and revisit it later.
484  * All deferred requests are stored in a hash table,
485  * indexed by "struct cache_head *".
486  * As it may be wasteful to store a whole request
487  * structure, we allow the request to provide a
488  * deferred form, which must contain a
489  * 'struct cache_deferred_req'
490  * This cache_deferred_req contains a method to allow
491  * it to be revisited when cache info is available
492  */
493
494 #define DFR_HASHSIZE    (PAGE_SIZE/sizeof(struct list_head))
495 #define DFR_HASH(item)  ((((long)item)>>4 ^ (((long)item)>>13)) % DFR_HASHSIZE)
496
497 #define DFR_MAX 300     /* ??? */
498
499 static DEFINE_SPINLOCK(cache_defer_lock);
500 static LIST_HEAD(cache_defer_list);
501 static struct list_head cache_defer_hash[DFR_HASHSIZE];
502 static int cache_defer_cnt;
503
504 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
505 {
506         struct cache_deferred_req *dreq;
507         int hash = DFR_HASH(item);
508
509         if (cache_defer_cnt >= DFR_MAX) {
510                 /* too much in the cache, randomly drop this one,
511                  * or continue and drop the oldest below
512                  */
513                 if (net_random()&1)
514                         return -ENOMEM;
515         }
516         dreq = req->defer(req);
517         if (dreq == NULL)
518                 return -ENOMEM;
519
520         dreq->item = item;
521
522         spin_lock(&cache_defer_lock);
523
524         list_add(&dreq->recent, &cache_defer_list);
525
526         if (cache_defer_hash[hash].next == NULL)
527                 INIT_LIST_HEAD(&cache_defer_hash[hash]);
528         list_add(&dreq->hash, &cache_defer_hash[hash]);
529
530         /* it is in, now maybe clean up */
531         dreq = NULL;
532         if (++cache_defer_cnt > DFR_MAX) {
533                 dreq = list_entry(cache_defer_list.prev,
534                                   struct cache_deferred_req, recent);
535                 list_del(&dreq->recent);
536                 list_del(&dreq->hash);
537                 cache_defer_cnt--;
538         }
539         spin_unlock(&cache_defer_lock);
540
541         if (dreq) {
542                 /* there was one too many */
543                 dreq->revisit(dreq, 1);
544         }
545         if (!test_bit(CACHE_PENDING, &item->flags)) {
546                 /* must have just been validated... */
547                 cache_revisit_request(item);
548                 return -EAGAIN;
549         }
550         return 0;
551 }
552
553 static void cache_revisit_request(struct cache_head *item)
554 {
555         struct cache_deferred_req *dreq;
556         struct list_head pending;
557
558         struct list_head *lp;
559         int hash = DFR_HASH(item);
560
561         INIT_LIST_HEAD(&pending);
562         spin_lock(&cache_defer_lock);
563
564         lp = cache_defer_hash[hash].next;
565         if (lp) {
566                 while (lp != &cache_defer_hash[hash]) {
567                         dreq = list_entry(lp, struct cache_deferred_req, hash);
568                         lp = lp->next;
569                         if (dreq->item == item) {
570                                 list_del(&dreq->hash);
571                                 list_move(&dreq->recent, &pending);
572                                 cache_defer_cnt--;
573                         }
574                 }
575         }
576         spin_unlock(&cache_defer_lock);
577
578         while (!list_empty(&pending)) {
579                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
580                 list_del_init(&dreq->recent);
581                 dreq->revisit(dreq, 0);
582         }
583 }
584
585 void cache_clean_deferred(void *owner)
586 {
587         struct cache_deferred_req *dreq, *tmp;
588         struct list_head pending;
589
590
591         INIT_LIST_HEAD(&pending);
592         spin_lock(&cache_defer_lock);
593
594         list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
595                 if (dreq->owner == owner) {
596                         list_del(&dreq->hash);
597                         list_move(&dreq->recent, &pending);
598                         cache_defer_cnt--;
599                 }
600         }
601         spin_unlock(&cache_defer_lock);
602
603         while (!list_empty(&pending)) {
604                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
605                 list_del_init(&dreq->recent);
606                 dreq->revisit(dreq, 1);
607         }
608 }
609
610 /*
611  * communicate with user-space
612  *
613  * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
614  * On read, you get a full request, or block.
615  * On write, an update request is processed.
616  * Poll works if anything to read, and always allows write.
617  *
618  * Implemented by linked list of requests.  Each open file has
619  * a ->private that also exists in this list.  New requests are added
620  * to the end and may wakeup and preceding readers.
621  * New readers are added to the head.  If, on read, an item is found with
622  * CACHE_UPCALLING clear, we free it from the list.
623  *
624  */
625
626 static DEFINE_SPINLOCK(queue_lock);
627 static DEFINE_MUTEX(queue_io_mutex);
628
629 struct cache_queue {
630         struct list_head        list;
631         int                     reader; /* if 0, then request */
632 };
633 struct cache_request {
634         struct cache_queue      q;
635         struct cache_head       *item;
636         char                    * buf;
637         int                     len;
638         int                     readers;
639 };
640 struct cache_reader {
641         struct cache_queue      q;
642         int                     offset; /* if non-0, we have a refcnt on next request */
643 };
644
645 static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
646                           loff_t *ppos, struct cache_detail *cd)
647 {
648         struct cache_reader *rp = filp->private_data;
649         struct cache_request *rq;
650         struct inode *inode = filp->f_path.dentry->d_inode;
651         int err;
652
653         if (count == 0)
654                 return 0;
655
656         mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
657                               * readers on this file */
658  again:
659         spin_lock(&queue_lock);
660         /* need to find next request */
661         while (rp->q.list.next != &cd->queue &&
662                list_entry(rp->q.list.next, struct cache_queue, list)
663                ->reader) {
664                 struct list_head *next = rp->q.list.next;
665                 list_move(&rp->q.list, next);
666         }
667         if (rp->q.list.next == &cd->queue) {
668                 spin_unlock(&queue_lock);
669                 mutex_unlock(&inode->i_mutex);
670                 BUG_ON(rp->offset);
671                 return 0;
672         }
673         rq = container_of(rp->q.list.next, struct cache_request, q.list);
674         BUG_ON(rq->q.reader);
675         if (rp->offset == 0)
676                 rq->readers++;
677         spin_unlock(&queue_lock);
678
679         if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
680                 err = -EAGAIN;
681                 spin_lock(&queue_lock);
682                 list_move(&rp->q.list, &rq->q.list);
683                 spin_unlock(&queue_lock);
684         } else {
685                 if (rp->offset + count > rq->len)
686                         count = rq->len - rp->offset;
687                 err = -EFAULT;
688                 if (copy_to_user(buf, rq->buf + rp->offset, count))
689                         goto out;
690                 rp->offset += count;
691                 if (rp->offset >= rq->len) {
692                         rp->offset = 0;
693                         spin_lock(&queue_lock);
694                         list_move(&rp->q.list, &rq->q.list);
695                         spin_unlock(&queue_lock);
696                 }
697                 err = 0;
698         }
699  out:
700         if (rp->offset == 0) {
701                 /* need to release rq */
702                 spin_lock(&queue_lock);
703                 rq->readers--;
704                 if (rq->readers == 0 &&
705                     !test_bit(CACHE_PENDING, &rq->item->flags)) {
706                         list_del(&rq->q.list);
707                         spin_unlock(&queue_lock);
708                         cache_put(rq->item, cd);
709                         kfree(rq->buf);
710                         kfree(rq);
711                 } else
712                         spin_unlock(&queue_lock);
713         }
714         if (err == -EAGAIN)
715                 goto again;
716         mutex_unlock(&inode->i_mutex);
717         return err ? err :  count;
718 }
719
720 static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
721                                  size_t count, struct cache_detail *cd)
722 {
723         ssize_t ret;
724
725         if (copy_from_user(kaddr, buf, count))
726                 return -EFAULT;
727         kaddr[count] = '\0';
728         ret = cd->cache_parse(cd, kaddr, count);
729         if (!ret)
730                 ret = count;
731         return ret;
732 }
733
734 static ssize_t cache_slow_downcall(const char __user *buf,
735                                    size_t count, struct cache_detail *cd)
736 {
737         static char write_buf[8192]; /* protected by queue_io_mutex */
738         ssize_t ret = -EINVAL;
739
740         if (count >= sizeof(write_buf))
741                 goto out;
742         mutex_lock(&queue_io_mutex);
743         ret = cache_do_downcall(write_buf, buf, count, cd);
744         mutex_unlock(&queue_io_mutex);
745 out:
746         return ret;
747 }
748
749 static ssize_t cache_downcall(struct address_space *mapping,
750                               const char __user *buf,
751                               size_t count, struct cache_detail *cd)
752 {
753         struct page *page;
754         char *kaddr;
755         ssize_t ret = -ENOMEM;
756
757         if (count >= PAGE_CACHE_SIZE)
758                 goto out_slow;
759
760         page = find_or_create_page(mapping, 0, GFP_KERNEL);
761         if (!page)
762                 goto out_slow;
763
764         kaddr = kmap(page);
765         ret = cache_do_downcall(kaddr, buf, count, cd);
766         kunmap(page);
767         unlock_page(page);
768         page_cache_release(page);
769         return ret;
770 out_slow:
771         return cache_slow_downcall(buf, count, cd);
772 }
773
774 static ssize_t cache_write(struct file *filp, const char __user *buf,
775                            size_t count, loff_t *ppos,
776                            struct cache_detail *cd)
777 {
778         struct address_space *mapping = filp->f_mapping;
779         struct inode *inode = filp->f_path.dentry->d_inode;
780         ssize_t ret = -EINVAL;
781
782         if (!cd->cache_parse)
783                 goto out;
784
785         mutex_lock(&inode->i_mutex);
786         ret = cache_downcall(mapping, buf, count, cd);
787         mutex_unlock(&inode->i_mutex);
788 out:
789         return ret;
790 }
791
792 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
793
794 static unsigned int cache_poll(struct file *filp, poll_table *wait,
795                                struct cache_detail *cd)
796 {
797         unsigned int mask;
798         struct cache_reader *rp = filp->private_data;
799         struct cache_queue *cq;
800
801         poll_wait(filp, &queue_wait, wait);
802
803         /* alway allow write */
804         mask = POLL_OUT | POLLWRNORM;
805
806         if (!rp)
807                 return mask;
808
809         spin_lock(&queue_lock);
810
811         for (cq= &rp->q; &cq->list != &cd->queue;
812              cq = list_entry(cq->list.next, struct cache_queue, list))
813                 if (!cq->reader) {
814                         mask |= POLLIN | POLLRDNORM;
815                         break;
816                 }
817         spin_unlock(&queue_lock);
818         return mask;
819 }
820
821 static int cache_ioctl(struct inode *ino, struct file *filp,
822                        unsigned int cmd, unsigned long arg,
823                        struct cache_detail *cd)
824 {
825         int len = 0;
826         struct cache_reader *rp = filp->private_data;
827         struct cache_queue *cq;
828
829         if (cmd != FIONREAD || !rp)
830                 return -EINVAL;
831
832         spin_lock(&queue_lock);
833
834         /* only find the length remaining in current request,
835          * or the length of the next request
836          */
837         for (cq= &rp->q; &cq->list != &cd->queue;
838              cq = list_entry(cq->list.next, struct cache_queue, list))
839                 if (!cq->reader) {
840                         struct cache_request *cr =
841                                 container_of(cq, struct cache_request, q);
842                         len = cr->len - rp->offset;
843                         break;
844                 }
845         spin_unlock(&queue_lock);
846
847         return put_user(len, (int __user *)arg);
848 }
849
850 static int cache_open(struct inode *inode, struct file *filp,
851                       struct cache_detail *cd)
852 {
853         struct cache_reader *rp = NULL;
854
855         if (!cd || !try_module_get(cd->owner))
856                 return -EACCES;
857         nonseekable_open(inode, filp);
858         if (filp->f_mode & FMODE_READ) {
859                 rp = kmalloc(sizeof(*rp), GFP_KERNEL);
860                 if (!rp)
861                         return -ENOMEM;
862                 rp->offset = 0;
863                 rp->q.reader = 1;
864                 atomic_inc(&cd->readers);
865                 spin_lock(&queue_lock);
866                 list_add(&rp->q.list, &cd->queue);
867                 spin_unlock(&queue_lock);
868         }
869         filp->private_data = rp;
870         return 0;
871 }
872
873 static int cache_release(struct inode *inode, struct file *filp,
874                          struct cache_detail *cd)
875 {
876         struct cache_reader *rp = filp->private_data;
877
878         if (rp) {
879                 spin_lock(&queue_lock);
880                 if (rp->offset) {
881                         struct cache_queue *cq;
882                         for (cq= &rp->q; &cq->list != &cd->queue;
883                              cq = list_entry(cq->list.next, struct cache_queue, list))
884                                 if (!cq->reader) {
885                                         container_of(cq, struct cache_request, q)
886                                                 ->readers--;
887                                         break;
888                                 }
889                         rp->offset = 0;
890                 }
891                 list_del(&rp->q.list);
892                 spin_unlock(&queue_lock);
893
894                 filp->private_data = NULL;
895                 kfree(rp);
896
897                 cd->last_close = get_seconds();
898                 atomic_dec(&cd->readers);
899         }
900         module_put(cd->owner);
901         return 0;
902 }
903
904
905
906 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
907 {
908         struct cache_queue *cq;
909         spin_lock(&queue_lock);
910         list_for_each_entry(cq, &detail->queue, list)
911                 if (!cq->reader) {
912                         struct cache_request *cr = container_of(cq, struct cache_request, q);
913                         if (cr->item != ch)
914                                 continue;
915                         if (cr->readers != 0)
916                                 continue;
917                         list_del(&cr->q.list);
918                         spin_unlock(&queue_lock);
919                         cache_put(cr->item, detail);
920                         kfree(cr->buf);
921                         kfree(cr);
922                         return;
923                 }
924         spin_unlock(&queue_lock);
925 }
926
927 /*
928  * Support routines for text-based upcalls.
929  * Fields are separated by spaces.
930  * Fields are either mangled to quote space tab newline slosh with slosh
931  * or a hexified with a leading \x
932  * Record is terminated with newline.
933  *
934  */
935
936 void qword_add(char **bpp, int *lp, char *str)
937 {
938         char *bp = *bpp;
939         int len = *lp;
940         char c;
941
942         if (len < 0) return;
943
944         while ((c=*str++) && len)
945                 switch(c) {
946                 case ' ':
947                 case '\t':
948                 case '\n':
949                 case '\\':
950                         if (len >= 4) {
951                                 *bp++ = '\\';
952                                 *bp++ = '0' + ((c & 0300)>>6);
953                                 *bp++ = '0' + ((c & 0070)>>3);
954                                 *bp++ = '0' + ((c & 0007)>>0);
955                         }
956                         len -= 4;
957                         break;
958                 default:
959                         *bp++ = c;
960                         len--;
961                 }
962         if (c || len <1) len = -1;
963         else {
964                 *bp++ = ' ';
965                 len--;
966         }
967         *bpp = bp;
968         *lp = len;
969 }
970 EXPORT_SYMBOL_GPL(qword_add);
971
972 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
973 {
974         char *bp = *bpp;
975         int len = *lp;
976
977         if (len < 0) return;
978
979         if (len > 2) {
980                 *bp++ = '\\';
981                 *bp++ = 'x';
982                 len -= 2;
983                 while (blen && len >= 2) {
984                         unsigned char c = *buf++;
985                         *bp++ = '0' + ((c&0xf0)>>4) + (c>=0xa0)*('a'-'9'-1);
986                         *bp++ = '0' + (c&0x0f) + ((c&0x0f)>=0x0a)*('a'-'9'-1);
987                         len -= 2;
988                         blen--;
989                 }
990         }
991         if (blen || len<1) len = -1;
992         else {
993                 *bp++ = ' ';
994                 len--;
995         }
996         *bpp = bp;
997         *lp = len;
998 }
999 EXPORT_SYMBOL_GPL(qword_addhex);
1000
1001 static void warn_no_listener(struct cache_detail *detail)
1002 {
1003         if (detail->last_warn != detail->last_close) {
1004                 detail->last_warn = detail->last_close;
1005                 if (detail->warn_no_listener)
1006                         detail->warn_no_listener(detail, detail->last_close != 0);
1007         }
1008 }
1009
1010 /*
1011  * register an upcall request to user-space and queue it up for read() by the
1012  * upcall daemon.
1013  *
1014  * Each request is at most one page long.
1015  */
1016 int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h,
1017                 void (*cache_request)(struct cache_detail *,
1018                                       struct cache_head *,
1019                                       char **,
1020                                       int *))
1021 {
1022
1023         char *buf;
1024         struct cache_request *crq;
1025         char *bp;
1026         int len;
1027
1028         if (atomic_read(&detail->readers) == 0 &&
1029             detail->last_close < get_seconds() - 30) {
1030                         warn_no_listener(detail);
1031                         return -EINVAL;
1032         }
1033
1034         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1035         if (!buf)
1036                 return -EAGAIN;
1037
1038         crq = kmalloc(sizeof (*crq), GFP_KERNEL);
1039         if (!crq) {
1040                 kfree(buf);
1041                 return -EAGAIN;
1042         }
1043
1044         bp = buf; len = PAGE_SIZE;
1045
1046         cache_request(detail, h, &bp, &len);
1047
1048         if (len < 0) {
1049                 kfree(buf);
1050                 kfree(crq);
1051                 return -EAGAIN;
1052         }
1053         crq->q.reader = 0;
1054         crq->item = cache_get(h);
1055         crq->buf = buf;
1056         crq->len = PAGE_SIZE - len;
1057         crq->readers = 0;
1058         spin_lock(&queue_lock);
1059         list_add_tail(&crq->q.list, &detail->queue);
1060         spin_unlock(&queue_lock);
1061         wake_up(&queue_wait);
1062         return 0;
1063 }
1064 EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall);
1065
1066 /*
1067  * parse a message from user-space and pass it
1068  * to an appropriate cache
1069  * Messages are, like requests, separated into fields by
1070  * spaces and dequotes as \xHEXSTRING or embedded \nnn octal
1071  *
1072  * Message is
1073  *   reply cachename expiry key ... content....
1074  *
1075  * key and content are both parsed by cache
1076  */
1077
1078 #define isodigit(c) (isdigit(c) && c <= '7')
1079 int qword_get(char **bpp, char *dest, int bufsize)
1080 {
1081         /* return bytes copied, or -1 on error */
1082         char *bp = *bpp;
1083         int len = 0;
1084
1085         while (*bp == ' ') bp++;
1086
1087         if (bp[0] == '\\' && bp[1] == 'x') {
1088                 /* HEX STRING */
1089                 bp += 2;
1090                 while (isxdigit(bp[0]) && isxdigit(bp[1]) && len < bufsize) {
1091                         int byte = isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1092                         bp++;
1093                         byte <<= 4;
1094                         byte |= isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1095                         *dest++ = byte;
1096                         bp++;
1097                         len++;
1098                 }
1099         } else {
1100                 /* text with \nnn octal quoting */
1101                 while (*bp != ' ' && *bp != '\n' && *bp && len < bufsize-1) {
1102                         if (*bp == '\\' &&
1103                             isodigit(bp[1]) && (bp[1] <= '3') &&
1104                             isodigit(bp[2]) &&
1105                             isodigit(bp[3])) {
1106                                 int byte = (*++bp -'0');
1107                                 bp++;
1108                                 byte = (byte << 3) | (*bp++ - '0');
1109                                 byte = (byte << 3) | (*bp++ - '0');
1110                                 *dest++ = byte;
1111                                 len++;
1112                         } else {
1113                                 *dest++ = *bp++;
1114                                 len++;
1115                         }
1116                 }
1117         }
1118
1119         if (*bp != ' ' && *bp != '\n' && *bp != '\0')
1120                 return -1;
1121         while (*bp == ' ') bp++;
1122         *bpp = bp;
1123         *dest = '\0';
1124         return len;
1125 }
1126 EXPORT_SYMBOL_GPL(qword_get);
1127
1128
1129 /*
1130  * support /proc/sunrpc/cache/$CACHENAME/content
1131  * as a seqfile.
1132  * We call ->cache_show passing NULL for the item to
1133  * get a header, then pass each real item in the cache
1134  */
1135
1136 struct handle {
1137         struct cache_detail *cd;
1138 };
1139
1140 static void *c_start(struct seq_file *m, loff_t *pos)
1141         __acquires(cd->hash_lock)
1142 {
1143         loff_t n = *pos;
1144         unsigned hash, entry;
1145         struct cache_head *ch;
1146         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1147
1148
1149         read_lock(&cd->hash_lock);
1150         if (!n--)
1151                 return SEQ_START_TOKEN;
1152         hash = n >> 32;
1153         entry = n & ((1LL<<32) - 1);
1154
1155         for (ch=cd->hash_table[hash]; ch; ch=ch->next)
1156                 if (!entry--)
1157                         return ch;
1158         n &= ~((1LL<<32) - 1);
1159         do {
1160                 hash++;
1161                 n += 1LL<<32;
1162         } while(hash < cd->hash_size &&
1163                 cd->hash_table[hash]==NULL);
1164         if (hash >= cd->hash_size)
1165                 return NULL;
1166         *pos = n+1;
1167         return cd->hash_table[hash];
1168 }
1169
1170 static void *c_next(struct seq_file *m, void *p, loff_t *pos)
1171 {
1172         struct cache_head *ch = p;
1173         int hash = (*pos >> 32);
1174         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1175
1176         if (p == SEQ_START_TOKEN)
1177                 hash = 0;
1178         else if (ch->next == NULL) {
1179                 hash++;
1180                 *pos += 1LL<<32;
1181         } else {
1182                 ++*pos;
1183                 return ch->next;
1184         }
1185         *pos &= ~((1LL<<32) - 1);
1186         while (hash < cd->hash_size &&
1187                cd->hash_table[hash] == NULL) {
1188                 hash++;
1189                 *pos += 1LL<<32;
1190         }
1191         if (hash >= cd->hash_size)
1192                 return NULL;
1193         ++*pos;
1194         return cd->hash_table[hash];
1195 }
1196
1197 static void c_stop(struct seq_file *m, void *p)
1198         __releases(cd->hash_lock)
1199 {
1200         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1201         read_unlock(&cd->hash_lock);
1202 }
1203
1204 static int c_show(struct seq_file *m, void *p)
1205 {
1206         struct cache_head *cp = p;
1207         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1208
1209         if (p == SEQ_START_TOKEN)
1210                 return cd->cache_show(m, cd, NULL);
1211
1212         ifdebug(CACHE)
1213                 seq_printf(m, "# expiry=%ld refcnt=%d flags=%lx\n",
1214                            cp->expiry_time, atomic_read(&cp->ref.refcount), cp->flags);
1215         cache_get(cp);
1216         if (cache_check(cd, cp, NULL))
1217                 /* cache_check does a cache_put on failure */
1218                 seq_printf(m, "# ");
1219         else
1220                 cache_put(cp, cd);
1221
1222         return cd->cache_show(m, cd, cp);
1223 }
1224
1225 static const struct seq_operations cache_content_op = {
1226         .start  = c_start,
1227         .next   = c_next,
1228         .stop   = c_stop,
1229         .show   = c_show,
1230 };
1231
1232 static int content_open(struct inode *inode, struct file *file,
1233                         struct cache_detail *cd)
1234 {
1235         struct handle *han;
1236
1237         if (!cd || !try_module_get(cd->owner))
1238                 return -EACCES;
1239         han = __seq_open_private(file, &cache_content_op, sizeof(*han));
1240         if (han == NULL)
1241                 return -ENOMEM;
1242
1243         han->cd = cd;
1244         return 0;
1245 }
1246
1247 static int content_release(struct inode *inode, struct file *file,
1248                 struct cache_detail *cd)
1249 {
1250         int ret = seq_release_private(inode, file);
1251         module_put(cd->owner);
1252         return ret;
1253 }
1254
1255 static int open_flush(struct inode *inode, struct file *file,
1256                         struct cache_detail *cd)
1257 {
1258         if (!cd || !try_module_get(cd->owner))
1259                 return -EACCES;
1260         return nonseekable_open(inode, file);
1261 }
1262
1263 static int release_flush(struct inode *inode, struct file *file,
1264                         struct cache_detail *cd)
1265 {
1266         module_put(cd->owner);
1267         return 0;
1268 }
1269
1270 static ssize_t read_flush(struct file *file, char __user *buf,
1271                           size_t count, loff_t *ppos,
1272                           struct cache_detail *cd)
1273 {
1274         char tbuf[20];
1275         unsigned long p = *ppos;
1276         size_t len;
1277
1278         sprintf(tbuf, "%lu\n", cd->flush_time);
1279         len = strlen(tbuf);
1280         if (p >= len)
1281                 return 0;
1282         len -= p;
1283         if (len > count)
1284                 len = count;
1285         if (copy_to_user(buf, (void*)(tbuf+p), len))
1286                 return -EFAULT;
1287         *ppos += len;
1288         return len;
1289 }
1290
1291 static ssize_t write_flush(struct file *file, const char __user *buf,
1292                            size_t count, loff_t *ppos,
1293                            struct cache_detail *cd)
1294 {
1295         char tbuf[20];
1296         char *ep;
1297         long flushtime;
1298         if (*ppos || count > sizeof(tbuf)-1)
1299                 return -EINVAL;
1300         if (copy_from_user(tbuf, buf, count))
1301                 return -EFAULT;
1302         tbuf[count] = 0;
1303         flushtime = simple_strtoul(tbuf, &ep, 0);
1304         if (*ep && *ep != '\n')
1305                 return -EINVAL;
1306
1307         cd->flush_time = flushtime;
1308         cd->nextcheck = get_seconds();
1309         cache_flush();
1310
1311         *ppos += count;
1312         return count;
1313 }
1314
1315 static ssize_t cache_read_procfs(struct file *filp, char __user *buf,
1316                                  size_t count, loff_t *ppos)
1317 {
1318         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1319
1320         return cache_read(filp, buf, count, ppos, cd);
1321 }
1322
1323 static ssize_t cache_write_procfs(struct file *filp, const char __user *buf,
1324                                   size_t count, loff_t *ppos)
1325 {
1326         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1327
1328         return cache_write(filp, buf, count, ppos, cd);
1329 }
1330
1331 static unsigned int cache_poll_procfs(struct file *filp, poll_table *wait)
1332 {
1333         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1334
1335         return cache_poll(filp, wait, cd);
1336 }
1337
1338 static int cache_ioctl_procfs(struct inode *inode, struct file *filp,
1339                               unsigned int cmd, unsigned long arg)
1340 {
1341         struct cache_detail *cd = PDE(inode)->data;
1342
1343         return cache_ioctl(inode, filp, cmd, arg, cd);
1344 }
1345
1346 static int cache_open_procfs(struct inode *inode, struct file *filp)
1347 {
1348         struct cache_detail *cd = PDE(inode)->data;
1349
1350         return cache_open(inode, filp, cd);
1351 }
1352
1353 static int cache_release_procfs(struct inode *inode, struct file *filp)
1354 {
1355         struct cache_detail *cd = PDE(inode)->data;
1356
1357         return cache_release(inode, filp, cd);
1358 }
1359
1360 static const struct file_operations cache_file_operations_procfs = {
1361         .owner          = THIS_MODULE,
1362         .llseek         = no_llseek,
1363         .read           = cache_read_procfs,
1364         .write          = cache_write_procfs,
1365         .poll           = cache_poll_procfs,
1366         .ioctl          = cache_ioctl_procfs, /* for FIONREAD */
1367         .open           = cache_open_procfs,
1368         .release        = cache_release_procfs,
1369 };
1370
1371 static int content_open_procfs(struct inode *inode, struct file *filp)
1372 {
1373         struct cache_detail *cd = PDE(inode)->data;
1374
1375         return content_open(inode, filp, cd);
1376 }
1377
1378 static int content_release_procfs(struct inode *inode, struct file *filp)
1379 {
1380         struct cache_detail *cd = PDE(inode)->data;
1381
1382         return content_release(inode, filp, cd);
1383 }
1384
1385 static const struct file_operations content_file_operations_procfs = {
1386         .open           = content_open_procfs,
1387         .read           = seq_read,
1388         .llseek         = seq_lseek,
1389         .release        = content_release_procfs,
1390 };
1391
1392 static int open_flush_procfs(struct inode *inode, struct file *filp)
1393 {
1394         struct cache_detail *cd = PDE(inode)->data;
1395
1396         return open_flush(inode, filp, cd);
1397 }
1398
1399 static int release_flush_procfs(struct inode *inode, struct file *filp)
1400 {
1401         struct cache_detail *cd = PDE(inode)->data;
1402
1403         return release_flush(inode, filp, cd);
1404 }
1405
1406 static ssize_t read_flush_procfs(struct file *filp, char __user *buf,
1407                             size_t count, loff_t *ppos)
1408 {
1409         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1410
1411         return read_flush(filp, buf, count, ppos, cd);
1412 }
1413
1414 static ssize_t write_flush_procfs(struct file *filp,
1415                                   const char __user *buf,
1416                                   size_t count, loff_t *ppos)
1417 {
1418         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
1419
1420         return write_flush(filp, buf, count, ppos, cd);
1421 }
1422
1423 static const struct file_operations cache_flush_operations_procfs = {
1424         .open           = open_flush_procfs,
1425         .read           = read_flush_procfs,
1426         .write          = write_flush_procfs,
1427         .release        = release_flush_procfs,
1428 };
1429
1430 static void remove_cache_proc_entries(struct cache_detail *cd)
1431 {
1432         if (cd->u.procfs.proc_ent == NULL)
1433                 return;
1434         if (cd->u.procfs.flush_ent)
1435                 remove_proc_entry("flush", cd->u.procfs.proc_ent);
1436         if (cd->u.procfs.channel_ent)
1437                 remove_proc_entry("channel", cd->u.procfs.proc_ent);
1438         if (cd->u.procfs.content_ent)
1439                 remove_proc_entry("content", cd->u.procfs.proc_ent);
1440         cd->u.procfs.proc_ent = NULL;
1441         remove_proc_entry(cd->name, proc_net_rpc);
1442 }
1443
1444 #ifdef CONFIG_PROC_FS
1445 static int create_cache_proc_entries(struct cache_detail *cd)
1446 {
1447         struct proc_dir_entry *p;
1448
1449         cd->u.procfs.proc_ent = proc_mkdir(cd->name, proc_net_rpc);
1450         if (cd->u.procfs.proc_ent == NULL)
1451                 goto out_nomem;
1452         cd->u.procfs.channel_ent = NULL;
1453         cd->u.procfs.content_ent = NULL;
1454
1455         p = proc_create_data("flush", S_IFREG|S_IRUSR|S_IWUSR,
1456                              cd->u.procfs.proc_ent,
1457                              &cache_flush_operations_procfs, cd);
1458         cd->u.procfs.flush_ent = p;
1459         if (p == NULL)
1460                 goto out_nomem;
1461
1462         if (cd->cache_upcall || cd->cache_parse) {
1463                 p = proc_create_data("channel", S_IFREG|S_IRUSR|S_IWUSR,
1464                                      cd->u.procfs.proc_ent,
1465                                      &cache_file_operations_procfs, cd);
1466                 cd->u.procfs.channel_ent = p;
1467                 if (p == NULL)
1468                         goto out_nomem;
1469         }
1470         if (cd->cache_show) {
1471                 p = proc_create_data("content", S_IFREG|S_IRUSR|S_IWUSR,
1472                                 cd->u.procfs.proc_ent,
1473                                 &content_file_operations_procfs, cd);
1474                 cd->u.procfs.content_ent = p;
1475                 if (p == NULL)
1476                         goto out_nomem;
1477         }
1478         return 0;
1479 out_nomem:
1480         remove_cache_proc_entries(cd);
1481         return -ENOMEM;
1482 }
1483 #else /* CONFIG_PROC_FS */
1484 static int create_cache_proc_entries(struct cache_detail *cd)
1485 {
1486         return 0;
1487 }
1488 #endif
1489
1490 int cache_register(struct cache_detail *cd)
1491 {
1492         int ret;
1493
1494         sunrpc_init_cache_detail(cd);
1495         ret = create_cache_proc_entries(cd);
1496         if (ret)
1497                 sunrpc_destroy_cache_detail(cd);
1498         return ret;
1499 }
1500 EXPORT_SYMBOL_GPL(cache_register);
1501
1502 void cache_unregister(struct cache_detail *cd)
1503 {
1504         remove_cache_proc_entries(cd);
1505         sunrpc_destroy_cache_detail(cd);
1506 }
1507 EXPORT_SYMBOL_GPL(cache_unregister);
1508
1509 static ssize_t cache_read_pipefs(struct file *filp, char __user *buf,
1510                                  size_t count, loff_t *ppos)
1511 {
1512         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1513
1514         return cache_read(filp, buf, count, ppos, cd);
1515 }
1516
1517 static ssize_t cache_write_pipefs(struct file *filp, const char __user *buf,
1518                                   size_t count, loff_t *ppos)
1519 {
1520         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1521
1522         return cache_write(filp, buf, count, ppos, cd);
1523 }
1524
1525 static unsigned int cache_poll_pipefs(struct file *filp, poll_table *wait)
1526 {
1527         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1528
1529         return cache_poll(filp, wait, cd);
1530 }
1531
1532 static int cache_ioctl_pipefs(struct inode *inode, struct file *filp,
1533                               unsigned int cmd, unsigned long arg)
1534 {
1535         struct cache_detail *cd = RPC_I(inode)->private;
1536
1537         return cache_ioctl(inode, filp, cmd, arg, cd);
1538 }
1539
1540 static int cache_open_pipefs(struct inode *inode, struct file *filp)
1541 {
1542         struct cache_detail *cd = RPC_I(inode)->private;
1543
1544         return cache_open(inode, filp, cd);
1545 }
1546
1547 static int cache_release_pipefs(struct inode *inode, struct file *filp)
1548 {
1549         struct cache_detail *cd = RPC_I(inode)->private;
1550
1551         return cache_release(inode, filp, cd);
1552 }
1553
1554 const struct file_operations cache_file_operations_pipefs = {
1555         .owner          = THIS_MODULE,
1556         .llseek         = no_llseek,
1557         .read           = cache_read_pipefs,
1558         .write          = cache_write_pipefs,
1559         .poll           = cache_poll_pipefs,
1560         .ioctl          = cache_ioctl_pipefs, /* for FIONREAD */
1561         .open           = cache_open_pipefs,
1562         .release        = cache_release_pipefs,
1563 };
1564
1565 static int content_open_pipefs(struct inode *inode, struct file *filp)
1566 {
1567         struct cache_detail *cd = RPC_I(inode)->private;
1568
1569         return content_open(inode, filp, cd);
1570 }
1571
1572 static int content_release_pipefs(struct inode *inode, struct file *filp)
1573 {
1574         struct cache_detail *cd = RPC_I(inode)->private;
1575
1576         return content_release(inode, filp, cd);
1577 }
1578
1579 const struct file_operations content_file_operations_pipefs = {
1580         .open           = content_open_pipefs,
1581         .read           = seq_read,
1582         .llseek         = seq_lseek,
1583         .release        = content_release_pipefs,
1584 };
1585
1586 static int open_flush_pipefs(struct inode *inode, struct file *filp)
1587 {
1588         struct cache_detail *cd = RPC_I(inode)->private;
1589
1590         return open_flush(inode, filp, cd);
1591 }
1592
1593 static int release_flush_pipefs(struct inode *inode, struct file *filp)
1594 {
1595         struct cache_detail *cd = RPC_I(inode)->private;
1596
1597         return release_flush(inode, filp, cd);
1598 }
1599
1600 static ssize_t read_flush_pipefs(struct file *filp, char __user *buf,
1601                             size_t count, loff_t *ppos)
1602 {
1603         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1604
1605         return read_flush(filp, buf, count, ppos, cd);
1606 }
1607
1608 static ssize_t write_flush_pipefs(struct file *filp,
1609                                   const char __user *buf,
1610                                   size_t count, loff_t *ppos)
1611 {
1612         struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
1613
1614         return write_flush(filp, buf, count, ppos, cd);
1615 }
1616
1617 const struct file_operations cache_flush_operations_pipefs = {
1618         .open           = open_flush_pipefs,
1619         .read           = read_flush_pipefs,
1620         .write          = write_flush_pipefs,
1621         .release        = release_flush_pipefs,
1622 };
1623
1624 int sunrpc_cache_register_pipefs(struct dentry *parent,
1625                                  const char *name, mode_t umode,
1626                                  struct cache_detail *cd)
1627 {
1628         struct qstr q;
1629         struct dentry *dir;
1630         int ret = 0;
1631
1632         sunrpc_init_cache_detail(cd);
1633         q.name = name;
1634         q.len = strlen(name);
1635         q.hash = full_name_hash(q.name, q.len);
1636         dir = rpc_create_cache_dir(parent, &q, umode, cd);
1637         if (!IS_ERR(dir))
1638                 cd->u.pipefs.dir = dir;
1639         else {
1640                 sunrpc_destroy_cache_detail(cd);
1641                 ret = PTR_ERR(dir);
1642         }
1643         return ret;
1644 }
1645 EXPORT_SYMBOL_GPL(sunrpc_cache_register_pipefs);
1646
1647 void sunrpc_cache_unregister_pipefs(struct cache_detail *cd)
1648 {
1649         rpc_remove_cache_dir(cd->u.pipefs.dir);
1650         cd->u.pipefs.dir = NULL;
1651         sunrpc_destroy_cache_detail(cd);
1652 }
1653 EXPORT_SYMBOL_GPL(sunrpc_cache_unregister_pipefs);
1654