Merge branch 'bkl/ioctl' of git://git.kernel.org/pub/scm/linux/kernel/git/frederic...
[safe/jmp/linux-2.6] / fs / ceph / super.c
index 7f7d475..7c663d9 100644 (file)
@@ -8,17 +8,16 @@
 #include <linux/module.h>
 #include <linux/mount.h>
 #include <linux/parser.h>
-#include <linux/rwsem.h>
 #include <linux/sched.h>
 #include <linux/seq_file.h>
+#include <linux/slab.h>
 #include <linux/statfs.h>
 #include <linux/string.h>
-#include <linux/version.h>
-#include <linux/vmalloc.h>
 
 #include "decode.h"
 #include "super.h"
 #include "mon_client.h"
+#include "auth.h"
 
 /*
  * Ceph superblock operations
@@ -45,10 +44,20 @@ const char *ceph_file_part(const char *s, int len)
  */
 static void ceph_put_super(struct super_block *s)
 {
-       struct ceph_client *cl = ceph_client(s);
+       struct ceph_client *client = ceph_sb_to_client(s);
 
        dout("put_super\n");
-       ceph_mdsc_close_sessions(&cl->mdsc);
+       ceph_mdsc_close_sessions(&client->mdsc);
+
+       /*
+        * ensure we release the bdi before put_anon_super releases
+        * the device name.
+        */
+       if (s->s_bdi == &client->backing_dev_info) {
+               bdi_unregister(&client->backing_dev_info);
+               s->s_bdi = NULL;
+       }
+
        return;
 }
 
@@ -95,12 +104,40 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 static int ceph_syncfs(struct super_block *sb, int wait)
 {
        dout("sync_fs %d\n", wait);
-       ceph_osdc_sync(&ceph_client(sb)->osdc);
-       ceph_mdsc_sync(&ceph_client(sb)->mdsc);
+       ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
+       ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
        dout("sync_fs %d done\n", wait);
        return 0;
 }
 
+static int default_congestion_kb(void)
+{
+       int congestion_kb;
+
+       /*
+        * Copied from NFS
+        *
+        * congestion size, scale with available memory.
+        *
+        *  64MB:    8192k
+        * 128MB:   11585k
+        * 256MB:   16384k
+        * 512MB:   23170k
+        *   1GB:   32768k
+        *   2GB:   46340k
+        *   4GB:   65536k
+        *   8GB:   92681k
+        *  16GB:  131072k
+        *
+        * This allows larger machines to have larger/more transfers.
+        * Limit the default to 256M
+        */
+       congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10);
+       if (congestion_kb > 256*1024)
+               congestion_kb = 256*1024;
+
+       return congestion_kb;
+}
 
 /**
  * ceph_show_options - Show mount options in /proc/mounts
@@ -110,7 +147,7 @@ static int ceph_syncfs(struct super_block *sb, int wait)
 static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
 {
        struct ceph_client *client = ceph_sb_to_client(mnt->mnt_sb);
-       struct ceph_mount_args *args = &client->mount_args;
+       struct ceph_mount_args *args = client->mount_args;
 
        if (args->flags & CEPH_OPT_FSID)
                seq_printf(m, ",fsidmajor=%llu,fsidminor%llu",
@@ -126,8 +163,39 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
                seq_puts(m, ",nocrc");
        if (args->flags & CEPH_OPT_NOASYNCREADDIR)
                seq_puts(m, ",noasyncreaddir");
+
+       if (args->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
+               seq_printf(m, ",mount_timeout=%d", args->mount_timeout);
+       if (args->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
+               seq_printf(m, ",osd_idle_ttl=%d", args->osd_idle_ttl);
+       if (args->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
+               seq_printf(m, ",osdtimeout=%d", args->osd_timeout);
+       if (args->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
+               seq_printf(m, ",osdkeepalivetimeout=%d",
+                        args->osd_keepalive_timeout);
+       if (args->wsize)
+               seq_printf(m, ",wsize=%d", args->wsize);
+       if (args->rsize != CEPH_MOUNT_RSIZE_DEFAULT)
+               seq_printf(m, ",rsize=%d", args->rsize);
+       if (args->congestion_kb != default_congestion_kb())
+               seq_printf(m, ",write_congestion_kb=%d", args->congestion_kb);
+       if (args->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
+               seq_printf(m, ",caps_wanted_delay_min=%d",
+                        args->caps_wanted_delay_min);
+       if (args->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
+               seq_printf(m, ",caps_wanted_delay_max=%d",
+                          args->caps_wanted_delay_max);
+       if (args->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT)
+               seq_printf(m, ",cap_release_safety=%d",
+                          args->cap_release_safety);
+       if (args->max_readdir != CEPH_MAX_READDIR_DEFAULT)
+               seq_printf(m, ",readdir_max_entries=%d", args->max_readdir);
+       if (args->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
+               seq_printf(m, ",readdir_max_bytes=%d", args->max_readdir_bytes);
        if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
                seq_printf(m, ",snapdirname=%s", args->snapdir_name);
+       if (args->name)
+               seq_printf(m, ",name=%s", args->name);
        if (args->secret)
                seq_puts(m, ",secret=<hidden>");
        return 0;
@@ -224,12 +292,12 @@ const char *ceph_msg_type_name(int type)
        switch (type) {
        case CEPH_MSG_SHUTDOWN: return "shutdown";
        case CEPH_MSG_PING: return "ping";
+       case CEPH_MSG_AUTH: return "auth";
+       case CEPH_MSG_AUTH_REPLY: return "auth_reply";
        case CEPH_MSG_MON_MAP: return "mon_map";
        case CEPH_MSG_MON_GET_MAP: return "mon_get_map";
        case CEPH_MSG_MON_SUBSCRIBE: return "mon_subscribe";
        case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
-       case CEPH_MSG_CLIENT_MOUNT: return "client_mount";
-       case CEPH_MSG_CLIENT_MOUNT_ACK: return "client_mount_ack";
        case CEPH_MSG_STATFS: return "statfs";
        case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
        case CEPH_MSG_MDS_MAP: return "mds_map";
@@ -260,13 +328,21 @@ enum {
        Opt_wsize,
        Opt_rsize,
        Opt_osdtimeout,
+       Opt_osdkeepalivetimeout,
        Opt_mount_timeout,
+       Opt_osd_idle_ttl,
        Opt_caps_wanted_delay_min,
        Opt_caps_wanted_delay_max,
+       Opt_cap_release_safety,
        Opt_readdir_max_entries,
+       Opt_readdir_max_bytes,
+       Opt_congestion_kb,
+       Opt_last_int,
        /* int args above */
        Opt_snapdirname,
+       Opt_name,
        Opt_secret,
+       Opt_last_string,
        /* string args above */
        Opt_ip,
        Opt_noshare,
@@ -285,12 +361,18 @@ static match_table_t arg_tokens = {
        {Opt_wsize, "wsize=%d"},
        {Opt_rsize, "rsize=%d"},
        {Opt_osdtimeout, "osdtimeout=%d"},
+       {Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
        {Opt_mount_timeout, "mount_timeout=%d"},
+       {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
        {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
        {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+       {Opt_cap_release_safety, "cap_release_safety=%d"},
        {Opt_readdir_max_entries, "readdir_max_entries=%d"},
+       {Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
+       {Opt_congestion_kb, "write_congestion_kb=%d"},
        /* int args above */
        {Opt_snapdirname, "snapdirname=%s"},
+       {Opt_name, "name=%s"},
        {Opt_secret, "secret=%s"},
        /* string args above */
        {Opt_ip, "ip=%s"},
@@ -305,64 +387,57 @@ static match_table_t arg_tokens = {
 };
 
 
-static int parse_mount_args(struct ceph_client *client,
-                           int flags, char *options, const char *dev_name,
-                           const char **path)
+static struct ceph_mount_args *parse_mount_args(int flags, char *options,
+                                               const char *dev_name,
+                                               const char **path)
 {
-       struct ceph_mount_args *args = &client->mount_args;
+       struct ceph_mount_args *args;
        const char *c;
-       int err;
+       int err = -ENOMEM;
        substring_t argstr[MAX_OPT_ARGS];
-       int num_mon;
-       struct ceph_entity_addr mon_addr[CEPH_MAX_MON_MOUNT_ADDR];
-       int i;
 
-       dout("parse_mount_args dev_name '%s'\n", dev_name);
-       memset(args, 0, sizeof(*args));
+       args = kzalloc(sizeof(*args), GFP_KERNEL);
+       if (!args)
+               return ERR_PTR(-ENOMEM);
+       args->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*args->mon_addr),
+                                GFP_KERNEL);
+       if (!args->mon_addr)
+               goto out;
+
+       dout("parse_mount_args %p, dev_name '%s'\n", args, dev_name);
 
        /* start with defaults */
        args->sb_flags = flags;
        args->flags = CEPH_OPT_DEFAULT;
-       args->osd_timeout = 5;    /* seconds */
+       args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
+       args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
        args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
+       args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
        args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
        args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
+       args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
        args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
-       args->cap_release_safety = CEPH_CAPS_PER_RELEASE * 4;
-       args->max_readdir = 1024;
+       args->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
+       args->max_readdir = CEPH_MAX_READDIR_DEFAULT;
+       args->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
+       args->congestion_kb = default_congestion_kb();
 
        /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+       err = -EINVAL;
        if (!dev_name)
-               return -EINVAL;
+               goto out;
        *path = strstr(dev_name, ":/");
        if (*path == NULL) {
                pr_err("device name is missing path (no :/ in %s)\n",
                       dev_name);
-               return -EINVAL;
+               goto out;
        }
 
        /* get mon ip(s) */
-       err = ceph_parse_ips(dev_name, *path, mon_addr,
-                            CEPH_MAX_MON_MOUNT_ADDR, &num_mon);
+       err = ceph_parse_ips(dev_name, *path, args->mon_addr,
+                            CEPH_MAX_MON, &args->num_mon);
        if (err < 0)
-               return err;
-
-       /* build initial monmap */
-       client->monc.monmap = kzalloc(sizeof(*client->monc.monmap) +
-                              num_mon*sizeof(client->monc.monmap->mon_inst[0]),
-                              GFP_KERNEL);
-       if (!client->monc.monmap)
-               return -ENOMEM;
-       for (i = 0; i < num_mon; i++) {
-               client->monc.monmap->mon_inst[i].addr = mon_addr[i];
-               client->monc.monmap->mon_inst[i].addr.erank = 0;
-               client->monc.monmap->mon_inst[i].addr.nonce = 0;
-               client->monc.monmap->mon_inst[i].name.type =
-                       CEPH_ENTITY_TYPE_MON;
-               client->monc.monmap->mon_inst[i].name.num = cpu_to_le64(i);
-       }
-       client->monc.monmap->num_mon = num_mon;
-       memset(&args->my_addr.in_addr, 0, sizeof(args->my_addr.in_addr));
+               goto out;
 
        /* path on server */
        *path += 2;
@@ -373,20 +448,25 @@ static int parse_mount_args(struct ceph_client *client,
                int token, intval, ret;
                if (!*c)
                        continue;
+               err = -EINVAL;
                token = match_token((char *)c, arg_tokens, argstr);
                if (token < 0) {
                        pr_err("bad mount option at '%s'\n", c);
-                       return -EINVAL;
-
+                       goto out;
                }
-               if (token < Opt_ip) {
+               if (token < Opt_last_int) {
                        ret = match_int(&argstr[0], &intval);
                        if (ret < 0) {
                                pr_err("bad mount option arg (not int) "
                                       "at '%s'\n", c);
                                continue;
                        }
-                       dout("got token %d intval %d\n", token, intval);
+                       dout("got int token %d val %d\n", token, intval);
+               } else if (token > Opt_last_int && token < Opt_last_string) {
+                       dout("got string token %d val %s\n", token,
+                            argstr[0].from);
+               } else {
+                       dout("got token %d\n", token);
                }
                switch (token) {
                case Opt_fsidmajor:
@@ -401,7 +481,7 @@ static int parse_mount_args(struct ceph_client *client,
                                             &args->my_addr,
                                             1, NULL);
                        if (err < 0)
-                               return err;
+                               goto out;
                        args->flags |= CEPH_OPT_MYIP;
                        break;
 
@@ -411,6 +491,11 @@ static int parse_mount_args(struct ceph_client *client,
                                              argstr[0].to-argstr[0].from,
                                              GFP_KERNEL);
                        break;
+               case Opt_name:
+                       args->name = kstrndup(argstr[0].from,
+                                             argstr[0].to-argstr[0].from,
+                                             GFP_KERNEL);
+                       break;
                case Opt_secret:
                        args->secret = kstrndup(argstr[0].from,
                                                argstr[0].to-argstr[0].from,
@@ -427,6 +512,9 @@ static int parse_mount_args(struct ceph_client *client,
                case Opt_osdtimeout:
                        args->osd_timeout = intval;
                        break;
+               case Opt_osdkeepalivetimeout:
+                       args->osd_keepalive_timeout = intval;
+                       break;
                case Opt_mount_timeout:
                        args->mount_timeout = intval;
                        break;
@@ -439,6 +527,12 @@ static int parse_mount_args(struct ceph_client *client,
                case Opt_readdir_max_entries:
                        args->max_readdir = intval;
                        break;
+               case Opt_readdir_max_bytes:
+                       args->max_readdir_bytes = intval;
+                       break;
+               case Opt_congestion_kb:
+                       args->congestion_kb = intval;
+                       break;
 
                case Opt_noshare:
                        args->flags |= CEPH_OPT_NOSHARE;
@@ -467,22 +561,30 @@ static int parse_mount_args(struct ceph_client *client,
                        BUG_ON(token);
                }
        }
+       return args;
 
-       return 0;
+out:
+       kfree(args->mon_addr);
+       kfree(args);
+       return ERR_PTR(err);
 }
 
-static void release_mount_args(struct ceph_mount_args *args)
+static void destroy_mount_args(struct ceph_mount_args *args)
 {
+       dout("destroy_mount_args %p\n", args);
        kfree(args->snapdir_name);
        args->snapdir_name = NULL;
+       kfree(args->name);
+       args->name = NULL;
        kfree(args->secret);
        args->secret = NULL;
+       kfree(args);
 }
 
 /*
  * create a fresh client instance
  */
-static struct ceph_client *ceph_create_client(void)
+static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
 {
        struct ceph_client *client;
        int err = -ENOMEM;
@@ -493,22 +595,25 @@ static struct ceph_client *ceph_create_client(void)
 
        mutex_init(&client->mount_mutex);
 
-       init_waitqueue_head(&client->mount_wq);
+       init_waitqueue_head(&client->auth_wq);
 
        client->sb = NULL;
        client->mount_state = CEPH_MOUNT_MOUNTING;
-       client->whoami = -1;
+       client->mount_args = args;
 
        client->msgr = NULL;
 
-       client->mount_err = 0;
-       client->signed_ticket = NULL;
-       client->signed_ticket_len = 0;
+       client->auth_err = 0;
+       atomic_long_set(&client->writeback_count, 0);
+
+       err = bdi_init(&client->backing_dev_info);
+       if (err < 0)
+               goto fail;
 
        err = -ENOMEM;
        client->wb_wq = create_workqueue("ceph-writeback");
        if (client->wb_wq == NULL)
-               goto fail;
+               goto fail_bdi;
        client->pg_inv_wq = create_singlethread_workqueue("ceph-pg-invalid");
        if (client->pg_inv_wq == NULL)
                goto fail_wb_wq;
@@ -516,24 +621,43 @@ static struct ceph_client *ceph_create_client(void)
        if (client->trunc_wq == NULL)
                goto fail_pg_inv_wq;
 
+       /* set up mempools */
+       err = -ENOMEM;
+       client->wb_pagevec_pool = mempool_create_kmalloc_pool(10,
+                             client->mount_args->wsize >> PAGE_CACHE_SHIFT);
+       if (!client->wb_pagevec_pool)
+               goto fail_trunc_wq;
+
+       /* caps */
+       client->min_caps = args->max_readdir;
+       ceph_adjust_min_caps(client->min_caps);
+
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
        if (err < 0)
-               goto fail_trunc_wq;
+               goto fail_mempool;
        err = ceph_osdc_init(&client->osdc, client);
        if (err < 0)
                goto fail_monc;
-       ceph_mdsc_init(&client->mdsc, client);
+       err = ceph_mdsc_init(&client->mdsc, client);
+       if (err < 0)
+               goto fail_osdc;
        return client;
 
+fail_osdc:
+       ceph_osdc_stop(&client->osdc);
 fail_monc:
        ceph_monc_stop(&client->monc);
+fail_mempool:
+       mempool_destroy(client->wb_pagevec_pool);
 fail_trunc_wq:
        destroy_workqueue(client->trunc_wq);
 fail_pg_inv_wq:
        destroy_workqueue(client->pg_inv_wq);
 fail_wb_wq:
        destroy_workqueue(client->wb_wq);
+fail_bdi:
+       bdi_destroy(&client->backing_dev_info);
 fail:
        kfree(client);
        return ERR_PTR(err);
@@ -548,30 +672,53 @@ static void ceph_destroy_client(struct ceph_client *client)
        ceph_monc_stop(&client->monc);
        ceph_osdc_stop(&client->osdc);
 
-       kfree(client->signed_ticket);
+       ceph_adjust_min_caps(-client->min_caps);
 
        ceph_debugfs_client_cleanup(client);
        destroy_workqueue(client->wb_wq);
        destroy_workqueue(client->pg_inv_wq);
        destroy_workqueue(client->trunc_wq);
 
+       bdi_destroy(&client->backing_dev_info);
+
        if (client->msgr)
                ceph_messenger_destroy(client->msgr);
-       if (client->wb_pagevec_pool)
-               mempool_destroy(client->wb_pagevec_pool);
+       mempool_destroy(client->wb_pagevec_pool);
 
-       release_mount_args(&client->mount_args);
+       destroy_mount_args(client->mount_args);
 
        kfree(client);
        dout("destroy_client %p done\n", client);
 }
 
 /*
+ * Initially learn our fsid, or verify an fsid matches.
+ */
+int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
+{
+       if (client->have_fsid) {
+               if (ceph_fsid_compare(&client->fsid, fsid)) {
+                       pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT,
+                              PR_FSID(&client->fsid), PR_FSID(fsid));
+                       return -1;
+               }
+       } else {
+               pr_info("client%lld fsid " FSID_FORMAT "\n",
+                       client->monc.auth->global_id, PR_FSID(fsid));
+               memcpy(&client->fsid, fsid, sizeof(*fsid));
+               ceph_debugfs_client_init(client);
+               client->have_fsid = true;
+       }
+       return 0;
+}
+
+/*
  * true if we have the mon map (and have thus joined the cluster)
  */
-static int have_mon_map(struct ceph_client *client)
+static int have_mon_and_osd_map(struct ceph_client *client)
 {
-       return client->monc.monmap && client->monc.monmap->epoch;
+       return client->monc.monmap && client->monc.monmap->epoch &&
+              client->osdc.osdmap && client->osdc.osdmap->epoch;
 }
 
 /*
@@ -596,7 +743,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
        req->r_ino1.ino = CEPH_INO_ROOT;
        req->r_ino1.snap = CEPH_NOSNAP;
        req->r_started = started;
-       req->r_timeout = client->mount_args.mount_timeout * HZ;
+       req->r_timeout = client->mount_args->mount_timeout * HZ;
        req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
        req->r_num_caps = 2;
        err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -624,7 +771,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 {
        struct ceph_entity_addr *myaddr = NULL;
        int err;
-       unsigned long timeout = client->mount_args.mount_timeout * HZ;
+       unsigned long timeout = client->mount_args->mount_timeout * HZ;
        unsigned long started = jiffies;  /* note the start time */
        struct dentry *root;
 
@@ -634,7 +781,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
        /* initialize the messenger */
        if (client->msgr == NULL) {
                if (ceph_test_opt(client, MYIP))
-                       myaddr = &client->mount_args.my_addr;
+                       myaddr = &client->mount_args->my_addr;
                client->msgr = ceph_messenger_create(myaddr);
                if (IS_ERR(client->msgr)) {
                        err = PTR_ERR(client->msgr);
@@ -644,25 +791,25 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                client->msgr->nocrc = ceph_test_opt(client, NOCRC);
        }
 
-       /* send mount request, and wait for mon, mds, and osd maps */
-       err = ceph_monc_request_mount(&client->monc);
+       /* open session, and wait for mon, mds, and osd maps */
+       err = ceph_monc_open_session(&client->monc);
        if (err < 0)
                goto out;
 
-       while (!have_mon_map(client) && !client->mount_err) {
+       while (!have_mon_and_osd_map(client)) {
                err = -EIO;
                if (timeout && time_after_eq(jiffies, started + timeout))
                        goto out;
 
                /* wait */
-               dout("mount waiting for mount\n");
-               err = wait_event_interruptible_timeout(client->mount_wq,
-                              client->mount_err || have_mon_map(client),
-                              timeout);
+               dout("mount waiting for mon_map\n");
+               err = wait_event_interruptible_timeout(client->auth_wq,
+                      have_mon_and_osd_map(client) || (client->auth_err < 0),
+                      timeout);
                if (err == -EINTR || err == -ERESTARTSYS)
                        goto out;
-               if (client->mount_err) {
-                       err = client->mount_err;
+               if (client->auth_err < 0) {
+                       err = client->auth_err;
                        goto out;
                }
        }
@@ -710,7 +857,7 @@ static int ceph_set_super(struct super_block *s, void *data)
 
        dout("set_super %p data %p\n", s, data);
 
-       s->s_flags = client->mount_args.sb_flags;
+       s->s_flags = client->mount_args->sb_flags;
        s->s_maxbytes = 1ULL << 40;  /* temp value until we get mdsmap */
 
        s->s_fs_info = client;
@@ -739,7 +886,7 @@ fail:
 static int ceph_compare_super(struct super_block *sb, void *data)
 {
        struct ceph_client *new = data;
-       struct ceph_mount_args *args = &new->mount_args;
+       struct ceph_mount_args *args = new->mount_args;
        struct ceph_client *other = ceph_sb_to_client(sb);
        int i;
 
@@ -761,7 +908,7 @@ static int ceph_compare_super(struct super_block *sb, void *data)
                }
                dout("mon ip matches existing sb %p\n", sb);
        }
-       if (args->sb_flags != other->mount_args.sb_flags) {
+       if (args->sb_flags != other->mount_args->sb_flags) {
                dout("flags differ\n");
                return 0;
        }
@@ -771,22 +918,21 @@ static int ceph_compare_super(struct super_block *sb, void *data)
 /*
  * construct our own bdi so we can control readahead, etc.
  */
-static int ceph_init_bdi(struct super_block *sb, struct ceph_client *client)
+static atomic_long_t bdi_seq = ATOMIC_INIT(0);
+
+static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
 {
        int err;
 
-       err = bdi_init(&client->backing_dev_info);
-       if (err < 0)
-               return err;
-       sb->s_bdi = &client->backing_dev_info;
-
        /* set ra_pages based on rsize mount option? */
-       if (client->mount_args.rsize >= PAGE_CACHE_SIZE)
+       if (client->mount_args->rsize >= PAGE_CACHE_SIZE)
                client->backing_dev_info.ra_pages =
-                       (client->mount_args.rsize + PAGE_CACHE_SIZE - 1)
+                       (client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
                        >> PAGE_SHIFT;
-
-       err = bdi_register_dev(&client->backing_dev_info, sb->s_dev);
+       err = bdi_register(&client->backing_dev_info, NULL, "ceph-%d",
+                          atomic_long_inc_return(&bdi_seq));
+       if (!err)
+               sb->s_bdi = &client->backing_dev_info;
        return err;
 }
 
@@ -798,20 +944,24 @@ static int ceph_get_sb(struct file_system_type *fs_type,
        struct ceph_client *client;
        int err;
        int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
-       const char *path;
+       const char *path = NULL;
+       struct ceph_mount_args *args;
 
        dout("ceph_get_sb\n");
+       args = parse_mount_args(flags, data, dev_name, &path);
+       if (IS_ERR(args)) {
+               err = PTR_ERR(args);
+               goto out_final;
+       }
 
        /* create client (which we may/may not use) */
-       client = ceph_create_client();
-       if (IS_ERR(client))
-               return PTR_ERR(client);
-
-       err = parse_mount_args(client, flags, data, dev_name, &path);
-       if (err < 0)
-               goto out;
+       client = ceph_create_client(args);
+       if (IS_ERR(client)) {
+               err = PTR_ERR(client);
+               goto out_final;
+       }
 
-       if (client->mount_args.flags & CEPH_OPT_NOSHARE)
+       if (client->mount_args->flags & CEPH_OPT_NOSHARE)
                compare_super = NULL;
        sb = sget(fs_type, compare_super, ceph_set_super, client);
        if (IS_ERR(sb)) {
@@ -819,21 +969,13 @@ static int ceph_get_sb(struct file_system_type *fs_type,
                goto out;
        }
 
-       if (ceph_client(sb) != client) {
+       if (ceph_sb_to_client(sb) != client) {
                ceph_destroy_client(client);
-               client = ceph_client(sb);
+               client = ceph_sb_to_client(sb);
                dout("get_sb got existing client %p\n", client);
        } else {
                dout("get_sb using new client %p\n", client);
-
-               /* set up mempools */
-               err = -ENOMEM;
-               client->wb_pagevec_pool = mempool_create_kmalloc_pool(10,
-                             client->mount_args.wsize >> PAGE_CACHE_SHIFT);
-               if (!client->wb_pagevec_pool)
-                       goto out_splat;
-
-               err = ceph_init_bdi(sb, client);
+               err = ceph_register_bdi(sb, client);
                if (err < 0)
                        goto out_splat;
        }
@@ -847,8 +989,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
 
 out_splat:
        ceph_mdsc_close_sessions(&client->mdsc);
-       up_write(&sb->s_umount);
-       deactivate_super(sb);
+       deactivate_locked_super(sb);
        goto out_final;
 
 out:
@@ -864,8 +1005,6 @@ static void ceph_kill_sb(struct super_block *s)
        dout("kill_sb %p\n", s);
        ceph_mdsc_pre_umount(&client->mdsc);
        kill_anon_super(s);    /* will call put_super after sb is r/o */
-       bdi_unregister(&client->backing_dev_info);
-       bdi_destroy(&client->backing_dev_info);
        ceph_destroy_client(client);
 }
 
@@ -902,9 +1041,10 @@ static int __init init_ceph(void)
        if (ret)
                goto out_icache;
 
-       pr_info("loaded %d.%d.%d (mon/mds/osd proto %d/%d/%d)\n",
-               CEPH_VERSION_MAJOR, CEPH_VERSION_MINOR, CEPH_VERSION_PATCH,
-               CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL);
+       pr_info("loaded (mon/mds/osd proto %d/%d/%d, osdmap %d/%d %d/%d)\n",
+               CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL,
+               CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
+               CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
        return 0;
 
 out_icache: