xfs: fix access to upper inodes without inode64
[safe/jmp/linux-2.6] / fs / ceph / inode.c
index af85f2d..85b4d2f 100644 (file)
@@ -10,6 +10,7 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
+#include <linux/pagevec.h>
 
 #include "super.h"
 #include "decode.h"
@@ -28,7 +29,9 @@
 
 static const struct inode_operations ceph_symlink_iops;
 
-static void ceph_inode_invalidate_pages(struct work_struct *work);
+static void ceph_invalidate_work(struct work_struct *work);
+static void ceph_writeback_work(struct work_struct *work);
+static void ceph_vmtruncate_work(struct work_struct *work);
 
 /*
  * find or create an inode, given the ceph ino number
@@ -357,8 +360,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_snap_realm_item);
        INIT_LIST_HEAD(&ci->i_snap_flush_item);
 
-       INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
-       INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
+       INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
+       INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
 
        INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
@@ -375,6 +378,22 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_queue_caps_release(inode);
 
+       /*
+        * we may still have a snap_realm reference if there are stray
+        * caps in i_cap_exporting_issued or i_snap_caps.
+        */
+       if (ci->i_snap_realm) {
+               struct ceph_mds_client *mdsc =
+                       &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+               struct ceph_snap_realm *realm = ci->i_snap_realm;
+
+               dout(" dropping residual ref to snap realm %p\n", realm);
+               spin_lock(&realm->inodes_with_caps_lock);
+               list_del_init(&ci->i_snap_realm_item);
+               spin_unlock(&realm->inodes_with_caps_lock);
+               ceph_put_snap_realm(mdsc, realm);
+       }
+
        kfree(ci->i_symlink);
        while ((n = rb_first(&ci->i_fragtree)) != NULL) {
                frag = rb_entry(n, struct ceph_inode_frag, node);
@@ -675,9 +694,7 @@ no_change:
 
        /* queue truncate if we saw i_size decrease */
        if (queue_trunc)
-               if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                              &ci->i_vmtruncate_work))
-                       igrab(inode);
+               ceph_queue_vmtruncate(inode);
 
        /* populate frag tree */
        /* FIXME: move me up, if/when version reflects fragtree changes */
@@ -716,6 +733,10 @@ no_change:
                                __ceph_get_fmode(ci, cap_fmode);
                        spin_unlock(&inode->i_lock);
                }
+       } else if (cap_fmode >= 0) {
+               pr_warning("mds issued no caps on %llx.%llx\n",
+                          ceph_vinop(inode));
+               __ceph_get_fmode(ci, cap_fmode);
        }
 
        /* update delegation info? */
@@ -869,6 +890,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct inode *in = NULL;
        struct ceph_mds_reply_inode *ininfo;
        struct ceph_vino vino;
+       struct ceph_client *client = ceph_sb_to_client(sb);
        int i = 0;
        int err = 0;
 
@@ -932,7 +954,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        return err;
        }
 
-       if (rinfo->head->is_dentry && !req->r_aborted) {
+       /*
+        * ignore null lease/binding on snapdir ENOENT, or else we
+        * will have trouble splicing in the virtual snapdir later
+        */
+       if (rinfo->head->is_dentry && !req->r_aborted &&
+           (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
+                                              client->mount_args->snapdir_name,
+                                              req->r_dentry->d_name.len))) {
                /*
                 * lookup link rename   : null -> possibly existing inode
                 * mknod symlink mkdir  : null -> new inode
@@ -972,6 +1001,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                             dn, dn->d_name.len, dn->d_name.name);
                        dout("fill_trace doing d_move %p -> %p\n",
                             req->r_old_dentry, dn);
+
+                       /* d_move screws up d_subdirs order */
+                       ceph_i_clear(dir, CEPH_I_COMPLETE);
+
                        d_move(req->r_old_dentry, dn);
                        dout(" src %p '%.*s' dst %p '%.*s'\n",
                             req->r_old_dentry,
@@ -1243,7 +1276,18 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
  * Write back inode data in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-void ceph_inode_writeback(struct work_struct *work)
+void ceph_queue_writeback(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->wb_wq,
+                      &ceph_inode(inode)->i_wb_work)) {
+               dout("ceph_queue_writeback %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_writeback %p failed\n", inode);
+       }
+}
+
+static void ceph_writeback_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_wb_work);
@@ -1255,10 +1299,67 @@ void ceph_inode_writeback(struct work_struct *work)
 }
 
 /*
+ * queue an async invalidation
+ */
+void ceph_queue_invalidate(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
+                      &ceph_inode(inode)->i_pg_inv_work)) {
+               dout("ceph_queue_invalidate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_invalidate %p failed\n", inode);
+       }
+}
+
+/*
+ * invalidate any pages that are not dirty or under writeback.  this
+ * includes pages that are clean and mapped.
+ */
+static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
+{
+       struct pagevec pvec;
+       pgoff_t next = 0;
+       int i;
+
+       pagevec_init(&pvec, 0);
+       while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
+               for (i = 0; i < pagevec_count(&pvec); i++) {
+                       struct page *page = pvec.pages[i];
+                       pgoff_t index;
+                       int skip_page =
+                               (PageDirty(page) || PageWriteback(page));
+
+                       if (!skip_page)
+                               skip_page = !trylock_page(page);
+
+                       /*
+                        * We really shouldn't be looking at the ->index of an
+                        * unlocked page.  But we're not allowed to lock these
+                        * pages.  So we rely upon nobody altering the ->index
+                        * of this (pinned-by-us) page.
+                        */
+                       index = page->index;
+                       if (index > next)
+                               next = index;
+                       next++;
+
+                       if (skip_page)
+                               continue;
+
+                       generic_error_remove_page(mapping, page);
+                       unlock_page(page);
+               }
+               pagevec_release(&pvec);
+               cond_resched();
+       }
+}
+
+/*
  * Invalidate inode pages in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-static void ceph_inode_invalidate_pages(struct work_struct *work)
+static void ceph_invalidate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_pg_inv_work);
@@ -1280,7 +1381,7 @@ static void ceph_inode_invalidate_pages(struct work_struct *work)
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&inode->i_lock);
 
-       truncate_inode_pages(&inode->i_data, 0);
+       ceph_invalidate_nondirty_pages(inode->i_mapping);
 
        spin_lock(&inode->i_lock);
        if (orig_gen == ci->i_rdcache_gen) {
@@ -1307,7 +1408,7 @@ out:
  *
  * We also truncate in a separate thread as well.
  */
-void ceph_vmtruncate_work(struct work_struct *work)
+static void ceph_vmtruncate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_vmtruncate_work);
@@ -1321,6 +1422,24 @@ void ceph_vmtruncate_work(struct work_struct *work)
 }
 
 /*
+ * Queue an async vmtruncate.  If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
+ */
+void ceph_queue_vmtruncate(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
+                      &ci->i_vmtruncate_work)) {
+               dout("ceph_queue_vmtruncate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
+                    inode, ci->i_truncate_pending);
+       }
+}
+
+/*
  * called with i_mutex held.
  *
  * Make sure any pending truncation is applied before doing anything