drop unused dentry argument to ->fsync
[safe/jmp/linux-2.6] / fs / ceph / caps.c
index d8132b6..ae3e3a3 100644 (file)
@@ -3,8 +3,10 @@
 #include <linux/fs.h>
 #include <linux/kernel.h>
 #include <linux/sched.h>
+#include <linux/slab.h>
 #include <linux/vmalloc.h>
 #include <linux/wait.h>
+#include <linux/writeback.h>
 
 #include "super.h"
 #include "decode.h"
@@ -128,6 +130,7 @@ static int caps_total_count;        /* total caps allocated */
 static int caps_use_count;          /* in use */
 static int caps_reserve_count;      /* unused, reserved */
 static int caps_avail_count;        /* unused, unreserved */
+static int caps_min_count;          /* keep at least this many (unreserved) */
 
 void __init ceph_caps_init(void)
 {
@@ -149,6 +152,15 @@ void ceph_caps_finalize(void)
        caps_avail_count = 0;
        caps_use_count = 0;
        caps_reserve_count = 0;
+       caps_min_count = 0;
+       spin_unlock(&caps_list_lock);
+}
+
+void ceph_adjust_min_caps(int delta)
+{
+       spin_lock(&caps_list_lock);
+       caps_min_count += delta;
+       BUG_ON(caps_min_count < 0);
        spin_unlock(&caps_list_lock);
 }
 
@@ -256,30 +268,22 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        return cap;
 }
 
-static void put_cap(struct ceph_cap *cap,
-                   struct ceph_cap_reservation *ctx)
+void ceph_put_cap(struct ceph_cap *cap)
 {
        spin_lock(&caps_list_lock);
-       dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
+       dout("put_cap %p %d = %d used + %d resv + %d avail\n",
+            cap, caps_total_count, caps_use_count,
             caps_reserve_count, caps_avail_count);
        caps_use_count--;
        /*
-        * Keep some preallocated caps around, at least enough to do a
-        * readdir (which needs to preallocate lots of them), to avoid
-        * lots of free/alloc churn.
+        * Keep some preallocated caps around (ceph_min_count), to
+        * avoid lots of free/alloc churn.
         */
-       if (caps_avail_count >= caps_reserve_count +
-           ceph_client(cap->ci->vfs_inode.i_sb)->mount_args->max_readdir) {
+       if (caps_avail_count >= caps_reserve_count + caps_min_count) {
                caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               if (ctx) {
-                       ctx->count++;
-                       caps_reserve_count++;
-               } else {
-                       caps_avail_count++;
-               }
+               caps_avail_count++;
                list_add(&cap->caps_item, &caps_list);
        }
 
@@ -289,7 +293,8 @@ static void put_cap(struct ceph_cap *cap,
 }
 
 void ceph_reservation_status(struct ceph_client *client,
-                            int *total, int *avail, int *used, int *reserved)
+                            int *total, int *avail, int *used, int *reserved,
+                            int *min)
 {
        if (total)
                *total = caps_total_count;
@@ -299,6 +304,8 @@ void ceph_reservation_status(struct ceph_client *client,
                *used = caps_use_count;
        if (reserved)
                *reserved = caps_reserve_count;
+       if (min)
+               *min = caps_min_count;
 }
 
 /*
@@ -650,7 +657,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-       int have = ci->i_snap_caps;
+       int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
        struct ceph_cap *cap;
        struct rb_node *p;
 
@@ -697,10 +704,15 @@ static void __touch_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *s = cap->session;
 
-       dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
-            s->s_mds);
        spin_lock(&s->s_cap_lock);
-       list_move_tail(&cap->session_caps, &s->s_caps);
+       if (s->s_cap_iterator == NULL) {
+               dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
+                    s->s_mds);
+               list_move_tail(&cap->session_caps, &s->s_caps);
+       } else {
+               dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
+                    &cap->ci->vfs_inode, cap, s->s_mds);
+       }
        spin_unlock(&s->s_cap_lock);
 }
 
@@ -846,31 +858,44 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 }
 
 /*
- * caller should hold i_lock, and session s_mutex.
- * returns true if this is the last cap.  if so, caller should iput.
+ * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
+ *
+ * caller should hold i_lock.
+ * caller will not hold session s_mutex if called from destroy_inode.
  */
-void __ceph_remove_cap(struct ceph_cap *cap,
-                      struct ceph_cap_reservation *ctx)
+void __ceph_remove_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
-       struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+       struct ceph_mds_client *mdsc =
+               &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
+       int removed = 0;
 
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
-       list_del_init(&cap->session_caps);
-       session->s_nr_caps--;
+       if (session->s_cap_iterator == cap) {
+               /* not yet, we are iterating over this very cap */
+               dout("__ceph_remove_cap  delaying %p removal from session %p\n",
+                    cap, cap->session);
+       } else {
+               list_del_init(&cap->session_caps);
+               session->s_nr_caps--;
+               cap->session = NULL;
+               removed = 1;
+       }
+       /* protect backpointer with s_cap_lock: see iterate_session_caps */
+       cap->ci = NULL;
        spin_unlock(&session->s_cap_lock);
 
        /* remove from inode list */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       cap->session = NULL;
        if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
 
-       put_cap(cap, ctx);
+       if (removed)
+               ceph_put_cap(cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -913,18 +938,18 @@ static int send_cap_msg(struct ceph_mds_session *session,
             seq, issue_seq, mseq, follows, size, max_size,
             xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
-       if (IS_ERR(msg))
-               return PTR_ERR(msg);
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
+       if (!msg)
+               return -ENOMEM;
 
-       fc = msg->front.iov_base;
+       msg->hdr.tid = cpu_to_le64(flush_tid);
 
+       fc = msg->front.iov_base;
        memset(fc, 0, sizeof(*fc));
 
        fc->cap_id = cpu_to_le64(cid);
        fc->op = cpu_to_le32(op);
        fc->seq = cpu_to_le32(seq);
-       fc->client_tid = cpu_to_le64(flush_tid);
        fc->issue_seq = cpu_to_le32(issue_seq);
        fc->migrate_seq = cpu_to_le32(mseq);
        fc->caps = cpu_to_le32(caps);
@@ -957,15 +982,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
 }
 
 /*
- * Queue cap releases when an inode is dropped from our
- * cache.
+ * Queue cap releases when an inode is dropped from our cache.  Since
+ * inode is about to be destroyed, there is no need for i_lock.
  */
 void ceph_queue_caps_release(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct rb_node *p;
 
-       spin_lock(&inode->i_lock);
        p = rb_first(&ci->i_caps);
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
@@ -1006,10 +1030,8 @@ void ceph_queue_caps_release(struct inode *inode)
                }
                spin_unlock(&session->s_cap_lock);
                p = rb_next(p);
-               __ceph_remove_cap(cap, NULL);
-
+               __ceph_remove_cap(cap);
        }
-       spin_unlock(&inode->i_lock);
 }
 
 /*
@@ -1037,10 +1059,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        struct ceph_inode_info *ci = cap->ci;
        struct inode *inode = &ci->vfs_inode;
        u64 cap_id = cap->cap_id;
-       int held = cap->issued | cap->implemented;
-       int revoking = cap->implemented & ~cap->issued;
-       int dropping = cap->issued & ~retain;
-       int keep;
+       int held, revoking, dropping, keep;
        u64 seq, issue_seq, mseq, time_warp_seq, follows;
        u64 size, max_size;
        struct timespec mtime, atime;
@@ -1055,6 +1074,11 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        int i;
        int ret;
 
+       held = cap->issued | cap->implemented;
+       revoking = cap->implemented & ~cap->issued;
+       retain &= ~revoking;
+       dropping = cap->issued & ~retain;
+
        dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
             inode, cap, cap->session,
             ceph_cap_string(held), ceph_cap_string(held & retain),
@@ -1130,12 +1154,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 
        spin_unlock(&inode->i_lock);
 
-       if (dropping & CEPH_CAP_FILE_CACHE) {
-               /* invalidate what we can */
-               dout("invalidating pages on %p\n", inode);
-               invalidate_mapping_pages(&inode->i_data, 0, -1);
-       }
-
        ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
                op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
                size, max_size, &mtime, &atime, time_warp_seq,
@@ -1193,6 +1211,12 @@ retry:
                if (capsnap->dirty_pages || capsnap->writing)
                        continue;
 
+               /*
+                * if cap writeback already occurred, we should have dropped
+                * the capsnap in ceph_put_wrbuffer_cap_refs.
+                */
+               BUG_ON(capsnap->dirty == 0);
+
                /* pick mds, take s_mutex */
                mds = __ceph_get_cap_mds(ci, &mseq);
                if (session && session->s_mds != mds) {
@@ -1275,7 +1299,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
  */
 void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 {
-       struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+       struct ceph_mds_client *mdsc =
+               &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
        struct inode *inode = &ci->vfs_inode;
        int was = ci->i_dirty_caps;
        int dirty = 0;
@@ -1313,10 +1338,10 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 static int __mark_caps_flushing(struct inode *inode,
                                 struct ceph_mds_session *session)
 {
-       struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+       struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int flushing;
-       
+
        BUG_ON(ci->i_dirty_caps == 0);
        BUG_ON(list_empty(&ci->i_dirty_item));
 
@@ -1349,6 +1374,41 @@ static int __mark_caps_flushing(struct inode *inode,
 }
 
 /*
+ * try to invalidate mapping pages without blocking.
+ */
+static int mapping_is_empty(struct address_space *mapping)
+{
+       struct page *page = find_get_page(mapping, 0);
+
+       if (!page)
+               return 1;
+
+       put_page(page);
+       return 0;
+}
+
+static int try_nonblocking_invalidate(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       u32 invalidating_gen = ci->i_rdcache_gen;
+
+       spin_unlock(&inode->i_lock);
+       invalidate_mapping_pages(&inode->i_data, 0, -1);
+       spin_lock(&inode->i_lock);
+
+       if (mapping_is_empty(&inode->i_data) &&
+           invalidating_gen == ci->i_rdcache_gen) {
+               /* success. */
+               dout("try_nonblocking_invalidate %p success\n", inode);
+               ci->i_rdcache_gen = 0;
+               ci->i_rdcache_revoking = 0;
+               return 0;
+       }
+       dout("try_nonblocking_invalidate %p failed\n", inode);
+       return -1;
+}
+
+/*
  * Swiss army knife function to examine currently used and wanted
  * versus held caps.  Release, flush, ack revoked caps to mds as
  * appropriate.
@@ -1361,6 +1421,7 @@ static int __mark_caps_flushing(struct inode *inode,
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                     struct ceph_mds_session *session)
+       __releases(session->s_mutex)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1368,13 +1429,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_cap *cap;
        int file_wanted, used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-       int drop_session_lock = session ? 0 : 1;
-       int want, retain, revoking, flushing = 0;
+       int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
        int tried_invalidate = 0;
        int delayed = 0, sent = 0, force_requeue = 0, num;
+       int queue_invalidate = 0;
        int is_delayed = flags & CHECK_CAPS_NODELAY;
 
        /* if we are unmounting, flush any unused caps immediately. */
@@ -1396,6 +1457,8 @@ retry_locked:
        file_wanted = __ceph_caps_file_wanted(ci);
        used = __ceph_caps_used(ci);
        want = file_wanted | used;
+       issued = __ceph_caps_issued(ci, &implemented);
+       revoking = implemented & ~issued;
 
        retain = want | CEPH_CAP_PIN;
        if (!mdsc->stopping && inode->i_nlink > 0) {
@@ -1414,11 +1477,11 @@ retry_locked:
        }
 
        dout("check_caps %p file_want %s used %s dirty %s flushing %s"
-            " issued %s retain %s %s%s%s\n", inode,
+            " issued %s revoking %s retain %s %s%s%s\n", inode,
             ceph_cap_string(file_wanted),
             ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
             ceph_cap_string(ci->i_flushing_caps),
-            ceph_cap_string(__ceph_caps_issued(ci, NULL)),
+            ceph_cap_string(issued), ceph_cap_string(revoking),
             ceph_cap_string(retain),
             (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
             (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
@@ -1432,26 +1495,22 @@ retry_locked:
        if ((!is_delayed || mdsc->stopping) &&
            ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
            ci->i_rdcache_gen &&                     /* may have cached pages */
-           file_wanted == 0 &&                      /* no open files */
-           !ci->i_truncate_pending &&
+           (file_wanted == 0 ||                     /* no open files */
+            (revoking & CEPH_CAP_FILE_CACHE)) &&     /*  or revoking cache */
            !tried_invalidate) {
-               u32 invalidating_gen = ci->i_rdcache_gen;
-               int ret;
-
                dout("check_caps trying to invalidate on %p\n", inode);
-               spin_unlock(&inode->i_lock);
-               ret = invalidate_inode_pages2(&inode->i_data);
-               spin_lock(&inode->i_lock);
-               if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) {
-                       /* success. */
-                       ci->i_rdcache_gen = 0;
-                       ci->i_rdcache_revoking = 0;
-               } else {
-                       dout("check_caps failed to invalidate pages\n");
-                       /* we failed to invalidate pages.  check these
-                          caps again later. */
-                       force_requeue = 1;
-                       __cap_set_timeouts(mdsc, ci);
+               if (try_nonblocking_invalidate(inode) < 0) {
+                       if (revoking & CEPH_CAP_FILE_CACHE) {
+                               dout("check_caps queuing invalidate\n");
+                               queue_invalidate = 1;
+                               ci->i_rdcache_revoking = ci->i_rdcache_gen;
+                       } else {
+                               dout("check_caps failed to invalidate pages\n");
+                               /* we failed to invalidate pages.  check these
+                                  caps again later. */
+                               force_requeue = 1;
+                               __cap_set_timeouts(mdsc, ci);
+                       }
                }
                tried_invalidate = 1;
                goto retry_locked;
@@ -1471,7 +1530,7 @@ retry_locked:
 
                revoking = cap->implemented & ~cap->issued;
                if (revoking)
-                       dout("mds%d revoking %s\n", cap->mds,
+                       dout(" mds%d revoking %s\n", cap->mds,
                             ceph_cap_string(revoking));
 
                if (cap == ci->i_auth_cap &&
@@ -1529,6 +1588,11 @@ retry_locked:
                }
 
 ack:
+               if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+                       dout(" skipping %p I_NOFLUSH set\n", inode);
+                       continue;
+               }
+
                if (session && session != cap->session) {
                        dout("oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -1586,7 +1650,10 @@ ack:
 
        spin_unlock(&inode->i_lock);
 
-       if (session && drop_session_lock)
+       if (queue_invalidate)
+               ceph_queue_invalidate(inode);
+
+       if (session)
                mutex_unlock(&session->s_mutex);
        if (took_snap_rwsem)
                up_read(&mdsc->snap_rwsem);
@@ -1598,13 +1665,17 @@ ack:
 static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
                          unsigned *flush_tid)
 {
-       struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+       struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int unlock_session = session ? 0 : 1;
        int flushing = 0;
 
 retry:
        spin_lock(&inode->i_lock);
+       if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+               dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
+               goto out;
+       }
        if (ci->i_dirty_caps && ci->i_auth_cap) {
                struct ceph_cap *cap = ci->i_auth_cap;
                int used = __ceph_caps_used(ci);
@@ -1647,10 +1718,9 @@ out_unlocked:
 static int caps_are_flushed(struct inode *inode, unsigned tid)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int dirty, i, ret = 1;
+       int i, ret = 1;
 
        spin_lock(&inode->i_lock);
-       dirty = __ceph_caps_dirty(ci);
        for (i = 0; i < CEPH_CAP_BITS; i++)
                if ((ci->i_flushing_caps & (1 << i)) &&
                    ci->i_cap_flush_tid[i] <= tid) {
@@ -1706,9 +1776,9 @@ out:
        spin_unlock(&ci->i_unsafe_lock);
 }
 
-int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
+int ceph_fsync(struct file *file, int datasync)
 {
-       struct inode *inode = dentry->d_inode;
+       struct inode *inode = file->f_mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned flush_tid;
        int ret;
@@ -1745,12 +1815,13 @@ int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
  * get by with fewer MDS messages if we wait for data writeback to
  * complete first.
  */
-int ceph_write_inode(struct inode *inode, int wait)
+int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned flush_tid;
        int err = 0;
        int dirty;
+       int wait = wbc->sync_mode == WB_SYNC_ALL;
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
@@ -1759,7 +1830,8 @@ int ceph_write_inode(struct inode *inode, int wait)
                        err = wait_event_interruptible(ci->i_cap_wq,
                                       caps_are_flushed(inode, flush_tid));
        } else {
-               struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+               struct ceph_mds_client *mdsc =
+                       &ceph_sb_to_client(inode->i_sb)->mdsc;
 
                spin_lock(&inode->i_lock);
                if (__ceph_caps_dirty(ci))
@@ -1796,8 +1868,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
                } else {
                        pr_err("%p auth cap %p not mds%d ???\n", inode,
                               cap, session->s_mds);
-                       spin_unlock(&inode->i_lock);
                }
+               spin_unlock(&inode->i_lock);
        }
 }
 
@@ -1876,14 +1948,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        struct inode *inode = &ci->vfs_inode;
        int ret = 0;
        int have, implemented;
+       int file_wanted;
 
        dout("get_cap_refs %p need %s want %s\n", inode,
             ceph_cap_string(need), ceph_cap_string(want));
        spin_lock(&inode->i_lock);
 
-       /* make sure we _have_ some caps! */
-       if (!__ceph_is_any_caps(ci)) {
-               dout("get_cap_refs %p no real caps\n", inode);
+       /* make sure file is actually open */
+       file_wanted = __ceph_caps_file_wanted(ci);
+       if ((file_wanted & need) == 0) {
+               dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
+                    ceph_cap_string(need), ceph_cap_string(file_wanted));
                *err = -EBADF;
                ret = 1;
                goto out;
@@ -2056,8 +2131,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
                }
        spin_unlock(&inode->i_lock);
 
-       dout("put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had),
-            last ? "last" : "");
+       dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
+            last ? " last" : "", put ? " put" : "");
 
        if (last && !flushsnaps)
                ceph_check_caps(ci, 0, NULL);
@@ -2081,7 +2156,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 {
        struct inode *inode = &ci->vfs_inode;
        int last = 0;
-       int last_snap = 0;
+       int complete_capsnap = 0;
+       int drop_capsnap = 0;
        int found = 0;
        struct ceph_cap_snap *capsnap = NULL;
 
@@ -2104,19 +2180,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
                        if (capsnap->context == snapc) {
                                found = 1;
-                               capsnap->dirty_pages -= nr;
-                               last_snap = !capsnap->dirty_pages;
                                break;
                        }
                }
                BUG_ON(!found);
+               capsnap->dirty_pages -= nr;
+               if (capsnap->dirty_pages == 0) {
+                       complete_capsnap = 1;
+                       if (capsnap->dirty == 0)
+                               /* cap writeback completed before we created
+                                * the cap_snap; no FLUSHSNAP is needed */
+                               drop_capsnap = 1;
+               }
                dout("put_wrbuffer_cap_refs on %p cap_snap %p "
-                    " snap %lld %d/%d -> %d/%d %s%s\n",
+                    " snap %lld %d/%d -> %d/%d %s%s%s\n",
                     inode, capsnap, capsnap->context->seq,
                     ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
                     ci->i_wrbuffer_ref, capsnap->dirty_pages,
                     last ? " (wrbuffer last)" : "",
-                    last_snap ? " (capsnap last)" : "");
+                    complete_capsnap ? " (complete capsnap)" : "",
+                    drop_capsnap ? " (drop capsnap)" : "");
+               if (drop_capsnap) {
+                       ceph_put_snap_context(capsnap->context);
+                       list_del(&capsnap->ci_item);
+                       list_del(&capsnap->flushing_item);
+                       ceph_put_cap_snap(capsnap);
+               }
        }
 
        spin_unlock(&inode->i_lock);
@@ -2124,28 +2213,31 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
        if (last) {
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
                iput(inode);
-       } else if (last_snap) {
+       } else if (complete_capsnap) {
                ceph_flush_snaps(ci);
                wake_up(&ci->i_cap_wq);
        }
+       if (drop_capsnap)
+               iput(inode);
 }
 
 /*
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
- * caller holds s_mutex.
+ * caller holds s_mutex and i_lock, we drop both.
+ *
  * return value:
  *  0 - ok
  *  1 - check_caps on auth cap only (writeback)
  *  2 - check_caps (ack revoke)
  */
-static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-                           struct ceph_mds_session *session,
-                           struct ceph_cap *cap,
-                           struct ceph_buffer *xattr_buf)
+static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+                            struct ceph_mds_session *session,
+                            struct ceph_cap *cap,
+                            struct ceph_buffer *xattr_buf)
        __releases(inode->i_lock)
-
+       __releases(session->s_mutex)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2155,13 +2247,11 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        u64 size = le64_to_cpu(grant->size);
        u64 max_size = le64_to_cpu(grant->max_size);
        struct timespec mtime, atime, ctime;
-       int reply = 0;
+       int check_caps = 0;
        int wake = 0;
        int writeback = 0;
        int revoked_rdcache = 0;
-       int invalidate_async = 0;
-       int tried_invalidate = 0;
-       int ret;
+       int queue_invalidate = 0;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2173,29 +2263,18 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
         * try to invalidate (once).  (If there are dirty buffers, we
         * will invalidate _after_ writeback.)
         */
-restart:
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
-           !ci->i_wrbuffer_ref && !tried_invalidate) {
-               dout("CACHE invalidation\n");
-               spin_unlock(&inode->i_lock);
-               tried_invalidate = 1;
-
-               ret = invalidate_inode_pages2(&inode->i_data);
-               spin_lock(&inode->i_lock);
-               if (ret < 0) {
+           !ci->i_wrbuffer_ref) {
+               if (try_nonblocking_invalidate(inode) == 0) {
+                       revoked_rdcache = 1;
+               } else {
                        /* there were locked pages.. invalidate later
                           in a separate thread. */
                        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-                               invalidate_async = 1;
+                               queue_invalidate = 1;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
                        }
-               } else {
-                       /* we successfully invalidated those pages */
-                       revoked_rdcache = 1;
-                       ci->i_rdcache_gen = 0;
-                       ci->i_rdcache_revoking = 0;
                }
-               goto restart;
        }
 
        /* side effects now are allowed */
@@ -2281,11 +2360,12 @@ restart:
                if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
                        writeback = 1; /* will delay ack */
                else if (dirty & ~newcaps)
-                       reply = 1;     /* initiate writeback in check_caps */
+                       check_caps = 1;  /* initiate writeback in check_caps */
                else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
                           revoked_rdcache)
-                       reply = 2;     /* send revoke ack in check_caps */
+                       check_caps = 2;     /* send revoke ack in check_caps */
                cap->issued = newcaps;
+               cap->implemented |= newcaps;
        } else if (cap->issued == newcaps) {
                dout("caps unchanged: %s -> %s\n",
                     ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
@@ -2298,44 +2378,45 @@ restart:
                                              * pending revocation */
                wake = 1;
        }
+       BUG_ON(cap->issued & ~cap->implemented);
 
        spin_unlock(&inode->i_lock);
-       if (writeback) {
+       if (writeback)
                /*
                 * queue inode for writeback: we can't actually call
                 * filemap_write_and_wait, etc. from message handler
                 * context.
                 */
-               dout("queueing %p for writeback\n", inode);
-               if (ceph_queue_writeback(inode))
-                       igrab(inode);
-       }
-       if (invalidate_async) {
-               dout("queueing %p for page invalidation\n", inode);
-               if (ceph_queue_page_invalidation(inode))
-                       igrab(inode);
-       }
+               ceph_queue_writeback(inode);
+       if (queue_invalidate)
+               ceph_queue_invalidate(inode);
        if (wake)
                wake_up(&ci->i_cap_wq);
-       return reply;
+
+       if (check_caps == 1)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+                               session);
+       else if (check_caps == 2)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+       else
+               mutex_unlock(&session->s_mutex);
 }
 
 /*
  * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
  * MDS has been safely committed.
  */
-static void handle_cap_flush_ack(struct inode *inode,
+static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                                 struct ceph_mds_caps *m,
                                 struct ceph_mds_session *session,
                                 struct ceph_cap *cap)
        __releases(inode->i_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+       struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
        unsigned seq = le32_to_cpu(m->seq);
        int dirty = le32_to_cpu(m->dirty);
        int cleaned = 0;
-       u64 flush_tid = le64_to_cpu(m->client_tid);
        int drop = 0;
        int i;
 
@@ -2391,13 +2472,12 @@ out:
  *
  * Caller hold s_mutex.
  */
-static void handle_cap_flushsnap_ack(struct inode *inode,
+static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
                                     struct ceph_mds_caps *m,
                                     struct ceph_mds_session *session)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 follows = le64_to_cpu(m->snap_follows);
-       u64 flush_tid = le64_to_cpu(m->client_tid);
        struct ceph_cap_snap *capsnap;
        int drop = 0;
 
@@ -2414,8 +2494,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode,
                                break;
                        }
                        WARN_ON(capsnap->dirty_pages || capsnap->writing);
-                       dout(" removing cap_snap %p follows %lld\n",
-                            capsnap, follows);
+                       dout(" removing %p cap_snap %p follows %lld\n",
+                            inode, capsnap, follows);
                        ceph_put_snap_context(capsnap->context);
                        list_del(&capsnap->ci_item);
                        list_del(&capsnap->flushing_item);
@@ -2462,9 +2542,7 @@ static void handle_cap_trunc(struct inode *inode,
        spin_unlock(&inode->i_lock);
 
        if (queue_trunc)
-               if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                              &ci->i_vmtruncate_work))
-                       igrab(inode);
+               ceph_queue_vmtruncate(inode);
 }
 
 /*
@@ -2509,10 +2587,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
                }
-               __ceph_remove_cap(cap, NULL);
-       } else {
-               WARN_ON(!cap);
+               __ceph_remove_cap(cap);
        }
+       /* else, we already released it */
 
        spin_unlock(&inode->i_lock);
 }
@@ -2576,21 +2653,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct inode *inode;
        struct ceph_cap *cap;
        struct ceph_mds_caps *h;
-       int mds = le64_to_cpu(msg->hdr.src.name.num);
+       int mds = session->s_mds;
        int op;
        u32 seq;
        struct ceph_vino vino;
        u64 cap_id;
        u64 size, max_size;
-       int check_caps = 0;
-       int r;
+       u64 tid;
+       void *snaptrace;
 
        dout("handle_caps from mds%d\n", mds);
 
        /* decode */
+       tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
+       snaptrace = h + 1;
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
@@ -2616,7 +2695,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        /* these will work even if we don't have a cap yet */
        switch (op) {
        case CEPH_CAP_OP_FLUSHSNAP_ACK:
-               handle_cap_flushsnap_ack(inode, h, session);
+               handle_cap_flushsnap_ack(inode, tid, h, session);
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
@@ -2625,10 +2704,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
-                                 msg->middle,
-                                 le32_to_cpu(h->snap_trace_len));
-               check_caps = 1; /* we may have sent a RELEASE to the old auth */
-               goto done;
+                                 snaptrace, le32_to_cpu(h->snap_trace_len));
+               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
+                               session);
+               goto done_unlocked;
        }
 
        /* the rest require a cap */
@@ -2645,19 +2724,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        switch (op) {
        case CEPH_CAP_OP_REVOKE:
        case CEPH_CAP_OP_GRANT:
-               r = handle_cap_grant(inode, h, session, cap, msg->middle);
-               if (r == 1)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
-                                       session);
-               else if (r == 2)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY,
-                                       session);
-               break;
+               handle_cap_grant(inode, h, session, cap, msg->middle);
+               goto done_unlocked;
 
        case CEPH_CAP_OP_FLUSH_ACK:
-               handle_cap_flush_ack(inode, h, session, cap);
+               handle_cap_flush_ack(inode, tid, h, session, cap);
                break;
 
        case CEPH_CAP_OP_TRUNC:
@@ -2672,15 +2743,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 done:
        mutex_unlock(&session->s_mutex);
-
-       if (check_caps)
-               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL);
+done_unlocked:
        if (inode)
                iput(inode);
        return;
 
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
+       ceph_msg_dump(msg);
        return;
 }
 
@@ -2716,16 +2786,38 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
  */
 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-       struct ceph_inode_info *ci;
-       struct inode *inode;
+       struct ceph_inode_info *ci, *nci = NULL;
+       struct inode *inode, *ninode = NULL;
+       struct list_head *p, *n;
 
        dout("flush_dirty_caps\n");
        spin_lock(&mdsc->cap_dirty_lock);
-       while (!list_empty(&mdsc->cap_dirty)) {
-               ci = list_first_entry(&mdsc->cap_dirty,
-                                     struct ceph_inode_info,
-                                     i_dirty_item);
-               inode = igrab(&ci->vfs_inode);
+       list_for_each_safe(p, n, &mdsc->cap_dirty) {
+               if (nci) {
+                       ci = nci;
+                       inode = ninode;
+                       ci->i_ceph_flags &= ~CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps inode %p (was next inode)\n",
+                            inode);
+               } else {
+                       ci = list_entry(p, struct ceph_inode_info,
+                                       i_dirty_item);
+                       inode = igrab(&ci->vfs_inode);
+                       BUG_ON(!inode);
+                       dout("flush_dirty_caps inode %p\n", inode);
+               }
+               if (n != &mdsc->cap_dirty) {
+                       nci = list_entry(n, struct ceph_inode_info,
+                                        i_dirty_item);
+                       ninode = igrab(&nci->vfs_inode);
+                       BUG_ON(!ninode);
+                       nci->i_ceph_flags |= CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps next inode %p, noflush\n",
+                            ninode);
+               } else {
+                       nci = NULL;
+                       ninode = NULL;
+               }
                spin_unlock(&mdsc->cap_dirty_lock);
                if (inode) {
                        ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH,
@@ -2774,11 +2866,18 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
        struct ceph_cap *cap;
        struct ceph_mds_request_release *rel = *p;
        int ret = 0;
-
-       dout("encode_inode_release %p mds%d drop %s unless %s\n", inode,
-            mds, ceph_cap_string(drop), ceph_cap_string(unless));
+       int used = 0;
 
        spin_lock(&inode->i_lock);
+       used = __ceph_caps_used(ci);
+
+       dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
+            mds, ceph_cap_string(used), ceph_cap_string(drop),
+            ceph_cap_string(unless));
+
+       /* only drop unused caps */
+       drop &= ~used;
+
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
                if (force ||