#include "osd_client.h"
#include "messenger.h"
#include "decode.h"
+#include "auth.h"
+
+#define OSD_OP_FRONT_LEN 4096
+#define OSD_OPREPLY_FRONT_LEN 512
const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *kickosd);
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
}
-
/*
* requests
*/
-void ceph_osdc_put_request(struct ceph_osd_request *req)
+void ceph_osdc_release_request(struct kref *kref)
{
- dout("osdc put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
- atomic_read(&req->r_ref)-1);
- BUG_ON(atomic_read(&req->r_ref) <= 0);
- if (atomic_dec_and_test(&req->r_ref)) {
- if (req->r_request)
- ceph_msg_put(req->r_request);
- if (req->r_reply)
- ceph_msg_put(req->r_reply);
- if (req->r_own_pages)
- ceph_release_page_vector(req->r_pages,
- req->r_num_pages);
- ceph_put_snap_context(req->r_snapc);
- if (req->r_mempool)
- mempool_free(req, req->r_osdc->req_mempool);
- else
- kfree(req);
+ struct ceph_osd_request *req = container_of(kref,
+ struct ceph_osd_request,
+ r_kref);
+
+ if (req->r_request)
+ ceph_msg_put(req->r_request);
+ if (req->r_reply)
+ ceph_msg_put(req->r_reply);
+ if (req->r_con_filling_msg) {
+ dout("release_request revoking pages %p from con %p\n",
+ req->r_pages, req->r_con_filling_msg);
+ ceph_con_revoke_message(req->r_con_filling_msg,
+ req->r_reply);
+ ceph_con_put(req->r_con_filling_msg);
}
+ if (req->r_own_pages)
+ ceph_release_page_vector(req->r_pages,
+ req->r_num_pages);
+ ceph_put_snap_context(req->r_snapc);
+ if (req->r_mempool)
+ mempool_free(req, req->r_osdc->req_mempool);
+ else
+ kfree(req);
}
/*
struct ceph_osd_request_head *head;
struct ceph_osd_op *op;
void *p;
- int do_trunc = truncate_seq && (off + *plen > truncate_size);
- int num_op = 1 + do_sync + do_trunc;
+ int num_op = 1 + do_sync;
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int err, i;
- u64 prevofs;
+ int i;
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
- err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
- if (err) {
- ceph_osdc_put_request(req);
- return ERR_PTR(-ENOMEM);
- }
-
req->r_osdc = osdc;
req->r_mempool = use_mempool;
- atomic_set(&req->r_ref, 1);
+ kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
- /* create message; allow space for oid */
+ /* create reply message */
+ if (use_mempool)
+ msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+ else
+ msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
+ OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
+ if (IS_ERR(msg)) {
+ ceph_osdc_put_request(req);
+ return ERR_PTR(PTR_ERR(msg));
+ }
+ req->r_reply = msg;
+
+ /* create request message; allow space for oid */
msg_size += 40;
if (snapc)
msg_size += sizeof(u64) * snapc->num_snaps;
if (use_mempool)
- msg = ceph_msgpool_get(&osdc->msgpool_op);
+ msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
if (IS_ERR(msg)) {
- ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
ceph_osdc_put_request(req);
return ERR_PTR(PTR_ERR(msg));
}
req->r_request->hdr.data_len = cpu_to_le32(*plen);
op->payload_len = cpu_to_le32(*plen);
}
+ op->extent.truncate_size = cpu_to_le64(truncate_size);
+ op->extent.truncate_seq = cpu_to_le32(truncate_seq);
/* fill in oid */
head->object_len = cpu_to_le32(req->r_oid_len);
memcpy(p, req->r_oid, req->r_oid_len);
p += req->r_oid_len;
- /* additional ops */
- if (do_trunc) {
- op++;
- op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
- CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
- op->trunc.truncate_seq = cpu_to_le32(truncate_seq);
- prevofs = le64_to_cpu((op-1)->extent.offset);
- op->trunc.truncate_size = cpu_to_le64(truncate_size -
- (off-prevofs));
- }
if (do_sync) {
op++;
op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
}
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+ msg_size = p - msg->front.iov_base;
+ msg->front.iov_len = msg_size;
+ msg->hdr.front_len = cpu_to_le32(msg_size);
return req;
}
/*
- * The messaging layer will reconnect to the osd as needed. If the
- * session has dropped, the OSD will have dropped the session state,
- * and we'll get notified by the messaging layer. If that happens, we
- * need to resubmit all requests for that osd.
+ * If the osd connection drops, we need to resubmit all requests.
*/
static void osd_reset(struct ceph_connection *con)
{
return;
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
- osd->o_incarnation++;
down_read(&osdc->map_sem);
kick_requests(osdc, osd);
up_read(&osdc->map_sem);
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
INIT_LIST_HEAD(&osd->o_requests);
+ INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1;
ceph_con_init(osdc->client->msgr, &osd->o_con);
osd->o_con.private = osd;
osd->o_con.ops = &osd_con_ops;
osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+
+ INIT_LIST_HEAD(&osd->o_keepalive_item);
return osd;
}
{
dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
atomic_read(&osd->o_ref) - 1);
- if (atomic_dec_and_test(&osd->o_ref)) {
- ceph_con_shutdown(&osd->o_con);
+ if (atomic_dec_and_test(&osd->o_ref))
kfree(osd);
- }
}
/*
* remove an osd from our map
*/
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- dout("remove_osd %p\n", osd);
+ dout("__remove_osd %p\n", osd);
BUG_ON(!list_empty(&osd->o_requests));
rb_erase(&osd->o_node, &osdc->osds);
+ list_del_init(&osd->o_osd_lru);
ceph_con_close(&osd->o_con);
put_osd(osd);
}
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+ struct ceph_osd *osd)
+{
+ dout("__move_osd_to_lru %p\n", osd);
+ BUG_ON(!list_empty(&osd->o_osd_lru));
+ list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+ osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+ dout("__remove_osd_from_lru %p\n", osd);
+ if (!list_empty(&osd->o_osd_lru))
+ list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+ struct ceph_osd *osd, *nosd;
+
+ dout("__remove_old_osds %p\n", osdc);
+ mutex_lock(&osdc->request_mutex);
+ list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+ if (!remove_all && time_before(jiffies, osd->lru_ttl))
+ break;
+ __remove_osd(osdc, osd);
+ }
+ mutex_unlock(&osdc->request_mutex);
+}
+
/*
* reset osd connect
*/
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
int ret = 0;
- dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+ dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests)) {
- remove_osd(osdc, osd);
+ __remove_osd(osdc, osd);
} else {
ceph_con_close(&osd->o_con);
ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
return NULL;
}
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+ schedule_delayed_work(&osdc->timeout_work,
+ osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+ cancel_delayed_work(&osdc->timeout_work);
+}
/*
* Register request, assign tid. If this is the first request, set up
static void register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
- struct ceph_osd_request_head *head = req->r_request->front.iov_base;
-
mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid;
- head->tid = cpu_to_le64(req->r_tid);
+ req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+ INIT_LIST_HEAD(&req->r_req_lru_item);
dout("register_request %p tid %lld\n", req, req->r_tid);
__insert_request(osdc, req);
ceph_osdc_get_request(req);
osdc->num_requests++;
- req->r_timeout_stamp =
- jiffies + osdc->client->mount_args.osd_timeout*HZ;
-
if (osdc->num_requests == 1) {
- osdc->timeout_tid = req->r_tid;
- dout(" timeout on tid %llu at %lu\n", req->r_tid,
- req->r_timeout_stamp);
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(req->r_timeout_stamp - jiffies));
+ dout(" first request, scheduling timeout\n");
+ __schedule_osd_timeout(osdc);
}
mutex_unlock(&osdc->request_mutex);
}
rb_erase(&req->r_node, &osdc->requests);
osdc->num_requests--;
- list_del_init(&req->r_osd_item);
- if (list_empty(&req->r_osd->o_requests))
- remove_osd(osdc, req->r_osd);
- req->r_osd = NULL;
+ if (req->r_osd) {
+ /* make sure the original request isn't in flight. */
+ ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+
+ list_del_init(&req->r_osd_item);
+ if (list_empty(&req->r_osd->o_requests))
+ __move_osd_to_lru(osdc, req->r_osd);
+ req->r_osd = NULL;
+ }
ceph_osdc_put_request(req);
- if (req->r_tid == osdc->timeout_tid) {
- if (osdc->num_requests == 0) {
- dout("no requests, canceling timeout\n");
- osdc->timeout_tid = 0;
- cancel_delayed_work(&osdc->timeout_work);
- } else {
- req = rb_entry(rb_first(&osdc->requests),
- struct ceph_osd_request, r_node);
- osdc->timeout_tid = req->r_tid;
- dout("rescheduled timeout on tid %llu at %lu\n",
- req->r_tid, req->r_timeout_stamp);
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(req->r_timeout_stamp -
- jiffies));
- }
+ list_del_init(&req->r_req_lru_item);
+ if (osdc->num_requests == 0) {
+ dout(" no requests, canceling timeout\n");
+ __cancel_osd_timeout(osdc);
}
}
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0;
}
+ list_del_init(&req->r_req_lru_item);
}
/*
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- union ceph_pg pgid;
+ struct ceph_pg pgid;
int o = -1;
int err;
- struct ceph_osd *newosd = NULL;
dout("map_osds %p tid %lld\n", req, req->r_tid);
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
&req->r_file_layout, osdc->osdmap);
if (err)
return err;
- pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
+ pgid = reqhead->layout.ol_pgid;
+ req->r_pgid = pgid;
+
o = ceph_calc_pg_primary(osdc->osdmap, pgid);
if ((req->r_osd && req->r_osd->o_osd == o &&
(req->r_osd == NULL && o == -1))
return 0; /* no change */
- dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
- req->r_tid, pgid.pg64, pgid.pg.pool, o,
+ dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+ req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
req->r_osd ? req->r_osd->o_osd : -1);
if (req->r_osd) {
__cancel_request(req);
list_del_init(&req->r_osd_item);
- if (list_empty(&req->r_osd->o_requests)) {
- /* try to re-use r_osd if possible */
- newosd = get_osd(req->r_osd);
- remove_osd(osdc, newosd);
- }
req->r_osd = NULL;
}
req->r_osd = __lookup_osd(osdc, o);
if (!req->r_osd && o >= 0) {
- if (newosd) {
- req->r_osd = newosd;
- newosd = NULL;
- } else {
- err = -ENOMEM;
- req->r_osd = create_osd(osdc);
- if (!req->r_osd)
- goto out;
- }
+ err = -ENOMEM;
+ req->r_osd = create_osd(osdc);
+ if (!req->r_osd)
+ goto out;
dout("map_osds osd %p is osd%d\n", req->r_osd, o);
req->r_osd->o_osd = o;
ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
}
- if (req->r_osd)
+ if (req->r_osd) {
+ __remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
+ }
err = 1; /* osd changed */
out:
- if (newosd)
- put_osd(newosd);
return err;
}
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
reqhead->reassert_version = req->r_reassert_version;
- req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
+ req->r_sent_stamp = jiffies;
+ list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
- struct ceph_osd_request *req;
+ struct ceph_osd_request *req, *last_req = NULL;
struct ceph_osd *osd;
- unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
- unsigned long next_timeout = timeout + jiffies;
+ unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
+ unsigned long keepalive =
+ osdc->client->mount_args->osd_keepalive_timeout * HZ;
+ unsigned long last_sent = 0;
struct rb_node *p;
+ struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
continue;
}
}
- for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
- osd = rb_entry(p, struct ceph_osd, o_node);
- if (list_empty(&osd->o_requests))
- continue;
- req = list_first_entry(&osd->o_requests,
- struct ceph_osd_request, r_osd_item);
- if (time_before(jiffies, req->r_timeout_stamp))
- continue;
- dout(" tid %llu (at least) timed out on osd%d\n",
+ /*
+ * reset osds that appear to be _really_ unresponsive. this
+ * is a failsafe measure.. we really shouldn't be getting to
+ * this point if the system is working properly. the monitors
+ * should mark the osd as failed and we should find out about
+ * it from an updated osd map.
+ */
+ while (!list_empty(&osdc->req_lru)) {
+ req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+ r_req_lru_item);
+
+ if (time_before(jiffies, req->r_sent_stamp + timeout))
+ break;
+
+ BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+ last_req = req;
+ last_sent = req->r_sent_stamp;
+
+ osd = req->r_osd;
+ BUG_ON(!osd);
+ pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+ req->r_tid, osd->o_osd);
+ __kick_requests(osdc, osd);
+ }
+
+ /*
+ * ping osds that are a bit slow. this ensures that if there
+ * is a break in the TCP connection we will notice, and reopen
+ * a connection with that osd (from the fault callback).
+ */
+ INIT_LIST_HEAD(&slow_osds);
+ list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+ if (time_before(jiffies, req->r_sent_stamp + keepalive))
+ break;
+
+ osd = req->r_osd;
+ BUG_ON(!osd);
+ dout(" tid %llu is slow, will send keepalive on osd%d\n",
req->r_tid, osd->o_osd);
- req->r_timeout_stamp = next_timeout;
+ list_move_tail(&osd->o_keepalive_item, &slow_osds);
+ }
+ while (!list_empty(&slow_osds)) {
+ osd = list_entry(slow_osds.next, struct ceph_osd,
+ o_keepalive_item);
+ list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
- if (osdc->timeout_tid)
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(timeout));
-
+ __schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
}
+static void handle_osds_timeout(struct work_struct *work)
+{
+ struct ceph_osd_client *osdc =
+ container_of(work, struct ceph_osd_client,
+ osds_timeout_work.work);
+ unsigned long delay =
+ osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+ dout("osds timeout\n");
+ down_read(&osdc->map_sem);
+ remove_old_osds(osdc, 0);
+ up_read(&osdc->map_sem);
+
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(delay));
+}
+
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
+ struct ceph_connection *con)
{
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
struct ceph_osd_request *req;
u64 tid;
int numops, object_len, flags;
+ tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*rhead))
goto bad;
- tid = le64_to_cpu(rhead->tid);
numops = le32_to_cpu(rhead->num_ops);
object_len = le32_to_cpu(rhead->object_len);
if (msg->front.iov_len != sizeof(*rhead) + object_len +
ceph_osdc_get_request(req);
flags = le32_to_cpu(rhead->flags);
- if (req->r_reply) {
- /*
- * once we see the message has been received, we don't
- * need a ref (which is only needed for revoking
- * pages)
- */
- ceph_msg_put(req->r_reply);
- req->r_reply = NULL;
+ /*
+ * if this connection filled our message, drop our reference now, to
+ * avoid a (safe but slower) revoke later.
+ */
+ if (req->r_con_filling_msg == con && req->r_reply == msg) {
+ dout(" dropping con_filling_msg ref %p\n", con);
+ req->r_con_filling_msg = NULL;
+ ceph_con_put(con);
}
if (!req->r_got_reply) {
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
dout("handle_reply tid %llu dup ack\n", tid);
+ mutex_unlock(&osdc->request_mutex);
goto done;
}
pr_err("corrupt osd_op_reply got %d %d expected %d\n",
(int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
(int)sizeof(*rhead));
+ ceph_msg_dump(msg);
}
-/*
- * Resubmit osd requests whose osd or osd address has changed. Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
struct ceph_osd_request *req;
int err;
dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
- mutex_lock(&osdc->request_mutex);
- if (!kickosd) {
+ if (kickosd) {
+ __reset_osd(osdc, kickosd);
+ } else {
for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd =
rb_entry(p, struct ceph_osd, o_node);
n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
- !ceph_entity_addr_equal(&osd->o_con.peer_addr,
- ceph_osd_addr(osdc->osdmap,
- osd->o_osd)))
- reset_osd(osdc, osd);
+ memcmp(&osd->o_con.peer_addr,
+ ceph_osd_addr(osdc->osdmap,
+ osd->o_osd),
+ sizeof(struct ceph_entity_addr)) != 0)
+ __reset_osd(osdc, osd);
}
}
if (req->r_resend) {
dout(" r_resend set on tid %llu\n", req->r_tid);
+ __cancel_request(req);
goto kick;
}
- if (req->r_osd && kickosd == req->r_osd)
+ if (req->r_osd && kickosd == req->r_osd) {
+ __cancel_request(req);
goto kick;
+ }
err = __map_osds(osdc, req);
if (err == 0)
req->r_resend = true;
}
}
+
+ return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed. Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *kickosd)
+{
+ int needmap;
+
+ mutex_lock(&osdc->request_mutex);
+ needmap = __kick_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex);
if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
-}
+}
/*
* Process updated osd map.
*
/* verify fsid */
ceph_decode_need(&p, end, sizeof(fsid), bad);
ceph_decode_copy(&p, &fsid, sizeof(fsid));
- if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
- pr_err("got osdmap with wrong fsid, ignoring\n");
+ if (ceph_check_fsid(osdc->client, &fsid) < 0)
return;
- }
down_write(&osdc->map_sem);
dout(" %d inc maps\n", nr_maps);
while (nr_maps > 0) {
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
- ceph_decode_32(&p, epoch);
- ceph_decode_32(&p, maplen);
+ epoch = ceph_decode_32(&p);
+ maplen = ceph_decode_32(&p);
ceph_decode_need(&p, end, maplen, bad);
next = p + maplen;
if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
err = PTR_ERR(newmap);
goto bad;
}
+ BUG_ON(!newmap);
if (newmap != osdc->osdmap) {
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
dout(" %d full maps\n", nr_maps);
while (nr_maps) {
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
- ceph_decode_32(&p, epoch);
- ceph_decode_32(&p, maplen);
+ epoch = ceph_decode_32(&p);
+ maplen = ceph_decode_32(&p);
ceph_decode_need(&p, end, maplen, bad);
if (nr_maps > 1) {
dout("skipping non-latest full map %u len %d\n",
err = PTR_ERR(newmap);
goto bad;
}
+ BUG_ON(!newmap);
oldmap = osdc->osdmap;
osdc->osdmap = newmap;
if (oldmap)
bad:
pr_err("osdc handle_map corrupt msg\n");
+ ceph_msg_dump(msg);
up_write(&osdc->map_sem);
return;
}
* find those pages.
* 0 = success, -1 failure.
*/
-static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
- int want)
+static int __prepare_pages(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ struct ceph_osd_request *req,
+ u64 tid,
+ struct ceph_msg *m)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc;
- struct ceph_osd_reply_head *rhead = m->front.iov_base;
- struct ceph_osd_request *req;
- u64 tid;
int ret = -1;
- int type = le16_to_cpu(m->hdr.type);
+ int data_len = le32_to_cpu(hdr->data_len);
+ unsigned data_off = le16_to_cpu(hdr->data_off);
+
+ int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
if (!osd)
return -1;
- osdc = osd->o_osdc;
- dout("prepare_pages on msg %p want %d\n", m, want);
- if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
- return -1; /* hmm! */
+ osdc = osd->o_osdc;
- tid = le64_to_cpu(rhead->tid);
- mutex_lock(&osdc->request_mutex);
- req = __lookup_request(osdc, tid);
- if (!req) {
- dout("prepare_pages unknown tid %llu\n", tid);
- goto out;
- }
- dout("prepare_pages tid %llu has %d pages, want %d\n",
+ dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
tid, req->r_num_pages, want);
- if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
- m->pages = req->r_pages;
- m->nr_pages = req->r_num_pages;
- req->r_reply = m; /* only for duration of read over socket */
- ceph_msg_get(m);
- req->r_prepared_pages = 1;
- ret = 0; /* success */
- }
+ if (unlikely(req->r_num_pages < want))
+ goto out;
+ m->pages = req->r_pages;
+ m->nr_pages = req->r_num_pages;
+ ret = 0; /* success */
out:
- mutex_unlock(&osdc->request_mutex);
+ BUG_ON(ret < 0 || m->nr_pages < want);
+
return ret;
}
if (rc < 0) {
mutex_lock(&osdc->request_mutex);
__cancel_request(req);
+ __unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
- dout("wait_request tid %llu timed out\n", req->r_tid);
+ dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
return rc;
}
init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0;
mutex_init(&osdc->request_mutex);
- osdc->timeout_tid = 0;
osdc->last_tid = 0;
osdc->osds = RB_ROOT;
+ INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
+ INIT_LIST_HEAD(&osdc->req_lru);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+ INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
+
+ err = -ENOMEM;
osdc->req_mempool = mempool_create_kmalloc_pool(10,
sizeof(struct ceph_osd_request));
if (!osdc->req_mempool)
- return -ENOMEM;
+ goto out;
- err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+ err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
if (err < 0)
- return -ENOMEM;
- err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
+ goto out_mempool;
+ err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+ OSD_OPREPLY_FRONT_LEN, 10, true);
if (err < 0)
- return -ENOMEM;
-
+ goto out_msgpool;
return 0;
+
+out_msgpool:
+ ceph_msgpool_destroy(&osdc->msgpool_op);
+out_mempool:
+ mempool_destroy(osdc->req_mempool);
+out:
+ return err;
}
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
cancel_delayed_work_sync(&osdc->timeout_work);
+ cancel_delayed_work_sync(&osdc->osds_timeout_work);
if (osdc->osdmap) {
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = NULL;
}
+ remove_old_osds(osdc, 1);
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_osd *osd = con->private;
- struct ceph_osd_client *osdc = osd->o_osdc;
+ struct ceph_osd_client *osdc;
int type = le16_to_cpu(msg->hdr.type);
if (!osd)
return;
+ osdc = osd->o_osdc;
switch (type) {
case CEPH_MSG_OSD_MAP:
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
- handle_reply(osdc, msg);
+ handle_reply(osdc, msg, con);
break;
default:
ceph_msg_put(msg);
}
-static struct ceph_msg *alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr)
+/*
+ * lookup and return message for incoming reply
+ */
+static struct ceph_msg *get_reply(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
+ struct ceph_msg *m;
+ struct ceph_osd_request *req;
+ int front = le32_to_cpu(hdr->front_len);
+ int data_len = le32_to_cpu(hdr->data_len);
+ u64 tid;
+ int err;
+
+ tid = le64_to_cpu(hdr->tid);
+ mutex_lock(&osdc->request_mutex);
+ req = __lookup_request(osdc, tid);
+ if (!req) {
+ *skip = 1;
+ m = NULL;
+ pr_info("get_reply unknown tid %llu from osd%d\n", tid,
+ osd->o_osd);
+ goto out;
+ }
+
+ if (req->r_con_filling_msg) {
+ dout("get_reply revoking msg %p from old con %p\n",
+ req->r_reply, req->r_con_filling_msg);
+ ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+ ceph_con_put(req->r_con_filling_msg);
+ }
+
+ if (front > req->r_reply->front.iov_len) {
+ pr_warning("get_reply front %d > preallocated %d\n",
+ front, (int)req->r_reply->front.iov_len);
+ m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
+ if (IS_ERR(m))
+ goto out;
+ ceph_msg_put(req->r_reply);
+ req->r_reply = m;
+ }
+ m = ceph_msg_get(req->r_reply);
+
+ if (data_len > 0) {
+ err = __prepare_pages(con, hdr, req, tid, m);
+ if (err < 0) {
+ *skip = 1;
+ ceph_msg_put(m);
+ m = ERR_PTR(err);
+ }
+ }
+ *skip = 0;
+ req->r_con_filling_msg = ceph_con_get(con);
+ dout("get_reply tid %lld %p\n", tid, m);
+
+out:
+ mutex_unlock(&osdc->request_mutex);
+ return m;
+
+}
+
+static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip)
+{
+ struct ceph_osd *osd = con->private;
int type = le16_to_cpu(hdr->type);
+ int front = le32_to_cpu(hdr->front_len);
switch (type) {
+ case CEPH_MSG_OSD_MAP:
+ return ceph_msg_new(type, front, 0, 0, NULL);
case CEPH_MSG_OSD_OPREPLY:
- return ceph_msgpool_get(&osdc->msgpool_op_reply);
+ return get_reply(con, hdr, skip);
+ default:
+ pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
+ osd->o_osd);
+ *skip = 1;
+ return NULL;
}
- return ceph_alloc_msg(con, hdr);
}
/*
put_osd(osd);
}
+/*
+ * authentication
+ */
+static int get_authorizer(struct ceph_connection *con,
+ void **buf, int *len, int *proto,
+ void **reply_buf, int *reply_len, int force_new)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_osd_client *osdc = o->o_osdc;
+ struct ceph_auth_client *ac = osdc->client->monc.auth;
+ int ret = 0;
+
+ if (force_new && o->o_authorizer) {
+ ac->ops->destroy_authorizer(ac, o->o_authorizer);
+ o->o_authorizer = NULL;
+ }
+ if (o->o_authorizer == NULL) {
+ ret = ac->ops->create_authorizer(
+ ac, CEPH_ENTITY_TYPE_OSD,
+ &o->o_authorizer,
+ &o->o_authorizer_buf,
+ &o->o_authorizer_buf_len,
+ &o->o_authorizer_reply_buf,
+ &o->o_authorizer_reply_buf_len);
+ if (ret)
+ return ret;
+ }
+
+ *proto = ac->protocol;
+ *buf = o->o_authorizer_buf;
+ *len = o->o_authorizer_buf_len;
+ *reply_buf = o->o_authorizer_reply_buf;
+ *reply_len = o->o_authorizer_reply_buf_len;
+ return 0;
+}
+
+
+static int verify_authorizer_reply(struct ceph_connection *con, int len)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_osd_client *osdc = o->o_osdc;
+ struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+ return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+}
+
+static int invalidate_authorizer(struct ceph_connection *con)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_osd_client *osdc = o->o_osdc;
+ struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+ if (ac->ops->invalidate_authorizer)
+ ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
+
+ return ceph_monc_validate_auth(&osdc->client->monc);
+}
+
const static struct ceph_connection_operations osd_con_ops = {
.get = get_osd_con,
.put = put_osd_con,
.dispatch = dispatch,
+ .get_authorizer = get_authorizer,
+ .verify_authorizer_reply = verify_authorizer_reply,
+ .invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
- .peer_reset = osd_reset,
- .alloc_middle = ceph_alloc_middle,
- .prepare_pages = prepare_pages,
+ .fault = osd_reset,
};