4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
38 #include <linux/string.h>
39 #include <linux/scatterlist.h>
43 #include "drbd_tracing.h"
45 #define SLEEP_TIME (HZ/10)
47 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
57 * more endio handlers:
58 atodb_endio in drbd_actlog.c
59 drbd_bm_async_io_complete in drbd_bitmap.c
61 * For all these callbacks, note the following:
62 * The callbacks will be called in irq context by the IDE drivers,
63 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
64 * Try to get the locking right :)
69 /* About the global_state_lock
70 Each state transition on an device holds a read lock. In case we have
71 to evaluate the sync after dependencies, we grab a write lock, because
72 we need stable states on all devices for that. */
73 rwlock_t global_state_lock;
75 /* used for synchronous meta data and bitmap IO
76 * submitted by drbd_md_sync_page_io()
78 void drbd_md_io_complete(struct bio *bio, int error)
80 struct drbd_md_io *md_io;
82 md_io = (struct drbd_md_io *)bio->bi_private;
85 trace_drbd_bio(md_io->mdev, "Md", bio, 1, NULL);
87 complete(&md_io->event);
90 /* reads on behalf of the partner,
91 * "submitted" by the receiver
93 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
95 unsigned long flags = 0;
96 struct drbd_epoch_entry *e = NULL;
97 struct drbd_conf *mdev;
98 int uptodate = bio_flagged(bio, BIO_UPTODATE);
104 dev_warn(DEV, "read: error=%d s=%llus\n", error,
105 (unsigned long long)e->sector);
106 if (!error && !uptodate) {
107 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
108 (unsigned long long)e->sector);
109 /* strange behavior of some lower level drivers...
110 * fail the request by clearing the uptodate flag,
111 * but do not return any error?! */
115 D_ASSERT(e->block_id != ID_VACANT);
117 trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
119 spin_lock_irqsave(&mdev->req_lock, flags);
120 mdev->read_cnt += e->size >> 9;
121 list_del(&e->w.list);
122 if (list_empty(&mdev->read_ee))
123 wake_up(&mdev->ee_wait);
124 spin_unlock_irqrestore(&mdev->req_lock, flags);
126 drbd_chk_io_error(mdev, error, FALSE);
127 drbd_queue_work(&mdev->data.work, &e->w);
130 trace_drbd_ee(mdev, e, "read completed");
133 /* writes on behalf of the partner, or resync writes,
134 * "submitted" by the receiver.
136 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
138 unsigned long flags = 0;
139 struct drbd_epoch_entry *e = NULL;
140 struct drbd_conf *mdev;
144 int do_al_complete_io;
145 int uptodate = bio_flagged(bio, BIO_UPTODATE);
146 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
152 dev_warn(DEV, "write: error=%d s=%llus\n", error,
153 (unsigned long long)e->sector);
154 if (!error && !uptodate) {
155 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
156 (unsigned long long)e->sector);
157 /* strange behavior of some lower level drivers...
158 * fail the request by clearing the uptodate flag,
159 * but do not return any error?! */
163 /* error == -ENOTSUPP would be a better test,
164 * alas it is not reliable */
165 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
166 drbd_bump_write_ordering(mdev, WO_bdev_flush);
167 spin_lock_irqsave(&mdev->req_lock, flags);
168 list_del(&e->w.list);
169 e->w.cb = w_e_reissue;
170 /* put_ldev actually happens below, once we come here again. */
172 spin_unlock_irqrestore(&mdev->req_lock, flags);
173 drbd_queue_work(&mdev->data.work, &e->w);
177 D_ASSERT(e->block_id != ID_VACANT);
179 trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
181 spin_lock_irqsave(&mdev->req_lock, flags);
182 mdev->writ_cnt += e->size >> 9;
183 is_syncer_req = is_syncer_block_id(e->block_id);
185 /* after we moved e to done_ee,
186 * we may no longer access it,
187 * it may be freed/reused already!
188 * (as soon as we release the req_lock) */
189 e_sector = e->sector;
190 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
192 list_del(&e->w.list); /* has been on active_ee or sync_ee */
193 list_add_tail(&e->w.list, &mdev->done_ee);
195 trace_drbd_ee(mdev, e, "write completed");
197 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
198 * neither did we wake possibly waiting conflicting requests.
199 * done from "drbd_process_done_ee" within the appropriate w.cb
200 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
202 do_wake = is_syncer_req
203 ? list_empty(&mdev->sync_ee)
204 : list_empty(&mdev->active_ee);
207 __drbd_chk_io_error(mdev, FALSE);
208 spin_unlock_irqrestore(&mdev->req_lock, flags);
211 drbd_rs_complete_io(mdev, e_sector);
214 wake_up(&mdev->ee_wait);
216 if (do_al_complete_io)
217 drbd_al_complete_io(mdev, e_sector);
224 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
226 void drbd_endio_pri(struct bio *bio, int error)
229 struct drbd_request *req = bio->bi_private;
230 struct drbd_conf *mdev = req->mdev;
231 struct bio_and_error m;
232 enum drbd_req_event what;
233 int uptodate = bio_flagged(bio, BIO_UPTODATE);
236 dev_warn(DEV, "p %s: error=%d\n",
237 bio_data_dir(bio) == WRITE ? "write" : "read", error);
238 if (!error && !uptodate) {
239 dev_warn(DEV, "p %s: setting error to -EIO\n",
240 bio_data_dir(bio) == WRITE ? "write" : "read");
241 /* strange behavior of some lower level drivers...
242 * fail the request by clearing the uptodate flag,
243 * but do not return any error?! */
247 trace_drbd_bio(mdev, "Pri", bio, 1, NULL);
249 /* to avoid recursion in __req_mod */
250 if (unlikely(error)) {
251 what = (bio_data_dir(bio) == WRITE)
252 ? write_completed_with_error
253 : (bio_rw(bio) == READA)
254 ? read_completed_with_error
255 : read_ahead_completed_with_error;
259 bio_put(req->private_bio);
260 req->private_bio = ERR_PTR(error);
262 spin_lock_irqsave(&mdev->req_lock, flags);
263 __req_mod(req, what, &m);
264 spin_unlock_irqrestore(&mdev->req_lock, flags);
267 complete_master_bio(mdev, &m);
270 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
272 struct drbd_request *req = container_of(w, struct drbd_request, w);
274 /* NOTE: mdev->ldev can be NULL by the time we get here! */
275 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
277 /* the only way this callback is scheduled is from _req_may_be_done,
278 * when it is done and had a local write error, see comments there */
284 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
286 struct drbd_request *req = container_of(w, struct drbd_request, w);
288 /* We should not detach for read io-error,
289 * but try to WRITE the P_DATA_REPLY to the failed location,
290 * to give the disk the chance to relocate that block */
292 spin_lock_irq(&mdev->req_lock);
294 mdev->state.conn < C_CONNECTED ||
295 mdev->state.pdsk <= D_INCONSISTENT) {
296 _req_mod(req, send_canceled);
297 spin_unlock_irq(&mdev->req_lock);
298 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
301 spin_unlock_irq(&mdev->req_lock);
303 return w_send_read_req(mdev, w, 0);
306 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 ERR_IF(cancel) return 1;
309 dev_err(DEV, "resync inactive, but callback triggered??\n");
310 return 1; /* Simply ignore this! */
313 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
315 struct hash_desc desc;
316 struct scatterlist sg;
317 struct bio_vec *bvec;
323 sg_init_table(&sg, 1);
324 crypto_hash_init(&desc);
326 __bio_for_each_segment(bvec, bio, i, 0) {
327 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
328 crypto_hash_update(&desc, &sg, sg.length);
330 crypto_hash_final(&desc, digest);
333 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
335 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
340 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
342 if (unlikely(cancel)) {
343 drbd_free_ee(mdev, e);
347 if (likely(drbd_bio_uptodate(e->private_bio))) {
348 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
349 digest = kmalloc(digest_size, GFP_NOIO);
351 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
353 inc_rs_pending(mdev);
354 ok = drbd_send_drequest_csum(mdev,
362 dev_err(DEV, "kmalloc() of digest failed.\n");
368 drbd_free_ee(mdev, e);
371 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
375 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
377 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
379 struct drbd_epoch_entry *e;
384 /* GFP_TRY, because if there is no memory available right now, this may
385 * be rescheduled for later. It is "only" background resync, after all. */
386 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
392 spin_lock_irq(&mdev->req_lock);
393 list_add(&e->w.list, &mdev->read_ee);
394 spin_unlock_irq(&mdev->req_lock);
396 e->private_bio->bi_end_io = drbd_endio_read_sec;
397 e->private_bio->bi_rw = READ;
398 e->w.cb = w_e_send_csum;
400 mdev->read_cnt += size >> 9;
401 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
406 void resync_timer_fn(unsigned long data)
409 struct drbd_conf *mdev = (struct drbd_conf *) data;
412 spin_lock_irqsave(&mdev->req_lock, flags);
414 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
416 if (mdev->state.conn == C_VERIFY_S)
417 mdev->resync_work.cb = w_make_ov_request;
419 mdev->resync_work.cb = w_make_resync_request;
422 mdev->resync_work.cb = w_resync_inactive;
425 spin_unlock_irqrestore(&mdev->req_lock, flags);
427 /* harmless race: list_empty outside data.work.q_lock */
428 if (list_empty(&mdev->resync_work.list) && queue)
429 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
432 int w_make_resync_request(struct drbd_conf *mdev,
433 struct drbd_work *w, int cancel)
437 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
438 int max_segment_size = queue_max_segment_size(mdev->rq_queue);
439 int number, i, size, pe, mx;
440 int align, queued, sndbuf;
442 if (unlikely(cancel))
445 if (unlikely(mdev->state.conn < C_CONNECTED)) {
446 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
450 if (mdev->state.conn != C_SYNC_TARGET)
451 dev_err(DEV, "%s in w_make_resync_request\n",
452 drbd_conn_str(mdev->state.conn));
454 if (!get_ldev(mdev)) {
455 /* Since we only need to access mdev->rsync a
456 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
457 to continue resync with a broken disk makes no sense at
459 dev_err(DEV, "Disk broke down during resync!\n");
460 mdev->resync_work.cb = w_resync_inactive;
464 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
465 pe = atomic_read(&mdev->rs_pending_cnt);
467 mutex_lock(&mdev->data.mutex);
468 if (mdev->data.socket)
469 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
472 mutex_unlock(&mdev->data.mutex);
474 /* For resync rates >160MB/sec, allow more pending RS requests */
478 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
479 if ((pe + number) > mx) {
483 for (i = 0; i < number; i++) {
484 /* Stop generating RS requests, when half of the send buffer is filled */
485 mutex_lock(&mdev->data.mutex);
486 if (mdev->data.socket) {
487 queued = mdev->data.socket->sk->sk_wmem_queued;
488 sndbuf = mdev->data.socket->sk->sk_sndbuf;
493 mutex_unlock(&mdev->data.mutex);
494 if (queued > sndbuf / 2)
498 size = BM_BLOCK_SIZE;
499 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
502 mdev->bm_resync_fo = drbd_bm_bits(mdev);
503 mdev->resync_work.cb = w_resync_inactive;
508 sector = BM_BIT_TO_SECT(bit);
510 if (drbd_try_rs_begin_io(mdev, sector)) {
511 mdev->bm_resync_fo = bit;
514 mdev->bm_resync_fo = bit + 1;
516 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
517 drbd_rs_complete_io(mdev, sector);
521 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
522 /* try to find some adjacent bits.
523 * we stop if we have already the maximum req size.
525 * Additionally always align bigger requests, in order to
526 * be prepared for all stripe sizes of software RAIDs.
528 * we _do_ care about the agreed-upon q->max_segment_size
529 * here, as splitting up the requests on the other side is more
530 * difficult. the consequence is, that on lvm and md and other
531 * "indirect" devices, this is dead code, since
532 * q->max_segment_size will be PAGE_SIZE.
536 if (size + BM_BLOCK_SIZE > max_segment_size)
539 /* Be always aligned */
540 if (sector & ((1<<(align+3))-1))
543 /* do not cross extent boundaries */
544 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
546 /* now, is it actually dirty, after all?
547 * caution, drbd_bm_test_bit is tri-state for some
548 * obscure reason; ( b == 0 ) would get the out-of-band
549 * only accidentally right because of the "oddly sized"
550 * adjustment below */
551 if (drbd_bm_test_bit(mdev, bit+1) != 1)
554 size += BM_BLOCK_SIZE;
555 if ((BM_BLOCK_SIZE << align) <= size)
559 /* if we merged some,
560 * reset the offset to start the next drbd_bm_find_next from */
561 if (size > BM_BLOCK_SIZE)
562 mdev->bm_resync_fo = bit + 1;
565 /* adjust very last sectors, in case we are oddly sized */
566 if (sector + (size>>9) > capacity)
567 size = (capacity-sector)<<9;
568 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
569 switch (read_for_csum(mdev, sector, size)) {
570 case 0: /* Disk failure*/
573 case 2: /* Allocation failed */
574 drbd_rs_complete_io(mdev, sector);
575 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
577 /* case 1: everything ok */
580 inc_rs_pending(mdev);
581 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
582 sector, size, ID_SYNCER)) {
583 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
584 dec_rs_pending(mdev);
591 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
592 /* last syncer _request_ was sent,
593 * but the P_RS_DATA_REPLY not yet received. sync will end (and
594 * next sync group will resume), as soon as we receive the last
595 * resync data block, and the last bit is cleared.
596 * until then resync "work" is "inactive" ...
598 mdev->resync_work.cb = w_resync_inactive;
604 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
609 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
613 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
615 if (unlikely(cancel))
618 if (unlikely(mdev->state.conn < C_CONNECTED)) {
619 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
623 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
624 if (atomic_read(&mdev->rs_pending_cnt) > number)
627 number -= atomic_read(&mdev->rs_pending_cnt);
629 sector = mdev->ov_position;
630 for (i = 0; i < number; i++) {
631 if (sector >= capacity) {
632 mdev->resync_work.cb = w_resync_inactive;
636 size = BM_BLOCK_SIZE;
638 if (drbd_try_rs_begin_io(mdev, sector)) {
639 mdev->ov_position = sector;
643 if (sector + (size>>9) > capacity)
644 size = (capacity-sector)<<9;
646 inc_rs_pending(mdev);
647 if (!drbd_send_ov_request(mdev, sector, size)) {
648 dec_rs_pending(mdev);
651 sector += BM_SECT_PER_BIT;
653 mdev->ov_position = sector;
656 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
661 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
665 drbd_resync_finished(mdev);
670 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
674 drbd_resync_finished(mdev);
679 int drbd_resync_finished(struct drbd_conf *mdev)
681 unsigned long db, dt, dbdt;
683 union drbd_state os, ns;
685 char *khelper_cmd = NULL;
687 /* Remove all elements from the resync LRU. Since future actions
688 * might set bits in the (main) bitmap, then the entries in the
689 * resync LRU would be wrong. */
690 if (drbd_rs_del_all(mdev)) {
691 /* In case this is not possible now, most probably because
692 * there are P_RS_DATA_REPLY Packets lingering on the worker's
693 * queue (or even the read operations for those packets
694 * is not finished by now). Retry in 100ms. */
697 __set_current_state(TASK_INTERRUPTIBLE);
698 schedule_timeout(HZ / 10);
699 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
701 w->cb = w_resync_finished;
702 drbd_queue_work(&mdev->data.work, w);
705 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
708 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
712 dbdt = Bit2KB(db/dt);
713 mdev->rs_paused /= HZ;
718 spin_lock_irq(&mdev->req_lock);
721 /* This protects us against multiple calls (that can happen in the presence
722 of application IO), and against connectivity loss just before we arrive here. */
723 if (os.conn <= C_CONNECTED)
727 ns.conn = C_CONNECTED;
729 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
730 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
731 "Online verify " : "Resync",
732 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
734 n_oos = drbd_bm_total_weight(mdev);
736 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
738 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
740 khelper_cmd = "out-of-sync";
743 D_ASSERT((n_oos - mdev->rs_failed) == 0);
745 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
746 khelper_cmd = "after-resync-target";
748 if (mdev->csums_tfm && mdev->rs_total) {
749 const unsigned long s = mdev->rs_same_csum;
750 const unsigned long t = mdev->rs_total;
753 (t < 100000) ? ((s*100)/t) : (s/(t/100));
754 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
755 "transferred %luK total %luK\n",
757 Bit2KB(mdev->rs_same_csum),
758 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
759 Bit2KB(mdev->rs_total));
763 if (mdev->rs_failed) {
764 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
766 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
767 ns.disk = D_INCONSISTENT;
768 ns.pdsk = D_UP_TO_DATE;
770 ns.disk = D_UP_TO_DATE;
771 ns.pdsk = D_INCONSISTENT;
774 ns.disk = D_UP_TO_DATE;
775 ns.pdsk = D_UP_TO_DATE;
777 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
780 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
781 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
782 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
783 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
785 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
789 drbd_uuid_set_bm(mdev, 0UL);
792 /* Now the two UUID sets are equal, update what we
793 * know of the peer. */
795 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
796 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
800 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
802 spin_unlock_irq(&mdev->req_lock);
808 mdev->ov_start_sector = 0;
810 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
811 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
812 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
816 drbd_khelper(mdev, khelper_cmd);
822 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
824 if (drbd_bio_has_active_page(e->private_bio)) {
825 /* This might happen if sendpage() has not finished */
826 spin_lock_irq(&mdev->req_lock);
827 list_add_tail(&e->w.list, &mdev->net_ee);
828 spin_unlock_irq(&mdev->req_lock);
830 drbd_free_ee(mdev, e);
834 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
835 * @mdev: DRBD device.
837 * @cancel: The connection will be closed anyways
839 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
841 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
844 if (unlikely(cancel)) {
845 drbd_free_ee(mdev, e);
850 if (likely(drbd_bio_uptodate(e->private_bio))) {
851 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
853 if (__ratelimit(&drbd_ratelimit_state))
854 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
855 (unsigned long long)e->sector);
857 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
862 move_to_net_ee_or_free(mdev, e);
865 dev_err(DEV, "drbd_send_block() failed\n");
870 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
871 * @mdev: DRBD device.
873 * @cancel: The connection will be closed anyways
875 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
877 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
880 if (unlikely(cancel)) {
881 drbd_free_ee(mdev, e);
886 if (get_ldev_if_state(mdev, D_FAILED)) {
887 drbd_rs_complete_io(mdev, e->sector);
891 if (likely(drbd_bio_uptodate(e->private_bio))) {
892 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
893 inc_rs_pending(mdev);
894 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
896 if (__ratelimit(&drbd_ratelimit_state))
897 dev_err(DEV, "Not sending RSDataReply, "
898 "partner DISKLESS!\n");
902 if (__ratelimit(&drbd_ratelimit_state))
903 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
904 (unsigned long long)e->sector);
906 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
908 /* update resync data with failure */
909 drbd_rs_failed_io(mdev, e->sector, e->size);
914 move_to_net_ee_or_free(mdev, e);
917 dev_err(DEV, "drbd_send_block() failed\n");
921 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
923 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
924 struct digest_info *di;
929 if (unlikely(cancel)) {
930 drbd_free_ee(mdev, e);
935 drbd_rs_complete_io(mdev, e->sector);
937 di = (struct digest_info *)(unsigned long)e->block_id;
939 if (likely(drbd_bio_uptodate(e->private_bio))) {
940 /* quick hack to try to avoid a race against reconfiguration.
941 * a real fix would be much more involved,
942 * introducing more locking mechanisms */
943 if (mdev->csums_tfm) {
944 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
945 D_ASSERT(digest_size == di->digest_size);
946 digest = kmalloc(digest_size, GFP_NOIO);
949 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
950 eq = !memcmp(digest, di->digest, digest_size);
955 drbd_set_in_sync(mdev, e->sector, e->size);
956 mdev->rs_same_csum++;
957 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
959 inc_rs_pending(mdev);
960 e->block_id = ID_SYNCER;
961 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
964 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
965 if (__ratelimit(&drbd_ratelimit_state))
966 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
973 move_to_net_ee_or_free(mdev, e);
976 dev_err(DEV, "drbd_send_block/ack() failed\n");
980 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
982 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
987 if (unlikely(cancel))
990 if (unlikely(!drbd_bio_uptodate(e->private_bio)))
993 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
994 /* FIXME if this allocation fails, online verify will not terminate! */
995 digest = kmalloc(digest_size, GFP_NOIO);
997 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
998 inc_rs_pending(mdev);
999 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1000 digest, digest_size, P_OV_REPLY);
1002 dec_rs_pending(mdev);
1007 drbd_free_ee(mdev, e);
1014 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1016 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1017 mdev->ov_last_oos_size += size>>9;
1019 mdev->ov_last_oos_start = sector;
1020 mdev->ov_last_oos_size = size>>9;
1022 drbd_set_out_of_sync(mdev, sector, size);
1023 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1026 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1028 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1029 struct digest_info *di;
1034 if (unlikely(cancel)) {
1035 drbd_free_ee(mdev, e);
1040 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1041 * the resync lru has been cleaned up already */
1042 drbd_rs_complete_io(mdev, e->sector);
1044 di = (struct digest_info *)(unsigned long)e->block_id;
1046 if (likely(drbd_bio_uptodate(e->private_bio))) {
1047 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1048 digest = kmalloc(digest_size, GFP_NOIO);
1050 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1052 D_ASSERT(digest_size == di->digest_size);
1053 eq = !memcmp(digest, di->digest, digest_size);
1057 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1058 if (__ratelimit(&drbd_ratelimit_state))
1059 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1067 drbd_ov_oos_found(mdev, e->sector, e->size);
1071 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1072 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1074 drbd_free_ee(mdev, e);
1076 if (--mdev->ov_left == 0) {
1078 drbd_resync_finished(mdev);
1084 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1091 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1094 struct p_barrier *p = &mdev->data.sbuf.barrier;
1097 /* really avoid racing with tl_clear. w.cb may have been referenced
1098 * just before it was reassigned and re-queued, so double check that.
1099 * actually, this race was harmless, since we only try to send the
1100 * barrier packet here, and otherwise do nothing with the object.
1101 * but compare with the head of w_clear_epoch */
1102 spin_lock_irq(&mdev->req_lock);
1103 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1105 spin_unlock_irq(&mdev->req_lock);
1109 if (!drbd_get_data_sock(mdev))
1111 p->barrier = b->br_number;
1112 /* inc_ap_pending was done where this was queued.
1113 * dec_ap_pending will be done in got_BarrierAck
1114 * or (on connection loss) in w_clear_epoch. */
1115 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1116 (struct p_header *)p, sizeof(*p), 0);
1117 drbd_put_data_sock(mdev);
1122 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1126 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1130 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1131 * @mdev: DRBD device.
1133 * @cancel: The connection will be closed anyways
1135 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1137 struct drbd_request *req = container_of(w, struct drbd_request, w);
1140 if (unlikely(cancel)) {
1141 req_mod(req, send_canceled);
1145 ok = drbd_send_dblock(mdev, req);
1146 req_mod(req, ok ? handed_over_to_network : send_failed);
1152 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1153 * @mdev: DRBD device.
1155 * @cancel: The connection will be closed anyways
1157 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1159 struct drbd_request *req = container_of(w, struct drbd_request, w);
1162 if (unlikely(cancel)) {
1163 req_mod(req, send_canceled);
1167 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1168 (unsigned long)req);
1171 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1172 * so this is probably redundant */
1173 if (mdev->state.conn >= C_CONNECTED)
1174 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1176 req_mod(req, ok ? handed_over_to_network : send_failed);
1181 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1183 struct drbd_conf *odev = mdev;
1186 if (odev->sync_conf.after == -1)
1188 odev = minor_to_mdev(odev->sync_conf.after);
1189 ERR_IF(!odev) return 1;
1190 if ((odev->state.conn >= C_SYNC_SOURCE &&
1191 odev->state.conn <= C_PAUSED_SYNC_T) ||
1192 odev->state.aftr_isp || odev->state.peer_isp ||
1193 odev->state.user_isp)
1199 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1200 * @mdev: DRBD device.
1202 * Called from process context only (admin command and after_state_ch).
1204 static int _drbd_pause_after(struct drbd_conf *mdev)
1206 struct drbd_conf *odev;
1209 for (i = 0; i < minor_count; i++) {
1210 odev = minor_to_mdev(i);
1213 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1215 if (!_drbd_may_sync_now(odev))
1216 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1217 != SS_NOTHING_TO_DO);
1224 * _drbd_resume_next() - Resume resync on all devices that may resync now
1225 * @mdev: DRBD device.
1227 * Called from process context only (admin command and worker).
1229 static int _drbd_resume_next(struct drbd_conf *mdev)
1231 struct drbd_conf *odev;
1234 for (i = 0; i < minor_count; i++) {
1235 odev = minor_to_mdev(i);
1238 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1240 if (odev->state.aftr_isp) {
1241 if (_drbd_may_sync_now(odev))
1242 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1244 != SS_NOTHING_TO_DO) ;
1250 void resume_next_sg(struct drbd_conf *mdev)
1252 write_lock_irq(&global_state_lock);
1253 _drbd_resume_next(mdev);
1254 write_unlock_irq(&global_state_lock);
1257 void suspend_other_sg(struct drbd_conf *mdev)
1259 write_lock_irq(&global_state_lock);
1260 _drbd_pause_after(mdev);
1261 write_unlock_irq(&global_state_lock);
1264 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1266 struct drbd_conf *odev;
1270 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1271 return ERR_SYNC_AFTER;
1273 /* check for loops */
1274 odev = minor_to_mdev(o_minor);
1277 return ERR_SYNC_AFTER_CYCLE;
1279 /* dependency chain ends here, no cycles. */
1280 if (odev->sync_conf.after == -1)
1283 /* follow the dependency chain */
1284 odev = minor_to_mdev(odev->sync_conf.after);
1288 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1293 write_lock_irq(&global_state_lock);
1294 retcode = sync_after_error(mdev, na);
1295 if (retcode == NO_ERROR) {
1296 mdev->sync_conf.after = na;
1298 changes = _drbd_pause_after(mdev);
1299 changes |= _drbd_resume_next(mdev);
1302 write_unlock_irq(&global_state_lock);
1307 * drbd_start_resync() - Start the resync process
1308 * @mdev: DRBD device.
1309 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1311 * This function might bring you directly into one of the
1312 * C_PAUSED_SYNC_* states.
1314 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1316 union drbd_state ns;
1319 if (mdev->state.conn >= C_SYNC_SOURCE) {
1320 dev_err(DEV, "Resync already running!\n");
1324 trace_drbd_resync(mdev, TRACE_LVL_SUMMARY, "Resync starting: side=%s\n",
1325 side == C_SYNC_TARGET ? "SyncTarget" : "SyncSource");
1327 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1328 drbd_rs_cancel_all(mdev);
1330 if (side == C_SYNC_TARGET) {
1331 /* Since application IO was locked out during C_WF_BITMAP_T and
1332 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1333 we check that we might make the data inconsistent. */
1334 r = drbd_khelper(mdev, "before-resync-target");
1335 r = (r >> 8) & 0xff;
1337 dev_info(DEV, "before-resync-target handler returned %d, "
1338 "dropping connection.\n", r);
1339 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1344 drbd_state_lock(mdev);
1346 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1347 drbd_state_unlock(mdev);
1351 if (side == C_SYNC_TARGET) {
1352 mdev->bm_resync_fo = 0;
1353 } else /* side == C_SYNC_SOURCE */ {
1356 get_random_bytes(&uuid, sizeof(u64));
1357 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1358 drbd_send_sync_uuid(mdev, uuid);
1360 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1363 write_lock_irq(&global_state_lock);
1366 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1370 if (side == C_SYNC_TARGET)
1371 ns.disk = D_INCONSISTENT;
1372 else /* side == C_SYNC_SOURCE */
1373 ns.pdsk = D_INCONSISTENT;
1375 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1378 if (ns.conn < C_CONNECTED)
1379 r = SS_UNKNOWN_ERROR;
1381 if (r == SS_SUCCESS) {
1383 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1384 mdev->rs_failed = 0;
1385 mdev->rs_paused = 0;
1387 mdev->rs_mark_time = jiffies;
1388 mdev->rs_same_csum = 0;
1389 _drbd_pause_after(mdev);
1391 write_unlock_irq(&global_state_lock);
1392 drbd_state_unlock(mdev);
1395 if (r == SS_SUCCESS) {
1396 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1397 drbd_conn_str(ns.conn),
1398 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1399 (unsigned long) mdev->rs_total);
1401 if (mdev->rs_total == 0) {
1402 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1404 __set_current_state(TASK_INTERRUPTIBLE);
1405 schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1406 drbd_resync_finished(mdev);
1410 /* ns.conn may already be != mdev->state.conn,
1411 * we may have been paused in between, or become paused until
1412 * the timer triggers.
1413 * No matter, that is handled in resync_timer_fn() */
1414 if (ns.conn == C_SYNC_TARGET)
1415 mod_timer(&mdev->resync_timer, jiffies);
1421 int drbd_worker(struct drbd_thread *thi)
1423 struct drbd_conf *mdev = thi->mdev;
1424 struct drbd_work *w = NULL;
1425 LIST_HEAD(work_list);
1428 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1430 while (get_t_state(thi) == Running) {
1431 drbd_thread_current_set_cpu(mdev);
1433 if (down_trylock(&mdev->data.work.s)) {
1434 mutex_lock(&mdev->data.mutex);
1435 if (mdev->data.socket && !mdev->net_conf->no_cork)
1436 drbd_tcp_uncork(mdev->data.socket);
1437 mutex_unlock(&mdev->data.mutex);
1439 intr = down_interruptible(&mdev->data.work.s);
1441 mutex_lock(&mdev->data.mutex);
1442 if (mdev->data.socket && !mdev->net_conf->no_cork)
1443 drbd_tcp_cork(mdev->data.socket);
1444 mutex_unlock(&mdev->data.mutex);
1448 D_ASSERT(intr == -EINTR);
1449 flush_signals(current);
1450 ERR_IF (get_t_state(thi) == Running)
1455 if (get_t_state(thi) != Running)
1457 /* With this break, we have done a down() but not consumed
1458 the entry from the list. The cleanup code takes care of
1462 spin_lock_irq(&mdev->data.work.q_lock);
1463 ERR_IF(list_empty(&mdev->data.work.q)) {
1464 /* something terribly wrong in our logic.
1465 * we were able to down() the semaphore,
1466 * but the list is empty... doh.
1468 * what is the best thing to do now?
1469 * try again from scratch, restarting the receiver,
1470 * asender, whatnot? could break even more ugly,
1471 * e.g. when we are primary, but no good local data.
1473 * I'll try to get away just starting over this loop.
1475 spin_unlock_irq(&mdev->data.work.q_lock);
1478 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1479 list_del_init(&w->list);
1480 spin_unlock_irq(&mdev->data.work.q_lock);
1482 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1483 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1484 if (mdev->state.conn >= C_CONNECTED)
1485 drbd_force_state(mdev,
1486 NS(conn, C_NETWORK_FAILURE));
1489 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1490 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1492 spin_lock_irq(&mdev->data.work.q_lock);
1494 while (!list_empty(&mdev->data.work.q)) {
1495 list_splice_init(&mdev->data.work.q, &work_list);
1496 spin_unlock_irq(&mdev->data.work.q_lock);
1498 while (!list_empty(&work_list)) {
1499 w = list_entry(work_list.next, struct drbd_work, list);
1500 list_del_init(&w->list);
1502 i++; /* dead debugging code */
1505 spin_lock_irq(&mdev->data.work.q_lock);
1507 sema_init(&mdev->data.work.s, 0);
1508 /* DANGEROUS race: if someone did queue his work within the spinlock,
1509 * but up() ed outside the spinlock, we could get an up() on the
1510 * semaphore without corresponding list entry.
1513 spin_unlock_irq(&mdev->data.work.q_lock);
1515 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1516 /* _drbd_set_state only uses stop_nowait.
1517 * wait here for the Exiting receiver. */
1518 drbd_thread_stop(&mdev->receiver);
1519 drbd_mdev_cleanup(mdev);
1521 dev_info(DEV, "worker terminated\n");
1523 clear_bit(DEVICE_DYING, &mdev->flags);
1524 clear_bit(CONFIG_PENDING, &mdev->flags);
1525 wake_up(&mdev->state_wait);