67371fcbc5aa63935851ef3e94678dac54189d9a
[safe/jmp/linux-2.6] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 #define SLEEP_TIME (HZ/10)
43
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63
64
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76         struct drbd_md_io *md_io;
77
78         md_io = (struct drbd_md_io *)bio->bi_private;
79         md_io->error = error;
80
81         complete(&md_io->event);
82 }
83
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89         unsigned long flags = 0;
90         struct drbd_conf *mdev = e->mdev;
91
92         D_ASSERT(e->block_id != ID_VACANT);
93
94         spin_lock_irqsave(&mdev->req_lock, flags);
95         mdev->read_cnt += e->size >> 9;
96         list_del(&e->w.list);
97         if (list_empty(&mdev->read_ee))
98                 wake_up(&mdev->ee_wait);
99         if (test_bit(__EE_WAS_ERROR, &e->flags))
100                 __drbd_chk_io_error(mdev, FALSE);
101         spin_unlock_irqrestore(&mdev->req_lock, flags);
102
103         drbd_queue_work(&mdev->data.work, &e->w);
104         put_ldev(mdev);
105 }
106
107 static int is_failed_barrier(int ee_flags)
108 {
109         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110                         == (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117         unsigned long flags = 0;
118         struct drbd_conf *mdev = e->mdev;
119         sector_t e_sector;
120         int do_wake;
121         int is_syncer_req;
122         int do_al_complete_io;
123
124         /* if this is a failed barrier request, disable use of barriers,
125          * and schedule for resubmission */
126         if (is_failed_barrier(e->flags)) {
127                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
128                 spin_lock_irqsave(&mdev->req_lock, flags);
129                 list_del(&e->w.list);
130                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131                 e->w.cb = w_e_reissue;
132                 /* put_ldev actually happens below, once we come here again. */
133                 __release(local);
134                 spin_unlock_irqrestore(&mdev->req_lock, flags);
135                 drbd_queue_work(&mdev->data.work, &e->w);
136                 return;
137         }
138
139         D_ASSERT(e->block_id != ID_VACANT);
140
141         /* after we moved e to done_ee,
142          * we may no longer access it,
143          * it may be freed/reused already!
144          * (as soon as we release the req_lock) */
145         e_sector = e->sector;
146         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147         is_syncer_req = is_syncer_block_id(e->block_id);
148
149         spin_lock_irqsave(&mdev->req_lock, flags);
150         mdev->writ_cnt += e->size >> 9;
151         list_del(&e->w.list); /* has been on active_ee or sync_ee */
152         list_add_tail(&e->w.list, &mdev->done_ee);
153
154         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155          * neither did we wake possibly waiting conflicting requests.
156          * done from "drbd_process_done_ee" within the appropriate w.cb
157          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158
159         do_wake = is_syncer_req
160                 ? list_empty(&mdev->sync_ee)
161                 : list_empty(&mdev->active_ee);
162
163         if (test_bit(__EE_WAS_ERROR, &e->flags))
164                 __drbd_chk_io_error(mdev, FALSE);
165         spin_unlock_irqrestore(&mdev->req_lock, flags);
166
167         if (is_syncer_req)
168                 drbd_rs_complete_io(mdev, e_sector);
169
170         if (do_wake)
171                 wake_up(&mdev->ee_wait);
172
173         if (do_al_complete_io)
174                 drbd_al_complete_io(mdev, e_sector);
175
176         wake_asender(mdev);
177         put_ldev(mdev);
178 }
179
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185         struct drbd_epoch_entry *e = bio->bi_private;
186         struct drbd_conf *mdev = e->mdev;
187         int uptodate = bio_flagged(bio, BIO_UPTODATE);
188         int is_write = bio_data_dir(bio) == WRITE;
189
190         if (error)
191                 dev_warn(DEV, "%s: error=%d s=%llus\n",
192                                 is_write ? "write" : "read", error,
193                                 (unsigned long long)e->sector);
194         if (!error && !uptodate) {
195                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196                                 is_write ? "write" : "read",
197                                 (unsigned long long)e->sector);
198                 /* strange behavior of some lower level drivers...
199                  * fail the request by clearing the uptodate flag,
200                  * but do not return any error?! */
201                 error = -EIO;
202         }
203
204         if (error)
205                 set_bit(__EE_WAS_ERROR, &e->flags);
206
207         bio_put(bio); /* no need for the bio anymore */
208         if (atomic_dec_and_test(&e->pending_bios)) {
209                 if (is_write)
210                         drbd_endio_write_sec_final(e);
211                 else
212                         drbd_endio_read_sec_final(e);
213         }
214 }
215
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220         unsigned long flags;
221         struct drbd_request *req = bio->bi_private;
222         struct drbd_conf *mdev = req->mdev;
223         struct bio_and_error m;
224         enum drbd_req_event what;
225         int uptodate = bio_flagged(bio, BIO_UPTODATE);
226
227         if (error)
228                 dev_warn(DEV, "p %s: error=%d\n",
229                          bio_data_dir(bio) == WRITE ? "write" : "read", error);
230         if (!error && !uptodate) {
231                 dev_warn(DEV, "p %s: setting error to -EIO\n",
232                          bio_data_dir(bio) == WRITE ? "write" : "read");
233                 /* strange behavior of some lower level drivers...
234                  * fail the request by clearing the uptodate flag,
235                  * but do not return any error?! */
236                 error = -EIO;
237         }
238
239         /* to avoid recursion in __req_mod */
240         if (unlikely(error)) {
241                 what = (bio_data_dir(bio) == WRITE)
242                         ? write_completed_with_error
243                         : (bio_rw(bio) == READ)
244                           ? read_completed_with_error
245                           : read_ahead_completed_with_error;
246         } else
247                 what = completed_ok;
248
249         bio_put(req->private_bio);
250         req->private_bio = ERR_PTR(error);
251
252         spin_lock_irqsave(&mdev->req_lock, flags);
253         __req_mod(req, what, &m);
254         spin_unlock_irqrestore(&mdev->req_lock, flags);
255
256         if (m.bio)
257                 complete_master_bio(mdev, &m);
258 }
259
260 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
261 {
262         struct drbd_request *req = container_of(w, struct drbd_request, w);
263
264         /* We should not detach for read io-error,
265          * but try to WRITE the P_DATA_REPLY to the failed location,
266          * to give the disk the chance to relocate that block */
267
268         spin_lock_irq(&mdev->req_lock);
269         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
270                 _req_mod(req, read_retry_remote_canceled);
271                 spin_unlock_irq(&mdev->req_lock);
272                 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
273                 return 1;
274         }
275         spin_unlock_irq(&mdev->req_lock);
276
277         return w_send_read_req(mdev, w, 0);
278 }
279
280 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
281 {
282         ERR_IF(cancel) return 1;
283         dev_err(DEV, "resync inactive, but callback triggered??\n");
284         return 1; /* Simply ignore this! */
285 }
286
287 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
288 {
289         struct hash_desc desc;
290         struct scatterlist sg;
291         struct page *page = e->pages;
292         struct page *tmp;
293         unsigned len;
294
295         desc.tfm = tfm;
296         desc.flags = 0;
297
298         sg_init_table(&sg, 1);
299         crypto_hash_init(&desc);
300
301         while ((tmp = page_chain_next(page))) {
302                 /* all but the last page will be fully used */
303                 sg_set_page(&sg, page, PAGE_SIZE, 0);
304                 crypto_hash_update(&desc, &sg, sg.length);
305                 page = tmp;
306         }
307         /* and now the last, possibly only partially used page */
308         len = e->size & (PAGE_SIZE - 1);
309         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
310         crypto_hash_update(&desc, &sg, sg.length);
311         crypto_hash_final(&desc, digest);
312 }
313
314 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
315 {
316         struct hash_desc desc;
317         struct scatterlist sg;
318         struct bio_vec *bvec;
319         int i;
320
321         desc.tfm = tfm;
322         desc.flags = 0;
323
324         sg_init_table(&sg, 1);
325         crypto_hash_init(&desc);
326
327         __bio_for_each_segment(bvec, bio, i, 0) {
328                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
329                 crypto_hash_update(&desc, &sg, sg.length);
330         }
331         crypto_hash_final(&desc, digest);
332 }
333
334 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
335 {
336         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
337         int digest_size;
338         void *digest;
339         int ok;
340
341         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
342
343         if (unlikely(cancel)) {
344                 drbd_free_ee(mdev, e);
345                 return 1;
346         }
347
348         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
349                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
350                 digest = kmalloc(digest_size, GFP_NOIO);
351                 if (digest) {
352                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
353
354                         inc_rs_pending(mdev);
355                         ok = drbd_send_drequest_csum(mdev,
356                                                      e->sector,
357                                                      e->size,
358                                                      digest,
359                                                      digest_size,
360                                                      P_CSUM_RS_REQUEST);
361                         kfree(digest);
362                 } else {
363                         dev_err(DEV, "kmalloc() of digest failed.\n");
364                         ok = 0;
365                 }
366         } else
367                 ok = 1;
368
369         drbd_free_ee(mdev, e);
370
371         if (unlikely(!ok))
372                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
373         return ok;
374 }
375
376 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
377
378 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
379 {
380         struct drbd_epoch_entry *e;
381
382         if (!get_ldev(mdev))
383                 return 0;
384
385         /* GFP_TRY, because if there is no memory available right now, this may
386          * be rescheduled for later. It is "only" background resync, after all. */
387         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
388         if (!e)
389                 goto fail;
390
391         spin_lock_irq(&mdev->req_lock);
392         list_add(&e->w.list, &mdev->read_ee);
393         spin_unlock_irq(&mdev->req_lock);
394
395         e->w.cb = w_e_send_csum;
396         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
397                 return 1;
398
399         drbd_free_ee(mdev, e);
400 fail:
401         put_ldev(mdev);
402         return 2;
403 }
404
405 void resync_timer_fn(unsigned long data)
406 {
407         unsigned long flags;
408         struct drbd_conf *mdev = (struct drbd_conf *) data;
409         int queue;
410
411         spin_lock_irqsave(&mdev->req_lock, flags);
412
413         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
414                 queue = 1;
415                 if (mdev->state.conn == C_VERIFY_S)
416                         mdev->resync_work.cb = w_make_ov_request;
417                 else
418                         mdev->resync_work.cb = w_make_resync_request;
419         } else {
420                 queue = 0;
421                 mdev->resync_work.cb = w_resync_inactive;
422         }
423
424         spin_unlock_irqrestore(&mdev->req_lock, flags);
425
426         /* harmless race: list_empty outside data.work.q_lock */
427         if (list_empty(&mdev->resync_work.list) && queue)
428                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
429 }
430
431 static int calc_resync_rate(struct drbd_conf *mdev)
432 {
433         int d = mdev->data_delay / 1000; /* us -> ms */
434         int td = mdev->sync_conf.throttle_th * 100;  /* 0.1s -> ms */
435         int hd = mdev->sync_conf.hold_off_th * 100;  /* 0.1s -> ms */
436         int cr = mdev->sync_conf.rate;
437
438         return d <= td ? cr :
439                 d >= hd ? 0 :
440                 cr + (cr * (td - d) / (hd - td));
441 }
442
443 int w_make_resync_request(struct drbd_conf *mdev,
444                 struct drbd_work *w, int cancel)
445 {
446         unsigned long bit;
447         sector_t sector;
448         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
449         int max_segment_size;
450         int number, i, size, pe, mx;
451         int align, queued, sndbuf;
452
453         if (unlikely(cancel))
454                 return 1;
455
456         if (unlikely(mdev->state.conn < C_CONNECTED)) {
457                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
458                 return 0;
459         }
460
461         if (mdev->state.conn != C_SYNC_TARGET)
462                 dev_err(DEV, "%s in w_make_resync_request\n",
463                         drbd_conn_str(mdev->state.conn));
464
465         if (!get_ldev(mdev)) {
466                 /* Since we only need to access mdev->rsync a
467                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
468                    to continue resync with a broken disk makes no sense at
469                    all */
470                 dev_err(DEV, "Disk broke down during resync!\n");
471                 mdev->resync_work.cb = w_resync_inactive;
472                 return 1;
473         }
474
475         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
476          * if it should be necessary */
477         max_segment_size = mdev->agreed_pro_version < 94 ?
478                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
479
480         mdev->c_sync_rate = calc_resync_rate(mdev);
481         number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
482         pe = atomic_read(&mdev->rs_pending_cnt);
483
484         mutex_lock(&mdev->data.mutex);
485         if (mdev->data.socket)
486                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
487         else
488                 mx = 1;
489         mutex_unlock(&mdev->data.mutex);
490
491         /* For resync rates >160MB/sec, allow more pending RS requests */
492         if (number > mx)
493                 mx = number;
494
495         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
496         if ((pe + number) > mx) {
497                 number = mx - pe;
498         }
499
500         for (i = 0; i < number; i++) {
501                 /* Stop generating RS requests, when half of the send buffer is filled */
502                 mutex_lock(&mdev->data.mutex);
503                 if (mdev->data.socket) {
504                         queued = mdev->data.socket->sk->sk_wmem_queued;
505                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
506                 } else {
507                         queued = 1;
508                         sndbuf = 0;
509                 }
510                 mutex_unlock(&mdev->data.mutex);
511                 if (queued > sndbuf / 2)
512                         goto requeue;
513
514 next_sector:
515                 size = BM_BLOCK_SIZE;
516                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
517
518                 if (bit == -1UL) {
519                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
520                         mdev->resync_work.cb = w_resync_inactive;
521                         put_ldev(mdev);
522                         return 1;
523                 }
524
525                 sector = BM_BIT_TO_SECT(bit);
526
527                 if (drbd_try_rs_begin_io(mdev, sector)) {
528                         mdev->bm_resync_fo = bit;
529                         goto requeue;
530                 }
531                 mdev->bm_resync_fo = bit + 1;
532
533                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
534                         drbd_rs_complete_io(mdev, sector);
535                         goto next_sector;
536                 }
537
538 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
539                 /* try to find some adjacent bits.
540                  * we stop if we have already the maximum req size.
541                  *
542                  * Additionally always align bigger requests, in order to
543                  * be prepared for all stripe sizes of software RAIDs.
544                  */
545                 align = 1;
546                 for (;;) {
547                         if (size + BM_BLOCK_SIZE > max_segment_size)
548                                 break;
549
550                         /* Be always aligned */
551                         if (sector & ((1<<(align+3))-1))
552                                 break;
553
554                         /* do not cross extent boundaries */
555                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
556                                 break;
557                         /* now, is it actually dirty, after all?
558                          * caution, drbd_bm_test_bit is tri-state for some
559                          * obscure reason; ( b == 0 ) would get the out-of-band
560                          * only accidentally right because of the "oddly sized"
561                          * adjustment below */
562                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
563                                 break;
564                         bit++;
565                         size += BM_BLOCK_SIZE;
566                         if ((BM_BLOCK_SIZE << align) <= size)
567                                 align++;
568                         i++;
569                 }
570                 /* if we merged some,
571                  * reset the offset to start the next drbd_bm_find_next from */
572                 if (size > BM_BLOCK_SIZE)
573                         mdev->bm_resync_fo = bit + 1;
574 #endif
575
576                 /* adjust very last sectors, in case we are oddly sized */
577                 if (sector + (size>>9) > capacity)
578                         size = (capacity-sector)<<9;
579                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
580                         switch (read_for_csum(mdev, sector, size)) {
581                         case 0: /* Disk failure*/
582                                 put_ldev(mdev);
583                                 return 0;
584                         case 2: /* Allocation failed */
585                                 drbd_rs_complete_io(mdev, sector);
586                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
587                                 goto requeue;
588                         /* case 1: everything ok */
589                         }
590                 } else {
591                         inc_rs_pending(mdev);
592                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
593                                                sector, size, ID_SYNCER)) {
594                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
595                                 dec_rs_pending(mdev);
596                                 put_ldev(mdev);
597                                 return 0;
598                         }
599                 }
600         }
601
602         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
603                 /* last syncer _request_ was sent,
604                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
605                  * next sync group will resume), as soon as we receive the last
606                  * resync data block, and the last bit is cleared.
607                  * until then resync "work" is "inactive" ...
608                  */
609                 mdev->resync_work.cb = w_resync_inactive;
610                 put_ldev(mdev);
611                 return 1;
612         }
613
614  requeue:
615         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
616         put_ldev(mdev);
617         return 1;
618 }
619
620 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
621 {
622         int number, i, size;
623         sector_t sector;
624         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
625
626         if (unlikely(cancel))
627                 return 1;
628
629         if (unlikely(mdev->state.conn < C_CONNECTED)) {
630                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
631                 return 0;
632         }
633
634         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
635         if (atomic_read(&mdev->rs_pending_cnt) > number)
636                 goto requeue;
637
638         number -= atomic_read(&mdev->rs_pending_cnt);
639
640         sector = mdev->ov_position;
641         for (i = 0; i < number; i++) {
642                 if (sector >= capacity) {
643                         mdev->resync_work.cb = w_resync_inactive;
644                         return 1;
645                 }
646
647                 size = BM_BLOCK_SIZE;
648
649                 if (drbd_try_rs_begin_io(mdev, sector)) {
650                         mdev->ov_position = sector;
651                         goto requeue;
652                 }
653
654                 if (sector + (size>>9) > capacity)
655                         size = (capacity-sector)<<9;
656
657                 inc_rs_pending(mdev);
658                 if (!drbd_send_ov_request(mdev, sector, size)) {
659                         dec_rs_pending(mdev);
660                         return 0;
661                 }
662                 sector += BM_SECT_PER_BIT;
663         }
664         mdev->ov_position = sector;
665
666  requeue:
667         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
668         return 1;
669 }
670
671
672 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
673 {
674         kfree(w);
675         ov_oos_print(mdev);
676         drbd_resync_finished(mdev);
677
678         return 1;
679 }
680
681 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
682 {
683         kfree(w);
684
685         drbd_resync_finished(mdev);
686
687         return 1;
688 }
689
690 int drbd_resync_finished(struct drbd_conf *mdev)
691 {
692         unsigned long db, dt, dbdt;
693         unsigned long n_oos;
694         union drbd_state os, ns;
695         struct drbd_work *w;
696         char *khelper_cmd = NULL;
697
698         /* Remove all elements from the resync LRU. Since future actions
699          * might set bits in the (main) bitmap, then the entries in the
700          * resync LRU would be wrong. */
701         if (drbd_rs_del_all(mdev)) {
702                 /* In case this is not possible now, most probably because
703                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
704                  * queue (or even the read operations for those packets
705                  * is not finished by now).   Retry in 100ms. */
706
707                 drbd_kick_lo(mdev);
708                 __set_current_state(TASK_INTERRUPTIBLE);
709                 schedule_timeout(HZ / 10);
710                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
711                 if (w) {
712                         w->cb = w_resync_finished;
713                         drbd_queue_work(&mdev->data.work, w);
714                         return 1;
715                 }
716                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
717         }
718
719         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
720         if (dt <= 0)
721                 dt = 1;
722         db = mdev->rs_total;
723         dbdt = Bit2KB(db/dt);
724         mdev->rs_paused /= HZ;
725
726         if (!get_ldev(mdev))
727                 goto out;
728
729         spin_lock_irq(&mdev->req_lock);
730         os = mdev->state;
731
732         /* This protects us against multiple calls (that can happen in the presence
733            of application IO), and against connectivity loss just before we arrive here. */
734         if (os.conn <= C_CONNECTED)
735                 goto out_unlock;
736
737         ns = os;
738         ns.conn = C_CONNECTED;
739
740         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
741              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
742              "Online verify " : "Resync",
743              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
744
745         n_oos = drbd_bm_total_weight(mdev);
746
747         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
748                 if (n_oos) {
749                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
750                               n_oos, Bit2KB(1));
751                         khelper_cmd = "out-of-sync";
752                 }
753         } else {
754                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
755
756                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
757                         khelper_cmd = "after-resync-target";
758
759                 if (mdev->csums_tfm && mdev->rs_total) {
760                         const unsigned long s = mdev->rs_same_csum;
761                         const unsigned long t = mdev->rs_total;
762                         const int ratio =
763                                 (t == 0)     ? 0 :
764                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
765                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
766                              "transferred %luK total %luK\n",
767                              ratio,
768                              Bit2KB(mdev->rs_same_csum),
769                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
770                              Bit2KB(mdev->rs_total));
771                 }
772         }
773
774         if (mdev->rs_failed) {
775                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
776
777                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
778                         ns.disk = D_INCONSISTENT;
779                         ns.pdsk = D_UP_TO_DATE;
780                 } else {
781                         ns.disk = D_UP_TO_DATE;
782                         ns.pdsk = D_INCONSISTENT;
783                 }
784         } else {
785                 ns.disk = D_UP_TO_DATE;
786                 ns.pdsk = D_UP_TO_DATE;
787
788                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
789                         if (mdev->p_uuid) {
790                                 int i;
791                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
792                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
793                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
794                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
795                         } else {
796                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
797                         }
798                 }
799
800                 drbd_uuid_set_bm(mdev, 0UL);
801
802                 if (mdev->p_uuid) {
803                         /* Now the two UUID sets are equal, update what we
804                          * know of the peer. */
805                         int i;
806                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
807                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
808                 }
809         }
810
811         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
812 out_unlock:
813         spin_unlock_irq(&mdev->req_lock);
814         put_ldev(mdev);
815 out:
816         mdev->rs_total  = 0;
817         mdev->rs_failed = 0;
818         mdev->rs_paused = 0;
819         mdev->ov_start_sector = 0;
820
821         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
822                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
823                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
824         }
825
826         if (khelper_cmd)
827                 drbd_khelper(mdev, khelper_cmd);
828
829         return 1;
830 }
831
832 /* helper */
833 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
834 {
835         if (drbd_ee_has_active_page(e)) {
836                 /* This might happen if sendpage() has not finished */
837                 spin_lock_irq(&mdev->req_lock);
838                 list_add_tail(&e->w.list, &mdev->net_ee);
839                 spin_unlock_irq(&mdev->req_lock);
840         } else
841                 drbd_free_ee(mdev, e);
842 }
843
844 /**
845  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
846  * @mdev:       DRBD device.
847  * @w:          work object.
848  * @cancel:     The connection will be closed anyways
849  */
850 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
851 {
852         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
853         int ok;
854
855         if (unlikely(cancel)) {
856                 drbd_free_ee(mdev, e);
857                 dec_unacked(mdev);
858                 return 1;
859         }
860
861         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
862                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
863         } else {
864                 if (__ratelimit(&drbd_ratelimit_state))
865                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
866                             (unsigned long long)e->sector);
867
868                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
869         }
870
871         dec_unacked(mdev);
872
873         move_to_net_ee_or_free(mdev, e);
874
875         if (unlikely(!ok))
876                 dev_err(DEV, "drbd_send_block() failed\n");
877         return ok;
878 }
879
880 /**
881  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
882  * @mdev:       DRBD device.
883  * @w:          work object.
884  * @cancel:     The connection will be closed anyways
885  */
886 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
887 {
888         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
889         int ok;
890
891         if (unlikely(cancel)) {
892                 drbd_free_ee(mdev, e);
893                 dec_unacked(mdev);
894                 return 1;
895         }
896
897         if (get_ldev_if_state(mdev, D_FAILED)) {
898                 drbd_rs_complete_io(mdev, e->sector);
899                 put_ldev(mdev);
900         }
901
902         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
903                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
904                         inc_rs_pending(mdev);
905                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
906                 } else {
907                         if (__ratelimit(&drbd_ratelimit_state))
908                                 dev_err(DEV, "Not sending RSDataReply, "
909                                     "partner DISKLESS!\n");
910                         ok = 1;
911                 }
912         } else {
913                 if (__ratelimit(&drbd_ratelimit_state))
914                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
915                             (unsigned long long)e->sector);
916
917                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
918
919                 /* update resync data with failure */
920                 drbd_rs_failed_io(mdev, e->sector, e->size);
921         }
922
923         dec_unacked(mdev);
924
925         move_to_net_ee_or_free(mdev, e);
926
927         if (unlikely(!ok))
928                 dev_err(DEV, "drbd_send_block() failed\n");
929         return ok;
930 }
931
932 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
933 {
934         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
935         struct digest_info *di;
936         int digest_size;
937         void *digest = NULL;
938         int ok, eq = 0;
939
940         if (unlikely(cancel)) {
941                 drbd_free_ee(mdev, e);
942                 dec_unacked(mdev);
943                 return 1;
944         }
945
946         drbd_rs_complete_io(mdev, e->sector);
947
948         di = (struct digest_info *)(unsigned long)e->block_id;
949
950         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951                 /* quick hack to try to avoid a race against reconfiguration.
952                  * a real fix would be much more involved,
953                  * introducing more locking mechanisms */
954                 if (mdev->csums_tfm) {
955                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
956                         D_ASSERT(digest_size == di->digest_size);
957                         digest = kmalloc(digest_size, GFP_NOIO);
958                 }
959                 if (digest) {
960                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
961                         eq = !memcmp(digest, di->digest, digest_size);
962                         kfree(digest);
963                 }
964
965                 if (eq) {
966                         drbd_set_in_sync(mdev, e->sector, e->size);
967                         /* rs_same_csums unit is BM_BLOCK_SIZE */
968                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
969                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
970                 } else {
971                         inc_rs_pending(mdev);
972                         e->block_id = ID_SYNCER;
973                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
974                 }
975         } else {
976                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
977                 if (__ratelimit(&drbd_ratelimit_state))
978                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
979         }
980
981         dec_unacked(mdev);
982
983         kfree(di);
984
985         move_to_net_ee_or_free(mdev, e);
986
987         if (unlikely(!ok))
988                 dev_err(DEV, "drbd_send_block/ack() failed\n");
989         return ok;
990 }
991
992 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
993 {
994         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
995         int digest_size;
996         void *digest;
997         int ok = 1;
998
999         if (unlikely(cancel))
1000                 goto out;
1001
1002         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1003                 goto out;
1004
1005         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1006         /* FIXME if this allocation fails, online verify will not terminate! */
1007         digest = kmalloc(digest_size, GFP_NOIO);
1008         if (digest) {
1009                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1010                 inc_rs_pending(mdev);
1011                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1012                                              digest, digest_size, P_OV_REPLY);
1013                 if (!ok)
1014                         dec_rs_pending(mdev);
1015                 kfree(digest);
1016         }
1017
1018 out:
1019         drbd_free_ee(mdev, e);
1020
1021         dec_unacked(mdev);
1022
1023         return ok;
1024 }
1025
1026 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1027 {
1028         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1029                 mdev->ov_last_oos_size += size>>9;
1030         } else {
1031                 mdev->ov_last_oos_start = sector;
1032                 mdev->ov_last_oos_size = size>>9;
1033         }
1034         drbd_set_out_of_sync(mdev, sector, size);
1035         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1036 }
1037
1038 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1039 {
1040         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1041         struct digest_info *di;
1042         int digest_size;
1043         void *digest;
1044         int ok, eq = 0;
1045
1046         if (unlikely(cancel)) {
1047                 drbd_free_ee(mdev, e);
1048                 dec_unacked(mdev);
1049                 return 1;
1050         }
1051
1052         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1053          * the resync lru has been cleaned up already */
1054         drbd_rs_complete_io(mdev, e->sector);
1055
1056         di = (struct digest_info *)(unsigned long)e->block_id;
1057
1058         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1059                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1060                 digest = kmalloc(digest_size, GFP_NOIO);
1061                 if (digest) {
1062                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1063
1064                         D_ASSERT(digest_size == di->digest_size);
1065                         eq = !memcmp(digest, di->digest, digest_size);
1066                         kfree(digest);
1067                 }
1068         } else {
1069                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1070                 if (__ratelimit(&drbd_ratelimit_state))
1071                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1072         }
1073
1074         dec_unacked(mdev);
1075
1076         kfree(di);
1077
1078         if (!eq)
1079                 drbd_ov_oos_found(mdev, e->sector, e->size);
1080         else
1081                 ov_oos_print(mdev);
1082
1083         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1084                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1085
1086         drbd_free_ee(mdev, e);
1087
1088         if (--mdev->ov_left == 0) {
1089                 ov_oos_print(mdev);
1090                 drbd_resync_finished(mdev);
1091         }
1092
1093         return ok;
1094 }
1095
1096 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1097 {
1098         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1099         complete(&b->done);
1100         return 1;
1101 }
1102
1103 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1104 {
1105         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1106         struct p_barrier *p = &mdev->data.sbuf.barrier;
1107         int ok = 1;
1108
1109         /* really avoid racing with tl_clear.  w.cb may have been referenced
1110          * just before it was reassigned and re-queued, so double check that.
1111          * actually, this race was harmless, since we only try to send the
1112          * barrier packet here, and otherwise do nothing with the object.
1113          * but compare with the head of w_clear_epoch */
1114         spin_lock_irq(&mdev->req_lock);
1115         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1116                 cancel = 1;
1117         spin_unlock_irq(&mdev->req_lock);
1118         if (cancel)
1119                 return 1;
1120
1121         if (!drbd_get_data_sock(mdev))
1122                 return 0;
1123         p->barrier = b->br_number;
1124         /* inc_ap_pending was done where this was queued.
1125          * dec_ap_pending will be done in got_BarrierAck
1126          * or (on connection loss) in w_clear_epoch.  */
1127         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1128                                 (struct p_header *)p, sizeof(*p), 0);
1129         drbd_put_data_sock(mdev);
1130
1131         return ok;
1132 }
1133
1134 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1135 {
1136         if (cancel)
1137                 return 1;
1138         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1139 }
1140
1141 /**
1142  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1143  * @mdev:       DRBD device.
1144  * @w:          work object.
1145  * @cancel:     The connection will be closed anyways
1146  */
1147 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1148 {
1149         struct drbd_request *req = container_of(w, struct drbd_request, w);
1150         int ok;
1151
1152         if (unlikely(cancel)) {
1153                 req_mod(req, send_canceled);
1154                 return 1;
1155         }
1156
1157         ok = drbd_send_dblock(mdev, req);
1158         req_mod(req, ok ? handed_over_to_network : send_failed);
1159
1160         return ok;
1161 }
1162
1163 /**
1164  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1165  * @mdev:       DRBD device.
1166  * @w:          work object.
1167  * @cancel:     The connection will be closed anyways
1168  */
1169 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1170 {
1171         struct drbd_request *req = container_of(w, struct drbd_request, w);
1172         int ok;
1173
1174         if (unlikely(cancel)) {
1175                 req_mod(req, send_canceled);
1176                 return 1;
1177         }
1178
1179         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1180                                 (unsigned long)req);
1181
1182         if (!ok) {
1183                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1184                  * so this is probably redundant */
1185                 if (mdev->state.conn >= C_CONNECTED)
1186                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1187         }
1188         req_mod(req, ok ? handed_over_to_network : send_failed);
1189
1190         return ok;
1191 }
1192
1193 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1194 {
1195         struct drbd_conf *odev = mdev;
1196
1197         while (1) {
1198                 if (odev->sync_conf.after == -1)
1199                         return 1;
1200                 odev = minor_to_mdev(odev->sync_conf.after);
1201                 ERR_IF(!odev) return 1;
1202                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1203                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1204                     odev->state.aftr_isp || odev->state.peer_isp ||
1205                     odev->state.user_isp)
1206                         return 0;
1207         }
1208 }
1209
1210 /**
1211  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1212  * @mdev:       DRBD device.
1213  *
1214  * Called from process context only (admin command and after_state_ch).
1215  */
1216 static int _drbd_pause_after(struct drbd_conf *mdev)
1217 {
1218         struct drbd_conf *odev;
1219         int i, rv = 0;
1220
1221         for (i = 0; i < minor_count; i++) {
1222                 odev = minor_to_mdev(i);
1223                 if (!odev)
1224                         continue;
1225                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1226                         continue;
1227                 if (!_drbd_may_sync_now(odev))
1228                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1229                                != SS_NOTHING_TO_DO);
1230         }
1231
1232         return rv;
1233 }
1234
1235 /**
1236  * _drbd_resume_next() - Resume resync on all devices that may resync now
1237  * @mdev:       DRBD device.
1238  *
1239  * Called from process context only (admin command and worker).
1240  */
1241 static int _drbd_resume_next(struct drbd_conf *mdev)
1242 {
1243         struct drbd_conf *odev;
1244         int i, rv = 0;
1245
1246         for (i = 0; i < minor_count; i++) {
1247                 odev = minor_to_mdev(i);
1248                 if (!odev)
1249                         continue;
1250                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1251                         continue;
1252                 if (odev->state.aftr_isp) {
1253                         if (_drbd_may_sync_now(odev))
1254                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1255                                                         CS_HARD, NULL)
1256                                        != SS_NOTHING_TO_DO) ;
1257                 }
1258         }
1259         return rv;
1260 }
1261
1262 void resume_next_sg(struct drbd_conf *mdev)
1263 {
1264         write_lock_irq(&global_state_lock);
1265         _drbd_resume_next(mdev);
1266         write_unlock_irq(&global_state_lock);
1267 }
1268
1269 void suspend_other_sg(struct drbd_conf *mdev)
1270 {
1271         write_lock_irq(&global_state_lock);
1272         _drbd_pause_after(mdev);
1273         write_unlock_irq(&global_state_lock);
1274 }
1275
1276 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1277 {
1278         struct drbd_conf *odev;
1279
1280         if (o_minor == -1)
1281                 return NO_ERROR;
1282         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1283                 return ERR_SYNC_AFTER;
1284
1285         /* check for loops */
1286         odev = minor_to_mdev(o_minor);
1287         while (1) {
1288                 if (odev == mdev)
1289                         return ERR_SYNC_AFTER_CYCLE;
1290
1291                 /* dependency chain ends here, no cycles. */
1292                 if (odev->sync_conf.after == -1)
1293                         return NO_ERROR;
1294
1295                 /* follow the dependency chain */
1296                 odev = minor_to_mdev(odev->sync_conf.after);
1297         }
1298 }
1299
1300 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1301 {
1302         int changes;
1303         int retcode;
1304
1305         write_lock_irq(&global_state_lock);
1306         retcode = sync_after_error(mdev, na);
1307         if (retcode == NO_ERROR) {
1308                 mdev->sync_conf.after = na;
1309                 do {
1310                         changes  = _drbd_pause_after(mdev);
1311                         changes |= _drbd_resume_next(mdev);
1312                 } while (changes);
1313         }
1314         write_unlock_irq(&global_state_lock);
1315         return retcode;
1316 }
1317
1318 static void ping_peer(struct drbd_conf *mdev)
1319 {
1320         clear_bit(GOT_PING_ACK, &mdev->flags);
1321         request_ping(mdev);
1322         wait_event(mdev->misc_wait,
1323                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1324 }
1325
1326 /**
1327  * drbd_start_resync() - Start the resync process
1328  * @mdev:       DRBD device.
1329  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1330  *
1331  * This function might bring you directly into one of the
1332  * C_PAUSED_SYNC_* states.
1333  */
1334 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1335 {
1336         union drbd_state ns;
1337         int r;
1338
1339         if (mdev->state.conn >= C_SYNC_SOURCE) {
1340                 dev_err(DEV, "Resync already running!\n");
1341                 return;
1342         }
1343
1344         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1345         drbd_rs_cancel_all(mdev);
1346
1347         if (side == C_SYNC_TARGET) {
1348                 /* Since application IO was locked out during C_WF_BITMAP_T and
1349                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1350                    we check that we might make the data inconsistent. */
1351                 r = drbd_khelper(mdev, "before-resync-target");
1352                 r = (r >> 8) & 0xff;
1353                 if (r > 0) {
1354                         dev_info(DEV, "before-resync-target handler returned %d, "
1355                              "dropping connection.\n", r);
1356                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1357                         return;
1358                 }
1359         }
1360
1361         drbd_state_lock(mdev);
1362
1363         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1364                 drbd_state_unlock(mdev);
1365                 return;
1366         }
1367
1368         if (side == C_SYNC_TARGET) {
1369                 mdev->bm_resync_fo = 0;
1370         } else /* side == C_SYNC_SOURCE */ {
1371                 u64 uuid;
1372
1373                 get_random_bytes(&uuid, sizeof(u64));
1374                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1375                 drbd_send_sync_uuid(mdev, uuid);
1376
1377                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1378         }
1379
1380         write_lock_irq(&global_state_lock);
1381         ns = mdev->state;
1382
1383         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1384
1385         ns.conn = side;
1386
1387         if (side == C_SYNC_TARGET)
1388                 ns.disk = D_INCONSISTENT;
1389         else /* side == C_SYNC_SOURCE */
1390                 ns.pdsk = D_INCONSISTENT;
1391
1392         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1393         ns = mdev->state;
1394
1395         if (ns.conn < C_CONNECTED)
1396                 r = SS_UNKNOWN_ERROR;
1397
1398         if (r == SS_SUCCESS) {
1399                 mdev->rs_total     =
1400                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1401                 mdev->rs_failed    = 0;
1402                 mdev->rs_paused    = 0;
1403                 mdev->rs_start     =
1404                 mdev->rs_mark_time = jiffies;
1405                 mdev->rs_same_csum = 0;
1406                 _drbd_pause_after(mdev);
1407         }
1408         write_unlock_irq(&global_state_lock);
1409         put_ldev(mdev);
1410
1411         if (r == SS_SUCCESS) {
1412                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1413                      drbd_conn_str(ns.conn),
1414                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1415                      (unsigned long) mdev->rs_total);
1416
1417                 if (mdev->rs_total == 0) {
1418                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1419                         ping_peer(mdev);
1420                         drbd_resync_finished(mdev);
1421                 }
1422
1423                 /* ns.conn may already be != mdev->state.conn,
1424                  * we may have been paused in between, or become paused until
1425                  * the timer triggers.
1426                  * No matter, that is handled in resync_timer_fn() */
1427                 if (ns.conn == C_SYNC_TARGET)
1428                         mod_timer(&mdev->resync_timer, jiffies);
1429
1430                 drbd_md_sync(mdev);
1431         }
1432         drbd_state_unlock(mdev);
1433 }
1434
1435 int drbd_worker(struct drbd_thread *thi)
1436 {
1437         struct drbd_conf *mdev = thi->mdev;
1438         struct drbd_work *w = NULL;
1439         LIST_HEAD(work_list);
1440         int intr = 0, i;
1441
1442         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1443
1444         while (get_t_state(thi) == Running) {
1445                 drbd_thread_current_set_cpu(mdev);
1446
1447                 if (down_trylock(&mdev->data.work.s)) {
1448                         mutex_lock(&mdev->data.mutex);
1449                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1450                                 drbd_tcp_uncork(mdev->data.socket);
1451                         mutex_unlock(&mdev->data.mutex);
1452
1453                         intr = down_interruptible(&mdev->data.work.s);
1454
1455                         mutex_lock(&mdev->data.mutex);
1456                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1457                                 drbd_tcp_cork(mdev->data.socket);
1458                         mutex_unlock(&mdev->data.mutex);
1459                 }
1460
1461                 if (intr) {
1462                         D_ASSERT(intr == -EINTR);
1463                         flush_signals(current);
1464                         ERR_IF (get_t_state(thi) == Running)
1465                                 continue;
1466                         break;
1467                 }
1468
1469                 if (get_t_state(thi) != Running)
1470                         break;
1471                 /* With this break, we have done a down() but not consumed
1472                    the entry from the list. The cleanup code takes care of
1473                    this...   */
1474
1475                 w = NULL;
1476                 spin_lock_irq(&mdev->data.work.q_lock);
1477                 ERR_IF(list_empty(&mdev->data.work.q)) {
1478                         /* something terribly wrong in our logic.
1479                          * we were able to down() the semaphore,
1480                          * but the list is empty... doh.
1481                          *
1482                          * what is the best thing to do now?
1483                          * try again from scratch, restarting the receiver,
1484                          * asender, whatnot? could break even more ugly,
1485                          * e.g. when we are primary, but no good local data.
1486                          *
1487                          * I'll try to get away just starting over this loop.
1488                          */
1489                         spin_unlock_irq(&mdev->data.work.q_lock);
1490                         continue;
1491                 }
1492                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1493                 list_del_init(&w->list);
1494                 spin_unlock_irq(&mdev->data.work.q_lock);
1495
1496                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1497                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1498                         if (mdev->state.conn >= C_CONNECTED)
1499                                 drbd_force_state(mdev,
1500                                                 NS(conn, C_NETWORK_FAILURE));
1501                 }
1502         }
1503         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1504         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1505
1506         spin_lock_irq(&mdev->data.work.q_lock);
1507         i = 0;
1508         while (!list_empty(&mdev->data.work.q)) {
1509                 list_splice_init(&mdev->data.work.q, &work_list);
1510                 spin_unlock_irq(&mdev->data.work.q_lock);
1511
1512                 while (!list_empty(&work_list)) {
1513                         w = list_entry(work_list.next, struct drbd_work, list);
1514                         list_del_init(&w->list);
1515                         w->cb(mdev, w, 1);
1516                         i++; /* dead debugging code */
1517                 }
1518
1519                 spin_lock_irq(&mdev->data.work.q_lock);
1520         }
1521         sema_init(&mdev->data.work.s, 0);
1522         /* DANGEROUS race: if someone did queue his work within the spinlock,
1523          * but up() ed outside the spinlock, we could get an up() on the
1524          * semaphore without corresponding list entry.
1525          * So don't do that.
1526          */
1527         spin_unlock_irq(&mdev->data.work.q_lock);
1528
1529         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1530         /* _drbd_set_state only uses stop_nowait.
1531          * wait here for the Exiting receiver. */
1532         drbd_thread_stop(&mdev->receiver);
1533         drbd_mdev_cleanup(mdev);
1534
1535         dev_info(DEV, "worker terminated\n");
1536
1537         clear_bit(DEVICE_DYING, &mdev->flags);
1538         clear_bit(CONFIG_PENDING, &mdev->flags);
1539         wake_up(&mdev->state_wait);
1540
1541         return 0;
1542 }