#include "ceph_debug.h"
#include <linux/types.h>
+#include <linux/slab.h>
#include <linux/random.h>
#include <linux/sched.h>
#include "mon_client.h"
#include "super.h"
+#include "auth.h"
#include "decode.h"
/*
* resend any outstanding requests.
*/
-const static struct ceph_connection_operations mon_con_ops;
+static const struct ceph_connection_operations mon_con_ops;
+
+static int __validate_auth(struct ceph_mon_client *monc);
/*
* Decode a monmap blob (e.g., during mount).
struct ceph_fsid fsid;
u32 epoch, num_mon;
u16 version;
+ u32 len;
+
+ ceph_decode_32_safe(&p, end, len, bad);
+ ceph_decode_need(&p, end, len, bad);
dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
m->epoch = epoch;
m->num_mon = num_mon;
ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
+ for (i = 0; i < num_mon; i++)
+ ceph_decode_addr(&m->mon_inst[i].addr);
dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
m->num_mon);
int i;
for (i = 0; i < m->num_mon; i++)
- if (ceph_entity_addr_equal(addr, &m->mon_inst[i].addr))
+ if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
return 1;
return 0;
}
/*
+ * Send an auth request.
+ */
+static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
+{
+ monc->pending_auth = 1;
+ monc->m_auth->front.iov_len = len;
+ monc->m_auth->hdr.front_len = cpu_to_le32(len);
+ ceph_con_revoke(monc->con, monc->m_auth);
+ ceph_msg_get(monc->m_auth); /* keep our ref */
+ ceph_con_send(monc->con, monc->m_auth);
+}
+
+/*
* Close monitor session, if any.
*/
static void __close_session(struct ceph_mon_client *monc)
{
if (monc->con) {
dout("__close_session closing mon%d\n", monc->cur_mon);
+ ceph_con_revoke(monc->con, monc->m_auth);
ceph_con_close(monc->con);
monc->cur_mon = -1;
+ monc->pending_auth = 0;
+ ceph_auth_reset(monc->auth);
}
}
static int __open_session(struct ceph_mon_client *monc)
{
char r;
+ int ret;
if (monc->cur_mon < 0) {
get_random_bytes(&r, 1);
monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
ceph_con_open(monc->con,
&monc->monmap->mon_inst[monc->cur_mon].addr);
+
+ /* initiatiate authentication handshake */
+ ret = ceph_auth_build_hello(monc->auth,
+ monc->m_auth->front.iov_base,
+ monc->m_auth->front_max);
+ __send_prepared_auth_request(monc, ret);
} else {
dout("open_session mon%d already open\n", monc->cur_mon);
}
{
unsigned delay;
- if (monc->cur_mon < 0 || monc->want_mount || __sub_expired(monc))
+ if (monc->cur_mon < 0 || __sub_expired(monc))
delay = 10 * HZ;
else
delay = 20 * HZ;
monc->want_next_osdmap);
if ((__sub_expired(monc) && !monc->sub_sent) ||
monc->want_next_osdmap == 1) {
- struct ceph_msg *msg;
+ struct ceph_msg *msg = monc->m_subscribe;
struct ceph_mon_subscribe_item *i;
void *p, *end;
- msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, NULL);
- if (!msg)
- return;
-
p = msg->front.iov_base;
- end = p + msg->front.iov_len;
+ end = p + msg->front_max;
dout("__send_subscribe to 'mdsmap' %u+\n",
(unsigned)monc->have_mdsmap);
if (monc->want_next_osdmap) {
dout("__send_subscribe to 'osdmap' %u\n",
(unsigned)monc->have_osdmap);
- ceph_encode_32(&p, 2);
+ ceph_encode_32(&p, 3);
ceph_encode_string(&p, end, "osdmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_osdmap);
p += sizeof(*i);
monc->want_next_osdmap = 2; /* requested */
} else {
- ceph_encode_32(&p, 1);
+ ceph_encode_32(&p, 2);
}
ceph_encode_string(&p, end, "mdsmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_mdsmap);
i->onetime = 0;
p += sizeof(*i);
+ ceph_encode_string(&p, end, "monmap", 6);
+ i = p;
+ i->have = 0;
+ i->onetime = 0;
+ p += sizeof(*i);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_con_send(monc->con, msg);
+ ceph_con_revoke(monc->con, msg);
+ ceph_con_send(monc->con, ceph_msg_get(msg));
monc->sub_sent = jiffies | 1; /* never 0 */
}
return;
bad:
pr_err("got corrupt subscribe-ack msg\n");
+ ceph_msg_dump(msg);
}
/*
mutex_unlock(&monc->mutex);
}
-
/*
- * mount
+ *
*/
-static void __request_mount(struct ceph_mon_client *monc)
-{
- struct ceph_msg *msg;
- struct ceph_client_mount *h;
- int err;
-
- dout("__request_mount\n");
- err = __open_session(monc);
- if (err)
- return;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
- if (IS_ERR(msg))
- return;
- h = msg->front.iov_base;
- h->monhdr.have_version = 0;
- h->monhdr.session_mon = cpu_to_le16(-1);
- h->monhdr.session_mon_tid = 0;
- ceph_con_send(monc->con, msg);
-}
-
-int ceph_monc_request_mount(struct ceph_mon_client *monc)
+int ceph_monc_open_session(struct ceph_mon_client *monc)
{
if (!monc->con) {
monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
}
mutex_lock(&monc->mutex);
- __request_mount(monc);
+ __open_session(monc);
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
return 0;
* The monitor responds with mount ack indicate mount success. The
* included client ticket allows the client to talk to MDSs and OSDs.
*/
-static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
+static void ceph_monc_handle_map(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
{
struct ceph_client *client = monc->client;
struct ceph_monmap *monmap = NULL, *old = monc->monmap;
void *p, *end;
- s32 result;
- u32 len;
- s64 cnum;
- int err = -EINVAL;
-
- if (client->whoami >= 0) {
- dout("handle_mount_ack - already mounted\n");
- return;
- }
mutex_lock(&monc->mutex);
- dout("handle_mount_ack\n");
+ dout("handle_monmap\n");
p = msg->front.iov_base;
end = p + msg->front.iov_len;
- ceph_decode_64_safe(&p, end, cnum, bad);
- ceph_decode_32_safe(&p, end, result, bad);
- ceph_decode_32_safe(&p, end, len, bad);
- if (result) {
- pr_err("mount denied: %.*s (%d)\n", len, (char *)p,
- result);
- err = result;
- goto out;
- }
- p += len;
-
- ceph_decode_32_safe(&p, end, len, bad);
- ceph_decode_need(&p, end, len, bad);
- monmap = ceph_monmap_decode(p, p + len);
+ monmap = ceph_monmap_decode(p, end);
if (IS_ERR(monmap)) {
pr_err("problem decoding monmap, %d\n",
(int)PTR_ERR(monmap));
- err = -EINVAL;
goto out;
}
- p += len;
+
+ if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
+ kfree(monmap);
+ goto out;
+ }
client->monc.monmap = monmap;
kfree(old);
- client->signed_ticket = NULL;
- client->signed_ticket_len = 0;
+out:
+ mutex_unlock(&monc->mutex);
+ wake_up(&client->auth_wq);
+}
- monc->want_mount = false;
+/*
+ * statfs
+ */
+static struct ceph_mon_generic_request *__lookup_generic_req(
+ struct ceph_mon_client *monc, u64 tid)
+{
+ struct ceph_mon_generic_request *req;
+ struct rb_node *n = monc->generic_request_tree.rb_node;
+
+ while (n) {
+ req = rb_entry(n, struct ceph_mon_generic_request, node);
+ if (tid < req->tid)
+ n = n->rb_left;
+ else if (tid > req->tid)
+ n = n->rb_right;
+ else
+ return req;
+ }
+ return NULL;
+}
- client->whoami = cnum;
- client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
- client->msgr->inst.name.num = cpu_to_le64(cnum);
- pr_info("client%lld fsid " FSID_FORMAT "\n",
- client->whoami, PR_FSID(&client->monc.monmap->fsid));
+static void __insert_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *new)
+{
+ struct rb_node **p = &monc->generic_request_tree.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_mon_generic_request *req = NULL;
+
+ while (*p) {
+ parent = *p;
+ req = rb_entry(parent, struct ceph_mon_generic_request, node);
+ if (new->tid < req->tid)
+ p = &(*p)->rb_left;
+ else if (new->tid > req->tid)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
- ceph_debugfs_client_init(client);
- __send_subscribe(monc);
+ rb_link_node(&new->node, parent, p);
+ rb_insert_color(&new->node, &monc->generic_request_tree);
+}
- err = 0;
- goto out;
+static void release_generic_request(struct kref *kref)
+{
+ struct ceph_mon_generic_request *req =
+ container_of(kref, struct ceph_mon_generic_request, kref);
-bad:
- pr_err("error decoding mount_ack message\n");
-out:
- client->mount_err = err;
- mutex_unlock(&monc->mutex);
- wake_up(&client->mount_wq);
+ if (req->reply)
+ ceph_msg_put(req->reply);
+ if (req->request)
+ ceph_msg_put(req->request);
}
+static void put_generic_request(struct ceph_mon_generic_request *req)
+{
+ kref_put(&req->kref, release_generic_request);
+}
+static void get_generic_request(struct ceph_mon_generic_request *req)
+{
+ kref_get(&req->kref);
+}
+static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip)
+{
+ struct ceph_mon_client *monc = con->private;
+ struct ceph_mon_generic_request *req;
+ u64 tid = le64_to_cpu(hdr->tid);
+ struct ceph_msg *m;
+
+ mutex_lock(&monc->mutex);
+ req = __lookup_generic_req(monc, tid);
+ if (!req) {
+ dout("get_generic_reply %lld dne\n", tid);
+ *skip = 1;
+ m = NULL;
+ } else {
+ dout("get_generic_reply %lld got %p\n", tid, req->reply);
+ m = ceph_msg_get(req->reply);
+ /*
+ * we don't need to track the connection reading into
+ * this reply because we only have one open connection
+ * at a time, ever.
+ */
+ }
+ mutex_unlock(&monc->mutex);
+ return m;
+}
-/*
- * statfs
- */
static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
- struct ceph_mon_statfs_request *req;
+ struct ceph_mon_generic_request *req;
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
- u64 tid;
+ u64 tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len != sizeof(*reply))
goto bad;
- tid = le64_to_cpu(reply->tid);
dout("handle_statfs_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
- req = radix_tree_lookup(&monc->statfs_request_tree, tid);
+ req = __lookup_generic_req(monc, tid);
if (req) {
- *req->buf = reply->st;
+ *(struct ceph_statfs *)req->buf = reply->st;
req->result = 0;
+ get_generic_request(req);
}
mutex_unlock(&monc->mutex);
- if (req)
+ if (req) {
complete(&req->completion);
+ put_generic_request(req);
+ }
return;
bad:
- pr_err("corrupt statfs reply, no tid\n");
+ pr_err("corrupt generic reply, no tid\n");
+ ceph_msg_dump(msg);
}
/*
- * (re)send a statfs request
+ * Do a synchronous statfs().
*/
-static int send_statfs(struct ceph_mon_client *monc,
- struct ceph_mon_statfs_request *req)
+int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
{
- struct ceph_msg *msg;
+ struct ceph_mon_generic_request *req;
struct ceph_mon_statfs *h;
int err;
- dout("send_statfs tid %llu\n", req->tid);
- err = __open_session(monc);
- if (err)
- return err;
- msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
- if (IS_ERR(msg))
- return PTR_ERR(msg);
- req->request = msg;
- h = msg->front.iov_base;
+ req = kzalloc(sizeof(*req), GFP_NOFS);
+ if (!req)
+ return -ENOMEM;
+
+ kref_init(&req->kref);
+ req->buf = buf;
+ init_completion(&req->completion);
+
+ err = -ENOMEM;
+ req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
+ if (!req->request)
+ goto out;
+ req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
+ if (!req->reply)
+ goto out;
+
+ /* fill out request */
+ h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
- h->tid = cpu_to_le64(req->tid);
- ceph_con_send(monc->con, msg);
- return 0;
-}
-
-/*
- * Do a synchronous statfs().
- */
-int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
-{
- struct ceph_mon_statfs_request req;
- int err;
-
- req.buf = buf;
- init_completion(&req.completion);
-
- /* allocate memory for reply */
- err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
- if (err)
- return err;
/* register request */
mutex_lock(&monc->mutex);
- req.tid = ++monc->last_tid;
- req.last_attempt = jiffies;
- req.delay = BASE_DELAY_INTERVAL;
- if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) {
- mutex_unlock(&monc->mutex);
- pr_err("ENOMEM in do_statfs\n");
- return -ENOMEM;
- }
- monc->num_statfs_requests++;
+ req->tid = ++monc->last_tid;
+ req->request->hdr.tid = cpu_to_le64(req->tid);
+ __insert_generic_request(monc, req);
+ monc->num_generic_requests++;
mutex_unlock(&monc->mutex);
/* send request and wait */
- err = send_statfs(monc, &req);
- if (!err)
- err = wait_for_completion_interruptible(&req.completion);
+ ceph_con_send(monc->con, ceph_msg_get(req->request));
+ err = wait_for_completion_interruptible(&req->completion);
mutex_lock(&monc->mutex);
- radix_tree_delete(&monc->statfs_request_tree, req.tid);
- monc->num_statfs_requests--;
- ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
+ rb_erase(&req->node, &monc->generic_request_tree);
+ monc->num_generic_requests--;
mutex_unlock(&monc->mutex);
if (!err)
- err = req.result;
+ err = req->result;
+
+out:
+ kref_put(&req->kref, release_generic_request);
return err;
}
/*
* Resend pending statfs requests.
*/
-static void __resend_statfs(struct ceph_mon_client *monc)
-{
- u64 next_tid = 0;
- int got;
- int did = 0;
- struct ceph_mon_statfs_request *req;
-
- while (1) {
- got = radix_tree_gang_lookup(&monc->statfs_request_tree,
- (void **)&req,
- next_tid, 1);
- if (got == 0)
- break;
- did++;
- next_tid = req->tid + 1;
-
- send_statfs(monc, req);
+static void __resend_generic_request(struct ceph_mon_client *monc)
+{
+ struct ceph_mon_generic_request *req;
+ struct rb_node *p;
+
+ for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
+ req = rb_entry(p, struct ceph_mon_generic_request, node);
+ ceph_con_revoke(monc->con, req->request);
+ ceph_con_send(monc->con, ceph_msg_get(req->request));
}
}
dout("monc delayed_work\n");
mutex_lock(&monc->mutex);
- if (monc->want_mount) {
- __request_mount(monc);
+ if (monc->hunting) {
+ __close_session(monc);
+ __open_session(monc); /* continue hunting */
} else {
- if (monc->hunting) {
- __close_session(monc);
- __open_session(monc); /* continue hunting */
- } else {
- ceph_con_keepalive(monc->con);
- }
+ ceph_con_keepalive(monc->con);
+
+ __validate_auth(monc);
+
+ if (monc->auth->ops->is_authenticated(monc->auth))
+ __send_subscribe(monc);
}
- __send_subscribe(monc);
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
}
+/*
+ * On startup, we build a temporary monmap populated with the IPs
+ * provided by mount(2).
+ */
+static int build_initial_monmap(struct ceph_mon_client *monc)
+{
+ struct ceph_mount_args *args = monc->client->mount_args;
+ struct ceph_entity_addr *mon_addr = args->mon_addr;
+ int num_mon = args->num_mon;
+ int i;
+
+ /* build initial monmap */
+ monc->monmap = kzalloc(sizeof(*monc->monmap) +
+ num_mon*sizeof(monc->monmap->mon_inst[0]),
+ GFP_KERNEL);
+ if (!monc->monmap)
+ return -ENOMEM;
+ for (i = 0; i < num_mon; i++) {
+ monc->monmap->mon_inst[i].addr = mon_addr[i];
+ monc->monmap->mon_inst[i].addr.nonce = 0;
+ monc->monmap->mon_inst[i].name.type =
+ CEPH_ENTITY_TYPE_MON;
+ monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
+ }
+ monc->monmap->num_mon = num_mon;
+ monc->have_fsid = false;
+
+ /* release addr memory */
+ kfree(args->mon_addr);
+ args->mon_addr = NULL;
+ args->num_mon = 0;
+ return 0;
+}
+
int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
{
int err = 0;
monc->monmap = NULL;
mutex_init(&monc->mutex);
+ err = build_initial_monmap(monc);
+ if (err)
+ goto out;
+
monc->con = NULL;
- /* msg pools */
- err = ceph_msgpool_init(&monc->msgpool_mount_ack, 4096, 1, false);
- if (err < 0)
- goto out;
- err = ceph_msgpool_init(&monc->msgpool_subscribe_ack,
- sizeof(struct ceph_mon_subscribe_ack), 1, false);
- if (err < 0)
- goto out;
- err = ceph_msgpool_init(&monc->msgpool_statfs_reply,
- sizeof(struct ceph_mon_statfs_reply), 0, false);
- if (err < 0)
- goto out;
+ /* authentication */
+ monc->auth = ceph_auth_init(cl->mount_args->name,
+ cl->mount_args->secret);
+ if (IS_ERR(monc->auth))
+ return PTR_ERR(monc->auth);
+ monc->auth->want_keys =
+ CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
+ CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
+
+ /* msgs */
+ err = -ENOMEM;
+ monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
+ sizeof(struct ceph_mon_subscribe_ack),
+ GFP_NOFS);
+ if (!monc->m_subscribe_ack)
+ goto out_monmap;
+
+ monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
+ if (!monc->m_subscribe)
+ goto out_subscribe_ack;
+
+ monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
+ if (!monc->m_auth_reply)
+ goto out_subscribe;
+
+ monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
+ monc->pending_auth = 0;
+ if (!monc->m_auth)
+ goto out_auth_reply;
monc->cur_mon = -1;
- monc->hunting = false; /* not really */
+ monc->hunting = true;
monc->sub_renew_after = jiffies;
monc->sub_sent = 0;
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
- INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
- monc->num_statfs_requests = 0;
+ monc->generic_request_tree = RB_ROOT;
+ monc->num_generic_requests = 0;
monc->last_tid = 0;
monc->have_mdsmap = 0;
monc->have_osdmap = 0;
monc->want_next_osdmap = 1;
- monc->want_mount = true;
+ return 0;
+
+out_auth_reply:
+ ceph_msg_put(monc->m_auth_reply);
+out_subscribe:
+ ceph_msg_put(monc->m_subscribe);
+out_subscribe_ack:
+ ceph_msg_put(monc->m_subscribe_ack);
+out_monmap:
+ kfree(monc->monmap);
out:
return err;
}
}
mutex_unlock(&monc->mutex);
- ceph_msgpool_destroy(&monc->msgpool_mount_ack);
- ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
- ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
+ ceph_auth_destroy(monc->auth);
+
+ ceph_msg_put(monc->m_auth);
+ ceph_msg_put(monc->m_auth_reply);
+ ceph_msg_put(monc->m_subscribe);
+ ceph_msg_put(monc->m_subscribe_ack);
kfree(monc->monmap);
}
+static void handle_auth_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ monc->pending_auth = 0;
+ ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
+ msg->front.iov_len,
+ monc->m_auth->front.iov_base,
+ monc->m_auth->front_max);
+ if (ret < 0) {
+ monc->client->auth_err = ret;
+ wake_up(&monc->client->auth_wq);
+ } else if (ret > 0) {
+ __send_prepared_auth_request(monc, ret);
+ } else if (monc->auth->ops->is_authenticated(monc->auth)) {
+ dout("authenticated, starting session\n");
+
+ monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
+ monc->client->msgr->inst.name.num = monc->auth->global_id;
+
+ __send_subscribe(monc);
+ __resend_generic_request(monc);
+ }
+ mutex_unlock(&monc->mutex);
+}
+
+static int __validate_auth(struct ceph_mon_client *monc)
+{
+ int ret;
+
+ if (monc->pending_auth)
+ return 0;
+
+ ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
+ monc->m_auth->front_max);
+ if (ret <= 0)
+ return ret; /* either an error, or no need to authenticate */
+ __send_prepared_auth_request(monc, ret);
+ return 0;
+}
+
+int ceph_monc_validate_auth(struct ceph_mon_client *monc)
+{
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ ret = __validate_auth(monc);
+ mutex_unlock(&monc->mutex);
+ return ret;
+}
/*
* handle incoming message
return;
switch (type) {
- case CEPH_MSG_CLIENT_MOUNT_ACK:
- handle_mount_ack(monc, msg);
+ case CEPH_MSG_AUTH_REPLY:
+ handle_auth_reply(monc, msg);
break;
case CEPH_MSG_MON_SUBSCRIBE_ACK:
handle_statfs_reply(monc, msg);
break;
+ case CEPH_MSG_MON_MAP:
+ ceph_monc_handle_map(monc, msg);
+ break;
+
case CEPH_MSG_MDS_MAP:
ceph_mdsc_handle_map(&monc->client->mdsc, msg);
break;
* Allocate memory for incoming message
*/
static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr)
+ struct ceph_msg_header *hdr,
+ int *skip)
{
struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(hdr->type);
- int front = le32_to_cpu(hdr->front_len);
+ int front_len = le32_to_cpu(hdr->front_len);
+ struct ceph_msg *m = NULL;
+
+ *skip = 0;
switch (type) {
- case CEPH_MSG_CLIENT_MOUNT_ACK:
- return ceph_msgpool_get(&monc->msgpool_mount_ack, front);
case CEPH_MSG_MON_SUBSCRIBE_ACK:
- return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
+ m = ceph_msg_get(monc->m_subscribe_ack);
+ break;
case CEPH_MSG_STATFS_REPLY:
- return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
+ return get_generic_reply(con, hdr, skip);
+ case CEPH_MSG_AUTH_REPLY:
+ m = ceph_msg_get(monc->m_auth_reply);
+ break;
+ case CEPH_MSG_MON_MAP:
+ case CEPH_MSG_MDS_MAP:
+ case CEPH_MSG_OSD_MAP:
+ m = ceph_msg_new(type, front_len, GFP_NOFS);
+ break;
}
- return ceph_alloc_msg(con, hdr);
+
+ if (!m) {
+ pr_info("alloc_msg unknown type %d\n", type);
+ *skip = 1;
+ }
+ return m;
}
/*
if (!monc->hunting) {
/* start hunting */
monc->hunting = true;
- if (__open_session(monc) == 0) {
- __send_subscribe(monc);
- __resend_statfs(monc);
- }
+ __open_session(monc);
} else {
/* already hunting, let's wait a bit */
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
}
-const static struct ceph_connection_operations mon_con_ops = {
+static const struct ceph_connection_operations mon_con_ops = {
.get = ceph_con_get,
.put = ceph_con_put,
.dispatch = dispatch,
.fault = mon_fault,
.alloc_msg = mon_alloc_msg,
- .alloc_middle = ceph_alloc_middle,
};