#include "decode.h"
#include "auth.h"
-#define OSD_REPLY_RESERVE_FRONT_LEN 512
+#define OSD_OP_FRONT_LEN 4096
+#define OSD_OPREPLY_FRONT_LEN 512
const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *kickosd);
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
}
-static void remove_replies(struct ceph_osd_request *req)
-{
- int i;
- int max = ARRAY_SIZE(req->replies);
-
- for (i=0; i<max; i++) {
- if (req->replies[i])
- ceph_msg_put(req->replies[i]);
- }
-}
-
/*
* requests
*/
ceph_msg_put(req->r_request);
if (req->r_reply)
ceph_msg_put(req->r_reply);
- remove_replies(req);
if (req->r_con_filling_msg) {
dout("release_request revoking pages %p from con %p\n",
req->r_pages, req->r_con_filling_msg);
kfree(req);
}
-static int alloc_replies(struct ceph_osd_request *req, int num_reply)
-{
- int i;
- int max = ARRAY_SIZE(req->replies);
-
- BUG_ON(num_reply > max);
-
- for (i=0; i<num_reply; i++) {
- req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
- if (IS_ERR(req->replies[i])) {
- int j;
- int err = PTR_ERR(req->replies[i]);
- for (j = 0; j<=i; j++) {
- ceph_msg_put(req->replies[j]);
- }
- return err;
- }
- }
-
- for (; i<max; i++) {
- req->replies[i] = NULL;
- }
-
- req->cur_reply = 0;
-
- return 0;
-}
-
-static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
- struct ceph_osd_request *req,
- int front_len)
-{
- struct ceph_msg *reply;
- if (req->r_con_filling_msg) {
- dout("revoking reply msg %p from old con %p\n", req->r_reply,
- req->r_con_filling_msg);
- ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
- ceph_con_put(req->r_con_filling_msg);
- req->cur_reply = 0;
- }
- reply = req->replies[req->cur_reply];
- if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
- /* maybe we can allocate it now? */
- reply = ceph_msg_new(0, front_len, 0, 0, NULL);
- if (!reply || IS_ERR(reply)) {
- pr_err(" reply alloc failed, front_len=%d\n", front_len);
- return ERR_PTR(-ENOMEM);
- }
- }
- req->r_con_filling_msg = ceph_con_get(con);
- req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
- return ceph_msg_get(reply);
-}
-
/*
* build new request AND message, calculate layout, and adjust file
* extent as needed.
void *p;
int num_op = 1 + do_sync;
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int err, i;
+ int i;
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
- err = alloc_replies(req, num_reply);
- if (err) {
- ceph_osdc_put_request(req);
- return ERR_PTR(-ENOMEM);
- }
- req->r_num_prealloc_reply = num_reply;
-
req->r_osdc = osdc;
req->r_mempool = use_mempool;
kref_init(&req->r_kref);
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
- /* create message; allow space for oid */
+ /* create reply message */
+ if (use_mempool)
+ msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+ else
+ msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
+ OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
+ if (IS_ERR(msg)) {
+ ceph_osdc_put_request(req);
+ return ERR_PTR(PTR_ERR(msg));
+ }
+ req->r_reply = msg;
+
+ /* create request message; allow space for oid */
msg_size += 40;
if (snapc)
msg_size += sizeof(u64) * snapc->num_snaps;
}
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+ msg_size = p - msg->front.iov_base;
+ msg->front.iov_len = msg_size;
+ msg->hdr.front_len = cpu_to_le32(msg_size);
return req;
}
osd->o_con.ops = &osd_con_ops;
osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+ INIT_LIST_HEAD(&osd->o_keepalive_item);
return osd;
}
return NULL;
}
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+ schedule_delayed_work(&osdc->timeout_work,
+ osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+ cancel_delayed_work(&osdc->timeout_work);
+}
/*
* Register request, assign tid. If this is the first request, set up
mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+ INIT_LIST_HEAD(&req->r_req_lru_item);
dout("register_request %p tid %lld\n", req, req->r_tid);
__insert_request(osdc, req);
ceph_osdc_get_request(req);
osdc->num_requests++;
- req->r_timeout_stamp =
- jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
if (osdc->num_requests == 1) {
- osdc->timeout_tid = req->r_tid;
- dout(" timeout on tid %llu at %lu\n", req->r_tid,
- req->r_timeout_stamp);
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(req->r_timeout_stamp - jiffies));
+ dout(" first request, scheduling timeout\n");
+ __schedule_osd_timeout(osdc);
}
mutex_unlock(&osdc->request_mutex);
}
ceph_osdc_put_request(req);
- if (req->r_tid == osdc->timeout_tid) {
- if (osdc->num_requests == 0) {
- dout("no requests, canceling timeout\n");
- osdc->timeout_tid = 0;
- cancel_delayed_work(&osdc->timeout_work);
- } else {
- req = rb_entry(rb_first(&osdc->requests),
- struct ceph_osd_request, r_node);
- osdc->timeout_tid = req->r_tid;
- dout("rescheduled timeout on tid %llu at %lu\n",
- req->r_tid, req->r_timeout_stamp);
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(req->r_timeout_stamp -
- jiffies));
- }
+ list_del_init(&req->r_req_lru_item);
+ if (osdc->num_requests == 0) {
+ dout(" no requests, canceling timeout\n");
+ __cancel_osd_timeout(osdc);
}
}
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0;
}
+ list_del_init(&req->r_req_lru_item);
}
/*
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
reqhead->reassert_version = req->r_reassert_version;
- req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+ req->r_sent_stamp = jiffies;
+ list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
- struct ceph_osd_request *req;
+ struct ceph_osd_request *req, *last_req = NULL;
struct ceph_osd *osd;
unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
- unsigned long next_timeout = timeout + jiffies;
+ unsigned long keepalive =
+ osdc->client->mount_args->osd_keepalive_timeout * HZ;
+ unsigned long last_sent = 0;
struct rb_node *p;
+ struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
continue;
}
}
- for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
- osd = rb_entry(p, struct ceph_osd, o_node);
- if (list_empty(&osd->o_requests))
- continue;
- req = list_first_entry(&osd->o_requests,
- struct ceph_osd_request, r_osd_item);
- if (time_before(jiffies, req->r_timeout_stamp))
- continue;
- dout(" tid %llu (at least) timed out on osd%d\n",
+ /*
+ * reset osds that appear to be _really_ unresponsive. this
+ * is a failsafe measure.. we really shouldn't be getting to
+ * this point if the system is working properly. the monitors
+ * should mark the osd as failed and we should find out about
+ * it from an updated osd map.
+ */
+ while (!list_empty(&osdc->req_lru)) {
+ req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+ r_req_lru_item);
+
+ if (time_before(jiffies, req->r_sent_stamp + timeout))
+ break;
+
+ BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+ last_req = req;
+ last_sent = req->r_sent_stamp;
+
+ osd = req->r_osd;
+ BUG_ON(!osd);
+ pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+ req->r_tid, osd->o_osd);
+ __kick_requests(osdc, osd);
+ }
+
+ /*
+ * ping osds that are a bit slow. this ensures that if there
+ * is a break in the TCP connection we will notice, and reopen
+ * a connection with that osd (from the fault callback).
+ */
+ INIT_LIST_HEAD(&slow_osds);
+ list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+ if (time_before(jiffies, req->r_sent_stamp + keepalive))
+ break;
+
+ osd = req->r_osd;
+ BUG_ON(!osd);
+ dout(" tid %llu is slow, will send keepalive on osd%d\n",
req->r_tid, osd->o_osd);
- req->r_timeout_stamp = next_timeout;
+ list_move_tail(&osd->o_keepalive_item, &slow_osds);
+ }
+ while (!list_empty(&slow_osds)) {
+ osd = list_entry(slow_osds.next, struct ceph_osd,
+ o_keepalive_item);
+ list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
- if (osdc->timeout_tid)
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(timeout));
-
+ __schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
* avoid a (safe but slower) revoke later.
*/
if (req->r_con_filling_msg == con && req->r_reply == msg) {
- dout(" got pages, dropping con_filling_msg ref %p\n", con);
+ dout(" dropping con_filling_msg ref %p\n", con);
req->r_con_filling_msg = NULL;
ceph_con_put(con);
}
- if (req->r_reply) {
- /*
- * once we see the message has been received, we don't
- * need a ref (which is only needed for revoking
- * pages)
- */
- ceph_msg_put(req->r_reply);
- req->r_reply = NULL;
- }
-
if (!req->r_got_reply) {
unsigned bytes;
}
-/*
- * Resubmit osd requests whose osd or osd address has changed. Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
struct ceph_osd_request *req;
int err;
dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
- mutex_lock(&osdc->request_mutex);
if (kickosd) {
__reset_osd(osdc, kickosd);
} else {
req->r_resend = true;
}
}
+
+ return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed. Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *kickosd)
+{
+ int needmap;
+
+ mutex_lock(&osdc->request_mutex);
+ needmap = __kick_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex);
if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
-}
+}
/*
* Process updated osd map.
*
init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0;
mutex_init(&osdc->request_mutex);
- osdc->timeout_tid = 0;
osdc->last_tid = 0;
osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
+ INIT_LIST_HEAD(&osdc->req_lru);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
if (!osdc->req_mempool)
goto out;
- err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+ err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
if (err < 0)
goto out_mempool;
+ err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+ OSD_OPREPLY_FRONT_LEN, 10, true);
+ if (err < 0)
+ goto out_msgpool;
return 0;
+out_msgpool:
+ ceph_msgpool_destroy(&osdc->msgpool_op);
out_mempool:
mempool_destroy(osdc->req_mempool);
out:
remove_old_osds(osdc, 1);
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
+ ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
/*
if (!req) {
*skip = 1;
m = NULL;
- pr_info("alloc_msg unknown tid %llu from osd%d\n", tid,
+ pr_info("get_reply unknown tid %llu from osd%d\n", tid,
osd->o_osd);
goto out;
}
- m = __get_next_reply(con, req, front);
- if (!m || IS_ERR(m)) {
- *skip = 1;
- goto out;
+
+ if (req->r_con_filling_msg) {
+ dout("get_reply revoking msg %p from old con %p\n",
+ req->r_reply, req->r_con_filling_msg);
+ ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+ ceph_con_put(req->r_con_filling_msg);
}
+ if (front > req->r_reply->front.iov_len) {
+ pr_warning("get_reply front %d > preallocated %d\n",
+ front, (int)req->r_reply->front.iov_len);
+ m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
+ if (IS_ERR(m))
+ goto out;
+ ceph_msg_put(req->r_reply);
+ req->r_reply = m;
+ }
+ m = ceph_msg_get(req->r_reply);
+
if (data_len > 0) {
err = __prepare_pages(con, hdr, req, tid, m);
if (err < 0) {
}
}
*skip = 0;
+ req->r_con_filling_msg = ceph_con_get(con);
+ dout("get_reply tid %lld %p\n", tid, m);
out:
mutex_unlock(&osdc->request_mutex);