AFS: Don't put struct file on the stack
[safe/jmp/linux-2.6] / fs / ceph / snap.c
index 52f46a1..d5114db 100644 (file)
@@ -1,7 +1,7 @@
 #include "ceph_debug.h"
 
-#include <linux/radix-tree.h>
 #include <linux/sort.h>
+#include <linux/slab.h>
 
 #include "super.h"
 #include "decode.h"
@@ -77,6 +77,28 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
        atomic_inc(&realm->nref);
 }
 
+static void __insert_snap_realm(struct rb_root *root,
+                               struct ceph_snap_realm *new)
+{
+       struct rb_node **p = &root->rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_snap_realm *r = NULL;
+
+       while (*p) {
+               parent = *p;
+               r = rb_entry(parent, struct ceph_snap_realm, node);
+               if (new->ino < r->ino)
+                       p = &(*p)->rb_left;
+               else if (new->ino > r->ino)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, root);
+}
+
 /*
  * create and get the realm rooted at @ino and bump its ref count.
  *
@@ -92,8 +114,6 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
        if (!realm)
                return ERR_PTR(-ENOMEM);
 
-       radix_tree_insert(&mdsc->snap_realms, ino, realm);
-
        atomic_set(&realm->nref, 0);    /* tree does not take a ref */
        realm->ino = ino;
        INIT_LIST_HEAD(&realm->children);
@@ -101,24 +121,34 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
        INIT_LIST_HEAD(&realm->empty_item);
        INIT_LIST_HEAD(&realm->inodes_with_caps);
        spin_lock_init(&realm->inodes_with_caps_lock);
+       __insert_snap_realm(&mdsc->snap_realms, realm);
        dout("create_snap_realm %llx %p\n", realm->ino, realm);
        return realm;
 }
 
 /*
- * find and get (if found) the realm rooted at @ino and bump its ref count.
+ * lookup the realm rooted at @ino.
  *
  * caller must hold snap_rwsem for write.
  */
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
                                               u64 ino)
 {
-       struct ceph_snap_realm *realm;
-
-       realm = radix_tree_lookup(&mdsc->snap_realms, ino);
-       if (realm)
-               dout("lookup_snap_realm %llx %p\n", realm->ino, realm);
-       return realm;
+       struct rb_node *n = mdsc->snap_realms.rb_node;
+       struct ceph_snap_realm *r;
+
+       while (n) {
+               r = rb_entry(n, struct ceph_snap_realm, node);
+               if (ino < r->ino)
+                       n = n->rb_left;
+               else if (ino > r->ino)
+                       n = n->rb_right;
+               else {
+                       dout("lookup_snap_realm %llx %p\n", r->ino, r);
+                       return r;
+               }
+       }
+       return NULL;
 }
 
 static void __put_snap_realm(struct ceph_mds_client *mdsc,
@@ -132,7 +162,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 {
        dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
-       radix_tree_delete(&mdsc->snap_realms, realm->ino);
+       rb_erase(&realm->node, &mdsc->snap_realms);
 
        if (realm->parent) {
                list_del_init(&realm->child_item);
@@ -285,9 +315,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
           because we rebuild_snap_realms() works _downward_ in
           hierarchy after each update.) */
        if (realm->cached_context &&
-           realm->cached_context->seq <= realm->seq &&
+           realm->cached_context->seq == realm->seq &&
            (!parent ||
-            realm->cached_context->seq <= parent->cached_context->seq)) {
+            realm->cached_context->seq >= parent->cached_context->seq)) {
                dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
                     " (unchanged)\n",
                     realm->ino, realm, realm->cached_context,
@@ -401,8 +431,7 @@ static int dup_array(u64 **dst, __le64 *src, int num)
  * Caller must hold snap_rwsem for read (i.e., the realm topology won't
  * change).
  */
-void ceph_queue_cap_snap(struct ceph_inode_info *ci,
-                        struct ceph_snap_context *snapc)
+void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap_snap *capsnap;
@@ -421,10 +450,11 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                   as no new writes are allowed to start when pending, so any
                   writes in progress now were started before the previous
                   cap_snap.  lucky us. */
-               dout("queue_cap_snap %p snapc %p seq %llu used %d"
-                    " already pending\n", inode, snapc, snapc->seq, used);
+               dout("queue_cap_snap %p already pending\n", inode);
                kfree(capsnap);
        } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
+               struct ceph_snap_context *snapc = ci->i_head_snapc;
+
                igrab(inode);
 
                atomic_set(&capsnap->nref, 1);
@@ -433,7 +463,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                INIT_LIST_HEAD(&capsnap->flushing_item);
 
                capsnap->follows = snapc->seq - 1;
-               capsnap->context = ceph_get_snap_context(snapc);
                capsnap->issued = __ceph_caps_issued(ci, NULL);
                capsnap->dirty = __ceph_caps_dirty(ci);
 
@@ -450,7 +479,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                   snapshot. */
                capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
                ci->i_wrbuffer_ref_head = 0;
-               ceph_put_snap_context(ci->i_head_snapc);
+               capsnap->context = snapc;
                ci->i_head_snapc = NULL;
                list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
 
@@ -492,15 +521,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        capsnap->ctime = inode->i_ctime;
        capsnap->time_warp_seq = ci->i_time_warp_seq;
        if (capsnap->dirty_pages) {
-               dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
+               dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
                     "still has %d dirty pages\n", inode, capsnap,
                     capsnap->context, capsnap->context->seq,
-                    capsnap->size, capsnap->dirty_pages);
+                    ceph_cap_string(capsnap->dirty), capsnap->size,
+                    capsnap->dirty_pages);
                return 0;
        }
-       dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
+       dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
             inode, capsnap, capsnap->context,
-            capsnap->context->seq, capsnap->size);
+            capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+            capsnap->size);
 
        spin_lock(&mdsc->snap_flush_lock);
        list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
@@ -572,7 +603,7 @@ more:
                                if (lastinode)
                                        iput(lastinode);
                                lastinode = inode;
-                               ceph_queue_cap_snap(ci, realm->cached_context);
+                               ceph_queue_cap_snap(ci);
                                spin_lock(&realm->inodes_with_caps_lock);
                        }
                        spin_unlock(&realm->inodes_with_caps_lock);
@@ -684,11 +715,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
  * directory into another realm.
  */
 void ceph_handle_snap(struct ceph_mds_client *mdsc,
+                     struct ceph_mds_session *session,
                      struct ceph_msg *msg)
 {
        struct super_block *sb = mdsc->client->sb;
-       struct ceph_mds_session *session;
-       int mds;
+       int mds = session->s_mds;
        u64 split;
        int op;
        int trace_len;
@@ -701,10 +732,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        int i;
        int locked_rwsem = 0;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               return;
-       mds = le64_to_cpu(msg->hdr.src.name.num);
-
        /* decode */
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
@@ -720,15 +747,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
             ceph_snap_op_name(op), split, trace_len);
 
-       /* find session */
-       mutex_lock(&mdsc->mutex);
-       session = __ceph_lookup_mds_session(mdsc, mds);
-       mutex_unlock(&mdsc->mutex);
-       if (!session) {
-               dout("WTF, got snap but no session for mds%d\n", mds);
-               return;
-       }
-
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        mutex_unlock(&session->s_mutex);
@@ -802,11 +820,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                         * queued (again) by ceph_update_snap_trace()
                         * below.  Queue it _now_, under the old context.
                         */
+                       spin_lock(&realm->inodes_with_caps_lock);
                        list_del_init(&ci->i_snap_realm_item);
+                       spin_unlock(&realm->inodes_with_caps_lock);
                        spin_unlock(&inode->i_lock);
 
-                       ceph_queue_cap_snap(ci,
-                                           ci->i_snap_realm->cached_context);
+                       ceph_queue_cap_snap(ci);
 
                        iput(inode);
                        continue;
@@ -850,16 +869,20 @@ skip_inode:
                                continue;
                        ci = ceph_inode(inode);
                        spin_lock(&inode->i_lock);
-                       if (!ci->i_snap_realm)
-                               goto split_skip_inode;
-                       ceph_put_snap_realm(mdsc, ci->i_snap_realm);
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_add(&ci->i_snap_realm_item,
-                                &realm->inodes_with_caps);
-                       ci->i_snap_realm = realm;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-                       ceph_get_snap_realm(mdsc, realm);
-split_skip_inode:
+                       if (list_empty(&ci->i_snap_realm_item)) {
+                               struct ceph_snap_realm *oldrealm =
+                                       ci->i_snap_realm;
+
+                               dout(" moving %p to split realm %llx %p\n",
+                                    inode, realm->ino, realm);
+                               spin_lock(&realm->inodes_with_caps_lock);
+                               list_add(&ci->i_snap_realm_item,
+                                        &realm->inodes_with_caps);
+                               ci->i_snap_realm = realm;
+                               spin_unlock(&realm->inodes_with_caps_lock);
+                               ceph_get_snap_realm(mdsc, realm);
+                               ceph_put_snap_realm(mdsc, oldrealm);
+                       }
                        spin_unlock(&inode->i_lock);
                        iput(inode);
                }
@@ -877,6 +900,7 @@ split_skip_inode:
 
 bad:
        pr_err("corrupt snap message from mds%d\n", mds);
+       ceph_msg_dump(msg);
 out:
        if (locked_rwsem)
                up_write(&mdsc->snap_rwsem);