4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
42 #define SLEEP_TIME (HZ/10)
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
53 * more endio handlers:
54 atodb_endio in drbd_actlog.c
55 drbd_bm_async_io_complete in drbd_bitmap.c
57 * For all these callbacks, note the following:
58 * The callbacks will be called in irq context by the IDE drivers,
59 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60 * Try to get the locking right :)
65 /* About the global_state_lock
66 Each state transition on an device holds a read lock. In case we have
67 to evaluate the sync after dependencies, we grab a write lock, because
68 we need stable states on all devices for that. */
69 rwlock_t global_state_lock;
71 /* used for synchronous meta data and bitmap IO
72 * submitted by drbd_md_sync_page_io()
74 void drbd_md_io_complete(struct bio *bio, int error)
76 struct drbd_md_io *md_io;
78 md_io = (struct drbd_md_io *)bio->bi_private;
81 complete(&md_io->event);
84 /* reads on behalf of the partner,
85 * "submitted" by the receiver
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
89 unsigned long flags = 0;
90 struct drbd_conf *mdev = e->mdev;
92 D_ASSERT(e->block_id != ID_VACANT);
94 spin_lock_irqsave(&mdev->req_lock, flags);
95 mdev->read_cnt += e->size >> 9;
97 if (list_empty(&mdev->read_ee))
98 wake_up(&mdev->ee_wait);
99 if (test_bit(__EE_WAS_ERROR, &e->flags))
100 __drbd_chk_io_error(mdev, FALSE);
101 spin_unlock_irqrestore(&mdev->req_lock, flags);
103 drbd_queue_work(&mdev->data.work, &e->w);
107 static int is_failed_barrier(int ee_flags)
109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 == (EE_IS_BARRIER|EE_WAS_ERROR);
113 /* writes on behalf of the partner, or resync writes,
114 * "submitted" by the receiver, final stage. */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
117 unsigned long flags = 0;
118 struct drbd_conf *mdev = e->mdev;
122 int do_al_complete_io;
124 /* if this is a failed barrier request, disable use of barriers,
125 * and schedule for resubmission */
126 if (is_failed_barrier(e->flags)) {
127 drbd_bump_write_ordering(mdev, WO_bdev_flush);
128 spin_lock_irqsave(&mdev->req_lock, flags);
129 list_del(&e->w.list);
130 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131 e->w.cb = w_e_reissue;
132 /* put_ldev actually happens below, once we come here again. */
134 spin_unlock_irqrestore(&mdev->req_lock, flags);
135 drbd_queue_work(&mdev->data.work, &e->w);
139 D_ASSERT(e->block_id != ID_VACANT);
141 /* after we moved e to done_ee,
142 * we may no longer access it,
143 * it may be freed/reused already!
144 * (as soon as we release the req_lock) */
145 e_sector = e->sector;
146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 is_syncer_req = is_syncer_block_id(e->block_id);
149 spin_lock_irqsave(&mdev->req_lock, flags);
150 mdev->writ_cnt += e->size >> 9;
151 list_del(&e->w.list); /* has been on active_ee or sync_ee */
152 list_add_tail(&e->w.list, &mdev->done_ee);
154 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155 * neither did we wake possibly waiting conflicting requests.
156 * done from "drbd_process_done_ee" within the appropriate w.cb
157 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
159 do_wake = is_syncer_req
160 ? list_empty(&mdev->sync_ee)
161 : list_empty(&mdev->active_ee);
163 if (test_bit(__EE_WAS_ERROR, &e->flags))
164 __drbd_chk_io_error(mdev, FALSE);
165 spin_unlock_irqrestore(&mdev->req_lock, flags);
168 drbd_rs_complete_io(mdev, e_sector);
171 wake_up(&mdev->ee_wait);
173 if (do_al_complete_io)
174 drbd_al_complete_io(mdev, e_sector);
180 /* writes on behalf of the partner, or resync writes,
181 * "submitted" by the receiver.
183 void drbd_endio_sec(struct bio *bio, int error)
185 struct drbd_epoch_entry *e = bio->bi_private;
186 struct drbd_conf *mdev = e->mdev;
187 int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 int is_write = bio_data_dir(bio) == WRITE;
191 dev_warn(DEV, "%s: error=%d s=%llus\n",
192 is_write ? "write" : "read", error,
193 (unsigned long long)e->sector);
194 if (!error && !uptodate) {
195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 is_write ? "write" : "read",
197 (unsigned long long)e->sector);
198 /* strange behavior of some lower level drivers...
199 * fail the request by clearing the uptodate flag,
200 * but do not return any error?! */
205 set_bit(__EE_WAS_ERROR, &e->flags);
207 bio_put(bio); /* no need for the bio anymore */
208 if (atomic_dec_and_test(&e->pending_bios)) {
210 drbd_endio_write_sec_final(e);
212 drbd_endio_read_sec_final(e);
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
218 void drbd_endio_pri(struct bio *bio, int error)
221 struct drbd_request *req = bio->bi_private;
222 struct drbd_conf *mdev = req->mdev;
223 struct bio_and_error m;
224 enum drbd_req_event what;
225 int uptodate = bio_flagged(bio, BIO_UPTODATE);
228 dev_warn(DEV, "p %s: error=%d\n",
229 bio_data_dir(bio) == WRITE ? "write" : "read", error);
230 if (!error && !uptodate) {
231 dev_warn(DEV, "p %s: setting error to -EIO\n",
232 bio_data_dir(bio) == WRITE ? "write" : "read");
233 /* strange behavior of some lower level drivers...
234 * fail the request by clearing the uptodate flag,
235 * but do not return any error?! */
239 /* to avoid recursion in __req_mod */
240 if (unlikely(error)) {
241 what = (bio_data_dir(bio) == WRITE)
242 ? write_completed_with_error
243 : (bio_rw(bio) == READ)
244 ? read_completed_with_error
245 : read_ahead_completed_with_error;
249 bio_put(req->private_bio);
250 req->private_bio = ERR_PTR(error);
252 spin_lock_irqsave(&mdev->req_lock, flags);
253 __req_mod(req, what, &m);
254 spin_unlock_irqrestore(&mdev->req_lock, flags);
257 complete_master_bio(mdev, &m);
260 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
262 struct drbd_request *req = container_of(w, struct drbd_request, w);
264 /* We should not detach for read io-error,
265 * but try to WRITE the P_DATA_REPLY to the failed location,
266 * to give the disk the chance to relocate that block */
268 spin_lock_irq(&mdev->req_lock);
269 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
270 _req_mod(req, read_retry_remote_canceled);
271 spin_unlock_irq(&mdev->req_lock);
272 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
275 spin_unlock_irq(&mdev->req_lock);
277 return w_send_read_req(mdev, w, 0);
280 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
282 ERR_IF(cancel) return 1;
283 dev_err(DEV, "resync inactive, but callback triggered??\n");
284 return 1; /* Simply ignore this! */
287 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
289 struct hash_desc desc;
290 struct scatterlist sg;
291 struct page *page = e->pages;
298 sg_init_table(&sg, 1);
299 crypto_hash_init(&desc);
301 while ((tmp = page_chain_next(page))) {
302 /* all but the last page will be fully used */
303 sg_set_page(&sg, page, PAGE_SIZE, 0);
304 crypto_hash_update(&desc, &sg, sg.length);
307 /* and now the last, possibly only partially used page */
308 len = e->size & (PAGE_SIZE - 1);
309 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
310 crypto_hash_update(&desc, &sg, sg.length);
311 crypto_hash_final(&desc, digest);
314 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
316 struct hash_desc desc;
317 struct scatterlist sg;
318 struct bio_vec *bvec;
324 sg_init_table(&sg, 1);
325 crypto_hash_init(&desc);
327 __bio_for_each_segment(bvec, bio, i, 0) {
328 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
329 crypto_hash_update(&desc, &sg, sg.length);
331 crypto_hash_final(&desc, digest);
334 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
336 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
341 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
343 if (unlikely(cancel)) {
344 drbd_free_ee(mdev, e);
348 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
349 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
350 digest = kmalloc(digest_size, GFP_NOIO);
352 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
354 inc_rs_pending(mdev);
355 ok = drbd_send_drequest_csum(mdev,
363 dev_err(DEV, "kmalloc() of digest failed.\n");
369 drbd_free_ee(mdev, e);
372 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
376 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
378 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
380 struct drbd_epoch_entry *e;
385 /* GFP_TRY, because if there is no memory available right now, this may
386 * be rescheduled for later. It is "only" background resync, after all. */
387 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
391 spin_lock_irq(&mdev->req_lock);
392 list_add(&e->w.list, &mdev->read_ee);
393 spin_unlock_irq(&mdev->req_lock);
395 e->w.cb = w_e_send_csum;
396 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
399 drbd_free_ee(mdev, e);
405 void resync_timer_fn(unsigned long data)
408 struct drbd_conf *mdev = (struct drbd_conf *) data;
411 spin_lock_irqsave(&mdev->req_lock, flags);
413 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
415 if (mdev->state.conn == C_VERIFY_S)
416 mdev->resync_work.cb = w_make_ov_request;
418 mdev->resync_work.cb = w_make_resync_request;
421 mdev->resync_work.cb = w_resync_inactive;
424 spin_unlock_irqrestore(&mdev->req_lock, flags);
426 /* harmless race: list_empty outside data.work.q_lock */
427 if (list_empty(&mdev->resync_work.list) && queue)
428 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
431 static int calc_resync_rate(struct drbd_conf *mdev)
433 int d = mdev->data_delay / 1000; /* us -> ms */
434 int td = mdev->sync_conf.throttle_th * 100; /* 0.1s -> ms */
435 int hd = mdev->sync_conf.hold_off_th * 100; /* 0.1s -> ms */
436 int cr = mdev->sync_conf.rate;
438 return d <= td ? cr :
440 cr + (cr * (td - d) / (hd - td));
443 int w_make_resync_request(struct drbd_conf *mdev,
444 struct drbd_work *w, int cancel)
448 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
449 int max_segment_size;
450 int number, i, size, pe, mx;
451 int align, queued, sndbuf;
453 if (unlikely(cancel))
456 if (unlikely(mdev->state.conn < C_CONNECTED)) {
457 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
461 if (mdev->state.conn != C_SYNC_TARGET)
462 dev_err(DEV, "%s in w_make_resync_request\n",
463 drbd_conn_str(mdev->state.conn));
465 if (!get_ldev(mdev)) {
466 /* Since we only need to access mdev->rsync a
467 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
468 to continue resync with a broken disk makes no sense at
470 dev_err(DEV, "Disk broke down during resync!\n");
471 mdev->resync_work.cb = w_resync_inactive;
475 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
476 * if it should be necessary */
477 max_segment_size = mdev->agreed_pro_version < 94 ?
478 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
480 mdev->c_sync_rate = calc_resync_rate(mdev);
481 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
482 pe = atomic_read(&mdev->rs_pending_cnt);
484 mutex_lock(&mdev->data.mutex);
485 if (mdev->data.socket)
486 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
489 mutex_unlock(&mdev->data.mutex);
491 /* For resync rates >160MB/sec, allow more pending RS requests */
495 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
496 if ((pe + number) > mx) {
500 for (i = 0; i < number; i++) {
501 /* Stop generating RS requests, when half of the send buffer is filled */
502 mutex_lock(&mdev->data.mutex);
503 if (mdev->data.socket) {
504 queued = mdev->data.socket->sk->sk_wmem_queued;
505 sndbuf = mdev->data.socket->sk->sk_sndbuf;
510 mutex_unlock(&mdev->data.mutex);
511 if (queued > sndbuf / 2)
515 size = BM_BLOCK_SIZE;
516 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
519 mdev->bm_resync_fo = drbd_bm_bits(mdev);
520 mdev->resync_work.cb = w_resync_inactive;
525 sector = BM_BIT_TO_SECT(bit);
527 if (drbd_try_rs_begin_io(mdev, sector)) {
528 mdev->bm_resync_fo = bit;
531 mdev->bm_resync_fo = bit + 1;
533 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
534 drbd_rs_complete_io(mdev, sector);
538 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
539 /* try to find some adjacent bits.
540 * we stop if we have already the maximum req size.
542 * Additionally always align bigger requests, in order to
543 * be prepared for all stripe sizes of software RAIDs.
547 if (size + BM_BLOCK_SIZE > max_segment_size)
550 /* Be always aligned */
551 if (sector & ((1<<(align+3))-1))
554 /* do not cross extent boundaries */
555 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
557 /* now, is it actually dirty, after all?
558 * caution, drbd_bm_test_bit is tri-state for some
559 * obscure reason; ( b == 0 ) would get the out-of-band
560 * only accidentally right because of the "oddly sized"
561 * adjustment below */
562 if (drbd_bm_test_bit(mdev, bit+1) != 1)
565 size += BM_BLOCK_SIZE;
566 if ((BM_BLOCK_SIZE << align) <= size)
570 /* if we merged some,
571 * reset the offset to start the next drbd_bm_find_next from */
572 if (size > BM_BLOCK_SIZE)
573 mdev->bm_resync_fo = bit + 1;
576 /* adjust very last sectors, in case we are oddly sized */
577 if (sector + (size>>9) > capacity)
578 size = (capacity-sector)<<9;
579 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
580 switch (read_for_csum(mdev, sector, size)) {
581 case 0: /* Disk failure*/
584 case 2: /* Allocation failed */
585 drbd_rs_complete_io(mdev, sector);
586 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
588 /* case 1: everything ok */
591 inc_rs_pending(mdev);
592 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
593 sector, size, ID_SYNCER)) {
594 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
595 dec_rs_pending(mdev);
602 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
603 /* last syncer _request_ was sent,
604 * but the P_RS_DATA_REPLY not yet received. sync will end (and
605 * next sync group will resume), as soon as we receive the last
606 * resync data block, and the last bit is cleared.
607 * until then resync "work" is "inactive" ...
609 mdev->resync_work.cb = w_resync_inactive;
615 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
620 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
624 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
626 if (unlikely(cancel))
629 if (unlikely(mdev->state.conn < C_CONNECTED)) {
630 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
634 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
635 if (atomic_read(&mdev->rs_pending_cnt) > number)
638 number -= atomic_read(&mdev->rs_pending_cnt);
640 sector = mdev->ov_position;
641 for (i = 0; i < number; i++) {
642 if (sector >= capacity) {
643 mdev->resync_work.cb = w_resync_inactive;
647 size = BM_BLOCK_SIZE;
649 if (drbd_try_rs_begin_io(mdev, sector)) {
650 mdev->ov_position = sector;
654 if (sector + (size>>9) > capacity)
655 size = (capacity-sector)<<9;
657 inc_rs_pending(mdev);
658 if (!drbd_send_ov_request(mdev, sector, size)) {
659 dec_rs_pending(mdev);
662 sector += BM_SECT_PER_BIT;
664 mdev->ov_position = sector;
667 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
672 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
676 drbd_resync_finished(mdev);
681 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
685 drbd_resync_finished(mdev);
690 int drbd_resync_finished(struct drbd_conf *mdev)
692 unsigned long db, dt, dbdt;
694 union drbd_state os, ns;
696 char *khelper_cmd = NULL;
698 /* Remove all elements from the resync LRU. Since future actions
699 * might set bits in the (main) bitmap, then the entries in the
700 * resync LRU would be wrong. */
701 if (drbd_rs_del_all(mdev)) {
702 /* In case this is not possible now, most probably because
703 * there are P_RS_DATA_REPLY Packets lingering on the worker's
704 * queue (or even the read operations for those packets
705 * is not finished by now). Retry in 100ms. */
708 __set_current_state(TASK_INTERRUPTIBLE);
709 schedule_timeout(HZ / 10);
710 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
712 w->cb = w_resync_finished;
713 drbd_queue_work(&mdev->data.work, w);
716 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
719 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
723 dbdt = Bit2KB(db/dt);
724 mdev->rs_paused /= HZ;
729 spin_lock_irq(&mdev->req_lock);
732 /* This protects us against multiple calls (that can happen in the presence
733 of application IO), and against connectivity loss just before we arrive here. */
734 if (os.conn <= C_CONNECTED)
738 ns.conn = C_CONNECTED;
740 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
741 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
742 "Online verify " : "Resync",
743 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
745 n_oos = drbd_bm_total_weight(mdev);
747 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
749 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
751 khelper_cmd = "out-of-sync";
754 D_ASSERT((n_oos - mdev->rs_failed) == 0);
756 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
757 khelper_cmd = "after-resync-target";
759 if (mdev->csums_tfm && mdev->rs_total) {
760 const unsigned long s = mdev->rs_same_csum;
761 const unsigned long t = mdev->rs_total;
764 (t < 100000) ? ((s*100)/t) : (s/(t/100));
765 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
766 "transferred %luK total %luK\n",
768 Bit2KB(mdev->rs_same_csum),
769 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
770 Bit2KB(mdev->rs_total));
774 if (mdev->rs_failed) {
775 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
777 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
778 ns.disk = D_INCONSISTENT;
779 ns.pdsk = D_UP_TO_DATE;
781 ns.disk = D_UP_TO_DATE;
782 ns.pdsk = D_INCONSISTENT;
785 ns.disk = D_UP_TO_DATE;
786 ns.pdsk = D_UP_TO_DATE;
788 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
791 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
792 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
793 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
794 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
796 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
800 drbd_uuid_set_bm(mdev, 0UL);
803 /* Now the two UUID sets are equal, update what we
804 * know of the peer. */
806 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
807 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
811 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
813 spin_unlock_irq(&mdev->req_lock);
819 mdev->ov_start_sector = 0;
821 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
822 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
823 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
827 drbd_khelper(mdev, khelper_cmd);
833 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
835 if (drbd_ee_has_active_page(e)) {
836 /* This might happen if sendpage() has not finished */
837 spin_lock_irq(&mdev->req_lock);
838 list_add_tail(&e->w.list, &mdev->net_ee);
839 spin_unlock_irq(&mdev->req_lock);
841 drbd_free_ee(mdev, e);
845 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
846 * @mdev: DRBD device.
848 * @cancel: The connection will be closed anyways
850 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
852 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
855 if (unlikely(cancel)) {
856 drbd_free_ee(mdev, e);
861 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
862 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
864 if (__ratelimit(&drbd_ratelimit_state))
865 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
866 (unsigned long long)e->sector);
868 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
873 move_to_net_ee_or_free(mdev, e);
876 dev_err(DEV, "drbd_send_block() failed\n");
881 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
882 * @mdev: DRBD device.
884 * @cancel: The connection will be closed anyways
886 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
888 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
891 if (unlikely(cancel)) {
892 drbd_free_ee(mdev, e);
897 if (get_ldev_if_state(mdev, D_FAILED)) {
898 drbd_rs_complete_io(mdev, e->sector);
902 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
903 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
904 inc_rs_pending(mdev);
905 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
907 if (__ratelimit(&drbd_ratelimit_state))
908 dev_err(DEV, "Not sending RSDataReply, "
909 "partner DISKLESS!\n");
913 if (__ratelimit(&drbd_ratelimit_state))
914 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
915 (unsigned long long)e->sector);
917 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
919 /* update resync data with failure */
920 drbd_rs_failed_io(mdev, e->sector, e->size);
925 move_to_net_ee_or_free(mdev, e);
928 dev_err(DEV, "drbd_send_block() failed\n");
932 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
935 struct digest_info *di;
940 if (unlikely(cancel)) {
941 drbd_free_ee(mdev, e);
946 drbd_rs_complete_io(mdev, e->sector);
948 di = (struct digest_info *)(unsigned long)e->block_id;
950 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951 /* quick hack to try to avoid a race against reconfiguration.
952 * a real fix would be much more involved,
953 * introducing more locking mechanisms */
954 if (mdev->csums_tfm) {
955 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
956 D_ASSERT(digest_size == di->digest_size);
957 digest = kmalloc(digest_size, GFP_NOIO);
960 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
961 eq = !memcmp(digest, di->digest, digest_size);
966 drbd_set_in_sync(mdev, e->sector, e->size);
967 /* rs_same_csums unit is BM_BLOCK_SIZE */
968 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
969 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
971 inc_rs_pending(mdev);
972 e->block_id = ID_SYNCER;
973 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
976 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
977 if (__ratelimit(&drbd_ratelimit_state))
978 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
985 move_to_net_ee_or_free(mdev, e);
988 dev_err(DEV, "drbd_send_block/ack() failed\n");
992 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
994 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
999 if (unlikely(cancel))
1002 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1005 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1006 /* FIXME if this allocation fails, online verify will not terminate! */
1007 digest = kmalloc(digest_size, GFP_NOIO);
1009 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1010 inc_rs_pending(mdev);
1011 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1012 digest, digest_size, P_OV_REPLY);
1014 dec_rs_pending(mdev);
1019 drbd_free_ee(mdev, e);
1026 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1028 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1029 mdev->ov_last_oos_size += size>>9;
1031 mdev->ov_last_oos_start = sector;
1032 mdev->ov_last_oos_size = size>>9;
1034 drbd_set_out_of_sync(mdev, sector, size);
1035 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1038 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1040 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1041 struct digest_info *di;
1046 if (unlikely(cancel)) {
1047 drbd_free_ee(mdev, e);
1052 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1053 * the resync lru has been cleaned up already */
1054 drbd_rs_complete_io(mdev, e->sector);
1056 di = (struct digest_info *)(unsigned long)e->block_id;
1058 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1059 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1060 digest = kmalloc(digest_size, GFP_NOIO);
1062 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1064 D_ASSERT(digest_size == di->digest_size);
1065 eq = !memcmp(digest, di->digest, digest_size);
1069 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1070 if (__ratelimit(&drbd_ratelimit_state))
1071 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1079 drbd_ov_oos_found(mdev, e->sector, e->size);
1083 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1084 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1086 drbd_free_ee(mdev, e);
1088 if (--mdev->ov_left == 0) {
1090 drbd_resync_finished(mdev);
1096 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1098 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1103 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1105 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1106 struct p_barrier *p = &mdev->data.sbuf.barrier;
1109 /* really avoid racing with tl_clear. w.cb may have been referenced
1110 * just before it was reassigned and re-queued, so double check that.
1111 * actually, this race was harmless, since we only try to send the
1112 * barrier packet here, and otherwise do nothing with the object.
1113 * but compare with the head of w_clear_epoch */
1114 spin_lock_irq(&mdev->req_lock);
1115 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1117 spin_unlock_irq(&mdev->req_lock);
1121 if (!drbd_get_data_sock(mdev))
1123 p->barrier = b->br_number;
1124 /* inc_ap_pending was done where this was queued.
1125 * dec_ap_pending will be done in got_BarrierAck
1126 * or (on connection loss) in w_clear_epoch. */
1127 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1128 (struct p_header *)p, sizeof(*p), 0);
1129 drbd_put_data_sock(mdev);
1134 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1138 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1142 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1143 * @mdev: DRBD device.
1145 * @cancel: The connection will be closed anyways
1147 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1149 struct drbd_request *req = container_of(w, struct drbd_request, w);
1152 if (unlikely(cancel)) {
1153 req_mod(req, send_canceled);
1157 ok = drbd_send_dblock(mdev, req);
1158 req_mod(req, ok ? handed_over_to_network : send_failed);
1164 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1165 * @mdev: DRBD device.
1167 * @cancel: The connection will be closed anyways
1169 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1171 struct drbd_request *req = container_of(w, struct drbd_request, w);
1174 if (unlikely(cancel)) {
1175 req_mod(req, send_canceled);
1179 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1180 (unsigned long)req);
1183 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1184 * so this is probably redundant */
1185 if (mdev->state.conn >= C_CONNECTED)
1186 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1188 req_mod(req, ok ? handed_over_to_network : send_failed);
1193 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1195 struct drbd_conf *odev = mdev;
1198 if (odev->sync_conf.after == -1)
1200 odev = minor_to_mdev(odev->sync_conf.after);
1201 ERR_IF(!odev) return 1;
1202 if ((odev->state.conn >= C_SYNC_SOURCE &&
1203 odev->state.conn <= C_PAUSED_SYNC_T) ||
1204 odev->state.aftr_isp || odev->state.peer_isp ||
1205 odev->state.user_isp)
1211 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1212 * @mdev: DRBD device.
1214 * Called from process context only (admin command and after_state_ch).
1216 static int _drbd_pause_after(struct drbd_conf *mdev)
1218 struct drbd_conf *odev;
1221 for (i = 0; i < minor_count; i++) {
1222 odev = minor_to_mdev(i);
1225 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1227 if (!_drbd_may_sync_now(odev))
1228 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1229 != SS_NOTHING_TO_DO);
1236 * _drbd_resume_next() - Resume resync on all devices that may resync now
1237 * @mdev: DRBD device.
1239 * Called from process context only (admin command and worker).
1241 static int _drbd_resume_next(struct drbd_conf *mdev)
1243 struct drbd_conf *odev;
1246 for (i = 0; i < minor_count; i++) {
1247 odev = minor_to_mdev(i);
1250 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1252 if (odev->state.aftr_isp) {
1253 if (_drbd_may_sync_now(odev))
1254 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1256 != SS_NOTHING_TO_DO) ;
1262 void resume_next_sg(struct drbd_conf *mdev)
1264 write_lock_irq(&global_state_lock);
1265 _drbd_resume_next(mdev);
1266 write_unlock_irq(&global_state_lock);
1269 void suspend_other_sg(struct drbd_conf *mdev)
1271 write_lock_irq(&global_state_lock);
1272 _drbd_pause_after(mdev);
1273 write_unlock_irq(&global_state_lock);
1276 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1278 struct drbd_conf *odev;
1282 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1283 return ERR_SYNC_AFTER;
1285 /* check for loops */
1286 odev = minor_to_mdev(o_minor);
1289 return ERR_SYNC_AFTER_CYCLE;
1291 /* dependency chain ends here, no cycles. */
1292 if (odev->sync_conf.after == -1)
1295 /* follow the dependency chain */
1296 odev = minor_to_mdev(odev->sync_conf.after);
1300 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1305 write_lock_irq(&global_state_lock);
1306 retcode = sync_after_error(mdev, na);
1307 if (retcode == NO_ERROR) {
1308 mdev->sync_conf.after = na;
1310 changes = _drbd_pause_after(mdev);
1311 changes |= _drbd_resume_next(mdev);
1314 write_unlock_irq(&global_state_lock);
1318 static void ping_peer(struct drbd_conf *mdev)
1320 clear_bit(GOT_PING_ACK, &mdev->flags);
1322 wait_event(mdev->misc_wait,
1323 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1327 * drbd_start_resync() - Start the resync process
1328 * @mdev: DRBD device.
1329 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1331 * This function might bring you directly into one of the
1332 * C_PAUSED_SYNC_* states.
1334 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1336 union drbd_state ns;
1339 if (mdev->state.conn >= C_SYNC_SOURCE) {
1340 dev_err(DEV, "Resync already running!\n");
1344 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1345 drbd_rs_cancel_all(mdev);
1347 if (side == C_SYNC_TARGET) {
1348 /* Since application IO was locked out during C_WF_BITMAP_T and
1349 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1350 we check that we might make the data inconsistent. */
1351 r = drbd_khelper(mdev, "before-resync-target");
1352 r = (r >> 8) & 0xff;
1354 dev_info(DEV, "before-resync-target handler returned %d, "
1355 "dropping connection.\n", r);
1356 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1361 drbd_state_lock(mdev);
1363 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1364 drbd_state_unlock(mdev);
1368 if (side == C_SYNC_TARGET) {
1369 mdev->bm_resync_fo = 0;
1370 } else /* side == C_SYNC_SOURCE */ {
1373 get_random_bytes(&uuid, sizeof(u64));
1374 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1375 drbd_send_sync_uuid(mdev, uuid);
1377 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1380 write_lock_irq(&global_state_lock);
1383 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1387 if (side == C_SYNC_TARGET)
1388 ns.disk = D_INCONSISTENT;
1389 else /* side == C_SYNC_SOURCE */
1390 ns.pdsk = D_INCONSISTENT;
1392 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1395 if (ns.conn < C_CONNECTED)
1396 r = SS_UNKNOWN_ERROR;
1398 if (r == SS_SUCCESS) {
1400 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1401 mdev->rs_failed = 0;
1402 mdev->rs_paused = 0;
1404 mdev->rs_mark_time = jiffies;
1405 mdev->rs_same_csum = 0;
1406 _drbd_pause_after(mdev);
1408 write_unlock_irq(&global_state_lock);
1411 if (r == SS_SUCCESS) {
1412 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1413 drbd_conn_str(ns.conn),
1414 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1415 (unsigned long) mdev->rs_total);
1417 if (mdev->rs_total == 0) {
1418 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1420 drbd_resync_finished(mdev);
1423 /* ns.conn may already be != mdev->state.conn,
1424 * we may have been paused in between, or become paused until
1425 * the timer triggers.
1426 * No matter, that is handled in resync_timer_fn() */
1427 if (ns.conn == C_SYNC_TARGET)
1428 mod_timer(&mdev->resync_timer, jiffies);
1432 drbd_state_unlock(mdev);
1435 int drbd_worker(struct drbd_thread *thi)
1437 struct drbd_conf *mdev = thi->mdev;
1438 struct drbd_work *w = NULL;
1439 LIST_HEAD(work_list);
1442 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1444 while (get_t_state(thi) == Running) {
1445 drbd_thread_current_set_cpu(mdev);
1447 if (down_trylock(&mdev->data.work.s)) {
1448 mutex_lock(&mdev->data.mutex);
1449 if (mdev->data.socket && !mdev->net_conf->no_cork)
1450 drbd_tcp_uncork(mdev->data.socket);
1451 mutex_unlock(&mdev->data.mutex);
1453 intr = down_interruptible(&mdev->data.work.s);
1455 mutex_lock(&mdev->data.mutex);
1456 if (mdev->data.socket && !mdev->net_conf->no_cork)
1457 drbd_tcp_cork(mdev->data.socket);
1458 mutex_unlock(&mdev->data.mutex);
1462 D_ASSERT(intr == -EINTR);
1463 flush_signals(current);
1464 ERR_IF (get_t_state(thi) == Running)
1469 if (get_t_state(thi) != Running)
1471 /* With this break, we have done a down() but not consumed
1472 the entry from the list. The cleanup code takes care of
1476 spin_lock_irq(&mdev->data.work.q_lock);
1477 ERR_IF(list_empty(&mdev->data.work.q)) {
1478 /* something terribly wrong in our logic.
1479 * we were able to down() the semaphore,
1480 * but the list is empty... doh.
1482 * what is the best thing to do now?
1483 * try again from scratch, restarting the receiver,
1484 * asender, whatnot? could break even more ugly,
1485 * e.g. when we are primary, but no good local data.
1487 * I'll try to get away just starting over this loop.
1489 spin_unlock_irq(&mdev->data.work.q_lock);
1492 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1493 list_del_init(&w->list);
1494 spin_unlock_irq(&mdev->data.work.q_lock);
1496 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1497 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1498 if (mdev->state.conn >= C_CONNECTED)
1499 drbd_force_state(mdev,
1500 NS(conn, C_NETWORK_FAILURE));
1503 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1504 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1506 spin_lock_irq(&mdev->data.work.q_lock);
1508 while (!list_empty(&mdev->data.work.q)) {
1509 list_splice_init(&mdev->data.work.q, &work_list);
1510 spin_unlock_irq(&mdev->data.work.q_lock);
1512 while (!list_empty(&work_list)) {
1513 w = list_entry(work_list.next, struct drbd_work, list);
1514 list_del_init(&w->list);
1516 i++; /* dead debugging code */
1519 spin_lock_irq(&mdev->data.work.q_lock);
1521 sema_init(&mdev->data.work.s, 0);
1522 /* DANGEROUS race: if someone did queue his work within the spinlock,
1523 * but up() ed outside the spinlock, we could get an up() on the
1524 * semaphore without corresponding list entry.
1527 spin_unlock_irq(&mdev->data.work.q_lock);
1529 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1530 /* _drbd_set_state only uses stop_nowait.
1531 * wait here for the Exiting receiver. */
1532 drbd_thread_stop(&mdev->receiver);
1533 drbd_mdev_cleanup(mdev);
1535 dev_info(DEV, "worker terminated\n");
1537 clear_bit(DEVICE_DYING, &mdev->flags);
1538 clear_bit(CONFIG_PENDING, &mdev->flags);
1539 wake_up(&mdev->state_wait);