1 #include "ceph_debug.h"
4 #include <linux/highmem.h>
6 #include <linux/pagemap.h>
7 #include <linux/slab.h>
8 #include <linux/uaccess.h>
11 #include "osd_client.h"
12 #include "messenger.h"
16 #define OSD_OP_FRONT_LEN 4096
17 #define OSD_OPREPLY_FRONT_LEN 512
19 const static struct ceph_connection_operations osd_con_ops;
21 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 * Implement client access to distributed object storage cluster.
26 * All data objects are stored within a cluster/cloud of OSDs, or
27 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
28 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
29 * remote daemons serving up and coordinating consistent and safe
32 * Cluster membership and the mapping of data objects onto storage devices
33 * are described by the osd map.
35 * We keep track of pending OSD requests (read, write), resubmit
36 * requests to different OSDs when the cluster topology/data layout
37 * change, or retry the affected requests when the communications
38 * channel with an OSD is reset.
42 * calculate the mapping of a file extent onto an object, and fill out the
43 * request accordingly. shorten extent as necessary if it crosses an
46 * fill osd op in request message.
48 static void calc_layout(struct ceph_osd_client *osdc,
49 struct ceph_vino vino, struct ceph_file_layout *layout,
51 struct ceph_osd_request *req)
53 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
54 struct ceph_osd_op *op = (void *)(reqhead + 1);
56 u64 objoff, objlen; /* extent in object */
59 reqhead->snapid = cpu_to_le64(vino.snap);
62 ceph_calc_file_object_mapping(layout, off, plen, &bno,
65 dout(" skipping last %llu, final file extent %llu~%llu\n",
66 orig_len - *plen, off, *plen);
68 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
69 req->r_oid_len = strlen(req->r_oid);
71 op->extent.offset = cpu_to_le64(objoff);
72 op->extent.length = cpu_to_le64(objlen);
73 req->r_num_pages = calc_pages_for(off, *plen);
75 dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
76 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
82 void ceph_osdc_release_request(struct kref *kref)
84 struct ceph_osd_request *req = container_of(kref,
85 struct ceph_osd_request,
89 ceph_msg_put(req->r_request);
91 ceph_msg_put(req->r_reply);
92 if (req->r_con_filling_msg) {
93 dout("release_request revoking pages %p from con %p\n",
94 req->r_pages, req->r_con_filling_msg);
95 ceph_con_revoke_message(req->r_con_filling_msg,
97 ceph_con_put(req->r_con_filling_msg);
100 ceph_release_page_vector(req->r_pages,
102 ceph_put_snap_context(req->r_snapc);
104 mempool_free(req, req->r_osdc->req_mempool);
110 * build new request AND message, calculate layout, and adjust file
113 * if the file was recently truncated, we include information about its
114 * old and new size so that the object can be updated appropriately. (we
115 * avoid synchronously deleting truncated objects because it's slow.)
117 * if @do_sync, include a 'startsync' command so that the osd will flush
120 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
121 struct ceph_file_layout *layout,
122 struct ceph_vino vino,
124 int opcode, int flags,
125 struct ceph_snap_context *snapc,
129 struct timespec *mtime,
130 bool use_mempool, int num_reply)
132 struct ceph_osd_request *req;
133 struct ceph_msg *msg;
134 struct ceph_osd_request_head *head;
135 struct ceph_osd_op *op;
137 int num_op = 1 + do_sync;
138 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
142 req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
143 memset(req, 0, sizeof(*req));
145 req = kzalloc(sizeof(*req), GFP_NOFS);
148 return ERR_PTR(-ENOMEM);
151 req->r_mempool = use_mempool;
152 kref_init(&req->r_kref);
153 init_completion(&req->r_completion);
154 init_completion(&req->r_safe_completion);
155 INIT_LIST_HEAD(&req->r_unsafe_item);
156 req->r_flags = flags;
158 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
160 /* create reply message */
162 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
164 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
165 OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
167 ceph_osdc_put_request(req);
168 return ERR_PTR(PTR_ERR(msg));
172 /* create request message; allow space for oid */
175 msg_size += sizeof(u64) * snapc->num_snaps;
177 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
179 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
181 ceph_osdc_put_request(req);
182 return ERR_PTR(PTR_ERR(msg));
184 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
185 memset(msg->front.iov_base, 0, msg->front.iov_len);
186 head = msg->front.iov_base;
187 op = (void *)(head + 1);
188 p = (void *)(op + num_op);
190 req->r_request = msg;
191 req->r_snapc = ceph_get_snap_context(snapc);
193 head->client_inc = cpu_to_le32(1); /* always, for now. */
194 head->flags = cpu_to_le32(flags);
195 if (flags & CEPH_OSD_FLAG_WRITE)
196 ceph_encode_timespec(&head->mtime, mtime);
197 head->num_ops = cpu_to_le16(num_op);
198 op->op = cpu_to_le16(opcode);
200 /* calculate max write size */
201 calc_layout(osdc, vino, layout, off, plen, req);
202 req->r_file_layout = *layout; /* keep a copy */
204 if (flags & CEPH_OSD_FLAG_WRITE) {
205 req->r_request->hdr.data_off = cpu_to_le16(off);
206 req->r_request->hdr.data_len = cpu_to_le32(*plen);
207 op->payload_len = cpu_to_le32(*plen);
209 op->extent.truncate_size = cpu_to_le64(truncate_size);
210 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
213 head->object_len = cpu_to_le32(req->r_oid_len);
214 memcpy(p, req->r_oid, req->r_oid_len);
219 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
222 head->snap_seq = cpu_to_le64(snapc->seq);
223 head->num_snaps = cpu_to_le32(snapc->num_snaps);
224 for (i = 0; i < snapc->num_snaps; i++) {
225 put_unaligned_le64(snapc->snaps[i], p);
230 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
231 msg_size = p - msg->front.iov_base;
232 msg->front.iov_len = msg_size;
233 msg->hdr.front_len = cpu_to_le32(msg_size);
238 * We keep osd requests in an rbtree, sorted by ->r_tid.
240 static void __insert_request(struct ceph_osd_client *osdc,
241 struct ceph_osd_request *new)
243 struct rb_node **p = &osdc->requests.rb_node;
244 struct rb_node *parent = NULL;
245 struct ceph_osd_request *req = NULL;
249 req = rb_entry(parent, struct ceph_osd_request, r_node);
250 if (new->r_tid < req->r_tid)
252 else if (new->r_tid > req->r_tid)
258 rb_link_node(&new->r_node, parent, p);
259 rb_insert_color(&new->r_node, &osdc->requests);
262 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
265 struct ceph_osd_request *req;
266 struct rb_node *n = osdc->requests.rb_node;
269 req = rb_entry(n, struct ceph_osd_request, r_node);
270 if (tid < req->r_tid)
272 else if (tid > req->r_tid)
280 static struct ceph_osd_request *
281 __lookup_request_ge(struct ceph_osd_client *osdc,
284 struct ceph_osd_request *req;
285 struct rb_node *n = osdc->requests.rb_node;
288 req = rb_entry(n, struct ceph_osd_request, r_node);
289 if (tid < req->r_tid) {
293 } else if (tid > req->r_tid) {
304 * If the osd connection drops, we need to resubmit all requests.
306 static void osd_reset(struct ceph_connection *con)
308 struct ceph_osd *osd = con->private;
309 struct ceph_osd_client *osdc;
313 dout("osd_reset osd%d\n", osd->o_osd);
315 down_read(&osdc->map_sem);
316 kick_requests(osdc, osd);
317 up_read(&osdc->map_sem);
321 * Track open sessions with osds.
323 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
325 struct ceph_osd *osd;
327 osd = kzalloc(sizeof(*osd), GFP_NOFS);
331 atomic_set(&osd->o_ref, 1);
333 INIT_LIST_HEAD(&osd->o_requests);
334 INIT_LIST_HEAD(&osd->o_osd_lru);
335 osd->o_incarnation = 1;
337 ceph_con_init(osdc->client->msgr, &osd->o_con);
338 osd->o_con.private = osd;
339 osd->o_con.ops = &osd_con_ops;
340 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
345 static struct ceph_osd *get_osd(struct ceph_osd *osd)
347 if (atomic_inc_not_zero(&osd->o_ref)) {
348 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
349 atomic_read(&osd->o_ref));
352 dout("get_osd %p FAIL\n", osd);
357 static void put_osd(struct ceph_osd *osd)
359 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
360 atomic_read(&osd->o_ref) - 1);
361 if (atomic_dec_and_test(&osd->o_ref))
366 * remove an osd from our map
368 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
370 dout("__remove_osd %p\n", osd);
371 BUG_ON(!list_empty(&osd->o_requests));
372 rb_erase(&osd->o_node, &osdc->osds);
373 list_del_init(&osd->o_osd_lru);
374 ceph_con_close(&osd->o_con);
378 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
379 struct ceph_osd *osd)
381 dout("__move_osd_to_lru %p\n", osd);
382 BUG_ON(!list_empty(&osd->o_osd_lru));
383 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
384 osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
387 static void __remove_osd_from_lru(struct ceph_osd *osd)
389 dout("__remove_osd_from_lru %p\n", osd);
390 if (!list_empty(&osd->o_osd_lru))
391 list_del_init(&osd->o_osd_lru);
394 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
396 struct ceph_osd *osd, *nosd;
398 dout("__remove_old_osds %p\n", osdc);
399 mutex_lock(&osdc->request_mutex);
400 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
401 if (!remove_all && time_before(jiffies, osd->lru_ttl))
403 __remove_osd(osdc, osd);
405 mutex_unlock(&osdc->request_mutex);
411 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
415 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
416 if (list_empty(&osd->o_requests)) {
417 __remove_osd(osdc, osd);
419 ceph_con_close(&osd->o_con);
420 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
421 osd->o_incarnation++;
426 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
428 struct rb_node **p = &osdc->osds.rb_node;
429 struct rb_node *parent = NULL;
430 struct ceph_osd *osd = NULL;
434 osd = rb_entry(parent, struct ceph_osd, o_node);
435 if (new->o_osd < osd->o_osd)
437 else if (new->o_osd > osd->o_osd)
443 rb_link_node(&new->o_node, parent, p);
444 rb_insert_color(&new->o_node, &osdc->osds);
447 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
449 struct ceph_osd *osd;
450 struct rb_node *n = osdc->osds.rb_node;
453 osd = rb_entry(n, struct ceph_osd, o_node);
456 else if (o > osd->o_osd)
466 * Register request, assign tid. If this is the first request, set up
469 static void register_request(struct ceph_osd_client *osdc,
470 struct ceph_osd_request *req)
472 mutex_lock(&osdc->request_mutex);
473 req->r_tid = ++osdc->last_tid;
474 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
476 dout("register_request %p tid %lld\n", req, req->r_tid);
477 __insert_request(osdc, req);
478 ceph_osdc_get_request(req);
479 osdc->num_requests++;
481 req->r_timeout_stamp =
482 jiffies + osdc->client->mount_args->osd_timeout*HZ;
484 if (osdc->num_requests == 1) {
485 osdc->timeout_tid = req->r_tid;
486 dout(" timeout on tid %llu at %lu\n", req->r_tid,
487 req->r_timeout_stamp);
488 schedule_delayed_work(&osdc->timeout_work,
489 round_jiffies_relative(req->r_timeout_stamp - jiffies));
491 mutex_unlock(&osdc->request_mutex);
495 * called under osdc->request_mutex
497 static void __unregister_request(struct ceph_osd_client *osdc,
498 struct ceph_osd_request *req)
500 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
501 rb_erase(&req->r_node, &osdc->requests);
502 osdc->num_requests--;
505 /* make sure the original request isn't in flight. */
506 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
508 list_del_init(&req->r_osd_item);
509 if (list_empty(&req->r_osd->o_requests))
510 __move_osd_to_lru(osdc, req->r_osd);
514 ceph_osdc_put_request(req);
516 if (req->r_tid == osdc->timeout_tid) {
517 if (osdc->num_requests == 0) {
518 dout("no requests, canceling timeout\n");
519 osdc->timeout_tid = 0;
520 cancel_delayed_work(&osdc->timeout_work);
522 req = rb_entry(rb_first(&osdc->requests),
523 struct ceph_osd_request, r_node);
524 osdc->timeout_tid = req->r_tid;
525 dout("rescheduled timeout on tid %llu at %lu\n",
526 req->r_tid, req->r_timeout_stamp);
527 schedule_delayed_work(&osdc->timeout_work,
528 round_jiffies_relative(req->r_timeout_stamp -
535 * Cancel a previously queued request message
537 static void __cancel_request(struct ceph_osd_request *req)
540 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
546 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
547 * (as needed), and set the request r_osd appropriately. If there is
548 * no up osd, set r_osd to NULL.
550 * Return 0 if unchanged, 1 if changed, or negative on error.
552 * Caller should hold map_sem for read and request_mutex.
554 static int __map_osds(struct ceph_osd_client *osdc,
555 struct ceph_osd_request *req)
557 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
562 dout("map_osds %p tid %lld\n", req, req->r_tid);
563 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
564 &req->r_file_layout, osdc->osdmap);
567 pgid = reqhead->layout.ol_pgid;
570 o = ceph_calc_pg_primary(osdc->osdmap, pgid);
572 if ((req->r_osd && req->r_osd->o_osd == o &&
573 req->r_sent >= req->r_osd->o_incarnation) ||
574 (req->r_osd == NULL && o == -1))
575 return 0; /* no change */
577 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
578 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
579 req->r_osd ? req->r_osd->o_osd : -1);
582 __cancel_request(req);
583 list_del_init(&req->r_osd_item);
587 req->r_osd = __lookup_osd(osdc, o);
588 if (!req->r_osd && o >= 0) {
590 req->r_osd = create_osd(osdc);
594 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
595 req->r_osd->o_osd = o;
596 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
597 __insert_osd(osdc, req->r_osd);
599 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
603 __remove_osd_from_lru(req->r_osd);
604 list_add(&req->r_osd_item, &req->r_osd->o_requests);
606 err = 1; /* osd changed */
613 * caller should hold map_sem (for read) and request_mutex
615 static int __send_request(struct ceph_osd_client *osdc,
616 struct ceph_osd_request *req)
618 struct ceph_osd_request_head *reqhead;
621 err = __map_osds(osdc, req);
624 if (req->r_osd == NULL) {
625 dout("send_request %p no up osds in pg\n", req);
626 ceph_monc_request_next_osdmap(&osdc->client->monc);
630 dout("send_request %p tid %llu to osd%d flags %d\n",
631 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
633 reqhead = req->r_request->front.iov_base;
634 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
635 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
636 reqhead->reassert_version = req->r_reassert_version;
638 req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
640 ceph_msg_get(req->r_request); /* send consumes a ref */
641 ceph_con_send(&req->r_osd->o_con, req->r_request);
642 req->r_sent = req->r_osd->o_incarnation;
647 * Timeout callback, called every N seconds when 1 or more osd
648 * requests has been active for more than N seconds. When this
649 * happens, we ping all OSDs with requests who have timed out to
650 * ensure any communications channel reset is detected. Reset the
651 * request timeouts another N seconds in the future as we go.
652 * Reschedule the timeout event another N seconds in future (unless
653 * there are no open requests).
655 static void handle_timeout(struct work_struct *work)
657 struct ceph_osd_client *osdc =
658 container_of(work, struct ceph_osd_client, timeout_work.work);
659 struct ceph_osd_request *req;
660 struct ceph_osd *osd;
661 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
662 unsigned long next_timeout = timeout + jiffies;
666 down_read(&osdc->map_sem);
668 ceph_monc_request_next_osdmap(&osdc->client->monc);
670 mutex_lock(&osdc->request_mutex);
671 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
672 req = rb_entry(p, struct ceph_osd_request, r_node);
677 dout("osdc resending prev failed %lld\n", req->r_tid);
678 err = __send_request(osdc, req);
680 dout("osdc failed again on %lld\n", req->r_tid);
682 req->r_resend = false;
686 for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
687 osd = rb_entry(p, struct ceph_osd, o_node);
688 if (list_empty(&osd->o_requests))
690 req = list_first_entry(&osd->o_requests,
691 struct ceph_osd_request, r_osd_item);
692 if (time_before(jiffies, req->r_timeout_stamp))
695 dout(" tid %llu (at least) timed out on osd%d\n",
696 req->r_tid, osd->o_osd);
697 req->r_timeout_stamp = next_timeout;
698 ceph_con_keepalive(&osd->o_con);
701 if (osdc->timeout_tid)
702 schedule_delayed_work(&osdc->timeout_work,
703 round_jiffies_relative(timeout));
705 mutex_unlock(&osdc->request_mutex);
707 up_read(&osdc->map_sem);
710 static void handle_osds_timeout(struct work_struct *work)
712 struct ceph_osd_client *osdc =
713 container_of(work, struct ceph_osd_client,
714 osds_timeout_work.work);
715 unsigned long delay =
716 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
718 dout("osds timeout\n");
719 down_read(&osdc->map_sem);
720 remove_old_osds(osdc, 0);
721 up_read(&osdc->map_sem);
723 schedule_delayed_work(&osdc->osds_timeout_work,
724 round_jiffies_relative(delay));
728 * handle osd op reply. either call the callback if it is specified,
729 * or do the completion to wake up the waiting thread.
731 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
732 struct ceph_connection *con)
734 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
735 struct ceph_osd_request *req;
737 int numops, object_len, flags;
739 tid = le64_to_cpu(msg->hdr.tid);
740 if (msg->front.iov_len < sizeof(*rhead))
742 numops = le32_to_cpu(rhead->num_ops);
743 object_len = le32_to_cpu(rhead->object_len);
744 if (msg->front.iov_len != sizeof(*rhead) + object_len +
745 numops * sizeof(struct ceph_osd_op))
747 dout("handle_reply %p tid %llu\n", msg, tid);
750 mutex_lock(&osdc->request_mutex);
751 req = __lookup_request(osdc, tid);
753 dout("handle_reply tid %llu dne\n", tid);
754 mutex_unlock(&osdc->request_mutex);
757 ceph_osdc_get_request(req);
758 flags = le32_to_cpu(rhead->flags);
761 * if this connection filled our message, drop our reference now, to
762 * avoid a (safe but slower) revoke later.
764 if (req->r_con_filling_msg == con && req->r_reply == msg) {
765 dout(" dropping con_filling_msg ref %p\n", con);
766 req->r_con_filling_msg = NULL;
770 if (!req->r_got_reply) {
773 req->r_result = le32_to_cpu(rhead->result);
774 bytes = le32_to_cpu(msg->hdr.data_len);
775 dout("handle_reply result %d bytes %d\n", req->r_result,
777 if (req->r_result == 0)
778 req->r_result = bytes;
780 /* in case this is a write and we need to replay, */
781 req->r_reassert_version = rhead->reassert_version;
783 req->r_got_reply = 1;
784 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
785 dout("handle_reply tid %llu dup ack\n", tid);
786 mutex_unlock(&osdc->request_mutex);
790 dout("handle_reply tid %llu flags %d\n", tid, flags);
792 /* either this is a read, or we got the safe response */
793 if ((flags & CEPH_OSD_FLAG_ONDISK) ||
794 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
795 __unregister_request(osdc, req);
797 mutex_unlock(&osdc->request_mutex);
800 req->r_callback(req, msg);
802 complete(&req->r_completion);
804 if (flags & CEPH_OSD_FLAG_ONDISK) {
805 if (req->r_safe_callback)
806 req->r_safe_callback(req, msg);
807 complete(&req->r_safe_completion); /* fsync waiter */
811 ceph_osdc_put_request(req);
815 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
816 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
817 (int)sizeof(*rhead));
823 * Resubmit osd requests whose osd or osd address has changed. Request
824 * a new osd map if osds are down, or we are otherwise unable to determine
825 * how to direct a request.
827 * Close connections to down osds.
829 * If @who is specified, resubmit requests for that specific osd.
831 * Caller should hold map_sem for read and request_mutex.
833 static void kick_requests(struct ceph_osd_client *osdc,
834 struct ceph_osd *kickosd)
836 struct ceph_osd_request *req;
837 struct rb_node *p, *n;
841 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
842 mutex_lock(&osdc->request_mutex);
844 __reset_osd(osdc, kickosd);
846 for (p = rb_first(&osdc->osds); p; p = n) {
847 struct ceph_osd *osd =
848 rb_entry(p, struct ceph_osd, o_node);
851 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
852 memcmp(&osd->o_con.peer_addr,
853 ceph_osd_addr(osdc->osdmap,
855 sizeof(struct ceph_entity_addr)) != 0)
856 __reset_osd(osdc, osd);
860 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
861 req = rb_entry(p, struct ceph_osd_request, r_node);
864 dout(" r_resend set on tid %llu\n", req->r_tid);
865 __cancel_request(req);
868 if (req->r_osd && kickosd == req->r_osd) {
869 __cancel_request(req);
873 err = __map_osds(osdc, req);
875 continue; /* no change */
878 * FIXME: really, we should set the request
879 * error and fail if this isn't a 'nofail'
880 * request, but that's a fair bit more
881 * complicated to do. So retry!
883 dout(" setting r_resend on %llu\n", req->r_tid);
884 req->r_resend = true;
887 if (req->r_osd == NULL) {
888 dout("tid %llu maps to no valid osd\n", req->r_tid);
889 needmap++; /* request a newer map */
894 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
896 req->r_flags |= CEPH_OSD_FLAG_RETRY;
897 err = __send_request(osdc, req);
899 dout(" setting r_resend on %llu\n", req->r_tid);
900 req->r_resend = true;
903 mutex_unlock(&osdc->request_mutex);
906 dout("%d requests for down osds, need new map\n", needmap);
907 ceph_monc_request_next_osdmap(&osdc->client->monc);
912 * Process updated osd map.
914 * The message contains any number of incremental and full maps, normally
915 * indicating some sort of topology change in the cluster. Kick requests
916 * off to different OSDs as needed.
918 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
920 void *p, *end, *next;
923 struct ceph_osdmap *newmap = NULL, *oldmap;
925 struct ceph_fsid fsid;
927 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
928 p = msg->front.iov_base;
929 end = p + msg->front.iov_len;
932 ceph_decode_need(&p, end, sizeof(fsid), bad);
933 ceph_decode_copy(&p, &fsid, sizeof(fsid));
934 if (ceph_check_fsid(osdc->client, &fsid) < 0)
937 down_write(&osdc->map_sem);
939 /* incremental maps */
940 ceph_decode_32_safe(&p, end, nr_maps, bad);
941 dout(" %d inc maps\n", nr_maps);
942 while (nr_maps > 0) {
943 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
944 epoch = ceph_decode_32(&p);
945 maplen = ceph_decode_32(&p);
946 ceph_decode_need(&p, end, maplen, bad);
948 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
949 dout("applying incremental map %u len %d\n",
951 newmap = osdmap_apply_incremental(&p, next,
954 if (IS_ERR(newmap)) {
955 err = PTR_ERR(newmap);
959 if (newmap != osdc->osdmap) {
960 ceph_osdmap_destroy(osdc->osdmap);
961 osdc->osdmap = newmap;
964 dout("ignoring incremental map %u len %d\n",
974 ceph_decode_32_safe(&p, end, nr_maps, bad);
975 dout(" %d full maps\n", nr_maps);
977 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
978 epoch = ceph_decode_32(&p);
979 maplen = ceph_decode_32(&p);
980 ceph_decode_need(&p, end, maplen, bad);
982 dout("skipping non-latest full map %u len %d\n",
984 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
985 dout("skipping full map %u len %d, "
986 "older than our %u\n", epoch, maplen,
987 osdc->osdmap->epoch);
989 dout("taking full map %u len %d\n", epoch, maplen);
990 newmap = osdmap_decode(&p, p+maplen);
991 if (IS_ERR(newmap)) {
992 err = PTR_ERR(newmap);
996 oldmap = osdc->osdmap;
997 osdc->osdmap = newmap;
999 ceph_osdmap_destroy(oldmap);
1006 downgrade_write(&osdc->map_sem);
1007 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1009 kick_requests(osdc, NULL);
1010 up_read(&osdc->map_sem);
1014 pr_err("osdc handle_map corrupt msg\n");
1016 up_write(&osdc->map_sem);
1022 * A read request prepares specific pages that data is to be read into.
1023 * When a message is being read off the wire, we call prepare_pages to
1025 * 0 = success, -1 failure.
1027 static int __prepare_pages(struct ceph_connection *con,
1028 struct ceph_msg_header *hdr,
1029 struct ceph_osd_request *req,
1033 struct ceph_osd *osd = con->private;
1034 struct ceph_osd_client *osdc;
1036 int data_len = le32_to_cpu(hdr->data_len);
1037 unsigned data_off = le16_to_cpu(hdr->data_off);
1039 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1046 dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1047 tid, req->r_num_pages, want);
1048 if (unlikely(req->r_num_pages < want))
1050 m->pages = req->r_pages;
1051 m->nr_pages = req->r_num_pages;
1052 ret = 0; /* success */
1054 BUG_ON(ret < 0 || m->nr_pages < want);
1060 * Register request, send initial attempt.
1062 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1063 struct ceph_osd_request *req,
1068 req->r_request->pages = req->r_pages;
1069 req->r_request->nr_pages = req->r_num_pages;
1071 register_request(osdc, req);
1073 down_read(&osdc->map_sem);
1074 mutex_lock(&osdc->request_mutex);
1076 * a racing kick_requests() may have sent the message for us
1077 * while we dropped request_mutex above, so only send now if
1078 * the request still han't been touched yet.
1080 if (req->r_sent == 0) {
1081 rc = __send_request(osdc, req);
1084 dout("osdc_start_request failed send, "
1085 " marking %lld\n", req->r_tid);
1086 req->r_resend = true;
1089 __unregister_request(osdc, req);
1093 mutex_unlock(&osdc->request_mutex);
1094 up_read(&osdc->map_sem);
1099 * wait for a request to complete
1101 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1102 struct ceph_osd_request *req)
1106 rc = wait_for_completion_interruptible(&req->r_completion);
1108 mutex_lock(&osdc->request_mutex);
1109 __cancel_request(req);
1110 __unregister_request(osdc, req);
1111 mutex_unlock(&osdc->request_mutex);
1112 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1116 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1117 return req->r_result;
1121 * sync - wait for all in-flight requests to flush. avoid starvation.
1123 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1125 struct ceph_osd_request *req;
1126 u64 last_tid, next_tid = 0;
1128 mutex_lock(&osdc->request_mutex);
1129 last_tid = osdc->last_tid;
1131 req = __lookup_request_ge(osdc, next_tid);
1134 if (req->r_tid > last_tid)
1137 next_tid = req->r_tid + 1;
1138 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1141 ceph_osdc_get_request(req);
1142 mutex_unlock(&osdc->request_mutex);
1143 dout("sync waiting on tid %llu (last is %llu)\n",
1144 req->r_tid, last_tid);
1145 wait_for_completion(&req->r_safe_completion);
1146 mutex_lock(&osdc->request_mutex);
1147 ceph_osdc_put_request(req);
1149 mutex_unlock(&osdc->request_mutex);
1150 dout("sync done (thru tid %llu)\n", last_tid);
1156 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1161 osdc->client = client;
1162 osdc->osdmap = NULL;
1163 init_rwsem(&osdc->map_sem);
1164 init_completion(&osdc->map_waiters);
1165 osdc->last_requested_map = 0;
1166 mutex_init(&osdc->request_mutex);
1167 osdc->timeout_tid = 0;
1169 osdc->osds = RB_ROOT;
1170 INIT_LIST_HEAD(&osdc->osd_lru);
1171 osdc->requests = RB_ROOT;
1172 osdc->num_requests = 0;
1173 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1174 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1176 schedule_delayed_work(&osdc->osds_timeout_work,
1177 round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
1180 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1181 sizeof(struct ceph_osd_request));
1182 if (!osdc->req_mempool)
1185 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
1188 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1189 OSD_OPREPLY_FRONT_LEN, 10, true);
1195 ceph_msgpool_destroy(&osdc->msgpool_op);
1197 mempool_destroy(osdc->req_mempool);
1202 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1204 cancel_delayed_work_sync(&osdc->timeout_work);
1205 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1207 ceph_osdmap_destroy(osdc->osdmap);
1208 osdc->osdmap = NULL;
1210 remove_old_osds(osdc, 1);
1211 mempool_destroy(osdc->req_mempool);
1212 ceph_msgpool_destroy(&osdc->msgpool_op);
1213 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1217 * Read some contiguous pages. If we cross a stripe boundary, shorten
1218 * *plen. Return number of bytes read, or error.
1220 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1221 struct ceph_vino vino, struct ceph_file_layout *layout,
1223 u32 truncate_seq, u64 truncate_size,
1224 struct page **pages, int num_pages)
1226 struct ceph_osd_request *req;
1229 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1230 vino.snap, off, *plen);
1231 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1232 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1233 NULL, 0, truncate_seq, truncate_size, NULL,
1236 return PTR_ERR(req);
1238 /* it may be a short read due to an object boundary */
1239 req->r_pages = pages;
1240 num_pages = calc_pages_for(off, *plen);
1241 req->r_num_pages = num_pages;
1243 dout("readpages final extent is %llu~%llu (%d pages)\n",
1244 off, *plen, req->r_num_pages);
1246 rc = ceph_osdc_start_request(osdc, req, false);
1248 rc = ceph_osdc_wait_request(osdc, req);
1250 ceph_osdc_put_request(req);
1251 dout("readpages result %d\n", rc);
1256 * do a synchronous write on N pages
1258 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1259 struct ceph_file_layout *layout,
1260 struct ceph_snap_context *snapc,
1262 u32 truncate_seq, u64 truncate_size,
1263 struct timespec *mtime,
1264 struct page **pages, int num_pages,
1265 int flags, int do_sync, bool nofail)
1267 struct ceph_osd_request *req;
1270 BUG_ON(vino.snap != CEPH_NOSNAP);
1271 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1273 flags | CEPH_OSD_FLAG_ONDISK |
1274 CEPH_OSD_FLAG_WRITE,
1276 truncate_seq, truncate_size, mtime,
1279 return PTR_ERR(req);
1281 /* it may be a short write due to an object boundary */
1282 req->r_pages = pages;
1283 req->r_num_pages = calc_pages_for(off, len);
1284 dout("writepages %llu~%llu (%d pages)\n", off, len,
1287 rc = ceph_osdc_start_request(osdc, req, nofail);
1289 rc = ceph_osdc_wait_request(osdc, req);
1291 ceph_osdc_put_request(req);
1294 dout("writepages result %d\n", rc);
1299 * handle incoming message
1301 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1303 struct ceph_osd *osd = con->private;
1304 struct ceph_osd_client *osdc;
1305 int type = le16_to_cpu(msg->hdr.type);
1312 case CEPH_MSG_OSD_MAP:
1313 ceph_osdc_handle_map(osdc, msg);
1315 case CEPH_MSG_OSD_OPREPLY:
1316 handle_reply(osdc, msg, con);
1320 pr_err("received unknown message type %d %s\n", type,
1321 ceph_msg_type_name(type));
1327 * lookup and return message for incoming reply
1329 static struct ceph_msg *get_reply(struct ceph_connection *con,
1330 struct ceph_msg_header *hdr,
1333 struct ceph_osd *osd = con->private;
1334 struct ceph_osd_client *osdc = osd->o_osdc;
1336 struct ceph_osd_request *req;
1337 int front = le32_to_cpu(hdr->front_len);
1338 int data_len = le32_to_cpu(hdr->data_len);
1342 tid = le64_to_cpu(hdr->tid);
1343 mutex_lock(&osdc->request_mutex);
1344 req = __lookup_request(osdc, tid);
1348 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1353 if (req->r_con_filling_msg) {
1354 dout("get_reply revoking msg %p from old con %p\n",
1355 req->r_reply, req->r_con_filling_msg);
1356 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1357 ceph_con_put(req->r_con_filling_msg);
1360 if (front > req->r_reply->front.iov_len) {
1361 pr_warning("get_reply front %d > preallocated %d\n",
1362 front, (int)req->r_reply->front.iov_len);
1363 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
1366 ceph_msg_put(req->r_reply);
1369 m = ceph_msg_get(req->r_reply);
1372 err = __prepare_pages(con, hdr, req, tid, m);
1380 req->r_con_filling_msg = ceph_con_get(con);
1381 dout("get_reply tid %lld %p\n", tid, m);
1384 mutex_unlock(&osdc->request_mutex);
1389 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1390 struct ceph_msg_header *hdr,
1393 struct ceph_osd *osd = con->private;
1394 int type = le16_to_cpu(hdr->type);
1395 int front = le32_to_cpu(hdr->front_len);
1398 case CEPH_MSG_OSD_MAP:
1399 return ceph_msg_new(type, front, 0, 0, NULL);
1400 case CEPH_MSG_OSD_OPREPLY:
1401 return get_reply(con, hdr, skip);
1403 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1411 * Wrappers to refcount containing ceph_osd struct
1413 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1415 struct ceph_osd *osd = con->private;
1421 static void put_osd_con(struct ceph_connection *con)
1423 struct ceph_osd *osd = con->private;
1430 static int get_authorizer(struct ceph_connection *con,
1431 void **buf, int *len, int *proto,
1432 void **reply_buf, int *reply_len, int force_new)
1434 struct ceph_osd *o = con->private;
1435 struct ceph_osd_client *osdc = o->o_osdc;
1436 struct ceph_auth_client *ac = osdc->client->monc.auth;
1439 if (force_new && o->o_authorizer) {
1440 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1441 o->o_authorizer = NULL;
1443 if (o->o_authorizer == NULL) {
1444 ret = ac->ops->create_authorizer(
1445 ac, CEPH_ENTITY_TYPE_OSD,
1447 &o->o_authorizer_buf,
1448 &o->o_authorizer_buf_len,
1449 &o->o_authorizer_reply_buf,
1450 &o->o_authorizer_reply_buf_len);
1455 *proto = ac->protocol;
1456 *buf = o->o_authorizer_buf;
1457 *len = o->o_authorizer_buf_len;
1458 *reply_buf = o->o_authorizer_reply_buf;
1459 *reply_len = o->o_authorizer_reply_buf_len;
1464 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1466 struct ceph_osd *o = con->private;
1467 struct ceph_osd_client *osdc = o->o_osdc;
1468 struct ceph_auth_client *ac = osdc->client->monc.auth;
1470 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1473 static int invalidate_authorizer(struct ceph_connection *con)
1475 struct ceph_osd *o = con->private;
1476 struct ceph_osd_client *osdc = o->o_osdc;
1477 struct ceph_auth_client *ac = osdc->client->monc.auth;
1479 if (ac->ops->invalidate_authorizer)
1480 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1482 return ceph_monc_validate_auth(&osdc->client->monc);
1485 const static struct ceph_connection_operations osd_con_ops = {
1488 .dispatch = dispatch,
1489 .get_authorizer = get_authorizer,
1490 .verify_authorizer_reply = verify_authorizer_reply,
1491 .invalidate_authorizer = invalidate_authorizer,
1492 .alloc_msg = alloc_msg,