Merge commit 'v2.6.34-rc6'
[safe/jmp/linux-2.6] / net / sunrpc / cache.c
index dcaa0c4..a3f340c 100644 (file)
 #include <linux/proc_fs.h>
 #include <linux/net.h>
 #include <linux/workqueue.h>
+#include <linux/mutex.h>
+#include <linux/pagemap.h>
 #include <asm/ioctls.h>
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/cache.h>
 #include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/rpc_pipe_fs.h>
 
 #define         RPCDBG_FACILITY RPCDBG_CACHE
 
-static void cache_defer_req(struct cache_req *req, struct cache_head *item);
+static int cache_defer_req(struct cache_req *req, struct cache_head *item);
 static void cache_revisit_request(struct cache_head *item);
 
-void cache_init(struct cache_head *h)
+static void cache_init(struct cache_head *h)
 {
        time_t now = get_seconds();
        h->next = NULL;
        h->flags = 0;
-       atomic_set(&h->refcnt, 1);
+       kref_init(&h->ref);
        h->expiry_time = now + CACHE_NEW_EXPIRY;
        h->last_refresh = now;
 }
 
+static inline int cache_is_expired(struct cache_detail *detail, struct cache_head *h)
+{
+       return  (h->expiry_time < get_seconds()) ||
+               (detail->flush_time > h->last_refresh);
+}
+
+struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
+                                      struct cache_head *key, int hash)
+{
+       struct cache_head **head,  **hp;
+       struct cache_head *new = NULL, *freeme = NULL;
+
+       head = &detail->hash_table[hash];
+
+       read_lock(&detail->hash_lock);
+
+       for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
+               struct cache_head *tmp = *hp;
+               if (detail->match(tmp, key)) {
+                       if (cache_is_expired(detail, tmp))
+                               /* This entry is expired, we will discard it. */
+                               break;
+                       cache_get(tmp);
+                       read_unlock(&detail->hash_lock);
+                       return tmp;
+               }
+       }
+       read_unlock(&detail->hash_lock);
+       /* Didn't find anything, insert an empty entry */
+
+       new = detail->alloc();
+       if (!new)
+               return NULL;
+       /* must fully initialise 'new', else
+        * we might get lose if we need to
+        * cache_put it soon.
+        */
+       cache_init(new);
+       detail->init(new, key);
+
+       write_lock(&detail->hash_lock);
+
+       /* check if entry appeared while we slept */
+       for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
+               struct cache_head *tmp = *hp;
+               if (detail->match(tmp, key)) {
+                       if (cache_is_expired(detail, tmp)) {
+                               *hp = tmp->next;
+                               tmp->next = NULL;
+                               detail->entries --;
+                               freeme = tmp;
+                               break;
+                       }
+                       cache_get(tmp);
+                       write_unlock(&detail->hash_lock);
+                       cache_put(new, detail);
+                       return tmp;
+               }
+       }
+       new->next = *head;
+       *head = new;
+       detail->entries++;
+       cache_get(new);
+       write_unlock(&detail->hash_lock);
+
+       if (freeme)
+               cache_put(freeme, detail);
+       return new;
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
+
+
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
+
+static void cache_fresh_locked(struct cache_head *head, time_t expiry)
+{
+       head->expiry_time = expiry;
+       head->last_refresh = get_seconds();
+       set_bit(CACHE_VALID, &head->flags);
+}
+
+static void cache_fresh_unlocked(struct cache_head *head,
+                                struct cache_detail *detail)
+{
+       if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
+               cache_revisit_request(head);
+               cache_dequeue(detail, head);
+       }
+}
+
+struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
+                                      struct cache_head *new, struct cache_head *old, int hash)
+{
+       /* The 'old' entry is to be replaced by 'new'.
+        * If 'old' is not VALID, we update it directly,
+        * otherwise we need to replace it
+        */
+       struct cache_head **head;
+       struct cache_head *tmp;
+
+       if (!test_bit(CACHE_VALID, &old->flags)) {
+               write_lock(&detail->hash_lock);
+               if (!test_bit(CACHE_VALID, &old->flags)) {
+                       if (test_bit(CACHE_NEGATIVE, &new->flags))
+                               set_bit(CACHE_NEGATIVE, &old->flags);
+                       else
+                               detail->update(old, new);
+                       cache_fresh_locked(old, new->expiry_time);
+                       write_unlock(&detail->hash_lock);
+                       cache_fresh_unlocked(old, detail);
+                       return old;
+               }
+               write_unlock(&detail->hash_lock);
+       }
+       /* We need to insert a new entry */
+       tmp = detail->alloc();
+       if (!tmp) {
+               cache_put(old, detail);
+               return NULL;
+       }
+       cache_init(tmp);
+       detail->init(tmp, old);
+       head = &detail->hash_table[hash];
+
+       write_lock(&detail->hash_lock);
+       if (test_bit(CACHE_NEGATIVE, &new->flags))
+               set_bit(CACHE_NEGATIVE, &tmp->flags);
+       else
+               detail->update(tmp, new);
+       tmp->next = *head;
+       *head = tmp;
+       detail->entries++;
+       cache_get(tmp);
+       cache_fresh_locked(tmp, new->expiry_time);
+       cache_fresh_locked(old, 0);
+       write_unlock(&detail->hash_lock);
+       cache_fresh_unlocked(tmp, detail);
+       cache_fresh_unlocked(old, detail);
+       cache_put(old, detail);
+       return tmp;
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_update);
+
+static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
+{
+       if (!cd->cache_upcall)
+               return -EINVAL;
+       return cd->cache_upcall(cd, h);
+}
+
+static inline int cache_is_valid(struct cache_detail *detail, struct cache_head *h)
+{
+       if (!test_bit(CACHE_VALID, &h->flags))
+               return -EAGAIN;
+       else {
+               /* entry is valid */
+               if (test_bit(CACHE_NEGATIVE, &h->flags))
+                       return -ENOENT;
+               else
+                       return 0;
+       }
+}
 
-static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h);
 /*
  * This is the generic cache management routine for all
  * the authentication caches.
@@ -56,7 +220,10 @@ static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h);
  *
  *
  * Returns 0 if the cache_head can be used, or cache_puts it and returns
- * -EAGAIN if upcall is pending,
+ * -EAGAIN if upcall is pending and request has been queued
+ * -ETIMEDOUT if upcall failed or request could not be queue or
+ *           upcall completed but item is still invalid (implying that
+ *           the cache item has been replaced with a newer one).
  * -ENOENT if cache entry was negative
  */
 int cache_check(struct cache_detail *detail,
@@ -66,17 +233,7 @@ int cache_check(struct cache_detail *detail,
        long refresh_age, age;
 
        /* First decide return status as best we can */
-       if (!test_bit(CACHE_VALID, &h->flags) ||
-           h->expiry_time < get_seconds())
-               rv = -EAGAIN;
-       else if (detail->flush_time > h->last_refresh)
-               rv = -EAGAIN;
-       else {
-               /* entry is valid */
-               if (test_bit(CACHE_NEGATIVE, &h->flags))
-                       rv = -ENOENT;
-               else rv = 0;
-       }
+       rv = cache_is_valid(detail, h);
 
        /* now see if we want to start an upcall */
        refresh_age = (h->expiry_time - h->last_refresh);
@@ -86,14 +243,17 @@ int cache_check(struct cache_detail *detail,
                if (rv == -EAGAIN)
                        rv = -ENOENT;
        } else if (rv == -EAGAIN || age > refresh_age/2) {
-               dprintk("Want update, refage=%ld, age=%ld\n", refresh_age, age);
+               dprintk("RPC:       Want update, refage=%ld, age=%ld\n",
+                               refresh_age, age);
                if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
                        switch (cache_make_upcall(detail, h)) {
                        case -EINVAL:
                                clear_bit(CACHE_PENDING, &h->flags);
+                               cache_revisit_request(h);
                                if (rv == -EAGAIN) {
                                        set_bit(CACHE_NEGATIVE, &h->flags);
-                                       cache_fresh(detail, h, get_seconds()+CACHE_NEW_EXPIRY);
+                                       cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY);
+                                       cache_fresh_unlocked(h, detail);
                                        rv = -ENOENT;
                                }
                                break;
@@ -106,27 +266,19 @@ int cache_check(struct cache_detail *detail,
                }
        }
 
-       if (rv == -EAGAIN)
-               cache_defer_req(rqstp, h);
-
-       if (rv && h)
-               detail->cache_put(h, detail);
+       if (rv == -EAGAIN) {
+               if (cache_defer_req(rqstp, h) < 0) {
+                       /* Request is not deferred */
+                       rv = cache_is_valid(detail, h);
+                       if (rv == -EAGAIN)
+                               rv = -ETIMEDOUT;
+               }
+       }
+       if (rv)
+               cache_put(h, detail);
        return rv;
 }
-
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch);
-
-void cache_fresh(struct cache_detail *detail,
-                struct cache_head *head, time_t expiry)
-{
-
-       head->expiry_time = expiry;
-       head->last_refresh = get_seconds();
-       if (!test_and_set_bit(CACHE_VALID, &head->flags))
-               cache_revisit_request(head);
-       if (test_and_clear_bit(CACHE_PENDING, &head->flags))
-               queue_loose(detail, head);
-}
+EXPORT_SYMBOL_GPL(cache_check);
 
 /*
  * caches need to be periodically cleaned.
@@ -157,7 +309,7 @@ void cache_fresh(struct cache_detail *detail,
  *
  * A table is then only scanned if the current time is at least
  * the nextcheck time.
- * 
+ *
  */
 
 static LIST_HEAD(cache_list);
@@ -165,51 +317,11 @@ static DEFINE_SPINLOCK(cache_list_lock);
 static struct cache_detail *current_detail;
 static int current_index;
 
-static struct file_operations cache_file_operations;
-static struct file_operations content_file_operations;
-static struct file_operations cache_flush_operations;
-
-static void do_cache_clean(void *data);
-static DECLARE_WORK(cache_cleaner, do_cache_clean, NULL);
-
-void cache_register(struct cache_detail *cd)
-{
-       cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
-       if (cd->proc_ent) {
-               struct proc_dir_entry *p;
-               cd->proc_ent->owner = cd->owner;
-               cd->channel_ent = cd->content_ent = NULL;
-               
-               p = create_proc_entry("flush", S_IFREG|S_IRUSR|S_IWUSR,
-                                     cd->proc_ent);
-               cd->flush_ent =  p;
-               if (p) {
-                       p->proc_fops = &cache_flush_operations;
-                       p->owner = cd->owner;
-                       p->data = cd;
-               }
-               if (cd->cache_request || cd->cache_parse) {
-                       p = create_proc_entry("channel", S_IFREG|S_IRUSR|S_IWUSR,
-                                             cd->proc_ent);
-                       cd->channel_ent = p;
-                       if (p) {
-                               p->proc_fops = &cache_file_operations;
-                               p->owner = cd->owner;
-                               p->data = cd;
-                       }
-               }
-               if (cd->cache_show) {
-                       p = create_proc_entry("content", S_IFREG|S_IRUSR|S_IWUSR,
-                                             cd->proc_ent);
-                       cd->content_ent = p;
-                       if (p) {
-                               p->proc_fops = &content_file_operations;
-                               p->owner = cd->owner;
-                               p->data = cd;
-                       }
-               }
-       }
+static void do_cache_clean(struct work_struct *work);
+static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
+
+static void sunrpc_init_cache_detail(struct cache_detail *cd)
+{
        rwlock_init(&cd->hash_lock);
        INIT_LIST_HEAD(&cd->queue);
        spin_lock(&cache_list_lock);
@@ -222,10 +334,10 @@ void cache_register(struct cache_detail *cd)
        spin_unlock(&cache_list_lock);
 
        /* start the cleaning process */
-       schedule_work(&cache_cleaner);
+       schedule_delayed_work(&cache_cleaner, 0);
 }
 
-int cache_unregister(struct cache_detail *cd)
+static void sunrpc_destroy_cache_detail(struct cache_detail *cd)
 {
        cache_purge(cd);
        spin_lock(&cache_list_lock);
@@ -233,30 +345,20 @@ int cache_unregister(struct cache_detail *cd)
        if (cd->entries || atomic_read(&cd->inuse)) {
                write_unlock(&cd->hash_lock);
                spin_unlock(&cache_list_lock);
-               return -EBUSY;
+               goto out;
        }
        if (current_detail == cd)
                current_detail = NULL;
        list_del_init(&cd->others);
        write_unlock(&cd->hash_lock);
        spin_unlock(&cache_list_lock);
-       if (cd->proc_ent) {
-               if (cd->flush_ent)
-                       remove_proc_entry("flush", cd->proc_ent);
-               if (cd->channel_ent)
-                       remove_proc_entry("channel", cd->proc_ent);
-               if (cd->content_ent)
-                       remove_proc_entry("content", cd->proc_ent);
-
-               cd->proc_ent = NULL;
-               remove_proc_entry(cd->name, proc_net_rpc);
-       }
        if (list_empty(&cache_list)) {
                /* module must be being unloaded so its safe to kill the worker */
-               cancel_delayed_work(&cache_cleaner);
-               flush_scheduled_work();
+               cancel_delayed_work_sync(&cache_cleaner);
        }
-       return 0;
+       return;
+out:
+       printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
 }
 
 /* clean cache tries to find something to clean
@@ -300,43 +402,40 @@ static int cache_clean(void)
                current_index++;
 
        /* find a cleanable entry in the bucket and clean it, or set to next bucket */
-       
+
        if (current_detail && current_index < current_detail->hash_size) {
                struct cache_head *ch, **cp;
                struct cache_detail *d;
-               
+
                write_lock(&current_detail->hash_lock);
 
                /* Ok, now to clean this strand */
-                       
+
                cp = & current_detail->hash_table[current_index];
-               ch = *cp;
-               for (; ch; cp= & ch->next, ch= *cp) {
+               for (ch = *cp ; ch ; cp = & ch->next, ch = *cp) {
                        if (current_detail->nextcheck > ch->expiry_time)
                                current_detail->nextcheck = ch->expiry_time+1;
-                       if (ch->expiry_time >= get_seconds()
-                           && ch->last_refresh >= current_detail->flush_time
-                               )
+                       if (!cache_is_expired(current_detail, ch))
                                continue;
-                       if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
-                               queue_loose(current_detail, ch);
 
-                       if (atomic_read(&ch->refcnt) == 1)
-                               break;
-               }
-               if (ch) {
                        *cp = ch->next;
                        ch->next = NULL;
                        current_detail->entries--;
                        rv = 1;
+                       break;
                }
+
                write_unlock(&current_detail->hash_lock);
                d = current_detail;
                if (!ch)
                        current_index ++;
                spin_unlock(&cache_list_lock);
-               if (ch)
-                       d->cache_put(ch, d);
+               if (ch) {
+                       if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
+                               cache_dequeue(current_detail, ch);
+                       cache_revisit_request(ch);
+                       cache_put(ch, d);
+               }
        } else
                spin_unlock(&cache_list_lock);
 
@@ -346,11 +445,11 @@ static int cache_clean(void)
 /*
  * We want to regularly clean the cache, so we need to schedule some work ...
  */
-static void do_cache_clean(void *data)
+static void do_cache_clean(struct work_struct *work)
 {
        int delay = 5;
        if (cache_clean() == -1)
-               delay = 30*HZ;
+               delay = round_jiffies_relative(30*HZ);
 
        if (list_empty(&cache_list))
                delay = 0;
@@ -360,9 +459,9 @@ static void do_cache_clean(void *data)
 }
 
 
-/* 
+/*
  * Clean all caches promptly.  This just calls cache_clean
- * repeatedly until we are sure that every cache has had a chance to 
+ * repeatedly until we are sure that every cache has had a chance to
  * be fully cleaned
  */
 void cache_flush(void)
@@ -372,6 +471,7 @@ void cache_flush(void)
        while (cache_clean() != -1)
                cond_resched();
 }
+EXPORT_SYMBOL_GPL(cache_flush);
 
 void cache_purge(struct cache_detail *detail)
 {
@@ -380,7 +480,7 @@ void cache_purge(struct cache_detail *detail)
        cache_flush();
        detail->flush_time = 1;
 }
-
+EXPORT_SYMBOL_GPL(cache_purge);
 
 
 /*
@@ -391,7 +491,7 @@ void cache_purge(struct cache_detail *detail)
  * All deferred requests are stored in a hash table,
  * indexed by "struct cache_head *".
  * As it may be wasteful to store a whole request
- * structure, we allow the request to provide a 
+ * structure, we allow the request to provide a
  * deferred form, which must contain a
  * 'struct cache_deferred_req'
  * This cache_deferred_req contains a method to allow
@@ -408,17 +508,23 @@ static LIST_HEAD(cache_defer_list);
 static struct list_head cache_defer_hash[DFR_HASHSIZE];
 static int cache_defer_cnt;
 
-static void cache_defer_req(struct cache_req *req, struct cache_head *item)
+static int cache_defer_req(struct cache_req *req, struct cache_head *item)
 {
-       struct cache_deferred_req *dreq;
+       struct cache_deferred_req *dreq, *discard;
        int hash = DFR_HASH(item);
 
+       if (cache_defer_cnt >= DFR_MAX) {
+               /* too much in the cache, randomly drop this one,
+                * or continue and drop the oldest below
+                */
+               if (net_random()&1)
+                       return -ENOMEM;
+       }
        dreq = req->defer(req);
        if (dreq == NULL)
-               return;
+               return -ENOMEM;
 
        dreq->item = item;
-       dreq->recv_time = get_seconds();
 
        spin_lock(&cache_defer_lock);
 
@@ -429,33 +535,26 @@ static void cache_defer_req(struct cache_req *req, struct cache_head *item)
        list_add(&dreq->hash, &cache_defer_hash[hash]);
 
        /* it is in, now maybe clean up */
-       dreq = NULL;
+       discard = NULL;
        if (++cache_defer_cnt > DFR_MAX) {
-               /* too much in the cache, randomly drop
-                * first or last
-                */
-               if (net_random()&1) 
-                       dreq = list_entry(cache_defer_list.next,
-                                         struct cache_deferred_req,
-                                         recent);
-               else
-                       dreq = list_entry(cache_defer_list.prev,
-                                         struct cache_deferred_req,
-                                         recent);
-               list_del(&dreq->recent);
-               list_del(&dreq->hash);
+               discard = list_entry(cache_defer_list.prev,
+                                    struct cache_deferred_req, recent);
+               list_del_init(&discard->recent);
+               list_del_init(&discard->hash);
                cache_defer_cnt--;
        }
        spin_unlock(&cache_defer_lock);
 
-       if (dreq) {
+       if (discard)
                /* there was one too many */
-               dreq->revisit(dreq, 1);
-       }
-       if (test_bit(CACHE_VALID, &item->flags)) {
+               discard->revisit(discard, 1);
+
+       if (!test_bit(CACHE_PENDING, &item->flags)) {
                /* must have just been validated... */
                cache_revisit_request(item);
+               return -EAGAIN;
        }
+       return 0;
 }
 
 static void cache_revisit_request(struct cache_head *item)
@@ -468,14 +567,14 @@ static void cache_revisit_request(struct cache_head *item)
 
        INIT_LIST_HEAD(&pending);
        spin_lock(&cache_defer_lock);
-       
+
        lp = cache_defer_hash[hash].next;
        if (lp) {
                while (lp != &cache_defer_hash[hash]) {
                        dreq = list_entry(lp, struct cache_deferred_req, hash);
                        lp = lp->next;
                        if (dreq->item == item) {
-                               list_del(&dreq->hash);
+                               list_del_init(&dreq->hash);
                                list_move(&dreq->recent, &pending);
                                cache_defer_cnt--;
                        }
@@ -498,10 +597,10 @@ void cache_clean_deferred(void *owner)
 
        INIT_LIST_HEAD(&pending);
        spin_lock(&cache_defer_lock);
-       
+
        list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
                if (dreq->owner == owner) {
-                       list_del(&dreq->hash);
+                       list_del_init(&dreq->hash);
                        list_move(&dreq->recent, &pending);
                        cache_defer_cnt--;
                }
@@ -518,13 +617,13 @@ void cache_clean_deferred(void *owner)
 /*
  * communicate with user-space
  *
- * We have a magic /proc file - /proc/sunrpc/cache
- * On read, you get a full request, or block
- * On write, an update request is processed
- * Poll works if anything to read, and always allows write
+ * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
+ * On read, you get a full request, or block.
+ * On write, an update request is processed.
+ * Poll works if anything to read, and always allows write.
  *
- * Implemented by linked list of requests.  Each open file has 
- * a ->private that also exists in this list.  New request are added
+ * Implemented by linked list of requests.  Each open file has
+ * a ->private that also exists in this list.  New requests are added
  * to the end and may wakeup and preceding readers.
  * New readers are added to the head.  If, on read, an item is found with
  * CACHE_UPCALLING clear, we free it from the list.
@@ -532,7 +631,7 @@ void cache_clean_deferred(void *owner)
  */
 
 static DEFINE_SPINLOCK(queue_lock);
-static DECLARE_MUTEX(queue_io_sem);
+static DEFINE_MUTEX(queue_io_mutex);
 
 struct cache_queue {
        struct list_head        list;
@@ -550,18 +649,18 @@ struct cache_reader {
        int                     offset; /* if non-0, we have a refcnt on next request */
 };
 
-static ssize_t
-cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
+                         loff_t *ppos, struct cache_detail *cd)
 {
        struct cache_reader *rp = filp->private_data;
        struct cache_request *rq;
-       struct cache_detail *cd = PDE(filp->f_dentry->d_inode)->data;
+       struct inode *inode = filp->f_path.dentry->d_inode;
        int err;
 
        if (count == 0)
                return 0;
 
-       down(&queue_io_sem); /* protect against multiple concurrent
+       mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
                              * readers on this file */
  again:
        spin_lock(&queue_lock);
@@ -574,7 +673,7 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
        }
        if (rp->q.list.next == &cd->queue) {
                spin_unlock(&queue_lock);
-               up(&queue_io_sem);
+               mutex_unlock(&inode->i_mutex);
                BUG_ON(rp->offset);
                return 0;
        }
@@ -613,7 +712,7 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
                    !test_bit(CACHE_PENDING, &rq->item->flags)) {
                        list_del(&rq->q.list);
                        spin_unlock(&queue_lock);
-                       cd->cache_put(rq->item, cd);
+                       cache_put(rq->item, cd);
                        kfree(rq->buf);
                        kfree(rq);
                } else
@@ -621,49 +720,90 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
        }
        if (err == -EAGAIN)
                goto again;
-       up(&queue_io_sem);
+       mutex_unlock(&inode->i_mutex);
        return err ? err :  count;
 }
 
-static char write_buf[8192]; /* protected by queue_io_sem */
+static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
+                                size_t count, struct cache_detail *cd)
+{
+       ssize_t ret;
+
+       if (copy_from_user(kaddr, buf, count))
+               return -EFAULT;
+       kaddr[count] = '\0';
+       ret = cd->cache_parse(cd, kaddr, count);
+       if (!ret)
+               ret = count;
+       return ret;
+}
 
-static ssize_t
-cache_write(struct file *filp, const char __user *buf, size_t count,
-           loff_t *ppos)
+static ssize_t cache_slow_downcall(const char __user *buf,
+                                  size_t count, struct cache_detail *cd)
 {
-       int err;
-       struct cache_detail *cd = PDE(filp->f_dentry->d_inode)->data;
+       static char write_buf[8192]; /* protected by queue_io_mutex */
+       ssize_t ret = -EINVAL;
 
-       if (count == 0)
-               return 0;
        if (count >= sizeof(write_buf))
-               return -EINVAL;
-
-       down(&queue_io_sem);
+               goto out;
+       mutex_lock(&queue_io_mutex);
+       ret = cache_do_downcall(write_buf, buf, count, cd);
+       mutex_unlock(&queue_io_mutex);
+out:
+       return ret;
+}
 
-       if (copy_from_user(write_buf, buf, count)) {
-               up(&queue_io_sem);
-               return -EFAULT;
-       }
-       write_buf[count] = '\0';
-       if (cd->cache_parse)
-               err = cd->cache_parse(cd, write_buf, count);
-       else
-               err = -EINVAL;
+static ssize_t cache_downcall(struct address_space *mapping,
+                             const char __user *buf,
+                             size_t count, struct cache_detail *cd)
+{
+       struct page *page;
+       char *kaddr;
+       ssize_t ret = -ENOMEM;
+
+       if (count >= PAGE_CACHE_SIZE)
+               goto out_slow;
+
+       page = find_or_create_page(mapping, 0, GFP_KERNEL);
+       if (!page)
+               goto out_slow;
+
+       kaddr = kmap(page);
+       ret = cache_do_downcall(kaddr, buf, count, cd);
+       kunmap(page);
+       unlock_page(page);
+       page_cache_release(page);
+       return ret;
+out_slow:
+       return cache_slow_downcall(buf, count, cd);
+}
 
-       up(&queue_io_sem);
-       return err ? err : count;
+static ssize_t cache_write(struct file *filp, const char __user *buf,
+                          size_t count, loff_t *ppos,
+                          struct cache_detail *cd)
+{
+       struct address_space *mapping = filp->f_mapping;
+       struct inode *inode = filp->f_path.dentry->d_inode;
+       ssize_t ret = -EINVAL;
+
+       if (!cd->cache_parse)
+               goto out;
+
+       mutex_lock(&inode->i_mutex);
+       ret = cache_downcall(mapping, buf, count, cd);
+       mutex_unlock(&inode->i_mutex);
+out:
+       return ret;
 }
 
 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
 
-static unsigned int
-cache_poll(struct file *filp, poll_table *wait)
+static unsigned int cache_poll(struct file *filp, poll_table *wait,
+                              struct cache_detail *cd)
 {
        unsigned int mask;
        struct cache_reader *rp = filp->private_data;
        struct cache_queue *cq;
-       struct cache_detail *cd = PDE(filp->f_dentry->d_inode)->data;
 
        poll_wait(filp, &queue_wait, wait);
 
@@ -685,14 +825,13 @@ cache_poll(struct file *filp, poll_table *wait)
        return mask;
 }
 
-static int
-cache_ioctl(struct inode *ino, struct file *filp,
-           unsigned int cmd, unsigned long arg)
+static int cache_ioctl(struct inode *ino, struct file *filp,
+                      unsigned int cmd, unsigned long arg,
+                      struct cache_detail *cd)
 {
        int len = 0;
        struct cache_reader *rp = filp->private_data;
        struct cache_queue *cq;
-       struct cache_detail *cd = PDE(ino)->data;
 
        if (cmd != FIONREAD || !rp)
                return -EINVAL;
@@ -715,15 +854,15 @@ cache_ioctl(struct inode *ino, struct file *filp,
        return put_user(len, (int __user *)arg);
 }
 
-static int
-cache_open(struct inode *inode, struct file *filp)
+static int cache_open(struct inode *inode, struct file *filp,
+                     struct cache_detail *cd)
 {
        struct cache_reader *rp = NULL;
 
+       if (!cd || !try_module_get(cd->owner))
+               return -EACCES;
        nonseekable_open(inode, filp);
        if (filp->f_mode & FMODE_READ) {
-               struct cache_detail *cd = PDE(inode)->data;
-
                rp = kmalloc(sizeof(*rp), GFP_KERNEL);
                if (!rp)
                        return -ENOMEM;
@@ -738,11 +877,10 @@ cache_open(struct inode *inode, struct file *filp)
        return 0;
 }
 
-static int
-cache_release(struct inode *inode, struct file *filp)
+static int cache_release(struct inode *inode, struct file *filp,
+                        struct cache_detail *cd)
 {
        struct cache_reader *rp = filp->private_data;
-       struct cache_detail *cd = PDE(inode)->data;
 
        if (rp) {
                spin_lock(&queue_lock);
@@ -766,24 +904,13 @@ cache_release(struct inode *inode, struct file *filp)
                cd->last_close = get_seconds();
                atomic_dec(&cd->readers);
        }
+       module_put(cd->owner);
        return 0;
 }
 
 
 
-static struct file_operations cache_file_operations = {
-       .owner          = THIS_MODULE,
-       .llseek         = no_llseek,
-       .read           = cache_read,
-       .write          = cache_write,
-       .poll           = cache_poll,
-       .ioctl          = cache_ioctl, /* for FIONREAD */
-       .open           = cache_open,
-       .release        = cache_release,
-};
-
-
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
        struct cache_queue *cq;
        spin_lock(&queue_lock);
@@ -793,10 +920,10 @@ static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
                        if (cr->item != ch)
                                continue;
                        if (cr->readers != 0)
-                               break;
+                               continue;
                        list_del(&cr->q.list);
                        spin_unlock(&queue_lock);
-                       detail->cache_put(cr->item, detail);
+                       cache_put(cr->item, detail);
                        kfree(cr->buf);
                        kfree(cr);
                        return;
@@ -847,6 +974,7 @@ void qword_add(char **bpp, int *lp, char *str)
        *bpp = bp;
        *lp = len;
 }
+EXPORT_SYMBOL_GPL(qword_add);
 
 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
 {
@@ -875,21 +1003,28 @@ void qword_addhex(char **bpp, int *lp, char *buf, int blen)
        *bpp = bp;
        *lp = len;
 }
+EXPORT_SYMBOL_GPL(qword_addhex);
 
 static void warn_no_listener(struct cache_detail *detail)
 {
        if (detail->last_warn != detail->last_close) {
                detail->last_warn = detail->last_close;
                if (detail->warn_no_listener)
-                       detail->warn_no_listener(detail);
+                       detail->warn_no_listener(detail, detail->last_close != 0);
        }
 }
 
 /*
- * register an upcall request to user-space.
+ * register an upcall request to user-space and queue it up for read() by the
+ * upcall daemon.
+ *
  * Each request is at most one page long.
  */
-static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
+int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h,
+               void (*cache_request)(struct cache_detail *,
+                                     struct cache_head *,
+                                     char **,
+                                     int *))
 {
 
        char *buf;
@@ -897,9 +1032,6 @@ static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
        char *bp;
        int len;
 
-       if (detail->cache_request == NULL)
-               return -EINVAL;
-
        if (atomic_read(&detail->readers) == 0 &&
            detail->last_close < get_seconds() - 30) {
                        warn_no_listener(detail);
@@ -918,7 +1050,7 @@ static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
 
        bp = buf; len = PAGE_SIZE;
 
-       detail->cache_request(detail, h, &bp, &len);
+       cache_request(detail, h, &bp, &len);
 
        if (len < 0) {
                kfree(buf);
@@ -936,6 +1068,7 @@ static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
        wake_up(&queue_wait);
        return 0;
 }
+EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall);
 
 /*
  * parse a message from user-space and pass it
@@ -943,10 +1076,10 @@ static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
  * Messages are, like requests, separated into fields by
  * spaces and dequotes as \xHEXSTRING or embedded \nnn octal
  *
- * Message is 
+ * Message is
  *   reply cachename expiry key ... content....
  *
- * key and content are both parsed by cache 
+ * key and content are both parsed by cache
  */
 
 #define isodigit(c) (isdigit(c) && c <= '7')
@@ -997,6 +1130,7 @@ int qword_get(char **bpp, char *dest, int bufsize)
        *dest = '\0';
        return len;
 }
+EXPORT_SYMBOL_GPL(qword_get);
 
 
 /*
@@ -1011,12 +1145,13 @@ struct handle {
 };
 
 static void *c_start(struct seq_file *m, loff_t *pos)
+       __acquires(cd->hash_lock)
 {
        loff_t n = *pos;
        unsigned hash, entry;
        struct cache_head *ch;
        struct cache_detail *cd = ((struct handle*)m->private)->cd;
-       
+
 
        read_lock(&cd->hash_lock);
        if (!n--)
@@ -1031,7 +1166,7 @@ static void *c_start(struct seq_file *m, loff_t *pos)
        do {
                hash++;
                n += 1LL<<32;
-       } while(hash < cd->hash_size && 
+       } while(hash < cd->hash_size &&
                cd->hash_table[hash]==NULL);
        if (hash >= cd->hash_size)
                return NULL;
@@ -1067,6 +1202,7 @@ static void *c_next(struct seq_file *m, void *p, loff_t *pos)
 }
 
 static void c_stop(struct seq_file *m, void *p)
+       __releases(cd->hash_lock)
 {
        struct cache_detail *cd = ((struct handle*)m->private)->cd;
        read_unlock(&cd->hash_lock);
@@ -1081,8 +1217,8 @@ static int c_show(struct seq_file *m, void *p)
                return cd->cache_show(m, cd, NULL);
 
        ifdebug(CACHE)
-               seq_printf(m, "# expiry=%ld refcnt=%d\n",
-                          cp->expiry_time, atomic_read(&cp->refcnt));
+               seq_printf(m, "# expiry=%ld refcnt=%d flags=%lx\n",
+                          cp->expiry_time, atomic_read(&cp->ref.refcount), cp->flags);
        cache_get(cp);
        if (cache_check(cd, cp, NULL))
                /* cache_check does a cache_put on failure */
@@ -1093,74 +1229,78 @@ static int c_show(struct seq_file *m, void *p)
        return cd->cache_show(m, cd, cp);
 }
 
-static struct seq_operations cache_content_op = {
+static const struct seq_operations cache_content_op = {
        .start  = c_start,
        .next   = c_next,
        .stop   = c_stop,
        .show   = c_show,
 };
 
-static int content_open(struct inode *inode, struct file *file)
+static int content_open(struct inode *inode, struct file *file,
+                       struct cache_detail *cd)
 {
-       int res;
        struct handle *han;
-       struct cache_detail *cd = PDE(inode)->data;
 
-       han = kmalloc(sizeof(*han), GFP_KERNEL);
-       if (han == NULL)
+       if (!cd || !try_module_get(cd->owner))
+               return -EACCES;
+       han = __seq_open_private(file, &cache_content_op, sizeof(*han));
+       if (han == NULL) {
+               module_put(cd->owner);
                return -ENOMEM;
+       }
 
        han->cd = cd;
+       return 0;
+}
 
-       res = seq_open(file, &cache_content_op);
-       if (res)
-               kfree(han);
-       else
-               ((struct seq_file *)file->private_data)->private = han;
-
-       return res;
+static int content_release(struct inode *inode, struct file *file,
+               struct cache_detail *cd)
+{
+       int ret = seq_release_private(inode, file);
+       module_put(cd->owner);
+       return ret;
 }
-static int content_release(struct inode *inode, struct file *file)
+
+static int open_flush(struct inode *inode, struct file *file,
+                       struct cache_detail *cd)
 {
-       struct seq_file *m = (struct seq_file *)file->private_data;
-       struct handle *han = m->private;
-       kfree(han);
-       m->private = NULL;
-       return seq_release(inode, file);
+       if (!cd || !try_module_get(cd->owner))
+               return -EACCES;
+       return nonseekable_open(inode, file);
 }
 
-static struct file_operations content_file_operations = {
-       .open           = content_open,
-       .read           = seq_read,
-       .llseek         = seq_lseek,
-       .release        = content_release,
-};
+static int release_flush(struct inode *inode, struct file *file,
+                       struct cache_detail *cd)
+{
+       module_put(cd->owner);
+       return 0;
+}
 
 static ssize_t read_flush(struct file *file, char __user *buf,
-                           size_t count, loff_t *ppos)
+                         size_t count, loff_t *ppos,
+                         struct cache_detail *cd)
 {
-       struct cache_detail *cd = PDE(file->f_dentry->d_inode)->data;
        char tbuf[20];
        unsigned long p = *ppos;
-       int len;
+       size_t len;
 
        sprintf(tbuf, "%lu\n", cd->flush_time);
        len = strlen(tbuf);
        if (p >= len)
                return 0;
        len -= p;
-       if (len > count) len = count;
+       if (len > count)
+               len = count;
        if (copy_to_user(buf, (void*)(tbuf+p), len))
-               len = -EFAULT;
-       else
-               *ppos += len;
+               return -EFAULT;
+       *ppos += len;
        return len;
 }
 
-static ssize_t write_flush(struct file * file, const char __user * buf,
-                            size_t count, loff_t *ppos)
+static ssize_t write_flush(struct file *file, const char __user *buf,
+                          size_t count, loff_t *ppos,
+                          struct cache_detail *cd)
 {
-       struct cache_detail *cd = PDE(file->f_dentry->d_inode)->data;
        char tbuf[20];
        char *ep;
        long flushtime;
@@ -1181,8 +1321,343 @@ static ssize_t write_flush(struct file * file, const char __user * buf,
        return count;
 }
 
-static struct file_operations cache_flush_operations = {
-       .open           = nonseekable_open,
-       .read           = read_flush,
-       .write          = write_flush,
+static ssize_t cache_read_procfs(struct file *filp, char __user *buf,
+                                size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+
+       return cache_read(filp, buf, count, ppos, cd);
+}
+
+static ssize_t cache_write_procfs(struct file *filp, const char __user *buf,
+                                 size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+
+       return cache_write(filp, buf, count, ppos, cd);
+}
+
+static unsigned int cache_poll_procfs(struct file *filp, poll_table *wait)
+{
+       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+
+       return cache_poll(filp, wait, cd);
+}
+
+static int cache_ioctl_procfs(struct inode *inode, struct file *filp,
+                             unsigned int cmd, unsigned long arg)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return cache_ioctl(inode, filp, cmd, arg, cd);
+}
+
+static int cache_open_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return cache_open(inode, filp, cd);
+}
+
+static int cache_release_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return cache_release(inode, filp, cd);
+}
+
+static const struct file_operations cache_file_operations_procfs = {
+       .owner          = THIS_MODULE,
+       .llseek         = no_llseek,
+       .read           = cache_read_procfs,
+       .write          = cache_write_procfs,
+       .poll           = cache_poll_procfs,
+       .ioctl          = cache_ioctl_procfs, /* for FIONREAD */
+       .open           = cache_open_procfs,
+       .release        = cache_release_procfs,
 };
+
+static int content_open_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return content_open(inode, filp, cd);
+}
+
+static int content_release_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return content_release(inode, filp, cd);
+}
+
+static const struct file_operations content_file_operations_procfs = {
+       .open           = content_open_procfs,
+       .read           = seq_read,
+       .llseek         = seq_lseek,
+       .release        = content_release_procfs,
+};
+
+static int open_flush_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return open_flush(inode, filp, cd);
+}
+
+static int release_flush_procfs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = PDE(inode)->data;
+
+       return release_flush(inode, filp, cd);
+}
+
+static ssize_t read_flush_procfs(struct file *filp, char __user *buf,
+                           size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+
+       return read_flush(filp, buf, count, ppos, cd);
+}
+
+static ssize_t write_flush_procfs(struct file *filp,
+                                 const char __user *buf,
+                                 size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+
+       return write_flush(filp, buf, count, ppos, cd);
+}
+
+static const struct file_operations cache_flush_operations_procfs = {
+       .open           = open_flush_procfs,
+       .read           = read_flush_procfs,
+       .write          = write_flush_procfs,
+       .release        = release_flush_procfs,
+};
+
+static void remove_cache_proc_entries(struct cache_detail *cd)
+{
+       if (cd->u.procfs.proc_ent == NULL)
+               return;
+       if (cd->u.procfs.flush_ent)
+               remove_proc_entry("flush", cd->u.procfs.proc_ent);
+       if (cd->u.procfs.channel_ent)
+               remove_proc_entry("channel", cd->u.procfs.proc_ent);
+       if (cd->u.procfs.content_ent)
+               remove_proc_entry("content", cd->u.procfs.proc_ent);
+       cd->u.procfs.proc_ent = NULL;
+       remove_proc_entry(cd->name, proc_net_rpc);
+}
+
+#ifdef CONFIG_PROC_FS
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+       struct proc_dir_entry *p;
+
+       cd->u.procfs.proc_ent = proc_mkdir(cd->name, proc_net_rpc);
+       if (cd->u.procfs.proc_ent == NULL)
+               goto out_nomem;
+       cd->u.procfs.channel_ent = NULL;
+       cd->u.procfs.content_ent = NULL;
+
+       p = proc_create_data("flush", S_IFREG|S_IRUSR|S_IWUSR,
+                            cd->u.procfs.proc_ent,
+                            &cache_flush_operations_procfs, cd);
+       cd->u.procfs.flush_ent = p;
+       if (p == NULL)
+               goto out_nomem;
+
+       if (cd->cache_upcall || cd->cache_parse) {
+               p = proc_create_data("channel", S_IFREG|S_IRUSR|S_IWUSR,
+                                    cd->u.procfs.proc_ent,
+                                    &cache_file_operations_procfs, cd);
+               cd->u.procfs.channel_ent = p;
+               if (p == NULL)
+                       goto out_nomem;
+       }
+       if (cd->cache_show) {
+               p = proc_create_data("content", S_IFREG|S_IRUSR|S_IWUSR,
+                               cd->u.procfs.proc_ent,
+                               &content_file_operations_procfs, cd);
+               cd->u.procfs.content_ent = p;
+               if (p == NULL)
+                       goto out_nomem;
+       }
+       return 0;
+out_nomem:
+       remove_cache_proc_entries(cd);
+       return -ENOMEM;
+}
+#else /* CONFIG_PROC_FS */
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+       return 0;
+}
+#endif
+
+int cache_register(struct cache_detail *cd)
+{
+       int ret;
+
+       sunrpc_init_cache_detail(cd);
+       ret = create_cache_proc_entries(cd);
+       if (ret)
+               sunrpc_destroy_cache_detail(cd);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(cache_register);
+
+void cache_unregister(struct cache_detail *cd)
+{
+       remove_cache_proc_entries(cd);
+       sunrpc_destroy_cache_detail(cd);
+}
+EXPORT_SYMBOL_GPL(cache_unregister);
+
+static ssize_t cache_read_pipefs(struct file *filp, char __user *buf,
+                                size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
+
+       return cache_read(filp, buf, count, ppos, cd);
+}
+
+static ssize_t cache_write_pipefs(struct file *filp, const char __user *buf,
+                                 size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
+
+       return cache_write(filp, buf, count, ppos, cd);
+}
+
+static unsigned int cache_poll_pipefs(struct file *filp, poll_table *wait)
+{
+       struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
+
+       return cache_poll(filp, wait, cd);
+}
+
+static int cache_ioctl_pipefs(struct inode *inode, struct file *filp,
+                             unsigned int cmd, unsigned long arg)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return cache_ioctl(inode, filp, cmd, arg, cd);
+}
+
+static int cache_open_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return cache_open(inode, filp, cd);
+}
+
+static int cache_release_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return cache_release(inode, filp, cd);
+}
+
+const struct file_operations cache_file_operations_pipefs = {
+       .owner          = THIS_MODULE,
+       .llseek         = no_llseek,
+       .read           = cache_read_pipefs,
+       .write          = cache_write_pipefs,
+       .poll           = cache_poll_pipefs,
+       .ioctl          = cache_ioctl_pipefs, /* for FIONREAD */
+       .open           = cache_open_pipefs,
+       .release        = cache_release_pipefs,
+};
+
+static int content_open_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return content_open(inode, filp, cd);
+}
+
+static int content_release_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return content_release(inode, filp, cd);
+}
+
+const struct file_operations content_file_operations_pipefs = {
+       .open           = content_open_pipefs,
+       .read           = seq_read,
+       .llseek         = seq_lseek,
+       .release        = content_release_pipefs,
+};
+
+static int open_flush_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return open_flush(inode, filp, cd);
+}
+
+static int release_flush_pipefs(struct inode *inode, struct file *filp)
+{
+       struct cache_detail *cd = RPC_I(inode)->private;
+
+       return release_flush(inode, filp, cd);
+}
+
+static ssize_t read_flush_pipefs(struct file *filp, char __user *buf,
+                           size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
+
+       return read_flush(filp, buf, count, ppos, cd);
+}
+
+static ssize_t write_flush_pipefs(struct file *filp,
+                                 const char __user *buf,
+                                 size_t count, loff_t *ppos)
+{
+       struct cache_detail *cd = RPC_I(filp->f_path.dentry->d_inode)->private;
+
+       return write_flush(filp, buf, count, ppos, cd);
+}
+
+const struct file_operations cache_flush_operations_pipefs = {
+       .open           = open_flush_pipefs,
+       .read           = read_flush_pipefs,
+       .write          = write_flush_pipefs,
+       .release        = release_flush_pipefs,
+};
+
+int sunrpc_cache_register_pipefs(struct dentry *parent,
+                                const char *name, mode_t umode,
+                                struct cache_detail *cd)
+{
+       struct qstr q;
+       struct dentry *dir;
+       int ret = 0;
+
+       sunrpc_init_cache_detail(cd);
+       q.name = name;
+       q.len = strlen(name);
+       q.hash = full_name_hash(q.name, q.len);
+       dir = rpc_create_cache_dir(parent, &q, umode, cd);
+       if (!IS_ERR(dir))
+               cd->u.pipefs.dir = dir;
+       else {
+               sunrpc_destroy_cache_detail(cd);
+               ret = PTR_ERR(dir);
+       }
+       return ret;
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_register_pipefs);
+
+void sunrpc_cache_unregister_pipefs(struct cache_detail *cd)
+{
+       rpc_remove_cache_dir(cd->u.pipefs.dir);
+       cd->u.pipefs.dir = NULL;
+       sunrpc_destroy_cache_detail(cd);
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_unregister_pipefs);
+