ceph: release old ticket_blob buffer
[safe/jmp/linux-2.6] / fs / ceph / osd_client.c
index fa0f737..dbe63db 100644 (file)
 #include "decode.h"
 #include "auth.h"
 
-#define OSD_REPLY_RESERVE_FRONT_LEN    512
+#define OSD_OP_FRONT_LEN       4096
+#define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd);
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
@@ -75,17 +78,6 @@ static void calc_layout(struct ceph_osd_client *osdc,
             req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
-static void remove_replies(struct ceph_osd_request *req)
-{
-       int i;
-       int max = ARRAY_SIZE(req->replies);
-
-       for (i=0; i<max; i++) {
-               if (req->replies[i])
-                       ceph_msg_put(req->replies[i]);
-       }
-}
-
 /*
  * requests
  */
@@ -99,7 +91,6 @@ void ceph_osdc_release_request(struct kref *kref)
                ceph_msg_put(req->r_request);
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
-       remove_replies(req);
        if (req->r_con_filling_msg) {
                dout("release_request revoking pages %p from con %p\n",
                     req->r_pages, req->r_con_filling_msg);
@@ -117,60 +108,6 @@ void ceph_osdc_release_request(struct kref *kref)
                kfree(req);
 }
 
-static int alloc_replies(struct ceph_osd_request *req, int num_reply)
-{
-       int i;
-       int max = ARRAY_SIZE(req->replies);
-
-       BUG_ON(num_reply > max);
-
-       for (i=0; i<num_reply; i++) {
-               req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
-               if (IS_ERR(req->replies[i])) {
-                       int j;
-                       int err = PTR_ERR(req->replies[i]);
-                       for (j = 0; j<=i; j++) {
-                               ceph_msg_put(req->replies[j]);
-                       }
-                       return err;
-               }
-       }
-
-       for (; i<max; i++) {
-               req->replies[i] = NULL;
-       }
-
-       req->cur_reply = 0;
-
-       return 0;
-}
-
-static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
-                                      struct ceph_osd_request *req,
-                                      int front_len)
-{
-       struct ceph_msg *reply;
-       if (req->r_con_filling_msg) {
-               dout("revoking reply msg %p from old con %p\n", req->r_reply,
-                    req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
-               ceph_con_put(req->r_con_filling_msg);
-               req->cur_reply = 0;
-       }
-       reply = req->replies[req->cur_reply];
-       if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
-               /* maybe we can allocate it now? */
-               reply = ceph_msg_new(0, front_len, 0, 0, NULL);
-               if (!reply || IS_ERR(reply)) {
-                       pr_err(" reply alloc failed, front_len=%d\n", front_len);
-                       return ERR_PTR(-ENOMEM);
-               }
-       }
-       req->r_con_filling_msg = ceph_con_get(con);
-       req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
-       return ceph_msg_get(reply);
-}
-
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -201,7 +138,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        void *p;
        int num_op = 1 + do_sync;
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-       int err, i;
+       int i;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
@@ -212,13 +149,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
-       err = alloc_replies(req, num_reply);
-       if (err) {
-               ceph_osdc_put_request(req);
-               return ERR_PTR(-ENOMEM);
-       }
-       req->r_num_prealloc_reply = num_reply;
-
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
        kref_init(&req->r_kref);
@@ -229,7 +159,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 
-       /* create message; allow space for oid */
+       /* create reply message */
+       if (use_mempool)
+               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+       else
+               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
+                                  OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
+       if (IS_ERR(msg)) {
+               ceph_osdc_put_request(req);
+               return ERR_PTR(PTR_ERR(msg));
+       }
+       req->r_reply = msg;
+
+       /* create request message; allow space for oid */
        msg_size += 40;
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
@@ -288,6 +230,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+       msg_size = p - msg->front.iov_base;
+       msg->front.iov_len = msg_size;
+       msg->hdr.front_len = cpu_to_le32(msg_size);
        return req;
 }
 
@@ -396,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 
+       INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
 }
 
@@ -518,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
        return NULL;
 }
 
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+       schedule_delayed_work(&osdc->timeout_work,
+                       osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+       cancel_delayed_work(&osdc->timeout_work);
+}
 
 /*
  * Register request, assign tid.  If this is the first request, set up
@@ -529,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
-       req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
        if (osdc->num_requests == 1) {
-               osdc->timeout_tid = req->r_tid;
-               dout("  timeout on tid %llu at %lu\n", req->r_tid,
-                    req->r_timeout_stamp);
-               schedule_delayed_work(&osdc->timeout_work,
-                     round_jiffies_relative(req->r_timeout_stamp - jiffies));
+               dout(" first request, scheduling timeout\n");
+               __schedule_osd_timeout(osdc);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -570,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        ceph_osdc_put_request(req);
 
-       if (req->r_tid == osdc->timeout_tid) {
-               if (osdc->num_requests == 0) {
-                       dout("no requests, canceling timeout\n");
-                       osdc->timeout_tid = 0;
-                       cancel_delayed_work(&osdc->timeout_work);
-               } else {
-                       req = rb_entry(rb_first(&osdc->requests),
-                                      struct ceph_osd_request, r_node);
-                       osdc->timeout_tid = req->r_tid;
-                       dout("rescheduled timeout on tid %llu at %lu\n",
-                            req->r_tid, req->r_timeout_stamp);
-                       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(req->r_timeout_stamp -
-                                                    jiffies));
-               }
+       list_del_init(&req->r_req_lru_item);
+       if (osdc->num_requests == 0) {
+               dout(" no requests, canceling timeout\n");
+               __cancel_osd_timeout(osdc);
        }
 }
 
@@ -597,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
+       list_del_init(&req->r_req_lru_item);
 }
 
 /*
@@ -615,7 +556,6 @@ static int __map_osds(struct ceph_osd_client *osdc,
        struct ceph_pg pgid;
        int o = -1;
        int err;
-       struct ceph_osd *newosd = NULL;
 
        dout("map_osds %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
@@ -639,25 +579,15 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (req->r_osd) {
                __cancel_request(req);
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests)) {
-                       /* try to re-use r_osd if possible */
-                       newosd = get_osd(req->r_osd);
-                       __remove_osd(osdc, newosd);
-               }
                req->r_osd = NULL;
        }
 
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
-               if (newosd) {
-                       req->r_osd = newosd;
-                       newosd = NULL;
-               } else {
-                       err = -ENOMEM;
-                       req->r_osd = create_osd(osdc);
-                       if (!req->r_osd)
-                               goto out;
-               }
+               err = -ENOMEM;
+               req->r_osd = create_osd(osdc);
+               if (!req->r_osd)
+                       goto out;
 
                dout("map_osds osd %p is osd%d\n", req->r_osd, o);
                req->r_osd->o_osd = o;
@@ -674,8 +604,6 @@ static int __map_osds(struct ceph_osd_client *osdc,
        err = 1;   /* osd changed */
 
 out:
-       if (newosd)
-               put_osd(newosd);
        return err;
 }
 
@@ -705,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+       req->r_sent_stamp = jiffies;
+       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -726,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *last_req = NULL;
        struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
+       unsigned long keepalive =
+               osdc->client->mount_args->osd_keepalive_timeout * HZ;
+       unsigned long last_sent = 0;
        struct rb_node *p;
+       struct list_head slow_osds;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -753,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
                        continue;
                }
        }
-       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
-               osd = rb_entry(p, struct ceph_osd, o_node);
-               if (list_empty(&osd->o_requests))
-                       continue;
-               req = list_first_entry(&osd->o_requests,
-                                      struct ceph_osd_request, r_osd_item);
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
 
-               dout(" tid %llu (at least) timed out on osd%d\n",
+       /*
+        * reset osds that appear to be _really_ unresponsive.  this
+        * is a failsafe measure.. we really shouldn't be getting to
+        * this point if the system is working properly.  the monitors
+        * should mark the osd as failed and we should find out about
+        * it from an updated osd map.
+        */
+       while (!list_empty(&osdc->req_lru)) {
+               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+                                r_req_lru_item);
+
+               if (time_before(jiffies, req->r_sent_stamp + timeout))
+                       break;
+
+               BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+               last_req = req;
+               last_sent = req->r_sent_stamp;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+                          req->r_tid, osd->o_osd);
+               __kick_requests(osdc, osd);
+       }
+
+       /*
+        * ping osds that are a bit slow.  this ensures that if there
+        * is a break in the TCP connection we will notice, and reopen
+        * a connection with that osd (from the fault callback).
+        */
+       INIT_LIST_HEAD(&slow_osds);
+       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+               if (time_before(jiffies, req->r_sent_stamp + keepalive))
+                       break;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               dout(" tid %llu is slow, will send keepalive on osd%d\n",
                     req->r_tid, osd->o_osd);
-               req->r_timeout_stamp = next_timeout;
+               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+       }
+       while (!list_empty(&slow_osds)) {
+               osd = list_entry(slow_osds.next, struct ceph_osd,
+                                o_keepalive_item);
+               list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
+       __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
@@ -832,21 +795,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
         * avoid a (safe but slower) revoke later.
         */
        if (req->r_con_filling_msg == con && req->r_reply == msg) {
-               dout(" got pages, dropping con_filling_msg ref %p\n", con);
+               dout(" dropping con_filling_msg ref %p\n", con);
                req->r_con_filling_msg = NULL;
                ceph_con_put(con);
        }
 
-       if (req->r_reply) {
-               /*
-                * once we see the message has been received, we don't
-                * need a ref (which is only needed for revoking
-                * pages)
-                */
-               ceph_msg_put(req->r_reply);
-               req->r_reply = NULL;
-       }
-
        if (!req->r_got_reply) {
                unsigned bytes;
 
@@ -899,18 +852,7 @@ bad:
 }
 
 
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd)
 {
        struct ceph_osd_request *req;
@@ -919,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
        int err;
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       mutex_lock(&osdc->request_mutex);
        if (kickosd) {
                __reset_osd(osdc, kickosd);
        } else {
@@ -980,14 +921,36 @@ kick:
                        req->r_resend = true;
                }
        }
+
+       return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd)
+{
+       int needmap;
+
+       mutex_lock(&osdc->request_mutex);
+       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-}
 
+}
 /*
  * Process updated osd map.
  *
@@ -1244,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
-       osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->req_lru);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
@@ -1262,11 +1225,17 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (!osdc->req_mempool)
                goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
        if (err < 0)
                goto out_mempool;
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+                               OSD_OPREPLY_FRONT_LEN, 10, true);
+       if (err < 0)
+               goto out_msgpool;
        return 0;
 
+out_msgpool:
+       ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool:
        mempool_destroy(osdc->req_mempool);
 out:
@@ -1284,6 +1253,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        remove_old_osds(osdc, 1);
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
+       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
 /*
@@ -1396,39 +1366,51 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
-static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+/*
+ * lookup and return message for incoming reply
+ */
+static struct ceph_msg *get_reply(struct ceph_connection *con,
                                  struct ceph_msg_header *hdr,
                                  int *skip)
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
-       int type = le16_to_cpu(hdr->type);
-       int front = le32_to_cpu(hdr->front_len);
-       int data_len = le32_to_cpu(hdr->data_len);
        struct ceph_msg *m;
        struct ceph_osd_request *req;
+       int front = le32_to_cpu(hdr->front_len);
+       int data_len = le32_to_cpu(hdr->data_len);
        u64 tid;
        int err;
 
-       *skip = 0;
-       if (type != CEPH_MSG_OSD_OPREPLY)
-               return NULL;
-
        tid = le64_to_cpu(hdr->tid);
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
        if (!req) {
                *skip = 1;
                m = NULL;
-               dout("alloc_msg unknown tid %llu\n", tid);
+               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
+                       osd->o_osd);
                goto out;
        }
-       m = __get_next_reply(con, req, front);
-       if (!m || IS_ERR(m)) {
-               *skip = 1;
-               goto out;
+
+       if (req->r_con_filling_msg) {
+               dout("get_reply revoking msg %p from old con %p\n",
+                    req->r_reply, req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
        }
 
+       if (front > req->r_reply->front.iov_len) {
+               pr_warning("get_reply front %d > preallocated %d\n",
+                          front, (int)req->r_reply->front.iov_len);
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
+               if (IS_ERR(m))
+                       goto out;
+               ceph_msg_put(req->r_reply);
+               req->r_reply = m;
+       }
+       m = ceph_msg_get(req->r_reply);
+
        if (data_len > 0) {
                err = __prepare_pages(con, hdr, req, tid, m);
                if (err < 0) {
@@ -1437,11 +1419,35 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
                        m = ERR_PTR(err);
                }
        }
+       *skip = 0;
+       req->r_con_filling_msg = ceph_con_get(con);
+       dout("get_reply tid %lld %p\n", tid, m);
 
 out:
        mutex_unlock(&osdc->request_mutex);
-
        return m;
+
+}
+
+static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+                                 struct ceph_msg_header *hdr,
+                                 int *skip)
+{
+       struct ceph_osd *osd = con->private;
+       int type = le16_to_cpu(hdr->type);
+       int front = le32_to_cpu(hdr->front_len);
+
+       switch (type) {
+       case CEPH_MSG_OSD_MAP:
+               return ceph_msg_new(type, front, 0, 0, NULL);
+       case CEPH_MSG_OSD_OPREPLY:
+               return get_reply(con, hdr, skip);
+       default:
+               pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
+                       osd->o_osd);
+               *skip = 1;
+               return NULL;
+       }
 }
 
 /*