Merge branch 'topic/core-cleanup' into for-linus
[safe/jmp/linux-2.6] / fs / ceph / inode.c
index 074ee42..85b4d2f 100644 (file)
@@ -10,6 +10,7 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
+#include <linux/pagevec.h>
 
 #include "super.h"
 #include "decode.h"
@@ -28,7 +29,9 @@
 
 static const struct inode_operations ceph_symlink_iops;
 
-static void ceph_inode_invalidate_pages(struct work_struct *work);
+static void ceph_invalidate_work(struct work_struct *work);
+static void ceph_writeback_work(struct work_struct *work);
+static void ceph_vmtruncate_work(struct work_struct *work);
 
 /*
  * find or create an inode, given the ceph ino number
@@ -357,8 +360,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_snap_realm_item);
        INIT_LIST_HEAD(&ci->i_snap_flush_item);
 
-       INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
-       INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
+       INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
+       INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
 
        INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
@@ -375,6 +378,22 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_queue_caps_release(inode);
 
+       /*
+        * we may still have a snap_realm reference if there are stray
+        * caps in i_cap_exporting_issued or i_snap_caps.
+        */
+       if (ci->i_snap_realm) {
+               struct ceph_mds_client *mdsc =
+                       &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+               struct ceph_snap_realm *realm = ci->i_snap_realm;
+
+               dout(" dropping residual ref to snap realm %p\n", realm);
+               spin_lock(&realm->inodes_with_caps_lock);
+               list_del_init(&ci->i_snap_realm_item);
+               spin_unlock(&realm->inodes_with_caps_lock);
+               ceph_put_snap_realm(mdsc, realm);
+       }
+
        kfree(ci->i_symlink);
        while ((n = rb_first(&ci->i_fragtree)) != NULL) {
                frag = rb_entry(n, struct ceph_inode_frag, node);
@@ -383,8 +402,10 @@ void ceph_destroy_inode(struct inode *inode)
        }
 
        __ceph_destroy_xattrs(ci);
-       ceph_buffer_put(ci->i_xattrs.blob);
-       ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+       if (ci->i_xattrs.blob)
+               ceph_buffer_put(ci->i_xattrs.blob);
+       if (ci->i_xattrs.prealloc_blob)
+               ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
        kmem_cache_free(ceph_inode_cachep, ci);
 }
@@ -414,9 +435,17 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                        dout("truncate_seq %u -> %u\n",
                             ci->i_truncate_seq, truncate_seq);
                        ci->i_truncate_seq = truncate_seq;
-                       if (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
+                       /*
+                        * If we hold relevant caps, or in the case where we're
+                        * not the only client referencing this file and we
+                        * don't hold those caps, then we need to check whether
+                        * the file is either opened or mmaped
+                        */
+                       if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
                                      CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-                                     CEPH_CAP_FILE_EXCL)) {
+                                     CEPH_CAP_FILE_EXCL)) ||
+                           mapping_mapped(inode->i_mapping) ||
+                           __ceph_caps_file_wanted(ci)) {
                                ci->i_truncate_pending++;
                                queue_trunc = 1;
                        }
@@ -526,7 +555,7 @@ static int fill_inode(struct inode *inode,
         * bytes are the xattr count).
         */
        if (iinfo->xattr_len > 4) {
-               xattr_blob = ceph_buffer_new_alloc(iinfo->xattr_len, GFP_NOFS);
+               xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
                if (!xattr_blob)
                        pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
                               iinfo->xattr_len);
@@ -665,9 +694,7 @@ no_change:
 
        /* queue truncate if we saw i_size decrease */
        if (queue_trunc)
-               if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                              &ci->i_vmtruncate_work))
-                       igrab(inode);
+               ceph_queue_vmtruncate(inode);
 
        /* populate frag tree */
        /* FIXME: move me up, if/when version reflects fragtree changes */
@@ -706,6 +733,10 @@ no_change:
                                __ceph_get_fmode(ci, cap_fmode);
                        spin_unlock(&inode->i_lock);
                }
+       } else if (cap_fmode >= 0) {
+               pr_warning("mds issued no caps on %llx.%llx\n",
+                          ceph_vinop(inode));
+               __ceph_get_fmode(ci, cap_fmode);
        }
 
        /* update delegation info? */
@@ -715,7 +746,8 @@ no_change:
        err = 0;
 
 out:
-       ceph_buffer_put(xattr_blob);
+       if (xattr_blob)
+               ceph_buffer_put(xattr_blob);
        return err;
 }
 
@@ -814,6 +846,33 @@ out:
 }
 
 /*
+ * Set dentry's directory position based on the current dir's max, and
+ * order it in d_subdirs, so that dcache_readdir behaves.
+ */
+static void ceph_set_dentry_offset(struct dentry *dn)
+{
+       struct dentry *dir = dn->d_parent;
+       struct inode *inode = dn->d_parent->d_inode;
+       struct ceph_dentry_info *di;
+
+       BUG_ON(!inode);
+
+       di = ceph_dentry(dn);
+
+       spin_lock(&inode->i_lock);
+       di->offset = ceph_inode(inode)->i_max_offset++;
+       spin_unlock(&inode->i_lock);
+
+       spin_lock(&dcache_lock);
+       spin_lock(&dn->d_lock);
+       list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
+       dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
+            dn->d_u.d_child.prev, dn->d_u.d_child.next);
+       spin_unlock(&dn->d_lock);
+       spin_unlock(&dcache_lock);
+}
+
+/*
  * Incorporate results into the local cache.  This is either just
  * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
  * after a lookup).
@@ -831,6 +890,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct inode *in = NULL;
        struct ceph_mds_reply_inode *ininfo;
        struct ceph_vino vino;
+       struct ceph_client *client = ceph_sb_to_client(sb);
        int i = 0;
        int err = 0;
 
@@ -885,6 +945,23 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        }
 
        if (rinfo->head->is_dentry) {
+               struct inode *dir = req->r_locked_dir;
+
+               err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+                                session, req->r_request_started, -1,
+                                &req->r_caps_reservation);
+               if (err < 0)
+                       return err;
+       }
+
+       /*
+        * ignore null lease/binding on snapdir ENOENT, or else we
+        * will have trouble splicing in the virtual snapdir later
+        */
+       if (rinfo->head->is_dentry && !req->r_aborted &&
+           (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
+                                              client->mount_args->snapdir_name,
+                                              req->r_dentry->d_name.len))) {
                /*
                 * lookup link rename   : null -> possibly existing inode
                 * mknod symlink mkdir  : null -> new inode
@@ -902,12 +979,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                BUG_ON(ceph_snap(dir) !=
                       le64_to_cpu(rinfo->diri.in->snapid));
 
-               err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
-                                session, req->r_request_started, -1,
-                                &req->r_caps_reservation);
-               if (err < 0)
-                       return err;
-
                /* do we have a lease on the whole dir? */
                have_dir_cap =
                        (le32_to_cpu(rinfo->diri.in->cap.caps) &
@@ -930,12 +1001,20 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                             dn, dn->d_name.len, dn->d_name.name);
                        dout("fill_trace doing d_move %p -> %p\n",
                             req->r_old_dentry, dn);
+
+                       /* d_move screws up d_subdirs order */
+                       ceph_i_clear(dir, CEPH_I_COMPLETE);
+
                        d_move(req->r_old_dentry, dn);
                        dout(" src %p '%.*s' dst %p '%.*s'\n",
                             req->r_old_dentry,
                             req->r_old_dentry->d_name.len,
                             req->r_old_dentry->d_name.name,
                             dn, dn->d_name.len, dn->d_name.name);
+                       /* ensure target dentry is invalidated, despite
+                          rehashing bug in vfs_rename_dir */
+                       dn->d_time = jiffies;
+                       ceph_dentry(dn)->lease_shared_gen = 0;
                        /* take overwritten dentry's readdir offset */
                        ceph_dentry(req->r_old_dentry)->offset =
                                ceph_dentry(dn)->offset;
@@ -980,6 +1059,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                                goto done;
                        }
                        req->r_dentry = dn;  /* may have spliced */
+                       ceph_set_dentry_offset(dn);
                        igrab(in);
                } else if (ceph_ino(in) == vino.ino &&
                           ceph_snap(in) == vino.snap) {
@@ -1022,6 +1102,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        err = PTR_ERR(dn);
                        goto done;
                }
+               ceph_set_dentry_offset(dn);
                req->r_dentry = dn;  /* may have spliced */
                igrab(in);
                rinfo->head->is_dentry = 1;  /* fool notrace handlers */
@@ -1195,7 +1276,18 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
  * Write back inode data in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-void ceph_inode_writeback(struct work_struct *work)
+void ceph_queue_writeback(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->wb_wq,
+                      &ceph_inode(inode)->i_wb_work)) {
+               dout("ceph_queue_writeback %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_writeback %p failed\n", inode);
+       }
+}
+
+static void ceph_writeback_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_wb_work);
@@ -1207,10 +1299,67 @@ void ceph_inode_writeback(struct work_struct *work)
 }
 
 /*
+ * queue an async invalidation
+ */
+void ceph_queue_invalidate(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
+                      &ceph_inode(inode)->i_pg_inv_work)) {
+               dout("ceph_queue_invalidate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_invalidate %p failed\n", inode);
+       }
+}
+
+/*
+ * invalidate any pages that are not dirty or under writeback.  this
+ * includes pages that are clean and mapped.
+ */
+static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
+{
+       struct pagevec pvec;
+       pgoff_t next = 0;
+       int i;
+
+       pagevec_init(&pvec, 0);
+       while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
+               for (i = 0; i < pagevec_count(&pvec); i++) {
+                       struct page *page = pvec.pages[i];
+                       pgoff_t index;
+                       int skip_page =
+                               (PageDirty(page) || PageWriteback(page));
+
+                       if (!skip_page)
+                               skip_page = !trylock_page(page);
+
+                       /*
+                        * We really shouldn't be looking at the ->index of an
+                        * unlocked page.  But we're not allowed to lock these
+                        * pages.  So we rely upon nobody altering the ->index
+                        * of this (pinned-by-us) page.
+                        */
+                       index = page->index;
+                       if (index > next)
+                               next = index;
+                       next++;
+
+                       if (skip_page)
+                               continue;
+
+                       generic_error_remove_page(mapping, page);
+                       unlock_page(page);
+               }
+               pagevec_release(&pvec);
+               cond_resched();
+       }
+}
+
+/*
  * Invalidate inode pages in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-static void ceph_inode_invalidate_pages(struct work_struct *work)
+static void ceph_invalidate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_pg_inv_work);
@@ -1232,7 +1381,7 @@ static void ceph_inode_invalidate_pages(struct work_struct *work)
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&inode->i_lock);
 
-       truncate_inode_pages(&inode->i_data, 0);
+       ceph_invalidate_nondirty_pages(inode->i_mapping);
 
        spin_lock(&inode->i_lock);
        if (orig_gen == ci->i_rdcache_gen) {
@@ -1259,7 +1408,7 @@ out:
  *
  * We also truncate in a separate thread as well.
  */
-void ceph_vmtruncate_work(struct work_struct *work)
+static void ceph_vmtruncate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_vmtruncate_work);
@@ -1273,6 +1422,24 @@ void ceph_vmtruncate_work(struct work_struct *work)
 }
 
 /*
+ * Queue an async vmtruncate.  If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
+ */
+void ceph_queue_vmtruncate(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
+                      &ci->i_vmtruncate_work)) {
+               dout("ceph_queue_vmtruncate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
+                    inode, ci->i_truncate_pending);
+       }
+}
+
+/*
  * called with i_mutex held.
  *
  * Make sure any pending truncation is applied before doing anything
@@ -1356,7 +1523,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        int release = 0, dirtied = 0;
        int mask = 0;
        int err = 0;
-       int queue_trunc = 0;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
@@ -1470,11 +1636,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
                        inode->i_size = attr->ia_size;
-                       if (attr->ia_size < inode->i_size) {
-                               ci->i_truncate_size = attr->ia_size;
-                               ci->i_truncate_pending++;
-                               queue_trunc = 1;
-                       }
                        inode->i_blocks =
                                (attr->ia_size + (1 << 9) - 1) >> 9;
                        inode->i_ctime = attr->ia_ctime;
@@ -1527,9 +1688,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        release &= issued;
        spin_unlock(&inode->i_lock);
 
-       if (queue_trunc)
-               __ceph_do_pending_vmtruncate(inode);
-
        if (mask) {
                req->r_inode = igrab(inode);
                req->r_inode_drop = release;