SUNRPC: Remove the global temporary write buffer in net/sunrpc/cache.c
[safe/jmp/linux-2.6] / net / sunrpc / cache.c
1 /*
2  * net/sunrpc/cache.c
3  *
4  * Generic code for various authentication-related caches
5  * used by sunrpc clients and servers.
6  *
7  * Copyright (C) 2002 Neil Brown <neilb@cse.unsw.edu.au>
8  *
9  * Released under terms in GPL version 2.  See COPYING.
10  *
11  */
12
13 #include <linux/types.h>
14 #include <linux/fs.h>
15 #include <linux/file.h>
16 #include <linux/slab.h>
17 #include <linux/signal.h>
18 #include <linux/sched.h>
19 #include <linux/kmod.h>
20 #include <linux/list.h>
21 #include <linux/module.h>
22 #include <linux/ctype.h>
23 #include <asm/uaccess.h>
24 #include <linux/poll.h>
25 #include <linux/seq_file.h>
26 #include <linux/proc_fs.h>
27 #include <linux/net.h>
28 #include <linux/workqueue.h>
29 #include <linux/mutex.h>
30 #include <linux/pagemap.h>
31 #include <asm/ioctls.h>
32 #include <linux/sunrpc/types.h>
33 #include <linux/sunrpc/cache.h>
34 #include <linux/sunrpc/stats.h>
35
36 #define  RPCDBG_FACILITY RPCDBG_CACHE
37
38 static int cache_defer_req(struct cache_req *req, struct cache_head *item);
39 static void cache_revisit_request(struct cache_head *item);
40
41 static void cache_init(struct cache_head *h)
42 {
43         time_t now = get_seconds();
44         h->next = NULL;
45         h->flags = 0;
46         kref_init(&h->ref);
47         h->expiry_time = now + CACHE_NEW_EXPIRY;
48         h->last_refresh = now;
49 }
50
51 struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
52                                        struct cache_head *key, int hash)
53 {
54         struct cache_head **head,  **hp;
55         struct cache_head *new = NULL;
56
57         head = &detail->hash_table[hash];
58
59         read_lock(&detail->hash_lock);
60
61         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
62                 struct cache_head *tmp = *hp;
63                 if (detail->match(tmp, key)) {
64                         cache_get(tmp);
65                         read_unlock(&detail->hash_lock);
66                         return tmp;
67                 }
68         }
69         read_unlock(&detail->hash_lock);
70         /* Didn't find anything, insert an empty entry */
71
72         new = detail->alloc();
73         if (!new)
74                 return NULL;
75         /* must fully initialise 'new', else
76          * we might get lose if we need to
77          * cache_put it soon.
78          */
79         cache_init(new);
80         detail->init(new, key);
81
82         write_lock(&detail->hash_lock);
83
84         /* check if entry appeared while we slept */
85         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
86                 struct cache_head *tmp = *hp;
87                 if (detail->match(tmp, key)) {
88                         cache_get(tmp);
89                         write_unlock(&detail->hash_lock);
90                         cache_put(new, detail);
91                         return tmp;
92                 }
93         }
94         new->next = *head;
95         *head = new;
96         detail->entries++;
97         cache_get(new);
98         write_unlock(&detail->hash_lock);
99
100         return new;
101 }
102 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
103
104
105 static void queue_loose(struct cache_detail *detail, struct cache_head *ch);
106
107 static int cache_fresh_locked(struct cache_head *head, time_t expiry)
108 {
109         head->expiry_time = expiry;
110         head->last_refresh = get_seconds();
111         return !test_and_set_bit(CACHE_VALID, &head->flags);
112 }
113
114 static void cache_fresh_unlocked(struct cache_head *head,
115                         struct cache_detail *detail, int new)
116 {
117         if (new)
118                 cache_revisit_request(head);
119         if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
120                 cache_revisit_request(head);
121                 queue_loose(detail, head);
122         }
123 }
124
125 struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
126                                        struct cache_head *new, struct cache_head *old, int hash)
127 {
128         /* The 'old' entry is to be replaced by 'new'.
129          * If 'old' is not VALID, we update it directly,
130          * otherwise we need to replace it
131          */
132         struct cache_head **head;
133         struct cache_head *tmp;
134         int is_new;
135
136         if (!test_bit(CACHE_VALID, &old->flags)) {
137                 write_lock(&detail->hash_lock);
138                 if (!test_bit(CACHE_VALID, &old->flags)) {
139                         if (test_bit(CACHE_NEGATIVE, &new->flags))
140                                 set_bit(CACHE_NEGATIVE, &old->flags);
141                         else
142                                 detail->update(old, new);
143                         is_new = cache_fresh_locked(old, new->expiry_time);
144                         write_unlock(&detail->hash_lock);
145                         cache_fresh_unlocked(old, detail, is_new);
146                         return old;
147                 }
148                 write_unlock(&detail->hash_lock);
149         }
150         /* We need to insert a new entry */
151         tmp = detail->alloc();
152         if (!tmp) {
153                 cache_put(old, detail);
154                 return NULL;
155         }
156         cache_init(tmp);
157         detail->init(tmp, old);
158         head = &detail->hash_table[hash];
159
160         write_lock(&detail->hash_lock);
161         if (test_bit(CACHE_NEGATIVE, &new->flags))
162                 set_bit(CACHE_NEGATIVE, &tmp->flags);
163         else
164                 detail->update(tmp, new);
165         tmp->next = *head;
166         *head = tmp;
167         detail->entries++;
168         cache_get(tmp);
169         is_new = cache_fresh_locked(tmp, new->expiry_time);
170         cache_fresh_locked(old, 0);
171         write_unlock(&detail->hash_lock);
172         cache_fresh_unlocked(tmp, detail, is_new);
173         cache_fresh_unlocked(old, detail, 0);
174         cache_put(old, detail);
175         return tmp;
176 }
177 EXPORT_SYMBOL_GPL(sunrpc_cache_update);
178
179 static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h);
180 /*
181  * This is the generic cache management routine for all
182  * the authentication caches.
183  * It checks the currency of a cache item and will (later)
184  * initiate an upcall to fill it if needed.
185  *
186  *
187  * Returns 0 if the cache_head can be used, or cache_puts it and returns
188  * -EAGAIN if upcall is pending,
189  * -ETIMEDOUT if upcall failed and should be retried,
190  * -ENOENT if cache entry was negative
191  */
192 int cache_check(struct cache_detail *detail,
193                     struct cache_head *h, struct cache_req *rqstp)
194 {
195         int rv;
196         long refresh_age, age;
197
198         /* First decide return status as best we can */
199         if (!test_bit(CACHE_VALID, &h->flags) ||
200             h->expiry_time < get_seconds())
201                 rv = -EAGAIN;
202         else if (detail->flush_time > h->last_refresh)
203                 rv = -EAGAIN;
204         else {
205                 /* entry is valid */
206                 if (test_bit(CACHE_NEGATIVE, &h->flags))
207                         rv = -ENOENT;
208                 else rv = 0;
209         }
210
211         /* now see if we want to start an upcall */
212         refresh_age = (h->expiry_time - h->last_refresh);
213         age = get_seconds() - h->last_refresh;
214
215         if (rqstp == NULL) {
216                 if (rv == -EAGAIN)
217                         rv = -ENOENT;
218         } else if (rv == -EAGAIN || age > refresh_age/2) {
219                 dprintk("RPC:       Want update, refage=%ld, age=%ld\n",
220                                 refresh_age, age);
221                 if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
222                         switch (cache_make_upcall(detail, h)) {
223                         case -EINVAL:
224                                 clear_bit(CACHE_PENDING, &h->flags);
225                                 if (rv == -EAGAIN) {
226                                         set_bit(CACHE_NEGATIVE, &h->flags);
227                                         cache_fresh_unlocked(h, detail,
228                                              cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY));
229                                         rv = -ENOENT;
230                                 }
231                                 break;
232
233                         case -EAGAIN:
234                                 clear_bit(CACHE_PENDING, &h->flags);
235                                 cache_revisit_request(h);
236                                 break;
237                         }
238                 }
239         }
240
241         if (rv == -EAGAIN)
242                 if (cache_defer_req(rqstp, h) != 0)
243                         rv = -ETIMEDOUT;
244
245         if (rv)
246                 cache_put(h, detail);
247         return rv;
248 }
249 EXPORT_SYMBOL_GPL(cache_check);
250
251 /*
252  * caches need to be periodically cleaned.
253  * For this we maintain a list of cache_detail and
254  * a current pointer into that list and into the table
255  * for that entry.
256  *
257  * Each time clean_cache is called it finds the next non-empty entry
258  * in the current table and walks the list in that entry
259  * looking for entries that can be removed.
260  *
261  * An entry gets removed if:
262  * - The expiry is before current time
263  * - The last_refresh time is before the flush_time for that cache
264  *
265  * later we might drop old entries with non-NEVER expiry if that table
266  * is getting 'full' for some definition of 'full'
267  *
268  * The question of "how often to scan a table" is an interesting one
269  * and is answered in part by the use of the "nextcheck" field in the
270  * cache_detail.
271  * When a scan of a table begins, the nextcheck field is set to a time
272  * that is well into the future.
273  * While scanning, if an expiry time is found that is earlier than the
274  * current nextcheck time, nextcheck is set to that expiry time.
275  * If the flush_time is ever set to a time earlier than the nextcheck
276  * time, the nextcheck time is then set to that flush_time.
277  *
278  * A table is then only scanned if the current time is at least
279  * the nextcheck time.
280  *
281  */
282
283 static LIST_HEAD(cache_list);
284 static DEFINE_SPINLOCK(cache_list_lock);
285 static struct cache_detail *current_detail;
286 static int current_index;
287
288 static const struct file_operations cache_file_operations;
289 static const struct file_operations content_file_operations;
290 static const struct file_operations cache_flush_operations;
291
292 static void do_cache_clean(struct work_struct *work);
293 static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
294
295 static void remove_cache_proc_entries(struct cache_detail *cd)
296 {
297         if (cd->proc_ent == NULL)
298                 return;
299         if (cd->flush_ent)
300                 remove_proc_entry("flush", cd->proc_ent);
301         if (cd->channel_ent)
302                 remove_proc_entry("channel", cd->proc_ent);
303         if (cd->content_ent)
304                 remove_proc_entry("content", cd->proc_ent);
305         cd->proc_ent = NULL;
306         remove_proc_entry(cd->name, proc_net_rpc);
307 }
308
309 #ifdef CONFIG_PROC_FS
310 static int create_cache_proc_entries(struct cache_detail *cd)
311 {
312         struct proc_dir_entry *p;
313
314         cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
315         if (cd->proc_ent == NULL)
316                 goto out_nomem;
317         cd->channel_ent = cd->content_ent = NULL;
318
319         p = proc_create_data("flush", S_IFREG|S_IRUSR|S_IWUSR,
320                              cd->proc_ent, &cache_flush_operations, cd);
321         cd->flush_ent = p;
322         if (p == NULL)
323                 goto out_nomem;
324
325         if (cd->cache_request || cd->cache_parse) {
326                 p = proc_create_data("channel", S_IFREG|S_IRUSR|S_IWUSR,
327                                      cd->proc_ent, &cache_file_operations, cd);
328                 cd->channel_ent = p;
329                 if (p == NULL)
330                         goto out_nomem;
331         }
332         if (cd->cache_show) {
333                 p = proc_create_data("content", S_IFREG|S_IRUSR|S_IWUSR,
334                                 cd->proc_ent, &content_file_operations, cd);
335                 cd->content_ent = p;
336                 if (p == NULL)
337                         goto out_nomem;
338         }
339         return 0;
340 out_nomem:
341         remove_cache_proc_entries(cd);
342         return -ENOMEM;
343 }
344 #else /* CONFIG_PROC_FS */
345 static int create_cache_proc_entries(struct cache_detail *cd)
346 {
347         return 0;
348 }
349 #endif
350
351 static void sunrpc_init_cache_detail(struct cache_detail *cd)
352 {
353         rwlock_init(&cd->hash_lock);
354         INIT_LIST_HEAD(&cd->queue);
355         spin_lock(&cache_list_lock);
356         cd->nextcheck = 0;
357         cd->entries = 0;
358         atomic_set(&cd->readers, 0);
359         cd->last_close = 0;
360         cd->last_warn = -1;
361         list_add(&cd->others, &cache_list);
362         spin_unlock(&cache_list_lock);
363
364         /* start the cleaning process */
365         schedule_delayed_work(&cache_cleaner, 0);
366 }
367
368 static void sunrpc_destroy_cache_detail(struct cache_detail *cd)
369 {
370         cache_purge(cd);
371         spin_lock(&cache_list_lock);
372         write_lock(&cd->hash_lock);
373         if (cd->entries || atomic_read(&cd->inuse)) {
374                 write_unlock(&cd->hash_lock);
375                 spin_unlock(&cache_list_lock);
376                 goto out;
377         }
378         if (current_detail == cd)
379                 current_detail = NULL;
380         list_del_init(&cd->others);
381         write_unlock(&cd->hash_lock);
382         spin_unlock(&cache_list_lock);
383         if (list_empty(&cache_list)) {
384                 /* module must be being unloaded so its safe to kill the worker */
385                 cancel_delayed_work_sync(&cache_cleaner);
386         }
387         return;
388 out:
389         printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
390 }
391
392 int cache_register(struct cache_detail *cd)
393 {
394         int ret;
395
396         sunrpc_init_cache_detail(cd);
397         ret = create_cache_proc_entries(cd);
398         if (ret)
399                 sunrpc_destroy_cache_detail(cd);
400         return ret;
401 }
402 EXPORT_SYMBOL_GPL(cache_register);
403
404 void cache_unregister(struct cache_detail *cd)
405 {
406         remove_cache_proc_entries(cd);
407         sunrpc_destroy_cache_detail(cd);
408 }
409 EXPORT_SYMBOL_GPL(cache_unregister);
410
411 /* clean cache tries to find something to clean
412  * and cleans it.
413  * It returns 1 if it cleaned something,
414  *            0 if it didn't find anything this time
415  *           -1 if it fell off the end of the list.
416  */
417 static int cache_clean(void)
418 {
419         int rv = 0;
420         struct list_head *next;
421
422         spin_lock(&cache_list_lock);
423
424         /* find a suitable table if we don't already have one */
425         while (current_detail == NULL ||
426             current_index >= current_detail->hash_size) {
427                 if (current_detail)
428                         next = current_detail->others.next;
429                 else
430                         next = cache_list.next;
431                 if (next == &cache_list) {
432                         current_detail = NULL;
433                         spin_unlock(&cache_list_lock);
434                         return -1;
435                 }
436                 current_detail = list_entry(next, struct cache_detail, others);
437                 if (current_detail->nextcheck > get_seconds())
438                         current_index = current_detail->hash_size;
439                 else {
440                         current_index = 0;
441                         current_detail->nextcheck = get_seconds()+30*60;
442                 }
443         }
444
445         /* find a non-empty bucket in the table */
446         while (current_detail &&
447                current_index < current_detail->hash_size &&
448                current_detail->hash_table[current_index] == NULL)
449                 current_index++;
450
451         /* find a cleanable entry in the bucket and clean it, or set to next bucket */
452
453         if (current_detail && current_index < current_detail->hash_size) {
454                 struct cache_head *ch, **cp;
455                 struct cache_detail *d;
456
457                 write_lock(&current_detail->hash_lock);
458
459                 /* Ok, now to clean this strand */
460
461                 cp = & current_detail->hash_table[current_index];
462                 ch = *cp;
463                 for (; ch; cp= & ch->next, ch= *cp) {
464                         if (current_detail->nextcheck > ch->expiry_time)
465                                 current_detail->nextcheck = ch->expiry_time+1;
466                         if (ch->expiry_time >= get_seconds()
467                             && ch->last_refresh >= current_detail->flush_time
468                                 )
469                                 continue;
470                         if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
471                                 queue_loose(current_detail, ch);
472
473                         if (atomic_read(&ch->ref.refcount) == 1)
474                                 break;
475                 }
476                 if (ch) {
477                         *cp = ch->next;
478                         ch->next = NULL;
479                         current_detail->entries--;
480                         rv = 1;
481                 }
482                 write_unlock(&current_detail->hash_lock);
483                 d = current_detail;
484                 if (!ch)
485                         current_index ++;
486                 spin_unlock(&cache_list_lock);
487                 if (ch)
488                         cache_put(ch, d);
489         } else
490                 spin_unlock(&cache_list_lock);
491
492         return rv;
493 }
494
495 /*
496  * We want to regularly clean the cache, so we need to schedule some work ...
497  */
498 static void do_cache_clean(struct work_struct *work)
499 {
500         int delay = 5;
501         if (cache_clean() == -1)
502                 delay = round_jiffies_relative(30*HZ);
503
504         if (list_empty(&cache_list))
505                 delay = 0;
506
507         if (delay)
508                 schedule_delayed_work(&cache_cleaner, delay);
509 }
510
511
512 /*
513  * Clean all caches promptly.  This just calls cache_clean
514  * repeatedly until we are sure that every cache has had a chance to
515  * be fully cleaned
516  */
517 void cache_flush(void)
518 {
519         while (cache_clean() != -1)
520                 cond_resched();
521         while (cache_clean() != -1)
522                 cond_resched();
523 }
524 EXPORT_SYMBOL_GPL(cache_flush);
525
526 void cache_purge(struct cache_detail *detail)
527 {
528         detail->flush_time = LONG_MAX;
529         detail->nextcheck = get_seconds();
530         cache_flush();
531         detail->flush_time = 1;
532 }
533 EXPORT_SYMBOL_GPL(cache_purge);
534
535
536 /*
537  * Deferral and Revisiting of Requests.
538  *
539  * If a cache lookup finds a pending entry, we
540  * need to defer the request and revisit it later.
541  * All deferred requests are stored in a hash table,
542  * indexed by "struct cache_head *".
543  * As it may be wasteful to store a whole request
544  * structure, we allow the request to provide a
545  * deferred form, which must contain a
546  * 'struct cache_deferred_req'
547  * This cache_deferred_req contains a method to allow
548  * it to be revisited when cache info is available
549  */
550
551 #define DFR_HASHSIZE    (PAGE_SIZE/sizeof(struct list_head))
552 #define DFR_HASH(item)  ((((long)item)>>4 ^ (((long)item)>>13)) % DFR_HASHSIZE)
553
554 #define DFR_MAX 300     /* ??? */
555
556 static DEFINE_SPINLOCK(cache_defer_lock);
557 static LIST_HEAD(cache_defer_list);
558 static struct list_head cache_defer_hash[DFR_HASHSIZE];
559 static int cache_defer_cnt;
560
561 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
562 {
563         struct cache_deferred_req *dreq;
564         int hash = DFR_HASH(item);
565
566         if (cache_defer_cnt >= DFR_MAX) {
567                 /* too much in the cache, randomly drop this one,
568                  * or continue and drop the oldest below
569                  */
570                 if (net_random()&1)
571                         return -ETIMEDOUT;
572         }
573         dreq = req->defer(req);
574         if (dreq == NULL)
575                 return -ETIMEDOUT;
576
577         dreq->item = item;
578
579         spin_lock(&cache_defer_lock);
580
581         list_add(&dreq->recent, &cache_defer_list);
582
583         if (cache_defer_hash[hash].next == NULL)
584                 INIT_LIST_HEAD(&cache_defer_hash[hash]);
585         list_add(&dreq->hash, &cache_defer_hash[hash]);
586
587         /* it is in, now maybe clean up */
588         dreq = NULL;
589         if (++cache_defer_cnt > DFR_MAX) {
590                 dreq = list_entry(cache_defer_list.prev,
591                                   struct cache_deferred_req, recent);
592                 list_del(&dreq->recent);
593                 list_del(&dreq->hash);
594                 cache_defer_cnt--;
595         }
596         spin_unlock(&cache_defer_lock);
597
598         if (dreq) {
599                 /* there was one too many */
600                 dreq->revisit(dreq, 1);
601         }
602         if (!test_bit(CACHE_PENDING, &item->flags)) {
603                 /* must have just been validated... */
604                 cache_revisit_request(item);
605         }
606         return 0;
607 }
608
609 static void cache_revisit_request(struct cache_head *item)
610 {
611         struct cache_deferred_req *dreq;
612         struct list_head pending;
613
614         struct list_head *lp;
615         int hash = DFR_HASH(item);
616
617         INIT_LIST_HEAD(&pending);
618         spin_lock(&cache_defer_lock);
619
620         lp = cache_defer_hash[hash].next;
621         if (lp) {
622                 while (lp != &cache_defer_hash[hash]) {
623                         dreq = list_entry(lp, struct cache_deferred_req, hash);
624                         lp = lp->next;
625                         if (dreq->item == item) {
626                                 list_del(&dreq->hash);
627                                 list_move(&dreq->recent, &pending);
628                                 cache_defer_cnt--;
629                         }
630                 }
631         }
632         spin_unlock(&cache_defer_lock);
633
634         while (!list_empty(&pending)) {
635                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
636                 list_del_init(&dreq->recent);
637                 dreq->revisit(dreq, 0);
638         }
639 }
640
641 void cache_clean_deferred(void *owner)
642 {
643         struct cache_deferred_req *dreq, *tmp;
644         struct list_head pending;
645
646
647         INIT_LIST_HEAD(&pending);
648         spin_lock(&cache_defer_lock);
649
650         list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
651                 if (dreq->owner == owner) {
652                         list_del(&dreq->hash);
653                         list_move(&dreq->recent, &pending);
654                         cache_defer_cnt--;
655                 }
656         }
657         spin_unlock(&cache_defer_lock);
658
659         while (!list_empty(&pending)) {
660                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
661                 list_del_init(&dreq->recent);
662                 dreq->revisit(dreq, 1);
663         }
664 }
665
666 /*
667  * communicate with user-space
668  *
669  * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
670  * On read, you get a full request, or block.
671  * On write, an update request is processed.
672  * Poll works if anything to read, and always allows write.
673  *
674  * Implemented by linked list of requests.  Each open file has
675  * a ->private that also exists in this list.  New requests are added
676  * to the end and may wakeup and preceding readers.
677  * New readers are added to the head.  If, on read, an item is found with
678  * CACHE_UPCALLING clear, we free it from the list.
679  *
680  */
681
682 static DEFINE_SPINLOCK(queue_lock);
683 static DEFINE_MUTEX(queue_io_mutex);
684
685 struct cache_queue {
686         struct list_head        list;
687         int                     reader; /* if 0, then request */
688 };
689 struct cache_request {
690         struct cache_queue      q;
691         struct cache_head       *item;
692         char                    * buf;
693         int                     len;
694         int                     readers;
695 };
696 struct cache_reader {
697         struct cache_queue      q;
698         int                     offset; /* if non-0, we have a refcnt on next request */
699 };
700
701 static ssize_t
702 cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
703 {
704         struct cache_reader *rp = filp->private_data;
705         struct cache_request *rq;
706         struct inode *inode = filp->f_path.dentry->d_inode;
707         struct cache_detail *cd = PDE(inode)->data;
708         int err;
709
710         if (count == 0)
711                 return 0;
712
713         mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
714                               * readers on this file */
715  again:
716         spin_lock(&queue_lock);
717         /* need to find next request */
718         while (rp->q.list.next != &cd->queue &&
719                list_entry(rp->q.list.next, struct cache_queue, list)
720                ->reader) {
721                 struct list_head *next = rp->q.list.next;
722                 list_move(&rp->q.list, next);
723         }
724         if (rp->q.list.next == &cd->queue) {
725                 spin_unlock(&queue_lock);
726                 mutex_unlock(&inode->i_mutex);
727                 BUG_ON(rp->offset);
728                 return 0;
729         }
730         rq = container_of(rp->q.list.next, struct cache_request, q.list);
731         BUG_ON(rq->q.reader);
732         if (rp->offset == 0)
733                 rq->readers++;
734         spin_unlock(&queue_lock);
735
736         if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
737                 err = -EAGAIN;
738                 spin_lock(&queue_lock);
739                 list_move(&rp->q.list, &rq->q.list);
740                 spin_unlock(&queue_lock);
741         } else {
742                 if (rp->offset + count > rq->len)
743                         count = rq->len - rp->offset;
744                 err = -EFAULT;
745                 if (copy_to_user(buf, rq->buf + rp->offset, count))
746                         goto out;
747                 rp->offset += count;
748                 if (rp->offset >= rq->len) {
749                         rp->offset = 0;
750                         spin_lock(&queue_lock);
751                         list_move(&rp->q.list, &rq->q.list);
752                         spin_unlock(&queue_lock);
753                 }
754                 err = 0;
755         }
756  out:
757         if (rp->offset == 0) {
758                 /* need to release rq */
759                 spin_lock(&queue_lock);
760                 rq->readers--;
761                 if (rq->readers == 0 &&
762                     !test_bit(CACHE_PENDING, &rq->item->flags)) {
763                         list_del(&rq->q.list);
764                         spin_unlock(&queue_lock);
765                         cache_put(rq->item, cd);
766                         kfree(rq->buf);
767                         kfree(rq);
768                 } else
769                         spin_unlock(&queue_lock);
770         }
771         if (err == -EAGAIN)
772                 goto again;
773         mutex_unlock(&inode->i_mutex);
774         return err ? err :  count;
775 }
776
777 static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
778                                  size_t count, struct cache_detail *cd)
779 {
780         ssize_t ret;
781
782         if (copy_from_user(kaddr, buf, count))
783                 return -EFAULT;
784         kaddr[count] = '\0';
785         ret = cd->cache_parse(cd, kaddr, count);
786         if (!ret)
787                 ret = count;
788         return ret;
789 }
790
791 static ssize_t cache_slow_downcall(const char __user *buf,
792                                    size_t count, struct cache_detail *cd)
793 {
794         static char write_buf[8192]; /* protected by queue_io_mutex */
795         ssize_t ret = -EINVAL;
796
797         if (count >= sizeof(write_buf))
798                 goto out;
799         mutex_lock(&queue_io_mutex);
800         ret = cache_do_downcall(write_buf, buf, count, cd);
801         mutex_unlock(&queue_io_mutex);
802 out:
803         return ret;
804 }
805
806 static ssize_t cache_downcall(struct address_space *mapping,
807                               const char __user *buf,
808                               size_t count, struct cache_detail *cd)
809 {
810         struct page *page;
811         char *kaddr;
812         ssize_t ret = -ENOMEM;
813
814         if (count >= PAGE_CACHE_SIZE)
815                 goto out_slow;
816
817         page = find_or_create_page(mapping, 0, GFP_KERNEL);
818         if (!page)
819                 goto out_slow;
820
821         kaddr = kmap(page);
822         ret = cache_do_downcall(kaddr, buf, count, cd);
823         kunmap(page);
824         unlock_page(page);
825         page_cache_release(page);
826         return ret;
827 out_slow:
828         return cache_slow_downcall(buf, count, cd);
829 }
830
831 static ssize_t
832 cache_write(struct file *filp, const char __user *buf, size_t count,
833             loff_t *ppos)
834 {
835         struct address_space *mapping = filp->f_mapping;
836         struct inode *inode = filp->f_path.dentry->d_inode;
837         struct cache_detail *cd = PDE(inode)->data;
838         ssize_t ret = -EINVAL;
839
840         if (!cd->cache_parse)
841                 goto out;
842
843         mutex_lock(&inode->i_mutex);
844         ret = cache_downcall(mapping, buf, count, cd);
845         mutex_unlock(&inode->i_mutex);
846 out:
847         return ret;
848 }
849
850 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
851
852 static unsigned int
853 cache_poll(struct file *filp, poll_table *wait)
854 {
855         unsigned int mask;
856         struct cache_reader *rp = filp->private_data;
857         struct cache_queue *cq;
858         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
859
860         poll_wait(filp, &queue_wait, wait);
861
862         /* alway allow write */
863         mask = POLL_OUT | POLLWRNORM;
864
865         if (!rp)
866                 return mask;
867
868         spin_lock(&queue_lock);
869
870         for (cq= &rp->q; &cq->list != &cd->queue;
871              cq = list_entry(cq->list.next, struct cache_queue, list))
872                 if (!cq->reader) {
873                         mask |= POLLIN | POLLRDNORM;
874                         break;
875                 }
876         spin_unlock(&queue_lock);
877         return mask;
878 }
879
880 static int
881 cache_ioctl(struct inode *ino, struct file *filp,
882             unsigned int cmd, unsigned long arg)
883 {
884         int len = 0;
885         struct cache_reader *rp = filp->private_data;
886         struct cache_queue *cq;
887         struct cache_detail *cd = PDE(ino)->data;
888
889         if (cmd != FIONREAD || !rp)
890                 return -EINVAL;
891
892         spin_lock(&queue_lock);
893
894         /* only find the length remaining in current request,
895          * or the length of the next request
896          */
897         for (cq= &rp->q; &cq->list != &cd->queue;
898              cq = list_entry(cq->list.next, struct cache_queue, list))
899                 if (!cq->reader) {
900                         struct cache_request *cr =
901                                 container_of(cq, struct cache_request, q);
902                         len = cr->len - rp->offset;
903                         break;
904                 }
905         spin_unlock(&queue_lock);
906
907         return put_user(len, (int __user *)arg);
908 }
909
910 static int
911 cache_open(struct inode *inode, struct file *filp)
912 {
913         struct cache_reader *rp = NULL;
914
915         nonseekable_open(inode, filp);
916         if (filp->f_mode & FMODE_READ) {
917                 struct cache_detail *cd = PDE(inode)->data;
918
919                 rp = kmalloc(sizeof(*rp), GFP_KERNEL);
920                 if (!rp)
921                         return -ENOMEM;
922                 rp->offset = 0;
923                 rp->q.reader = 1;
924                 atomic_inc(&cd->readers);
925                 spin_lock(&queue_lock);
926                 list_add(&rp->q.list, &cd->queue);
927                 spin_unlock(&queue_lock);
928         }
929         filp->private_data = rp;
930         return 0;
931 }
932
933 static int
934 cache_release(struct inode *inode, struct file *filp)
935 {
936         struct cache_reader *rp = filp->private_data;
937         struct cache_detail *cd = PDE(inode)->data;
938
939         if (rp) {
940                 spin_lock(&queue_lock);
941                 if (rp->offset) {
942                         struct cache_queue *cq;
943                         for (cq= &rp->q; &cq->list != &cd->queue;
944                              cq = list_entry(cq->list.next, struct cache_queue, list))
945                                 if (!cq->reader) {
946                                         container_of(cq, struct cache_request, q)
947                                                 ->readers--;
948                                         break;
949                                 }
950                         rp->offset = 0;
951                 }
952                 list_del(&rp->q.list);
953                 spin_unlock(&queue_lock);
954
955                 filp->private_data = NULL;
956                 kfree(rp);
957
958                 cd->last_close = get_seconds();
959                 atomic_dec(&cd->readers);
960         }
961         return 0;
962 }
963
964
965
966 static const struct file_operations cache_file_operations = {
967         .owner          = THIS_MODULE,
968         .llseek         = no_llseek,
969         .read           = cache_read,
970         .write          = cache_write,
971         .poll           = cache_poll,
972         .ioctl          = cache_ioctl, /* for FIONREAD */
973         .open           = cache_open,
974         .release        = cache_release,
975 };
976
977
978 static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
979 {
980         struct cache_queue *cq;
981         spin_lock(&queue_lock);
982         list_for_each_entry(cq, &detail->queue, list)
983                 if (!cq->reader) {
984                         struct cache_request *cr = container_of(cq, struct cache_request, q);
985                         if (cr->item != ch)
986                                 continue;
987                         if (cr->readers != 0)
988                                 continue;
989                         list_del(&cr->q.list);
990                         spin_unlock(&queue_lock);
991                         cache_put(cr->item, detail);
992                         kfree(cr->buf);
993                         kfree(cr);
994                         return;
995                 }
996         spin_unlock(&queue_lock);
997 }
998
999 /*
1000  * Support routines for text-based upcalls.
1001  * Fields are separated by spaces.
1002  * Fields are either mangled to quote space tab newline slosh with slosh
1003  * or a hexified with a leading \x
1004  * Record is terminated with newline.
1005  *
1006  */
1007
1008 void qword_add(char **bpp, int *lp, char *str)
1009 {
1010         char *bp = *bpp;
1011         int len = *lp;
1012         char c;
1013
1014         if (len < 0) return;
1015
1016         while ((c=*str++) && len)
1017                 switch(c) {
1018                 case ' ':
1019                 case '\t':
1020                 case '\n':
1021                 case '\\':
1022                         if (len >= 4) {
1023                                 *bp++ = '\\';
1024                                 *bp++ = '0' + ((c & 0300)>>6);
1025                                 *bp++ = '0' + ((c & 0070)>>3);
1026                                 *bp++ = '0' + ((c & 0007)>>0);
1027                         }
1028                         len -= 4;
1029                         break;
1030                 default:
1031                         *bp++ = c;
1032                         len--;
1033                 }
1034         if (c || len <1) len = -1;
1035         else {
1036                 *bp++ = ' ';
1037                 len--;
1038         }
1039         *bpp = bp;
1040         *lp = len;
1041 }
1042 EXPORT_SYMBOL_GPL(qword_add);
1043
1044 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
1045 {
1046         char *bp = *bpp;
1047         int len = *lp;
1048
1049         if (len < 0) return;
1050
1051         if (len > 2) {
1052                 *bp++ = '\\';
1053                 *bp++ = 'x';
1054                 len -= 2;
1055                 while (blen && len >= 2) {
1056                         unsigned char c = *buf++;
1057                         *bp++ = '0' + ((c&0xf0)>>4) + (c>=0xa0)*('a'-'9'-1);
1058                         *bp++ = '0' + (c&0x0f) + ((c&0x0f)>=0x0a)*('a'-'9'-1);
1059                         len -= 2;
1060                         blen--;
1061                 }
1062         }
1063         if (blen || len<1) len = -1;
1064         else {
1065                 *bp++ = ' ';
1066                 len--;
1067         }
1068         *bpp = bp;
1069         *lp = len;
1070 }
1071 EXPORT_SYMBOL_GPL(qword_addhex);
1072
1073 static void warn_no_listener(struct cache_detail *detail)
1074 {
1075         if (detail->last_warn != detail->last_close) {
1076                 detail->last_warn = detail->last_close;
1077                 if (detail->warn_no_listener)
1078                         detail->warn_no_listener(detail, detail->last_close != 0);
1079         }
1080 }
1081
1082 /*
1083  * register an upcall request to user-space.
1084  * Each request is at most one page long.
1085  */
1086 static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
1087 {
1088
1089         char *buf;
1090         struct cache_request *crq;
1091         char *bp;
1092         int len;
1093
1094         if (detail->cache_request == NULL)
1095                 return -EINVAL;
1096
1097         if (atomic_read(&detail->readers) == 0 &&
1098             detail->last_close < get_seconds() - 30) {
1099                         warn_no_listener(detail);
1100                         return -EINVAL;
1101         }
1102
1103         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1104         if (!buf)
1105                 return -EAGAIN;
1106
1107         crq = kmalloc(sizeof (*crq), GFP_KERNEL);
1108         if (!crq) {
1109                 kfree(buf);
1110                 return -EAGAIN;
1111         }
1112
1113         bp = buf; len = PAGE_SIZE;
1114
1115         detail->cache_request(detail, h, &bp, &len);
1116
1117         if (len < 0) {
1118                 kfree(buf);
1119                 kfree(crq);
1120                 return -EAGAIN;
1121         }
1122         crq->q.reader = 0;
1123         crq->item = cache_get(h);
1124         crq->buf = buf;
1125         crq->len = PAGE_SIZE - len;
1126         crq->readers = 0;
1127         spin_lock(&queue_lock);
1128         list_add_tail(&crq->q.list, &detail->queue);
1129         spin_unlock(&queue_lock);
1130         wake_up(&queue_wait);
1131         return 0;
1132 }
1133
1134 /*
1135  * parse a message from user-space and pass it
1136  * to an appropriate cache
1137  * Messages are, like requests, separated into fields by
1138  * spaces and dequotes as \xHEXSTRING or embedded \nnn octal
1139  *
1140  * Message is
1141  *   reply cachename expiry key ... content....
1142  *
1143  * key and content are both parsed by cache
1144  */
1145
1146 #define isodigit(c) (isdigit(c) && c <= '7')
1147 int qword_get(char **bpp, char *dest, int bufsize)
1148 {
1149         /* return bytes copied, or -1 on error */
1150         char *bp = *bpp;
1151         int len = 0;
1152
1153         while (*bp == ' ') bp++;
1154
1155         if (bp[0] == '\\' && bp[1] == 'x') {
1156                 /* HEX STRING */
1157                 bp += 2;
1158                 while (isxdigit(bp[0]) && isxdigit(bp[1]) && len < bufsize) {
1159                         int byte = isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1160                         bp++;
1161                         byte <<= 4;
1162                         byte |= isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1163                         *dest++ = byte;
1164                         bp++;
1165                         len++;
1166                 }
1167         } else {
1168                 /* text with \nnn octal quoting */
1169                 while (*bp != ' ' && *bp != '\n' && *bp && len < bufsize-1) {
1170                         if (*bp == '\\' &&
1171                             isodigit(bp[1]) && (bp[1] <= '3') &&
1172                             isodigit(bp[2]) &&
1173                             isodigit(bp[3])) {
1174                                 int byte = (*++bp -'0');
1175                                 bp++;
1176                                 byte = (byte << 3) | (*bp++ - '0');
1177                                 byte = (byte << 3) | (*bp++ - '0');
1178                                 *dest++ = byte;
1179                                 len++;
1180                         } else {
1181                                 *dest++ = *bp++;
1182                                 len++;
1183                         }
1184                 }
1185         }
1186
1187         if (*bp != ' ' && *bp != '\n' && *bp != '\0')
1188                 return -1;
1189         while (*bp == ' ') bp++;
1190         *bpp = bp;
1191         *dest = '\0';
1192         return len;
1193 }
1194 EXPORT_SYMBOL_GPL(qword_get);
1195
1196
1197 /*
1198  * support /proc/sunrpc/cache/$CACHENAME/content
1199  * as a seqfile.
1200  * We call ->cache_show passing NULL for the item to
1201  * get a header, then pass each real item in the cache
1202  */
1203
1204 struct handle {
1205         struct cache_detail *cd;
1206 };
1207
1208 static void *c_start(struct seq_file *m, loff_t *pos)
1209         __acquires(cd->hash_lock)
1210 {
1211         loff_t n = *pos;
1212         unsigned hash, entry;
1213         struct cache_head *ch;
1214         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1215
1216
1217         read_lock(&cd->hash_lock);
1218         if (!n--)
1219                 return SEQ_START_TOKEN;
1220         hash = n >> 32;
1221         entry = n & ((1LL<<32) - 1);
1222
1223         for (ch=cd->hash_table[hash]; ch; ch=ch->next)
1224                 if (!entry--)
1225                         return ch;
1226         n &= ~((1LL<<32) - 1);
1227         do {
1228                 hash++;
1229                 n += 1LL<<32;
1230         } while(hash < cd->hash_size &&
1231                 cd->hash_table[hash]==NULL);
1232         if (hash >= cd->hash_size)
1233                 return NULL;
1234         *pos = n+1;
1235         return cd->hash_table[hash];
1236 }
1237
1238 static void *c_next(struct seq_file *m, void *p, loff_t *pos)
1239 {
1240         struct cache_head *ch = p;
1241         int hash = (*pos >> 32);
1242         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1243
1244         if (p == SEQ_START_TOKEN)
1245                 hash = 0;
1246         else if (ch->next == NULL) {
1247                 hash++;
1248                 *pos += 1LL<<32;
1249         } else {
1250                 ++*pos;
1251                 return ch->next;
1252         }
1253         *pos &= ~((1LL<<32) - 1);
1254         while (hash < cd->hash_size &&
1255                cd->hash_table[hash] == NULL) {
1256                 hash++;
1257                 *pos += 1LL<<32;
1258         }
1259         if (hash >= cd->hash_size)
1260                 return NULL;
1261         ++*pos;
1262         return cd->hash_table[hash];
1263 }
1264
1265 static void c_stop(struct seq_file *m, void *p)
1266         __releases(cd->hash_lock)
1267 {
1268         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1269         read_unlock(&cd->hash_lock);
1270 }
1271
1272 static int c_show(struct seq_file *m, void *p)
1273 {
1274         struct cache_head *cp = p;
1275         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1276
1277         if (p == SEQ_START_TOKEN)
1278                 return cd->cache_show(m, cd, NULL);
1279
1280         ifdebug(CACHE)
1281                 seq_printf(m, "# expiry=%ld refcnt=%d flags=%lx\n",
1282                            cp->expiry_time, atomic_read(&cp->ref.refcount), cp->flags);
1283         cache_get(cp);
1284         if (cache_check(cd, cp, NULL))
1285                 /* cache_check does a cache_put on failure */
1286                 seq_printf(m, "# ");
1287         else
1288                 cache_put(cp, cd);
1289
1290         return cd->cache_show(m, cd, cp);
1291 }
1292
1293 static const struct seq_operations cache_content_op = {
1294         .start  = c_start,
1295         .next   = c_next,
1296         .stop   = c_stop,
1297         .show   = c_show,
1298 };
1299
1300 static int content_open(struct inode *inode, struct file *file)
1301 {
1302         struct handle *han;
1303         struct cache_detail *cd = PDE(inode)->data;
1304
1305         han = __seq_open_private(file, &cache_content_op, sizeof(*han));
1306         if (han == NULL)
1307                 return -ENOMEM;
1308
1309         han->cd = cd;
1310         return 0;
1311 }
1312
1313 static const struct file_operations content_file_operations = {
1314         .open           = content_open,
1315         .read           = seq_read,
1316         .llseek         = seq_lseek,
1317         .release        = seq_release_private,
1318 };
1319
1320 static ssize_t read_flush(struct file *file, char __user *buf,
1321                             size_t count, loff_t *ppos)
1322 {
1323         struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
1324         char tbuf[20];
1325         unsigned long p = *ppos;
1326         size_t len;
1327
1328         sprintf(tbuf, "%lu\n", cd->flush_time);
1329         len = strlen(tbuf);
1330         if (p >= len)
1331                 return 0;
1332         len -= p;
1333         if (len > count)
1334                 len = count;
1335         if (copy_to_user(buf, (void*)(tbuf+p), len))
1336                 return -EFAULT;
1337         *ppos += len;
1338         return len;
1339 }
1340
1341 static ssize_t write_flush(struct file * file, const char __user * buf,
1342                              size_t count, loff_t *ppos)
1343 {
1344         struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
1345         char tbuf[20];
1346         char *ep;
1347         long flushtime;
1348         if (*ppos || count > sizeof(tbuf)-1)
1349                 return -EINVAL;
1350         if (copy_from_user(tbuf, buf, count))
1351                 return -EFAULT;
1352         tbuf[count] = 0;
1353         flushtime = simple_strtoul(tbuf, &ep, 0);
1354         if (*ep && *ep != '\n')
1355                 return -EINVAL;
1356
1357         cd->flush_time = flushtime;
1358         cd->nextcheck = get_seconds();
1359         cache_flush();
1360
1361         *ppos += count;
1362         return count;
1363 }
1364
1365 static const struct file_operations cache_flush_operations = {
1366         .open           = nonseekable_open,
1367         .read           = read_flush,
1368         .write          = write_flush,
1369 };