fs/minix: bugfix, number of indirect block ptrs per block depends on block size
[safe/jmp/linux-2.6] / fs / ceph / mon_client.c
index 775a9c0..f6510a4 100644 (file)
@@ -1,6 +1,7 @@
 #include "ceph_debug.h"
 
 #include <linux/types.h>
+#include <linux/slab.h>
 #include <linux/random.h>
 #include <linux/sched.h>
 
@@ -27,7 +28,9 @@
  * resend any outstanding requests.
  */
 
-const static struct ceph_connection_operations mon_con_ops;
+static const struct ceph_connection_operations mon_con_ops;
+
+static int __validate_auth(struct ceph_mon_client *monc);
 
 /*
  * Decode a monmap blob (e.g., during mount).
@@ -88,12 +91,25 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
        int i;
 
        for (i = 0; i < m->num_mon; i++)
-               if (ceph_entity_addr_equal(addr, &m->mon_inst[i].addr))
+               if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
                        return 1;
        return 0;
 }
 
 /*
+ * Send an auth request.
+ */
+static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
+{
+       monc->pending_auth = 1;
+       monc->m_auth->front.iov_len = len;
+       monc->m_auth->hdr.front_len = cpu_to_le32(len);
+       ceph_con_revoke(monc->con, monc->m_auth);
+       ceph_msg_get(monc->m_auth);  /* keep our ref */
+       ceph_con_send(monc->con, monc->m_auth);
+}
+
+/*
  * Close monitor session, if any.
  */
 static void __close_session(struct ceph_mon_client *monc)
@@ -103,6 +119,7 @@ static void __close_session(struct ceph_mon_client *monc)
                ceph_con_revoke(monc->con, monc->m_auth);
                ceph_con_close(monc->con);
                monc->cur_mon = -1;
+               monc->pending_auth = 0;
                ceph_auth_reset(monc->auth);
        }
 }
@@ -134,10 +151,7 @@ static int __open_session(struct ceph_mon_client *monc)
                ret = ceph_auth_build_hello(monc->auth,
                                            monc->m_auth->front.iov_base,
                                            monc->m_auth->front_max);
-               monc->m_auth->front.iov_len = ret;
-               monc->m_auth->hdr.front_len = cpu_to_le32(ret);
-               ceph_msg_get(monc->m_auth);  /* keep our ref */
-               ceph_con_send(monc->con, monc->m_auth);
+               __send_prepared_auth_request(monc, ret);
        } else {
                dout("open_session mon%d already open\n", monc->cur_mon);
        }
@@ -174,16 +188,12 @@ static void __send_subscribe(struct ceph_mon_client *monc)
             monc->want_next_osdmap);
        if ((__sub_expired(monc) && !monc->sub_sent) ||
            monc->want_next_osdmap == 1) {
-               struct ceph_msg *msg;
+               struct ceph_msg *msg = monc->m_subscribe;
                struct ceph_mon_subscribe_item *i;
                void *p, *end;
 
-               msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL);
-               if (!msg)
-                       return;
-
                p = msg->front.iov_base;
-               end = p + msg->front.iov_len;
+               end = p + msg->front_max;
 
                dout("__send_subscribe to 'mdsmap' %u+\n",
                     (unsigned)monc->have_mdsmap);
@@ -213,7 +223,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
 
                msg->front.iov_len = p - msg->front.iov_base;
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-               ceph_con_send(monc->con, msg);
+               ceph_con_revoke(monc->con, msg);
+               ceph_con_send(monc->con, ceph_msg_get(msg));
 
                monc->sub_sent = jiffies | 1;  /* never 0 */
        }
@@ -334,129 +345,200 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 
 out:
        mutex_unlock(&monc->mutex);
-       wake_up(&client->mount_wq);
+       wake_up(&client->auth_wq);
 }
 
 /*
  * statfs
  */
+static struct ceph_mon_generic_request *__lookup_generic_req(
+       struct ceph_mon_client *monc, u64 tid)
+{
+       struct ceph_mon_generic_request *req;
+       struct rb_node *n = monc->generic_request_tree.rb_node;
+
+       while (n) {
+               req = rb_entry(n, struct ceph_mon_generic_request, node);
+               if (tid < req->tid)
+                       n = n->rb_left;
+               else if (tid > req->tid)
+                       n = n->rb_right;
+               else
+                       return req;
+       }
+       return NULL;
+}
+
+static void __insert_generic_request(struct ceph_mon_client *monc,
+                           struct ceph_mon_generic_request *new)
+{
+       struct rb_node **p = &monc->generic_request_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_mon_generic_request *req = NULL;
+
+       while (*p) {
+               parent = *p;
+               req = rb_entry(parent, struct ceph_mon_generic_request, node);
+               if (new->tid < req->tid)
+                       p = &(*p)->rb_left;
+               else if (new->tid > req->tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, &monc->generic_request_tree);
+}
+
+static void release_generic_request(struct kref *kref)
+{
+       struct ceph_mon_generic_request *req =
+               container_of(kref, struct ceph_mon_generic_request, kref);
+
+       if (req->reply)
+               ceph_msg_put(req->reply);
+       if (req->request)
+               ceph_msg_put(req->request);
+}
+
+static void put_generic_request(struct ceph_mon_generic_request *req)
+{
+       kref_put(&req->kref, release_generic_request);
+}
+
+static void get_generic_request(struct ceph_mon_generic_request *req)
+{
+       kref_get(&req->kref);
+}
+
+static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
+                                        struct ceph_msg_header *hdr,
+                                        int *skip)
+{
+       struct ceph_mon_client *monc = con->private;
+       struct ceph_mon_generic_request *req;
+       u64 tid = le64_to_cpu(hdr->tid);
+       struct ceph_msg *m;
+
+       mutex_lock(&monc->mutex);
+       req = __lookup_generic_req(monc, tid);
+       if (!req) {
+               dout("get_generic_reply %lld dne\n", tid);
+               *skip = 1;
+               m = NULL;
+       } else {
+               dout("get_generic_reply %lld got %p\n", tid, req->reply);
+               m = ceph_msg_get(req->reply);
+               /*
+                * we don't need to track the connection reading into
+                * this reply because we only have one open connection
+                * at a time, ever.
+                */
+       }
+       mutex_unlock(&monc->mutex);
+       return m;
+}
+
 static void handle_statfs_reply(struct ceph_mon_client *monc,
                                struct ceph_msg *msg)
 {
-       struct ceph_mon_statfs_request *req;
+       struct ceph_mon_generic_request *req;
        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
-       u64 tid;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
 
        if (msg->front.iov_len != sizeof(*reply))
                goto bad;
-       tid = le64_to_cpu(reply->tid);
        dout("handle_statfs_reply %p tid %llu\n", msg, tid);
 
        mutex_lock(&monc->mutex);
-       req = radix_tree_lookup(&monc->statfs_request_tree, tid);
+       req = __lookup_generic_req(monc, tid);
        if (req) {
-               *req->buf = reply->st;
+               *(struct ceph_statfs *)req->buf = reply->st;
                req->result = 0;
+               get_generic_request(req);
        }
        mutex_unlock(&monc->mutex);
-       if (req)
+       if (req) {
                complete(&req->completion);
+               put_generic_request(req);
+       }
        return;
 
 bad:
-       pr_err("corrupt statfs reply, no tid\n");
+       pr_err("corrupt generic reply, no tid\n");
        ceph_msg_dump(msg);
 }
 
 /*
- * (re)send a statfs request
+ * Do a synchronous statfs().
  */
-static int send_statfs(struct ceph_mon_client *monc,
-                      struct ceph_mon_statfs_request *req)
+int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 {
-       struct ceph_msg *msg;
+       struct ceph_mon_generic_request *req;
        struct ceph_mon_statfs *h;
+       int err;
+
+       req = kzalloc(sizeof(*req), GFP_NOFS);
+       if (!req)
+               return -ENOMEM;
 
-       dout("send_statfs tid %llu\n", req->tid);
-       msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
-       if (IS_ERR(msg))
-               return PTR_ERR(msg);
-       req->request = msg;
-       h = msg->front.iov_base;
+       kref_init(&req->kref);
+       req->buf = buf;
+       init_completion(&req->completion);
+
+       err = -ENOMEM;
+       req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
+       if (!req->request)
+               goto out;
+       req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
+       if (!req->reply)
+               goto out;
+
+       /* fill out request */
+       h = req->request->front.iov_base;
        h->monhdr.have_version = 0;
        h->monhdr.session_mon = cpu_to_le16(-1);
        h->monhdr.session_mon_tid = 0;
        h->fsid = monc->monmap->fsid;
-       h->tid = cpu_to_le64(req->tid);
-       ceph_con_send(monc->con, msg);
-       return 0;
-}
-
-/*
- * Do a synchronous statfs().
- */
-int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
-{
-       struct ceph_mon_statfs_request req;
-       int err;
-
-       req.buf = buf;
-       init_completion(&req.completion);
-
-       /* allocate memory for reply */
-       err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
-       if (err)
-               return err;
 
        /* register request */
        mutex_lock(&monc->mutex);
-       req.tid = ++monc->last_tid;
-       req.last_attempt = jiffies;
-       req.delay = BASE_DELAY_INTERVAL;
-       if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) {
-               mutex_unlock(&monc->mutex);
-               pr_err("ENOMEM in do_statfs\n");
-               return -ENOMEM;
-       }
-       monc->num_statfs_requests++;
+       req->tid = ++monc->last_tid;
+       req->request->hdr.tid = cpu_to_le64(req->tid);
+       __insert_generic_request(monc, req);
+       monc->num_generic_requests++;
        mutex_unlock(&monc->mutex);
 
        /* send request and wait */
-       err = send_statfs(monc, &req);
-       if (!err)
-               err = wait_for_completion_interruptible(&req.completion);
+       ceph_con_send(monc->con, ceph_msg_get(req->request));
+       err = wait_for_completion_interruptible(&req->completion);
 
        mutex_lock(&monc->mutex);
-       radix_tree_delete(&monc->statfs_request_tree, req.tid);
-       monc->num_statfs_requests--;
-       ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
+       rb_erase(&req->node, &monc->generic_request_tree);
+       monc->num_generic_requests--;
        mutex_unlock(&monc->mutex);
 
        if (!err)
-               err = req.result;
+               err = req->result;
+
+out:
+       kref_put(&req->kref, release_generic_request);
        return err;
 }
 
 /*
  * Resend pending statfs requests.
  */
-static void __resend_statfs(struct ceph_mon_client *monc)
-{
-       u64 next_tid = 0;
-       int got;
-       int did = 0;
-       struct ceph_mon_statfs_request *req;
-
-       while (1) {
-               got = radix_tree_gang_lookup(&monc->statfs_request_tree,
-                                            (void **)&req,
-                                            next_tid, 1);
-               if (got == 0)
-                       break;
-               did++;
-               next_tid = req->tid + 1;
-
-               send_statfs(monc, req);
+static void __resend_generic_request(struct ceph_mon_client *monc)
+{
+       struct ceph_mon_generic_request *req;
+       struct rb_node *p;
+
+       for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_mon_generic_request, node);
+               ceph_con_revoke(monc->con, req->request);
+               ceph_con_send(monc->con, ceph_msg_get(req->request));
        }
 }
 
@@ -477,6 +559,9 @@ static void delayed_work(struct work_struct *work)
                __open_session(monc);  /* continue hunting */
        } else {
                ceph_con_keepalive(monc->con);
+
+               __validate_auth(monc);
+
                if (monc->auth->ops->is_authenticated(monc->auth))
                        __send_subscribe(monc);
        }
@@ -503,7 +588,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
                return -ENOMEM;
        for (i = 0; i < num_mon; i++) {
                monc->monmap->mon_inst[i].addr = mon_addr[i];
-               monc->monmap->mon_inst[i].addr.erank = 0;
                monc->monmap->mon_inst[i].addr.nonce = 0;
                monc->monmap->mon_inst[i].name.type =
                        CEPH_ENTITY_TYPE_MON;
@@ -544,25 +628,26 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
 
-       /* msg pools */
-       err = ceph_msgpool_init(&monc->msgpool_subscribe_ack,
-                              sizeof(struct ceph_mon_subscribe_ack), 1, false);
-       if (err < 0)
+       /* msgs */
+       err = -ENOMEM;
+       monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
+                                    sizeof(struct ceph_mon_subscribe_ack),
+                                    GFP_NOFS);
+       if (!monc->m_subscribe_ack)
                goto out_monmap;
-       err = ceph_msgpool_init(&monc->msgpool_statfs_reply,
-                               sizeof(struct ceph_mon_statfs_reply), 0, false);
-       if (err < 0)
-               goto out_pool1;
-       err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false);
-       if (err < 0)
-               goto out_pool2;
-
-       monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL);
-       if (IS_ERR(monc->m_auth)) {
-               err = PTR_ERR(monc->m_auth);
-               monc->m_auth = NULL;
-               goto out_pool3;
-       }
+
+       monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
+       if (!monc->m_subscribe)
+               goto out_subscribe_ack;
+
+       monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
+       if (!monc->m_auth_reply)
+               goto out_subscribe;
+
+       monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
+       monc->pending_auth = 0;
+       if (!monc->m_auth)
+               goto out_auth_reply;
 
        monc->cur_mon = -1;
        monc->hunting = true;
@@ -570,8 +655,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        monc->sub_sent = 0;
 
        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
-       INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
-       monc->num_statfs_requests = 0;
+       monc->generic_request_tree = RB_ROOT;
+       monc->num_generic_requests = 0;
        monc->last_tid = 0;
 
        monc->have_mdsmap = 0;
@@ -579,12 +664,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        monc->want_next_osdmap = 1;
        return 0;
 
-out_pool3:
-       ceph_msgpool_destroy(&monc->msgpool_auth_reply);
-out_pool2:
-       ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
-out_pool1:
-       ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
+out_auth_reply:
+       ceph_msg_put(monc->m_auth_reply);
+out_subscribe:
+       ceph_msg_put(monc->m_subscribe);
+out_subscribe_ack:
+       ceph_msg_put(monc->m_subscribe_ack);
 out_monmap:
        kfree(monc->monmap);
 out:
@@ -608,32 +693,29 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
        ceph_auth_destroy(monc->auth);
 
        ceph_msg_put(monc->m_auth);
-       ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
-       ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
-       ceph_msgpool_destroy(&monc->msgpool_auth_reply);
+       ceph_msg_put(monc->m_auth_reply);
+       ceph_msg_put(monc->m_subscribe);
+       ceph_msg_put(monc->m_subscribe_ack);
 
        kfree(monc->monmap);
 }
 
-
 static void handle_auth_reply(struct ceph_mon_client *monc,
                              struct ceph_msg *msg)
 {
        int ret;
 
        mutex_lock(&monc->mutex);
+       monc->pending_auth = 0;
        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
                                     msg->front.iov_len,
                                     monc->m_auth->front.iov_base,
                                     monc->m_auth->front_max);
        if (ret < 0) {
-               monc->client->mount_err = ret;
-               wake_up(&monc->client->mount_wq);
+               monc->client->auth_err = ret;
+               wake_up(&monc->client->auth_wq);
        } else if (ret > 0) {
-               monc->m_auth->front.iov_len = ret;
-               monc->m_auth->hdr.front_len = cpu_to_le32(ret);
-               ceph_msg_get(monc->m_auth);  /* keep our ref */
-               ceph_con_send(monc->con, monc->m_auth);
+               __send_prepared_auth_request(monc, ret);
        } else if (monc->auth->ops->is_authenticated(monc->auth)) {
                dout("authenticated, starting session\n");
 
@@ -641,11 +723,36 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
                monc->client->msgr->inst.name.num = monc->auth->global_id;
 
                __send_subscribe(monc);
-               __resend_statfs(monc);
+               __resend_generic_request(monc);
        }
        mutex_unlock(&monc->mutex);
 }
 
+static int __validate_auth(struct ceph_mon_client *monc)
+{
+       int ret;
+
+       if (monc->pending_auth)
+               return 0;
+
+       ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
+                             monc->m_auth->front_max);
+       if (ret <= 0)
+               return ret; /* either an error, or no need to authenticate */
+       __send_prepared_auth_request(monc, ret);
+       return 0;
+}
+
+int ceph_monc_validate_auth(struct ceph_mon_client *monc)
+{
+       int ret;
+
+       mutex_lock(&monc->mutex);
+       ret = __validate_auth(monc);
+       mutex_unlock(&monc->mutex);
+       return ret;
+}
+
 /*
  * handle incoming message
  */
@@ -693,21 +800,37 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  * Allocate memory for incoming message
  */
 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
-                                     struct ceph_msg_header *hdr)
+                                     struct ceph_msg_header *hdr,
+                                     int *skip)
 {
        struct ceph_mon_client *monc = con->private;
        int type = le16_to_cpu(hdr->type);
-       int front = le32_to_cpu(hdr->front_len);
+       int front_len = le32_to_cpu(hdr->front_len);
+       struct ceph_msg *m = NULL;
+
+       *skip = 0;
 
        switch (type) {
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
-               return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
+               m = ceph_msg_get(monc->m_subscribe_ack);
+               break;
        case CEPH_MSG_STATFS_REPLY:
-               return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
+               return get_generic_reply(con, hdr, skip);
        case CEPH_MSG_AUTH_REPLY:
-               return ceph_msgpool_get(&monc->msgpool_auth_reply, front);
+               m = ceph_msg_get(monc->m_auth_reply);
+               break;
+       case CEPH_MSG_MON_MAP:
+       case CEPH_MSG_MDS_MAP:
+       case CEPH_MSG_OSD_MAP:
+               m = ceph_msg_new(type, front_len, GFP_NOFS);
+               break;
        }
-       return ceph_alloc_msg(con, hdr);
+
+       if (!m) {
+               pr_info("alloc_msg unknown type %d\n", type);
+               *skip = 1;
+       }
+       return m;
 }
 
 /*
@@ -744,11 +867,10 @@ out:
        mutex_unlock(&monc->mutex);
 }
 
-const static struct ceph_connection_operations mon_con_ops = {
+static const struct ceph_connection_operations mon_con_ops = {
        .get = ceph_con_get,
        .put = ceph_con_put,
        .dispatch = dispatch,
        .fault = mon_fault,
        .alloc_msg = mon_alloc_msg,
-       .alloc_middle = ceph_alloc_middle,
 };