ceph: set osd request message front length correctly
[safe/jmp/linux-2.6] / fs / ceph / osd_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/err.h>
4 #include <linux/highmem.h>
5 #include <linux/mm.h>
6 #include <linux/pagemap.h>
7 #include <linux/slab.h>
8 #include <linux/uaccess.h>
9
10 #include "super.h"
11 #include "osd_client.h"
12 #include "messenger.h"
13 #include "decode.h"
14 #include "auth.h"
15
16 #define OSD_OP_FRONT_LEN        4096
17 #define OSD_OPREPLY_FRONT_LEN   512
18
19 const static struct ceph_connection_operations osd_con_ops;
20
21 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
22
23 /*
24  * Implement client access to distributed object storage cluster.
25  *
26  * All data objects are stored within a cluster/cloud of OSDs, or
27  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
28  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
29  * remote daemons serving up and coordinating consistent and safe
30  * access to storage.
31  *
32  * Cluster membership and the mapping of data objects onto storage devices
33  * are described by the osd map.
34  *
35  * We keep track of pending OSD requests (read, write), resubmit
36  * requests to different OSDs when the cluster topology/data layout
37  * change, or retry the affected requests when the communications
38  * channel with an OSD is reset.
39  */
40
41 /*
42  * calculate the mapping of a file extent onto an object, and fill out the
43  * request accordingly.  shorten extent as necessary if it crosses an
44  * object boundary.
45  *
46  * fill osd op in request message.
47  */
48 static void calc_layout(struct ceph_osd_client *osdc,
49                         struct ceph_vino vino, struct ceph_file_layout *layout,
50                         u64 off, u64 *plen,
51                         struct ceph_osd_request *req)
52 {
53         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
54         struct ceph_osd_op *op = (void *)(reqhead + 1);
55         u64 orig_len = *plen;
56         u64 objoff, objlen;    /* extent in object */
57         u64 bno;
58
59         reqhead->snapid = cpu_to_le64(vino.snap);
60
61         /* object extent? */
62         ceph_calc_file_object_mapping(layout, off, plen, &bno,
63                                       &objoff, &objlen);
64         if (*plen < orig_len)
65                 dout(" skipping last %llu, final file extent %llu~%llu\n",
66                      orig_len - *plen, off, *plen);
67
68         sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
69         req->r_oid_len = strlen(req->r_oid);
70
71         op->extent.offset = cpu_to_le64(objoff);
72         op->extent.length = cpu_to_le64(objlen);
73         req->r_num_pages = calc_pages_for(off, *plen);
74
75         dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
76              req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
77 }
78
79 /*
80  * requests
81  */
82 void ceph_osdc_release_request(struct kref *kref)
83 {
84         struct ceph_osd_request *req = container_of(kref,
85                                                     struct ceph_osd_request,
86                                                     r_kref);
87
88         if (req->r_request)
89                 ceph_msg_put(req->r_request);
90         if (req->r_reply)
91                 ceph_msg_put(req->r_reply);
92         if (req->r_con_filling_msg) {
93                 dout("release_request revoking pages %p from con %p\n",
94                      req->r_pages, req->r_con_filling_msg);
95                 ceph_con_revoke_message(req->r_con_filling_msg,
96                                       req->r_reply);
97                 ceph_con_put(req->r_con_filling_msg);
98         }
99         if (req->r_own_pages)
100                 ceph_release_page_vector(req->r_pages,
101                                          req->r_num_pages);
102         ceph_put_snap_context(req->r_snapc);
103         if (req->r_mempool)
104                 mempool_free(req, req->r_osdc->req_mempool);
105         else
106                 kfree(req);
107 }
108
109 /*
110  * build new request AND message, calculate layout, and adjust file
111  * extent as needed.
112  *
113  * if the file was recently truncated, we include information about its
114  * old and new size so that the object can be updated appropriately.  (we
115  * avoid synchronously deleting truncated objects because it's slow.)
116  *
117  * if @do_sync, include a 'startsync' command so that the osd will flush
118  * data quickly.
119  */
120 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
121                                                struct ceph_file_layout *layout,
122                                                struct ceph_vino vino,
123                                                u64 off, u64 *plen,
124                                                int opcode, int flags,
125                                                struct ceph_snap_context *snapc,
126                                                int do_sync,
127                                                u32 truncate_seq,
128                                                u64 truncate_size,
129                                                struct timespec *mtime,
130                                                bool use_mempool, int num_reply)
131 {
132         struct ceph_osd_request *req;
133         struct ceph_msg *msg;
134         struct ceph_osd_request_head *head;
135         struct ceph_osd_op *op;
136         void *p;
137         int num_op = 1 + do_sync;
138         size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
139         int i;
140
141         if (use_mempool) {
142                 req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
143                 memset(req, 0, sizeof(*req));
144         } else {
145                 req = kzalloc(sizeof(*req), GFP_NOFS);
146         }
147         if (req == NULL)
148                 return ERR_PTR(-ENOMEM);
149
150         req->r_osdc = osdc;
151         req->r_mempool = use_mempool;
152         kref_init(&req->r_kref);
153         init_completion(&req->r_completion);
154         init_completion(&req->r_safe_completion);
155         INIT_LIST_HEAD(&req->r_unsafe_item);
156         req->r_flags = flags;
157
158         WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
159
160         /* create reply message */
161         if (use_mempool)
162                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
163         else
164                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
165                                    OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
166         if (IS_ERR(msg)) {
167                 ceph_osdc_put_request(req);
168                 return ERR_PTR(PTR_ERR(msg));
169         }
170         req->r_reply = msg;
171
172         /* create request message; allow space for oid */
173         msg_size += 40;
174         if (snapc)
175                 msg_size += sizeof(u64) * snapc->num_snaps;
176         if (use_mempool)
177                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
178         else
179                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
180         if (IS_ERR(msg)) {
181                 ceph_osdc_put_request(req);
182                 return ERR_PTR(PTR_ERR(msg));
183         }
184         msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
185         memset(msg->front.iov_base, 0, msg->front.iov_len);
186         head = msg->front.iov_base;
187         op = (void *)(head + 1);
188         p = (void *)(op + num_op);
189
190         req->r_request = msg;
191         req->r_snapc = ceph_get_snap_context(snapc);
192
193         head->client_inc = cpu_to_le32(1); /* always, for now. */
194         head->flags = cpu_to_le32(flags);
195         if (flags & CEPH_OSD_FLAG_WRITE)
196                 ceph_encode_timespec(&head->mtime, mtime);
197         head->num_ops = cpu_to_le16(num_op);
198         op->op = cpu_to_le16(opcode);
199
200         /* calculate max write size */
201         calc_layout(osdc, vino, layout, off, plen, req);
202         req->r_file_layout = *layout;  /* keep a copy */
203
204         if (flags & CEPH_OSD_FLAG_WRITE) {
205                 req->r_request->hdr.data_off = cpu_to_le16(off);
206                 req->r_request->hdr.data_len = cpu_to_le32(*plen);
207                 op->payload_len = cpu_to_le32(*plen);
208         }
209         op->extent.truncate_size = cpu_to_le64(truncate_size);
210         op->extent.truncate_seq = cpu_to_le32(truncate_seq);
211
212         /* fill in oid */
213         head->object_len = cpu_to_le32(req->r_oid_len);
214         memcpy(p, req->r_oid, req->r_oid_len);
215         p += req->r_oid_len;
216
217         if (do_sync) {
218                 op++;
219                 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
220         }
221         if (snapc) {
222                 head->snap_seq = cpu_to_le64(snapc->seq);
223                 head->num_snaps = cpu_to_le32(snapc->num_snaps);
224                 for (i = 0; i < snapc->num_snaps; i++) {
225                         put_unaligned_le64(snapc->snaps[i], p);
226                         p += sizeof(u64);
227                 }
228         }
229
230         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
231         msg_size = p - msg->front.iov_base;
232         msg->front.iov_len = msg_size;
233         msg->hdr.front_len = cpu_to_le32(msg_size);
234         return req;
235 }
236
237 /*
238  * We keep osd requests in an rbtree, sorted by ->r_tid.
239  */
240 static void __insert_request(struct ceph_osd_client *osdc,
241                              struct ceph_osd_request *new)
242 {
243         struct rb_node **p = &osdc->requests.rb_node;
244         struct rb_node *parent = NULL;
245         struct ceph_osd_request *req = NULL;
246
247         while (*p) {
248                 parent = *p;
249                 req = rb_entry(parent, struct ceph_osd_request, r_node);
250                 if (new->r_tid < req->r_tid)
251                         p = &(*p)->rb_left;
252                 else if (new->r_tid > req->r_tid)
253                         p = &(*p)->rb_right;
254                 else
255                         BUG();
256         }
257
258         rb_link_node(&new->r_node, parent, p);
259         rb_insert_color(&new->r_node, &osdc->requests);
260 }
261
262 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
263                                                  u64 tid)
264 {
265         struct ceph_osd_request *req;
266         struct rb_node *n = osdc->requests.rb_node;
267
268         while (n) {
269                 req = rb_entry(n, struct ceph_osd_request, r_node);
270                 if (tid < req->r_tid)
271                         n = n->rb_left;
272                 else if (tid > req->r_tid)
273                         n = n->rb_right;
274                 else
275                         return req;
276         }
277         return NULL;
278 }
279
280 static struct ceph_osd_request *
281 __lookup_request_ge(struct ceph_osd_client *osdc,
282                     u64 tid)
283 {
284         struct ceph_osd_request *req;
285         struct rb_node *n = osdc->requests.rb_node;
286
287         while (n) {
288                 req = rb_entry(n, struct ceph_osd_request, r_node);
289                 if (tid < req->r_tid) {
290                         if (!n->rb_left)
291                                 return req;
292                         n = n->rb_left;
293                 } else if (tid > req->r_tid) {
294                         n = n->rb_right;
295                 } else {
296                         return req;
297                 }
298         }
299         return NULL;
300 }
301
302
303 /*
304  * If the osd connection drops, we need to resubmit all requests.
305  */
306 static void osd_reset(struct ceph_connection *con)
307 {
308         struct ceph_osd *osd = con->private;
309         struct ceph_osd_client *osdc;
310
311         if (!osd)
312                 return;
313         dout("osd_reset osd%d\n", osd->o_osd);
314         osdc = osd->o_osdc;
315         down_read(&osdc->map_sem);
316         kick_requests(osdc, osd);
317         up_read(&osdc->map_sem);
318 }
319
320 /*
321  * Track open sessions with osds.
322  */
323 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
324 {
325         struct ceph_osd *osd;
326
327         osd = kzalloc(sizeof(*osd), GFP_NOFS);
328         if (!osd)
329                 return NULL;
330
331         atomic_set(&osd->o_ref, 1);
332         osd->o_osdc = osdc;
333         INIT_LIST_HEAD(&osd->o_requests);
334         INIT_LIST_HEAD(&osd->o_osd_lru);
335         osd->o_incarnation = 1;
336
337         ceph_con_init(osdc->client->msgr, &osd->o_con);
338         osd->o_con.private = osd;
339         osd->o_con.ops = &osd_con_ops;
340         osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
341
342         return osd;
343 }
344
345 static struct ceph_osd *get_osd(struct ceph_osd *osd)
346 {
347         if (atomic_inc_not_zero(&osd->o_ref)) {
348                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
349                      atomic_read(&osd->o_ref));
350                 return osd;
351         } else {
352                 dout("get_osd %p FAIL\n", osd);
353                 return NULL;
354         }
355 }
356
357 static void put_osd(struct ceph_osd *osd)
358 {
359         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
360              atomic_read(&osd->o_ref) - 1);
361         if (atomic_dec_and_test(&osd->o_ref))
362                 kfree(osd);
363 }
364
365 /*
366  * remove an osd from our map
367  */
368 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
369 {
370         dout("__remove_osd %p\n", osd);
371         BUG_ON(!list_empty(&osd->o_requests));
372         rb_erase(&osd->o_node, &osdc->osds);
373         list_del_init(&osd->o_osd_lru);
374         ceph_con_close(&osd->o_con);
375         put_osd(osd);
376 }
377
378 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
379                               struct ceph_osd *osd)
380 {
381         dout("__move_osd_to_lru %p\n", osd);
382         BUG_ON(!list_empty(&osd->o_osd_lru));
383         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
384         osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
385 }
386
387 static void __remove_osd_from_lru(struct ceph_osd *osd)
388 {
389         dout("__remove_osd_from_lru %p\n", osd);
390         if (!list_empty(&osd->o_osd_lru))
391                 list_del_init(&osd->o_osd_lru);
392 }
393
394 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
395 {
396         struct ceph_osd *osd, *nosd;
397
398         dout("__remove_old_osds %p\n", osdc);
399         mutex_lock(&osdc->request_mutex);
400         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
401                 if (!remove_all && time_before(jiffies, osd->lru_ttl))
402                         break;
403                 __remove_osd(osdc, osd);
404         }
405         mutex_unlock(&osdc->request_mutex);
406 }
407
408 /*
409  * reset osd connect
410  */
411 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
412 {
413         int ret = 0;
414
415         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
416         if (list_empty(&osd->o_requests)) {
417                 __remove_osd(osdc, osd);
418         } else {
419                 ceph_con_close(&osd->o_con);
420                 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
421                 osd->o_incarnation++;
422         }
423         return ret;
424 }
425
426 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
427 {
428         struct rb_node **p = &osdc->osds.rb_node;
429         struct rb_node *parent = NULL;
430         struct ceph_osd *osd = NULL;
431
432         while (*p) {
433                 parent = *p;
434                 osd = rb_entry(parent, struct ceph_osd, o_node);
435                 if (new->o_osd < osd->o_osd)
436                         p = &(*p)->rb_left;
437                 else if (new->o_osd > osd->o_osd)
438                         p = &(*p)->rb_right;
439                 else
440                         BUG();
441         }
442
443         rb_link_node(&new->o_node, parent, p);
444         rb_insert_color(&new->o_node, &osdc->osds);
445 }
446
447 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
448 {
449         struct ceph_osd *osd;
450         struct rb_node *n = osdc->osds.rb_node;
451
452         while (n) {
453                 osd = rb_entry(n, struct ceph_osd, o_node);
454                 if (o < osd->o_osd)
455                         n = n->rb_left;
456                 else if (o > osd->o_osd)
457                         n = n->rb_right;
458                 else
459                         return osd;
460         }
461         return NULL;
462 }
463
464
465 /*
466  * Register request, assign tid.  If this is the first request, set up
467  * the timeout event.
468  */
469 static void register_request(struct ceph_osd_client *osdc,
470                              struct ceph_osd_request *req)
471 {
472         mutex_lock(&osdc->request_mutex);
473         req->r_tid = ++osdc->last_tid;
474         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
475
476         dout("register_request %p tid %lld\n", req, req->r_tid);
477         __insert_request(osdc, req);
478         ceph_osdc_get_request(req);
479         osdc->num_requests++;
480
481         req->r_timeout_stamp =
482                 jiffies + osdc->client->mount_args->osd_timeout*HZ;
483
484         if (osdc->num_requests == 1) {
485                 osdc->timeout_tid = req->r_tid;
486                 dout("  timeout on tid %llu at %lu\n", req->r_tid,
487                      req->r_timeout_stamp);
488                 schedule_delayed_work(&osdc->timeout_work,
489                       round_jiffies_relative(req->r_timeout_stamp - jiffies));
490         }
491         mutex_unlock(&osdc->request_mutex);
492 }
493
494 /*
495  * called under osdc->request_mutex
496  */
497 static void __unregister_request(struct ceph_osd_client *osdc,
498                                  struct ceph_osd_request *req)
499 {
500         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
501         rb_erase(&req->r_node, &osdc->requests);
502         osdc->num_requests--;
503
504         if (req->r_osd) {
505                 /* make sure the original request isn't in flight. */
506                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
507
508                 list_del_init(&req->r_osd_item);
509                 if (list_empty(&req->r_osd->o_requests))
510                         __move_osd_to_lru(osdc, req->r_osd);
511                 req->r_osd = NULL;
512         }
513
514         ceph_osdc_put_request(req);
515
516         if (req->r_tid == osdc->timeout_tid) {
517                 if (osdc->num_requests == 0) {
518                         dout("no requests, canceling timeout\n");
519                         osdc->timeout_tid = 0;
520                         cancel_delayed_work(&osdc->timeout_work);
521                 } else {
522                         req = rb_entry(rb_first(&osdc->requests),
523                                        struct ceph_osd_request, r_node);
524                         osdc->timeout_tid = req->r_tid;
525                         dout("rescheduled timeout on tid %llu at %lu\n",
526                              req->r_tid, req->r_timeout_stamp);
527                         schedule_delayed_work(&osdc->timeout_work,
528                               round_jiffies_relative(req->r_timeout_stamp -
529                                                      jiffies));
530                 }
531         }
532 }
533
534 /*
535  * Cancel a previously queued request message
536  */
537 static void __cancel_request(struct ceph_osd_request *req)
538 {
539         if (req->r_sent) {
540                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
541                 req->r_sent = 0;
542         }
543 }
544
545 /*
546  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
547  * (as needed), and set the request r_osd appropriately.  If there is
548  * no up osd, set r_osd to NULL.
549  *
550  * Return 0 if unchanged, 1 if changed, or negative on error.
551  *
552  * Caller should hold map_sem for read and request_mutex.
553  */
554 static int __map_osds(struct ceph_osd_client *osdc,
555                       struct ceph_osd_request *req)
556 {
557         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
558         struct ceph_pg pgid;
559         int o = -1;
560         int err;
561
562         dout("map_osds %p tid %lld\n", req, req->r_tid);
563         err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
564                                       &req->r_file_layout, osdc->osdmap);
565         if (err)
566                 return err;
567         pgid = reqhead->layout.ol_pgid;
568         req->r_pgid = pgid;
569
570         o = ceph_calc_pg_primary(osdc->osdmap, pgid);
571
572         if ((req->r_osd && req->r_osd->o_osd == o &&
573              req->r_sent >= req->r_osd->o_incarnation) ||
574             (req->r_osd == NULL && o == -1))
575                 return 0;  /* no change */
576
577         dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
578              req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
579              req->r_osd ? req->r_osd->o_osd : -1);
580
581         if (req->r_osd) {
582                 __cancel_request(req);
583                 list_del_init(&req->r_osd_item);
584                 req->r_osd = NULL;
585         }
586
587         req->r_osd = __lookup_osd(osdc, o);
588         if (!req->r_osd && o >= 0) {
589                 err = -ENOMEM;
590                 req->r_osd = create_osd(osdc);
591                 if (!req->r_osd)
592                         goto out;
593
594                 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
595                 req->r_osd->o_osd = o;
596                 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
597                 __insert_osd(osdc, req->r_osd);
598
599                 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
600         }
601
602         if (req->r_osd) {
603                 __remove_osd_from_lru(req->r_osd);
604                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
605         }
606         err = 1;   /* osd changed */
607
608 out:
609         return err;
610 }
611
612 /*
613  * caller should hold map_sem (for read) and request_mutex
614  */
615 static int __send_request(struct ceph_osd_client *osdc,
616                           struct ceph_osd_request *req)
617 {
618         struct ceph_osd_request_head *reqhead;
619         int err;
620
621         err = __map_osds(osdc, req);
622         if (err < 0)
623                 return err;
624         if (req->r_osd == NULL) {
625                 dout("send_request %p no up osds in pg\n", req);
626                 ceph_monc_request_next_osdmap(&osdc->client->monc);
627                 return 0;
628         }
629
630         dout("send_request %p tid %llu to osd%d flags %d\n",
631              req, req->r_tid, req->r_osd->o_osd, req->r_flags);
632
633         reqhead = req->r_request->front.iov_base;
634         reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
635         reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
636         reqhead->reassert_version = req->r_reassert_version;
637
638         req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
639
640         ceph_msg_get(req->r_request); /* send consumes a ref */
641         ceph_con_send(&req->r_osd->o_con, req->r_request);
642         req->r_sent = req->r_osd->o_incarnation;
643         return 0;
644 }
645
646 /*
647  * Timeout callback, called every N seconds when 1 or more osd
648  * requests has been active for more than N seconds.  When this
649  * happens, we ping all OSDs with requests who have timed out to
650  * ensure any communications channel reset is detected.  Reset the
651  * request timeouts another N seconds in the future as we go.
652  * Reschedule the timeout event another N seconds in future (unless
653  * there are no open requests).
654  */
655 static void handle_timeout(struct work_struct *work)
656 {
657         struct ceph_osd_client *osdc =
658                 container_of(work, struct ceph_osd_client, timeout_work.work);
659         struct ceph_osd_request *req;
660         struct ceph_osd *osd;
661         unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
662         unsigned long next_timeout = timeout + jiffies;
663         struct rb_node *p;
664
665         dout("timeout\n");
666         down_read(&osdc->map_sem);
667
668         ceph_monc_request_next_osdmap(&osdc->client->monc);
669
670         mutex_lock(&osdc->request_mutex);
671         for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
672                 req = rb_entry(p, struct ceph_osd_request, r_node);
673
674                 if (req->r_resend) {
675                         int err;
676
677                         dout("osdc resending prev failed %lld\n", req->r_tid);
678                         err = __send_request(osdc, req);
679                         if (err)
680                                 dout("osdc failed again on %lld\n", req->r_tid);
681                         else
682                                 req->r_resend = false;
683                         continue;
684                 }
685         }
686         for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
687                 osd = rb_entry(p, struct ceph_osd, o_node);
688                 if (list_empty(&osd->o_requests))
689                         continue;
690                 req = list_first_entry(&osd->o_requests,
691                                        struct ceph_osd_request, r_osd_item);
692                 if (time_before(jiffies, req->r_timeout_stamp))
693                         continue;
694
695                 dout(" tid %llu (at least) timed out on osd%d\n",
696                      req->r_tid, osd->o_osd);
697                 req->r_timeout_stamp = next_timeout;
698                 ceph_con_keepalive(&osd->o_con);
699         }
700
701         if (osdc->timeout_tid)
702                 schedule_delayed_work(&osdc->timeout_work,
703                                       round_jiffies_relative(timeout));
704
705         mutex_unlock(&osdc->request_mutex);
706
707         up_read(&osdc->map_sem);
708 }
709
710 static void handle_osds_timeout(struct work_struct *work)
711 {
712         struct ceph_osd_client *osdc =
713                 container_of(work, struct ceph_osd_client,
714                              osds_timeout_work.work);
715         unsigned long delay =
716                 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
717
718         dout("osds timeout\n");
719         down_read(&osdc->map_sem);
720         remove_old_osds(osdc, 0);
721         up_read(&osdc->map_sem);
722
723         schedule_delayed_work(&osdc->osds_timeout_work,
724                               round_jiffies_relative(delay));
725 }
726
727 /*
728  * handle osd op reply.  either call the callback if it is specified,
729  * or do the completion to wake up the waiting thread.
730  */
731 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
732                          struct ceph_connection *con)
733 {
734         struct ceph_osd_reply_head *rhead = msg->front.iov_base;
735         struct ceph_osd_request *req;
736         u64 tid;
737         int numops, object_len, flags;
738
739         tid = le64_to_cpu(msg->hdr.tid);
740         if (msg->front.iov_len < sizeof(*rhead))
741                 goto bad;
742         numops = le32_to_cpu(rhead->num_ops);
743         object_len = le32_to_cpu(rhead->object_len);
744         if (msg->front.iov_len != sizeof(*rhead) + object_len +
745             numops * sizeof(struct ceph_osd_op))
746                 goto bad;
747         dout("handle_reply %p tid %llu\n", msg, tid);
748
749         /* lookup */
750         mutex_lock(&osdc->request_mutex);
751         req = __lookup_request(osdc, tid);
752         if (req == NULL) {
753                 dout("handle_reply tid %llu dne\n", tid);
754                 mutex_unlock(&osdc->request_mutex);
755                 return;
756         }
757         ceph_osdc_get_request(req);
758         flags = le32_to_cpu(rhead->flags);
759
760         /*
761          * if this connection filled our message, drop our reference now, to
762          * avoid a (safe but slower) revoke later.
763          */
764         if (req->r_con_filling_msg == con && req->r_reply == msg) {
765                 dout(" dropping con_filling_msg ref %p\n", con);
766                 req->r_con_filling_msg = NULL;
767                 ceph_con_put(con);
768         }
769
770         if (!req->r_got_reply) {
771                 unsigned bytes;
772
773                 req->r_result = le32_to_cpu(rhead->result);
774                 bytes = le32_to_cpu(msg->hdr.data_len);
775                 dout("handle_reply result %d bytes %d\n", req->r_result,
776                      bytes);
777                 if (req->r_result == 0)
778                         req->r_result = bytes;
779
780                 /* in case this is a write and we need to replay, */
781                 req->r_reassert_version = rhead->reassert_version;
782
783                 req->r_got_reply = 1;
784         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
785                 dout("handle_reply tid %llu dup ack\n", tid);
786                 mutex_unlock(&osdc->request_mutex);
787                 goto done;
788         }
789
790         dout("handle_reply tid %llu flags %d\n", tid, flags);
791
792         /* either this is a read, or we got the safe response */
793         if ((flags & CEPH_OSD_FLAG_ONDISK) ||
794             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
795                 __unregister_request(osdc, req);
796
797         mutex_unlock(&osdc->request_mutex);
798
799         if (req->r_callback)
800                 req->r_callback(req, msg);
801         else
802                 complete(&req->r_completion);
803
804         if (flags & CEPH_OSD_FLAG_ONDISK) {
805                 if (req->r_safe_callback)
806                         req->r_safe_callback(req, msg);
807                 complete(&req->r_safe_completion);  /* fsync waiter */
808         }
809
810 done:
811         ceph_osdc_put_request(req);
812         return;
813
814 bad:
815         pr_err("corrupt osd_op_reply got %d %d expected %d\n",
816                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
817                (int)sizeof(*rhead));
818         ceph_msg_dump(msg);
819 }
820
821
822 /*
823  * Resubmit osd requests whose osd or osd address has changed.  Request
824  * a new osd map if osds are down, or we are otherwise unable to determine
825  * how to direct a request.
826  *
827  * Close connections to down osds.
828  *
829  * If @who is specified, resubmit requests for that specific osd.
830  *
831  * Caller should hold map_sem for read and request_mutex.
832  */
833 static void kick_requests(struct ceph_osd_client *osdc,
834                           struct ceph_osd *kickosd)
835 {
836         struct ceph_osd_request *req;
837         struct rb_node *p, *n;
838         int needmap = 0;
839         int err;
840
841         dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
842         mutex_lock(&osdc->request_mutex);
843         if (kickosd) {
844                 __reset_osd(osdc, kickosd);
845         } else {
846                 for (p = rb_first(&osdc->osds); p; p = n) {
847                         struct ceph_osd *osd =
848                                 rb_entry(p, struct ceph_osd, o_node);
849
850                         n = rb_next(p);
851                         if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
852                             memcmp(&osd->o_con.peer_addr,
853                                    ceph_osd_addr(osdc->osdmap,
854                                                  osd->o_osd),
855                                    sizeof(struct ceph_entity_addr)) != 0)
856                                 __reset_osd(osdc, osd);
857                 }
858         }
859
860         for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
861                 req = rb_entry(p, struct ceph_osd_request, r_node);
862
863                 if (req->r_resend) {
864                         dout(" r_resend set on tid %llu\n", req->r_tid);
865                         __cancel_request(req);
866                         goto kick;
867                 }
868                 if (req->r_osd && kickosd == req->r_osd) {
869                         __cancel_request(req);
870                         goto kick;
871                 }
872
873                 err = __map_osds(osdc, req);
874                 if (err == 0)
875                         continue;  /* no change */
876                 if (err < 0) {
877                         /*
878                          * FIXME: really, we should set the request
879                          * error and fail if this isn't a 'nofail'
880                          * request, but that's a fair bit more
881                          * complicated to do.  So retry!
882                          */
883                         dout(" setting r_resend on %llu\n", req->r_tid);
884                         req->r_resend = true;
885                         continue;
886                 }
887                 if (req->r_osd == NULL) {
888                         dout("tid %llu maps to no valid osd\n", req->r_tid);
889                         needmap++;  /* request a newer map */
890                         continue;
891                 }
892
893 kick:
894                 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
895                      req->r_osd->o_osd);
896                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
897                 err = __send_request(osdc, req);
898                 if (err) {
899                         dout(" setting r_resend on %llu\n", req->r_tid);
900                         req->r_resend = true;
901                 }
902         }
903         mutex_unlock(&osdc->request_mutex);
904
905         if (needmap) {
906                 dout("%d requests for down osds, need new map\n", needmap);
907                 ceph_monc_request_next_osdmap(&osdc->client->monc);
908         }
909 }
910
911 /*
912  * Process updated osd map.
913  *
914  * The message contains any number of incremental and full maps, normally
915  * indicating some sort of topology change in the cluster.  Kick requests
916  * off to different OSDs as needed.
917  */
918 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
919 {
920         void *p, *end, *next;
921         u32 nr_maps, maplen;
922         u32 epoch;
923         struct ceph_osdmap *newmap = NULL, *oldmap;
924         int err;
925         struct ceph_fsid fsid;
926
927         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
928         p = msg->front.iov_base;
929         end = p + msg->front.iov_len;
930
931         /* verify fsid */
932         ceph_decode_need(&p, end, sizeof(fsid), bad);
933         ceph_decode_copy(&p, &fsid, sizeof(fsid));
934         if (ceph_check_fsid(osdc->client, &fsid) < 0)
935                 return;
936
937         down_write(&osdc->map_sem);
938
939         /* incremental maps */
940         ceph_decode_32_safe(&p, end, nr_maps, bad);
941         dout(" %d inc maps\n", nr_maps);
942         while (nr_maps > 0) {
943                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
944                 epoch = ceph_decode_32(&p);
945                 maplen = ceph_decode_32(&p);
946                 ceph_decode_need(&p, end, maplen, bad);
947                 next = p + maplen;
948                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
949                         dout("applying incremental map %u len %d\n",
950                              epoch, maplen);
951                         newmap = osdmap_apply_incremental(&p, next,
952                                                           osdc->osdmap,
953                                                           osdc->client->msgr);
954                         if (IS_ERR(newmap)) {
955                                 err = PTR_ERR(newmap);
956                                 goto bad;
957                         }
958                         BUG_ON(!newmap);
959                         if (newmap != osdc->osdmap) {
960                                 ceph_osdmap_destroy(osdc->osdmap);
961                                 osdc->osdmap = newmap;
962                         }
963                 } else {
964                         dout("ignoring incremental map %u len %d\n",
965                              epoch, maplen);
966                 }
967                 p = next;
968                 nr_maps--;
969         }
970         if (newmap)
971                 goto done;
972
973         /* full maps */
974         ceph_decode_32_safe(&p, end, nr_maps, bad);
975         dout(" %d full maps\n", nr_maps);
976         while (nr_maps) {
977                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
978                 epoch = ceph_decode_32(&p);
979                 maplen = ceph_decode_32(&p);
980                 ceph_decode_need(&p, end, maplen, bad);
981                 if (nr_maps > 1) {
982                         dout("skipping non-latest full map %u len %d\n",
983                              epoch, maplen);
984                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
985                         dout("skipping full map %u len %d, "
986                              "older than our %u\n", epoch, maplen,
987                              osdc->osdmap->epoch);
988                 } else {
989                         dout("taking full map %u len %d\n", epoch, maplen);
990                         newmap = osdmap_decode(&p, p+maplen);
991                         if (IS_ERR(newmap)) {
992                                 err = PTR_ERR(newmap);
993                                 goto bad;
994                         }
995                         BUG_ON(!newmap);
996                         oldmap = osdc->osdmap;
997                         osdc->osdmap = newmap;
998                         if (oldmap)
999                                 ceph_osdmap_destroy(oldmap);
1000                 }
1001                 p += maplen;
1002                 nr_maps--;
1003         }
1004
1005 done:
1006         downgrade_write(&osdc->map_sem);
1007         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1008         if (newmap)
1009                 kick_requests(osdc, NULL);
1010         up_read(&osdc->map_sem);
1011         return;
1012
1013 bad:
1014         pr_err("osdc handle_map corrupt msg\n");
1015         ceph_msg_dump(msg);
1016         up_write(&osdc->map_sem);
1017         return;
1018 }
1019
1020
1021 /*
1022  * A read request prepares specific pages that data is to be read into.
1023  * When a message is being read off the wire, we call prepare_pages to
1024  * find those pages.
1025  *  0 = success, -1 failure.
1026  */
1027 static int __prepare_pages(struct ceph_connection *con,
1028                          struct ceph_msg_header *hdr,
1029                          struct ceph_osd_request *req,
1030                          u64 tid,
1031                          struct ceph_msg *m)
1032 {
1033         struct ceph_osd *osd = con->private;
1034         struct ceph_osd_client *osdc;
1035         int ret = -1;
1036         int data_len = le32_to_cpu(hdr->data_len);
1037         unsigned data_off = le16_to_cpu(hdr->data_off);
1038
1039         int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1040
1041         if (!osd)
1042                 return -1;
1043
1044         osdc = osd->o_osdc;
1045
1046         dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1047              tid, req->r_num_pages, want);
1048         if (unlikely(req->r_num_pages < want))
1049                 goto out;
1050         m->pages = req->r_pages;
1051         m->nr_pages = req->r_num_pages;
1052         ret = 0; /* success */
1053 out:
1054         BUG_ON(ret < 0 || m->nr_pages < want);
1055
1056         return ret;
1057 }
1058
1059 /*
1060  * Register request, send initial attempt.
1061  */
1062 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1063                             struct ceph_osd_request *req,
1064                             bool nofail)
1065 {
1066         int rc = 0;
1067
1068         req->r_request->pages = req->r_pages;
1069         req->r_request->nr_pages = req->r_num_pages;
1070
1071         register_request(osdc, req);
1072
1073         down_read(&osdc->map_sem);
1074         mutex_lock(&osdc->request_mutex);
1075         /*
1076          * a racing kick_requests() may have sent the message for us
1077          * while we dropped request_mutex above, so only send now if
1078          * the request still han't been touched yet.
1079          */
1080         if (req->r_sent == 0) {
1081                 rc = __send_request(osdc, req);
1082                 if (rc) {
1083                         if (nofail) {
1084                                 dout("osdc_start_request failed send, "
1085                                      " marking %lld\n", req->r_tid);
1086                                 req->r_resend = true;
1087                                 rc = 0;
1088                         } else {
1089                                 __unregister_request(osdc, req);
1090                         }
1091                 }
1092         }
1093         mutex_unlock(&osdc->request_mutex);
1094         up_read(&osdc->map_sem);
1095         return rc;
1096 }
1097
1098 /*
1099  * wait for a request to complete
1100  */
1101 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1102                            struct ceph_osd_request *req)
1103 {
1104         int rc;
1105
1106         rc = wait_for_completion_interruptible(&req->r_completion);
1107         if (rc < 0) {
1108                 mutex_lock(&osdc->request_mutex);
1109                 __cancel_request(req);
1110                 __unregister_request(osdc, req);
1111                 mutex_unlock(&osdc->request_mutex);
1112                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1113                 return rc;
1114         }
1115
1116         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1117         return req->r_result;
1118 }
1119
1120 /*
1121  * sync - wait for all in-flight requests to flush.  avoid starvation.
1122  */
1123 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1124 {
1125         struct ceph_osd_request *req;
1126         u64 last_tid, next_tid = 0;
1127
1128         mutex_lock(&osdc->request_mutex);
1129         last_tid = osdc->last_tid;
1130         while (1) {
1131                 req = __lookup_request_ge(osdc, next_tid);
1132                 if (!req)
1133                         break;
1134                 if (req->r_tid > last_tid)
1135                         break;
1136
1137                 next_tid = req->r_tid + 1;
1138                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1139                         continue;
1140
1141                 ceph_osdc_get_request(req);
1142                 mutex_unlock(&osdc->request_mutex);
1143                 dout("sync waiting on tid %llu (last is %llu)\n",
1144                      req->r_tid, last_tid);
1145                 wait_for_completion(&req->r_safe_completion);
1146                 mutex_lock(&osdc->request_mutex);
1147                 ceph_osdc_put_request(req);
1148         }
1149         mutex_unlock(&osdc->request_mutex);
1150         dout("sync done (thru tid %llu)\n", last_tid);
1151 }
1152
1153 /*
1154  * init, shutdown
1155  */
1156 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1157 {
1158         int err;
1159
1160         dout("init\n");
1161         osdc->client = client;
1162         osdc->osdmap = NULL;
1163         init_rwsem(&osdc->map_sem);
1164         init_completion(&osdc->map_waiters);
1165         osdc->last_requested_map = 0;
1166         mutex_init(&osdc->request_mutex);
1167         osdc->timeout_tid = 0;
1168         osdc->last_tid = 0;
1169         osdc->osds = RB_ROOT;
1170         INIT_LIST_HEAD(&osdc->osd_lru);
1171         osdc->requests = RB_ROOT;
1172         osdc->num_requests = 0;
1173         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1174         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1175
1176         schedule_delayed_work(&osdc->osds_timeout_work,
1177            round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
1178
1179         err = -ENOMEM;
1180         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1181                                         sizeof(struct ceph_osd_request));
1182         if (!osdc->req_mempool)
1183                 goto out;
1184
1185         err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
1186         if (err < 0)
1187                 goto out_mempool;
1188         err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1189                                 OSD_OPREPLY_FRONT_LEN, 10, true);
1190         if (err < 0)
1191                 goto out_msgpool;
1192         return 0;
1193
1194 out_msgpool:
1195         ceph_msgpool_destroy(&osdc->msgpool_op);
1196 out_mempool:
1197         mempool_destroy(osdc->req_mempool);
1198 out:
1199         return err;
1200 }
1201
1202 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1203 {
1204         cancel_delayed_work_sync(&osdc->timeout_work);
1205         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1206         if (osdc->osdmap) {
1207                 ceph_osdmap_destroy(osdc->osdmap);
1208                 osdc->osdmap = NULL;
1209         }
1210         remove_old_osds(osdc, 1);
1211         mempool_destroy(osdc->req_mempool);
1212         ceph_msgpool_destroy(&osdc->msgpool_op);
1213         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1214 }
1215
1216 /*
1217  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1218  * *plen.  Return number of bytes read, or error.
1219  */
1220 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1221                         struct ceph_vino vino, struct ceph_file_layout *layout,
1222                         u64 off, u64 *plen,
1223                         u32 truncate_seq, u64 truncate_size,
1224                         struct page **pages, int num_pages)
1225 {
1226         struct ceph_osd_request *req;
1227         int rc = 0;
1228
1229         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1230              vino.snap, off, *plen);
1231         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1232                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1233                                     NULL, 0, truncate_seq, truncate_size, NULL,
1234                                     false, 1);
1235         if (IS_ERR(req))
1236                 return PTR_ERR(req);
1237
1238         /* it may be a short read due to an object boundary */
1239         req->r_pages = pages;
1240         num_pages = calc_pages_for(off, *plen);
1241         req->r_num_pages = num_pages;
1242
1243         dout("readpages  final extent is %llu~%llu (%d pages)\n",
1244              off, *plen, req->r_num_pages);
1245
1246         rc = ceph_osdc_start_request(osdc, req, false);
1247         if (!rc)
1248                 rc = ceph_osdc_wait_request(osdc, req);
1249
1250         ceph_osdc_put_request(req);
1251         dout("readpages result %d\n", rc);
1252         return rc;
1253 }
1254
1255 /*
1256  * do a synchronous write on N pages
1257  */
1258 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1259                          struct ceph_file_layout *layout,
1260                          struct ceph_snap_context *snapc,
1261                          u64 off, u64 len,
1262                          u32 truncate_seq, u64 truncate_size,
1263                          struct timespec *mtime,
1264                          struct page **pages, int num_pages,
1265                          int flags, int do_sync, bool nofail)
1266 {
1267         struct ceph_osd_request *req;
1268         int rc = 0;
1269
1270         BUG_ON(vino.snap != CEPH_NOSNAP);
1271         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1272                                     CEPH_OSD_OP_WRITE,
1273                                     flags | CEPH_OSD_FLAG_ONDISK |
1274                                             CEPH_OSD_FLAG_WRITE,
1275                                     snapc, do_sync,
1276                                     truncate_seq, truncate_size, mtime,
1277                                     nofail, 1);
1278         if (IS_ERR(req))
1279                 return PTR_ERR(req);
1280
1281         /* it may be a short write due to an object boundary */
1282         req->r_pages = pages;
1283         req->r_num_pages = calc_pages_for(off, len);
1284         dout("writepages %llu~%llu (%d pages)\n", off, len,
1285              req->r_num_pages);
1286
1287         rc = ceph_osdc_start_request(osdc, req, nofail);
1288         if (!rc)
1289                 rc = ceph_osdc_wait_request(osdc, req);
1290
1291         ceph_osdc_put_request(req);
1292         if (rc == 0)
1293                 rc = len;
1294         dout("writepages result %d\n", rc);
1295         return rc;
1296 }
1297
1298 /*
1299  * handle incoming message
1300  */
1301 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1302 {
1303         struct ceph_osd *osd = con->private;
1304         struct ceph_osd_client *osdc;
1305         int type = le16_to_cpu(msg->hdr.type);
1306
1307         if (!osd)
1308                 return;
1309         osdc = osd->o_osdc;
1310
1311         switch (type) {
1312         case CEPH_MSG_OSD_MAP:
1313                 ceph_osdc_handle_map(osdc, msg);
1314                 break;
1315         case CEPH_MSG_OSD_OPREPLY:
1316                 handle_reply(osdc, msg, con);
1317                 break;
1318
1319         default:
1320                 pr_err("received unknown message type %d %s\n", type,
1321                        ceph_msg_type_name(type));
1322         }
1323         ceph_msg_put(msg);
1324 }
1325
1326 /*
1327  * lookup and return message for incoming reply
1328  */
1329 static struct ceph_msg *get_reply(struct ceph_connection *con,
1330                                   struct ceph_msg_header *hdr,
1331                                   int *skip)
1332 {
1333         struct ceph_osd *osd = con->private;
1334         struct ceph_osd_client *osdc = osd->o_osdc;
1335         struct ceph_msg *m;
1336         struct ceph_osd_request *req;
1337         int front = le32_to_cpu(hdr->front_len);
1338         int data_len = le32_to_cpu(hdr->data_len);
1339         u64 tid;
1340         int err;
1341
1342         tid = le64_to_cpu(hdr->tid);
1343         mutex_lock(&osdc->request_mutex);
1344         req = __lookup_request(osdc, tid);
1345         if (!req) {
1346                 *skip = 1;
1347                 m = NULL;
1348                 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1349                         osd->o_osd);
1350                 goto out;
1351         }
1352
1353         if (req->r_con_filling_msg) {
1354                 dout("get_reply revoking msg %p from old con %p\n",
1355                      req->r_reply, req->r_con_filling_msg);
1356                 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1357                 ceph_con_put(req->r_con_filling_msg);
1358         }
1359
1360         if (front > req->r_reply->front.iov_len) {
1361                 pr_warning("get_reply front %d > preallocated %d\n",
1362                            front, (int)req->r_reply->front.iov_len);
1363                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
1364                 if (IS_ERR(m))
1365                         goto out;
1366                 ceph_msg_put(req->r_reply);
1367                 req->r_reply = m;
1368         }
1369         m = ceph_msg_get(req->r_reply);
1370
1371         if (data_len > 0) {
1372                 err = __prepare_pages(con, hdr, req, tid, m);
1373                 if (err < 0) {
1374                         *skip = 1;
1375                         ceph_msg_put(m);
1376                         m = ERR_PTR(err);
1377                 }
1378         }
1379         *skip = 0;
1380         req->r_con_filling_msg = ceph_con_get(con);
1381         dout("get_reply tid %lld %p\n", tid, m);
1382
1383 out:
1384         mutex_unlock(&osdc->request_mutex);
1385         return m;
1386
1387 }
1388
1389 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1390                                   struct ceph_msg_header *hdr,
1391                                   int *skip)
1392 {
1393         struct ceph_osd *osd = con->private;
1394         int type = le16_to_cpu(hdr->type);
1395         int front = le32_to_cpu(hdr->front_len);
1396
1397         switch (type) {
1398         case CEPH_MSG_OSD_MAP:
1399                 return ceph_msg_new(type, front, 0, 0, NULL);
1400         case CEPH_MSG_OSD_OPREPLY:
1401                 return get_reply(con, hdr, skip);
1402         default:
1403                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1404                         osd->o_osd);
1405                 *skip = 1;
1406                 return NULL;
1407         }
1408 }
1409
1410 /*
1411  * Wrappers to refcount containing ceph_osd struct
1412  */
1413 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1414 {
1415         struct ceph_osd *osd = con->private;
1416         if (get_osd(osd))
1417                 return con;
1418         return NULL;
1419 }
1420
1421 static void put_osd_con(struct ceph_connection *con)
1422 {
1423         struct ceph_osd *osd = con->private;
1424         put_osd(osd);
1425 }
1426
1427 /*
1428  * authentication
1429  */
1430 static int get_authorizer(struct ceph_connection *con,
1431                           void **buf, int *len, int *proto,
1432                           void **reply_buf, int *reply_len, int force_new)
1433 {
1434         struct ceph_osd *o = con->private;
1435         struct ceph_osd_client *osdc = o->o_osdc;
1436         struct ceph_auth_client *ac = osdc->client->monc.auth;
1437         int ret = 0;
1438
1439         if (force_new && o->o_authorizer) {
1440                 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1441                 o->o_authorizer = NULL;
1442         }
1443         if (o->o_authorizer == NULL) {
1444                 ret = ac->ops->create_authorizer(
1445                         ac, CEPH_ENTITY_TYPE_OSD,
1446                         &o->o_authorizer,
1447                         &o->o_authorizer_buf,
1448                         &o->o_authorizer_buf_len,
1449                         &o->o_authorizer_reply_buf,
1450                         &o->o_authorizer_reply_buf_len);
1451                 if (ret)
1452                 return ret;
1453         }
1454
1455         *proto = ac->protocol;
1456         *buf = o->o_authorizer_buf;
1457         *len = o->o_authorizer_buf_len;
1458         *reply_buf = o->o_authorizer_reply_buf;
1459         *reply_len = o->o_authorizer_reply_buf_len;
1460         return 0;
1461 }
1462
1463
1464 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1465 {
1466         struct ceph_osd *o = con->private;
1467         struct ceph_osd_client *osdc = o->o_osdc;
1468         struct ceph_auth_client *ac = osdc->client->monc.auth;
1469
1470         return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1471 }
1472
1473 static int invalidate_authorizer(struct ceph_connection *con)
1474 {
1475         struct ceph_osd *o = con->private;
1476         struct ceph_osd_client *osdc = o->o_osdc;
1477         struct ceph_auth_client *ac = osdc->client->monc.auth;
1478
1479         if (ac->ops->invalidate_authorizer)
1480                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1481
1482         return ceph_monc_validate_auth(&osdc->client->monc);
1483 }
1484
1485 const static struct ceph_connection_operations osd_con_ops = {
1486         .get = get_osd_con,
1487         .put = put_osd_con,
1488         .dispatch = dispatch,
1489         .get_authorizer = get_authorizer,
1490         .verify_authorizer_reply = verify_authorizer_reply,
1491         .invalidate_authorizer = invalidate_authorizer,
1492         .alloc_msg = alloc_msg,
1493         .fault = osd_reset,
1494 };