The DRBD driver
[safe/jmp/linux-2.6] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/autoconf.h>
27 #include <linux/module.h>
28 #include <linux/version.h>
29 #include <linux/drbd.h>
30 #include <linux/sched.h>
31 #include <linux/smp_lock.h>
32 #include <linux/wait.h>
33 #include <linux/mm.h>
34 #include <linux/memcontrol.h>
35 #include <linux/mm_inline.h>
36 #include <linux/slab.h>
37 #include <linux/random.h>
38 #include <linux/mm.h>
39 #include <linux/string.h>
40 #include <linux/scatterlist.h>
41
42 #include "drbd_int.h"
43 #include "drbd_req.h"
44 #include "drbd_tracing.h"
45
46 #define SLEEP_TIME (HZ/10)
47
48 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
49
50
51
52 /* defined here:
53    drbd_md_io_complete
54    drbd_endio_write_sec
55    drbd_endio_read_sec
56    drbd_endio_pri
57
58  * more endio handlers:
59    atodb_endio in drbd_actlog.c
60    drbd_bm_async_io_complete in drbd_bitmap.c
61
62  * For all these callbacks, note the following:
63  * The callbacks will be called in irq context by the IDE drivers,
64  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
65  * Try to get the locking right :)
66  *
67  */
68
69
70 /* About the global_state_lock
71    Each state transition on an device holds a read lock. In case we have
72    to evaluate the sync after dependencies, we grab a write lock, because
73    we need stable states on all devices for that.  */
74 rwlock_t global_state_lock;
75
76 /* used for synchronous meta data and bitmap IO
77  * submitted by drbd_md_sync_page_io()
78  */
79 void drbd_md_io_complete(struct bio *bio, int error)
80 {
81         struct drbd_md_io *md_io;
82
83         md_io = (struct drbd_md_io *)bio->bi_private;
84         md_io->error = error;
85
86         trace_drbd_bio(md_io->mdev, "Md", bio, 1, NULL);
87
88         complete(&md_io->event);
89 }
90
91 /* reads on behalf of the partner,
92  * "submitted" by the receiver
93  */
94 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
95 {
96         unsigned long flags = 0;
97         struct drbd_epoch_entry *e = NULL;
98         struct drbd_conf *mdev;
99         int uptodate = bio_flagged(bio, BIO_UPTODATE);
100
101         e = bio->bi_private;
102         mdev = e->mdev;
103
104         if (error)
105                 dev_warn(DEV, "read: error=%d s=%llus\n", error,
106                                 (unsigned long long)e->sector);
107         if (!error && !uptodate) {
108                 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
109                                 (unsigned long long)e->sector);
110                 /* strange behavior of some lower level drivers...
111                  * fail the request by clearing the uptodate flag,
112                  * but do not return any error?! */
113                 error = -EIO;
114         }
115
116         D_ASSERT(e->block_id != ID_VACANT);
117
118         trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
119
120         spin_lock_irqsave(&mdev->req_lock, flags);
121         mdev->read_cnt += e->size >> 9;
122         list_del(&e->w.list);
123         if (list_empty(&mdev->read_ee))
124                 wake_up(&mdev->ee_wait);
125         spin_unlock_irqrestore(&mdev->req_lock, flags);
126
127         drbd_chk_io_error(mdev, error, FALSE);
128         drbd_queue_work(&mdev->data.work, &e->w);
129         put_ldev(mdev);
130
131         trace_drbd_ee(mdev, e, "read completed");
132 }
133
134 /* writes on behalf of the partner, or resync writes,
135  * "submitted" by the receiver.
136  */
137 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
138 {
139         unsigned long flags = 0;
140         struct drbd_epoch_entry *e = NULL;
141         struct drbd_conf *mdev;
142         sector_t e_sector;
143         int do_wake;
144         int is_syncer_req;
145         int do_al_complete_io;
146         int uptodate = bio_flagged(bio, BIO_UPTODATE);
147         int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
148
149         e = bio->bi_private;
150         mdev = e->mdev;
151
152         if (error)
153                 dev_warn(DEV, "write: error=%d s=%llus\n", error,
154                                 (unsigned long long)e->sector);
155         if (!error && !uptodate) {
156                 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
157                                 (unsigned long long)e->sector);
158                 /* strange behavior of some lower level drivers...
159                  * fail the request by clearing the uptodate flag,
160                  * but do not return any error?! */
161                 error = -EIO;
162         }
163
164         /* error == -ENOTSUPP would be a better test,
165          * alas it is not reliable */
166         if (error && is_barrier && e->flags & EE_IS_BARRIER) {
167                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
168                 spin_lock_irqsave(&mdev->req_lock, flags);
169                 list_del(&e->w.list);
170                 e->w.cb = w_e_reissue;
171                 /* put_ldev actually happens below, once we come here again. */
172                 __release(local);
173                 spin_unlock_irqrestore(&mdev->req_lock, flags);
174                 drbd_queue_work(&mdev->data.work, &e->w);
175                 return;
176         }
177
178         D_ASSERT(e->block_id != ID_VACANT);
179
180         trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
181
182         spin_lock_irqsave(&mdev->req_lock, flags);
183         mdev->writ_cnt += e->size >> 9;
184         is_syncer_req = is_syncer_block_id(e->block_id);
185
186         /* after we moved e to done_ee,
187          * we may no longer access it,
188          * it may be freed/reused already!
189          * (as soon as we release the req_lock) */
190         e_sector = e->sector;
191         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
192
193         list_del(&e->w.list); /* has been on active_ee or sync_ee */
194         list_add_tail(&e->w.list, &mdev->done_ee);
195
196         trace_drbd_ee(mdev, e, "write completed");
197
198         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
199          * neither did we wake possibly waiting conflicting requests.
200          * done from "drbd_process_done_ee" within the appropriate w.cb
201          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
202
203         do_wake = is_syncer_req
204                 ? list_empty(&mdev->sync_ee)
205                 : list_empty(&mdev->active_ee);
206
207         if (error)
208                 __drbd_chk_io_error(mdev, FALSE);
209         spin_unlock_irqrestore(&mdev->req_lock, flags);
210
211         if (is_syncer_req)
212                 drbd_rs_complete_io(mdev, e_sector);
213
214         if (do_wake)
215                 wake_up(&mdev->ee_wait);
216
217         if (do_al_complete_io)
218                 drbd_al_complete_io(mdev, e_sector);
219
220         wake_asender(mdev);
221         put_ldev(mdev);
222
223 }
224
225 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
226  */
227 void drbd_endio_pri(struct bio *bio, int error)
228 {
229         unsigned long flags;
230         struct drbd_request *req = bio->bi_private;
231         struct drbd_conf *mdev = req->mdev;
232         struct bio_and_error m;
233         enum drbd_req_event what;
234         int uptodate = bio_flagged(bio, BIO_UPTODATE);
235
236         if (error)
237                 dev_warn(DEV, "p %s: error=%d\n",
238                          bio_data_dir(bio) == WRITE ? "write" : "read", error);
239         if (!error && !uptodate) {
240                 dev_warn(DEV, "p %s: setting error to -EIO\n",
241                          bio_data_dir(bio) == WRITE ? "write" : "read");
242                 /* strange behavior of some lower level drivers...
243                  * fail the request by clearing the uptodate flag,
244                  * but do not return any error?! */
245                 error = -EIO;
246         }
247
248         trace_drbd_bio(mdev, "Pri", bio, 1, NULL);
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(error)) {
252                 what = (bio_data_dir(bio) == WRITE)
253                         ? write_completed_with_error
254                         : (bio_rw(bio) == READA)
255                           ? read_completed_with_error
256                           : read_ahead_completed_with_error;
257         } else
258                 what = completed_ok;
259
260         bio_put(req->private_bio);
261         req->private_bio = ERR_PTR(error);
262
263         spin_lock_irqsave(&mdev->req_lock, flags);
264         __req_mod(req, what, &m);
265         spin_unlock_irqrestore(&mdev->req_lock, flags);
266
267         if (m.bio)
268                 complete_master_bio(mdev, &m);
269 }
270
271 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
272 {
273         struct drbd_request *req = container_of(w, struct drbd_request, w);
274
275         /* NOTE: mdev->ldev can be NULL by the time we get here! */
276         /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
277
278         /* the only way this callback is scheduled is from _req_may_be_done,
279          * when it is done and had a local write error, see comments there */
280         drbd_req_free(req);
281
282         return TRUE;
283 }
284
285 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
286 {
287         struct drbd_request *req = container_of(w, struct drbd_request, w);
288
289         /* We should not detach for read io-error,
290          * but try to WRITE the P_DATA_REPLY to the failed location,
291          * to give the disk the chance to relocate that block */
292
293         spin_lock_irq(&mdev->req_lock);
294         if (cancel ||
295             mdev->state.conn < C_CONNECTED ||
296             mdev->state.pdsk <= D_INCONSISTENT) {
297                 _req_mod(req, send_canceled);
298                 spin_unlock_irq(&mdev->req_lock);
299                 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
300                 return 1;
301         }
302         spin_unlock_irq(&mdev->req_lock);
303
304         return w_send_read_req(mdev, w, 0);
305 }
306
307 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309         ERR_IF(cancel) return 1;
310         dev_err(DEV, "resync inactive, but callback triggered??\n");
311         return 1; /* Simply ignore this! */
312 }
313
314 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
315 {
316         struct hash_desc desc;
317         struct scatterlist sg;
318         struct bio_vec *bvec;
319         int i;
320
321         desc.tfm = tfm;
322         desc.flags = 0;
323
324         sg_init_table(&sg, 1);
325         crypto_hash_init(&desc);
326
327         __bio_for_each_segment(bvec, bio, i, 0) {
328                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
329                 crypto_hash_update(&desc, &sg, sg.length);
330         }
331         crypto_hash_final(&desc, digest);
332 }
333
334 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
335 {
336         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
337         int digest_size;
338         void *digest;
339         int ok;
340
341         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
342
343         if (unlikely(cancel)) {
344                 drbd_free_ee(mdev, e);
345                 return 1;
346         }
347
348         if (likely(drbd_bio_uptodate(e->private_bio))) {
349                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
350                 digest = kmalloc(digest_size, GFP_NOIO);
351                 if (digest) {
352                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
353
354                         inc_rs_pending(mdev);
355                         ok = drbd_send_drequest_csum(mdev,
356                                                      e->sector,
357                                                      e->size,
358                                                      digest,
359                                                      digest_size,
360                                                      P_CSUM_RS_REQUEST);
361                         kfree(digest);
362                 } else {
363                         dev_err(DEV, "kmalloc() of digest failed.\n");
364                         ok = 0;
365                 }
366         } else
367                 ok = 1;
368
369         drbd_free_ee(mdev, e);
370
371         if (unlikely(!ok))
372                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
373         return ok;
374 }
375
376 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
377
378 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
379 {
380         struct drbd_epoch_entry *e;
381
382         if (!get_ldev(mdev))
383                 return 0;
384
385         /* GFP_TRY, because if there is no memory available right now, this may
386          * be rescheduled for later. It is "only" background resync, after all. */
387         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
388         if (!e) {
389                 put_ldev(mdev);
390                 return 2;
391         }
392
393         spin_lock_irq(&mdev->req_lock);
394         list_add(&e->w.list, &mdev->read_ee);
395         spin_unlock_irq(&mdev->req_lock);
396
397         e->private_bio->bi_end_io = drbd_endio_read_sec;
398         e->private_bio->bi_rw = READ;
399         e->w.cb = w_e_send_csum;
400
401         mdev->read_cnt += size >> 9;
402         drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
403
404         return 1;
405 }
406
407 void resync_timer_fn(unsigned long data)
408 {
409         unsigned long flags;
410         struct drbd_conf *mdev = (struct drbd_conf *) data;
411         int queue;
412
413         spin_lock_irqsave(&mdev->req_lock, flags);
414
415         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
416                 queue = 1;
417                 if (mdev->state.conn == C_VERIFY_S)
418                         mdev->resync_work.cb = w_make_ov_request;
419                 else
420                         mdev->resync_work.cb = w_make_resync_request;
421         } else {
422                 queue = 0;
423                 mdev->resync_work.cb = w_resync_inactive;
424         }
425
426         spin_unlock_irqrestore(&mdev->req_lock, flags);
427
428         /* harmless race: list_empty outside data.work.q_lock */
429         if (list_empty(&mdev->resync_work.list) && queue)
430                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
431 }
432
433 int w_make_resync_request(struct drbd_conf *mdev,
434                 struct drbd_work *w, int cancel)
435 {
436         unsigned long bit;
437         sector_t sector;
438         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
439         int max_segment_size = queue_max_segment_size(mdev->rq_queue);
440         int number, i, size, pe, mx;
441         int align, queued, sndbuf;
442
443         if (unlikely(cancel))
444                 return 1;
445
446         if (unlikely(mdev->state.conn < C_CONNECTED)) {
447                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
448                 return 0;
449         }
450
451         if (mdev->state.conn != C_SYNC_TARGET)
452                 dev_err(DEV, "%s in w_make_resync_request\n",
453                         drbd_conn_str(mdev->state.conn));
454
455         if (!get_ldev(mdev)) {
456                 /* Since we only need to access mdev->rsync a
457                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
458                    to continue resync with a broken disk makes no sense at
459                    all */
460                 dev_err(DEV, "Disk broke down during resync!\n");
461                 mdev->resync_work.cb = w_resync_inactive;
462                 return 1;
463         }
464
465         number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
466         pe = atomic_read(&mdev->rs_pending_cnt);
467
468         mutex_lock(&mdev->data.mutex);
469         if (mdev->data.socket)
470                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
471         else
472                 mx = 1;
473         mutex_unlock(&mdev->data.mutex);
474
475         /* For resync rates >160MB/sec, allow more pending RS requests */
476         if (number > mx)
477                 mx = number;
478
479         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
480         if ((pe + number) > mx) {
481                 number = mx - pe;
482         }
483
484         for (i = 0; i < number; i++) {
485                 /* Stop generating RS requests, when half of the send buffer is filled */
486                 mutex_lock(&mdev->data.mutex);
487                 if (mdev->data.socket) {
488                         queued = mdev->data.socket->sk->sk_wmem_queued;
489                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
490                 } else {
491                         queued = 1;
492                         sndbuf = 0;
493                 }
494                 mutex_unlock(&mdev->data.mutex);
495                 if (queued > sndbuf / 2)
496                         goto requeue;
497
498 next_sector:
499                 size = BM_BLOCK_SIZE;
500                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
501
502                 if (bit == -1UL) {
503                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
504                         mdev->resync_work.cb = w_resync_inactive;
505                         put_ldev(mdev);
506                         return 1;
507                 }
508
509                 sector = BM_BIT_TO_SECT(bit);
510
511                 if (drbd_try_rs_begin_io(mdev, sector)) {
512                         mdev->bm_resync_fo = bit;
513                         goto requeue;
514                 }
515                 mdev->bm_resync_fo = bit + 1;
516
517                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
518                         drbd_rs_complete_io(mdev, sector);
519                         goto next_sector;
520                 }
521
522 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
523                 /* try to find some adjacent bits.
524                  * we stop if we have already the maximum req size.
525                  *
526                  * Additionally always align bigger requests, in order to
527                  * be prepared for all stripe sizes of software RAIDs.
528                  *
529                  * we _do_ care about the agreed-upon q->max_segment_size
530                  * here, as splitting up the requests on the other side is more
531                  * difficult.  the consequence is, that on lvm and md and other
532                  * "indirect" devices, this is dead code, since
533                  * q->max_segment_size will be PAGE_SIZE.
534                  */
535                 align = 1;
536                 for (;;) {
537                         if (size + BM_BLOCK_SIZE > max_segment_size)
538                                 break;
539
540                         /* Be always aligned */
541                         if (sector & ((1<<(align+3))-1))
542                                 break;
543
544                         /* do not cross extent boundaries */
545                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
546                                 break;
547                         /* now, is it actually dirty, after all?
548                          * caution, drbd_bm_test_bit is tri-state for some
549                          * obscure reason; ( b == 0 ) would get the out-of-band
550                          * only accidentally right because of the "oddly sized"
551                          * adjustment below */
552                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
553                                 break;
554                         bit++;
555                         size += BM_BLOCK_SIZE;
556                         if ((BM_BLOCK_SIZE << align) <= size)
557                                 align++;
558                         i++;
559                 }
560                 /* if we merged some,
561                  * reset the offset to start the next drbd_bm_find_next from */
562                 if (size > BM_BLOCK_SIZE)
563                         mdev->bm_resync_fo = bit + 1;
564 #endif
565
566                 /* adjust very last sectors, in case we are oddly sized */
567                 if (sector + (size>>9) > capacity)
568                         size = (capacity-sector)<<9;
569                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
570                         switch (read_for_csum(mdev, sector, size)) {
571                         case 0: /* Disk failure*/
572                                 put_ldev(mdev);
573                                 return 0;
574                         case 2: /* Allocation failed */
575                                 drbd_rs_complete_io(mdev, sector);
576                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
577                                 goto requeue;
578                         /* case 1: everything ok */
579                         }
580                 } else {
581                         inc_rs_pending(mdev);
582                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
583                                                sector, size, ID_SYNCER)) {
584                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
585                                 dec_rs_pending(mdev);
586                                 put_ldev(mdev);
587                                 return 0;
588                         }
589                 }
590         }
591
592         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
593                 /* last syncer _request_ was sent,
594                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
595                  * next sync group will resume), as soon as we receive the last
596                  * resync data block, and the last bit is cleared.
597                  * until then resync "work" is "inactive" ...
598                  */
599                 mdev->resync_work.cb = w_resync_inactive;
600                 put_ldev(mdev);
601                 return 1;
602         }
603
604  requeue:
605         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
606         put_ldev(mdev);
607         return 1;
608 }
609
610 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
611 {
612         int number, i, size;
613         sector_t sector;
614         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
615
616         if (unlikely(cancel))
617                 return 1;
618
619         if (unlikely(mdev->state.conn < C_CONNECTED)) {
620                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
621                 return 0;
622         }
623
624         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
625         if (atomic_read(&mdev->rs_pending_cnt) > number)
626                 goto requeue;
627
628         number -= atomic_read(&mdev->rs_pending_cnt);
629
630         sector = mdev->ov_position;
631         for (i = 0; i < number; i++) {
632                 if (sector >= capacity) {
633                         mdev->resync_work.cb = w_resync_inactive;
634                         return 1;
635                 }
636
637                 size = BM_BLOCK_SIZE;
638
639                 if (drbd_try_rs_begin_io(mdev, sector)) {
640                         mdev->ov_position = sector;
641                         goto requeue;
642                 }
643
644                 if (sector + (size>>9) > capacity)
645                         size = (capacity-sector)<<9;
646
647                 inc_rs_pending(mdev);
648                 if (!drbd_send_ov_request(mdev, sector, size)) {
649                         dec_rs_pending(mdev);
650                         return 0;
651                 }
652                 sector += BM_SECT_PER_BIT;
653         }
654         mdev->ov_position = sector;
655
656  requeue:
657         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
658         return 1;
659 }
660
661
662 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
663 {
664         kfree(w);
665         ov_oos_print(mdev);
666         drbd_resync_finished(mdev);
667
668         return 1;
669 }
670
671 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
672 {
673         kfree(w);
674
675         drbd_resync_finished(mdev);
676
677         return 1;
678 }
679
680 int drbd_resync_finished(struct drbd_conf *mdev)
681 {
682         unsigned long db, dt, dbdt;
683         unsigned long n_oos;
684         union drbd_state os, ns;
685         struct drbd_work *w;
686         char *khelper_cmd = NULL;
687
688         /* Remove all elements from the resync LRU. Since future actions
689          * might set bits in the (main) bitmap, then the entries in the
690          * resync LRU would be wrong. */
691         if (drbd_rs_del_all(mdev)) {
692                 /* In case this is not possible now, most probably because
693                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
694                  * queue (or even the read operations for those packets
695                  * is not finished by now).   Retry in 100ms. */
696
697                 drbd_kick_lo(mdev);
698                 __set_current_state(TASK_INTERRUPTIBLE);
699                 schedule_timeout(HZ / 10);
700                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
701                 if (w) {
702                         w->cb = w_resync_finished;
703                         drbd_queue_work(&mdev->data.work, w);
704                         return 1;
705                 }
706                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
707         }
708
709         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
710         if (dt <= 0)
711                 dt = 1;
712         db = mdev->rs_total;
713         dbdt = Bit2KB(db/dt);
714         mdev->rs_paused /= HZ;
715
716         if (!get_ldev(mdev))
717                 goto out;
718
719         spin_lock_irq(&mdev->req_lock);
720         os = mdev->state;
721
722         /* This protects us against multiple calls (that can happen in the presence
723            of application IO), and against connectivity loss just before we arrive here. */
724         if (os.conn <= C_CONNECTED)
725                 goto out_unlock;
726
727         ns = os;
728         ns.conn = C_CONNECTED;
729
730         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
731              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
732              "Online verify " : "Resync",
733              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
734
735         n_oos = drbd_bm_total_weight(mdev);
736
737         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
738                 if (n_oos) {
739                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
740                               n_oos, Bit2KB(1));
741                         khelper_cmd = "out-of-sync";
742                 }
743         } else {
744                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
745
746                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
747                         khelper_cmd = "after-resync-target";
748
749                 if (mdev->csums_tfm && mdev->rs_total) {
750                         const unsigned long s = mdev->rs_same_csum;
751                         const unsigned long t = mdev->rs_total;
752                         const int ratio =
753                                 (t == 0)     ? 0 :
754                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
755                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
756                              "transferred %luK total %luK\n",
757                              ratio,
758                              Bit2KB(mdev->rs_same_csum),
759                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
760                              Bit2KB(mdev->rs_total));
761                 }
762         }
763
764         if (mdev->rs_failed) {
765                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
766
767                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
768                         ns.disk = D_INCONSISTENT;
769                         ns.pdsk = D_UP_TO_DATE;
770                 } else {
771                         ns.disk = D_UP_TO_DATE;
772                         ns.pdsk = D_INCONSISTENT;
773                 }
774         } else {
775                 ns.disk = D_UP_TO_DATE;
776                 ns.pdsk = D_UP_TO_DATE;
777
778                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
779                         if (mdev->p_uuid) {
780                                 int i;
781                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
782                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
783                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
784                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
785                         } else {
786                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
787                         }
788                 }
789
790                 drbd_uuid_set_bm(mdev, 0UL);
791
792                 if (mdev->p_uuid) {
793                         /* Now the two UUID sets are equal, update what we
794                          * know of the peer. */
795                         int i;
796                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
797                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
798                 }
799         }
800
801         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
802 out_unlock:
803         spin_unlock_irq(&mdev->req_lock);
804         put_ldev(mdev);
805 out:
806         mdev->rs_total  = 0;
807         mdev->rs_failed = 0;
808         mdev->rs_paused = 0;
809         mdev->ov_start_sector = 0;
810
811         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
812                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
813                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
814         }
815
816         if (khelper_cmd)
817                 drbd_khelper(mdev, khelper_cmd);
818
819         return 1;
820 }
821
822 /* helper */
823 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
824 {
825         if (drbd_bio_has_active_page(e->private_bio)) {
826                 /* This might happen if sendpage() has not finished */
827                 spin_lock_irq(&mdev->req_lock);
828                 list_add_tail(&e->w.list, &mdev->net_ee);
829                 spin_unlock_irq(&mdev->req_lock);
830         } else
831                 drbd_free_ee(mdev, e);
832 }
833
834 /**
835  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
836  * @mdev:       DRBD device.
837  * @w:          work object.
838  * @cancel:     The connection will be closed anyways
839  */
840 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
841 {
842         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
843         int ok;
844
845         if (unlikely(cancel)) {
846                 drbd_free_ee(mdev, e);
847                 dec_unacked(mdev);
848                 return 1;
849         }
850
851         if (likely(drbd_bio_uptodate(e->private_bio))) {
852                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
853         } else {
854                 if (__ratelimit(&drbd_ratelimit_state))
855                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
856                             (unsigned long long)e->sector);
857
858                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
859         }
860
861         dec_unacked(mdev);
862
863         move_to_net_ee_or_free(mdev, e);
864
865         if (unlikely(!ok))
866                 dev_err(DEV, "drbd_send_block() failed\n");
867         return ok;
868 }
869
870 /**
871  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
872  * @mdev:       DRBD device.
873  * @w:          work object.
874  * @cancel:     The connection will be closed anyways
875  */
876 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
877 {
878         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
879         int ok;
880
881         if (unlikely(cancel)) {
882                 drbd_free_ee(mdev, e);
883                 dec_unacked(mdev);
884                 return 1;
885         }
886
887         if (get_ldev_if_state(mdev, D_FAILED)) {
888                 drbd_rs_complete_io(mdev, e->sector);
889                 put_ldev(mdev);
890         }
891
892         if (likely(drbd_bio_uptodate(e->private_bio))) {
893                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
894                         inc_rs_pending(mdev);
895                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
896                 } else {
897                         if (__ratelimit(&drbd_ratelimit_state))
898                                 dev_err(DEV, "Not sending RSDataReply, "
899                                     "partner DISKLESS!\n");
900                         ok = 1;
901                 }
902         } else {
903                 if (__ratelimit(&drbd_ratelimit_state))
904                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
905                             (unsigned long long)e->sector);
906
907                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
908
909                 /* update resync data with failure */
910                 drbd_rs_failed_io(mdev, e->sector, e->size);
911         }
912
913         dec_unacked(mdev);
914
915         move_to_net_ee_or_free(mdev, e);
916
917         if (unlikely(!ok))
918                 dev_err(DEV, "drbd_send_block() failed\n");
919         return ok;
920 }
921
922 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
923 {
924         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
925         struct digest_info *di;
926         int digest_size;
927         void *digest = NULL;
928         int ok, eq = 0;
929
930         if (unlikely(cancel)) {
931                 drbd_free_ee(mdev, e);
932                 dec_unacked(mdev);
933                 return 1;
934         }
935
936         drbd_rs_complete_io(mdev, e->sector);
937
938         di = (struct digest_info *)(unsigned long)e->block_id;
939
940         if (likely(drbd_bio_uptodate(e->private_bio))) {
941                 /* quick hack to try to avoid a race against reconfiguration.
942                  * a real fix would be much more involved,
943                  * introducing more locking mechanisms */
944                 if (mdev->csums_tfm) {
945                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
946                         D_ASSERT(digest_size == di->digest_size);
947                         digest = kmalloc(digest_size, GFP_NOIO);
948                 }
949                 if (digest) {
950                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
951                         eq = !memcmp(digest, di->digest, digest_size);
952                         kfree(digest);
953                 }
954
955                 if (eq) {
956                         drbd_set_in_sync(mdev, e->sector, e->size);
957                         mdev->rs_same_csum++;
958                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
959                 } else {
960                         inc_rs_pending(mdev);
961                         e->block_id = ID_SYNCER;
962                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
963                 }
964         } else {
965                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
966                 if (__ratelimit(&drbd_ratelimit_state))
967                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
968         }
969
970         dec_unacked(mdev);
971
972         kfree(di);
973
974         move_to_net_ee_or_free(mdev, e);
975
976         if (unlikely(!ok))
977                 dev_err(DEV, "drbd_send_block/ack() failed\n");
978         return ok;
979 }
980
981 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
982 {
983         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
984         int digest_size;
985         void *digest;
986         int ok = 1;
987
988         if (unlikely(cancel))
989                 goto out;
990
991         if (unlikely(!drbd_bio_uptodate(e->private_bio)))
992                 goto out;
993
994         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
995         /* FIXME if this allocation fails, online verify will not terminate! */
996         digest = kmalloc(digest_size, GFP_NOIO);
997         if (digest) {
998                 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
999                 inc_rs_pending(mdev);
1000                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1001                                              digest, digest_size, P_OV_REPLY);
1002                 if (!ok)
1003                         dec_rs_pending(mdev);
1004                 kfree(digest);
1005         }
1006
1007 out:
1008         drbd_free_ee(mdev, e);
1009
1010         dec_unacked(mdev);
1011
1012         return ok;
1013 }
1014
1015 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1016 {
1017         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1018                 mdev->ov_last_oos_size += size>>9;
1019         } else {
1020                 mdev->ov_last_oos_start = sector;
1021                 mdev->ov_last_oos_size = size>>9;
1022         }
1023         drbd_set_out_of_sync(mdev, sector, size);
1024         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1025 }
1026
1027 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1028 {
1029         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1030         struct digest_info *di;
1031         int digest_size;
1032         void *digest;
1033         int ok, eq = 0;
1034
1035         if (unlikely(cancel)) {
1036                 drbd_free_ee(mdev, e);
1037                 dec_unacked(mdev);
1038                 return 1;
1039         }
1040
1041         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1042          * the resync lru has been cleaned up already */
1043         drbd_rs_complete_io(mdev, e->sector);
1044
1045         di = (struct digest_info *)(unsigned long)e->block_id;
1046
1047         if (likely(drbd_bio_uptodate(e->private_bio))) {
1048                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1049                 digest = kmalloc(digest_size, GFP_NOIO);
1050                 if (digest) {
1051                         drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1052
1053                         D_ASSERT(digest_size == di->digest_size);
1054                         eq = !memcmp(digest, di->digest, digest_size);
1055                         kfree(digest);
1056                 }
1057         } else {
1058                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1059                 if (__ratelimit(&drbd_ratelimit_state))
1060                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1061         }
1062
1063         dec_unacked(mdev);
1064
1065         kfree(di);
1066
1067         if (!eq)
1068                 drbd_ov_oos_found(mdev, e->sector, e->size);
1069         else
1070                 ov_oos_print(mdev);
1071
1072         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1073                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1074
1075         drbd_free_ee(mdev, e);
1076
1077         if (--mdev->ov_left == 0) {
1078                 ov_oos_print(mdev);
1079                 drbd_resync_finished(mdev);
1080         }
1081
1082         return ok;
1083 }
1084
1085 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1088         complete(&b->done);
1089         return 1;
1090 }
1091
1092 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093 {
1094         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1095         struct p_barrier *p = &mdev->data.sbuf.barrier;
1096         int ok = 1;
1097
1098         /* really avoid racing with tl_clear.  w.cb may have been referenced
1099          * just before it was reassigned and re-queued, so double check that.
1100          * actually, this race was harmless, since we only try to send the
1101          * barrier packet here, and otherwise do nothing with the object.
1102          * but compare with the head of w_clear_epoch */
1103         spin_lock_irq(&mdev->req_lock);
1104         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1105                 cancel = 1;
1106         spin_unlock_irq(&mdev->req_lock);
1107         if (cancel)
1108                 return 1;
1109
1110         if (!drbd_get_data_sock(mdev))
1111                 return 0;
1112         p->barrier = b->br_number;
1113         /* inc_ap_pending was done where this was queued.
1114          * dec_ap_pending will be done in got_BarrierAck
1115          * or (on connection loss) in w_clear_epoch.  */
1116         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1117                                 (struct p_header *)p, sizeof(*p), 0);
1118         drbd_put_data_sock(mdev);
1119
1120         return ok;
1121 }
1122
1123 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1124 {
1125         if (cancel)
1126                 return 1;
1127         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1128 }
1129
1130 /**
1131  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1132  * @mdev:       DRBD device.
1133  * @w:          work object.
1134  * @cancel:     The connection will be closed anyways
1135  */
1136 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1137 {
1138         struct drbd_request *req = container_of(w, struct drbd_request, w);
1139         int ok;
1140
1141         if (unlikely(cancel)) {
1142                 req_mod(req, send_canceled);
1143                 return 1;
1144         }
1145
1146         ok = drbd_send_dblock(mdev, req);
1147         req_mod(req, ok ? handed_over_to_network : send_failed);
1148
1149         return ok;
1150 }
1151
1152 /**
1153  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1154  * @mdev:       DRBD device.
1155  * @w:          work object.
1156  * @cancel:     The connection will be closed anyways
1157  */
1158 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1159 {
1160         struct drbd_request *req = container_of(w, struct drbd_request, w);
1161         int ok;
1162
1163         if (unlikely(cancel)) {
1164                 req_mod(req, send_canceled);
1165                 return 1;
1166         }
1167
1168         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1169                                 (unsigned long)req);
1170
1171         if (!ok) {
1172                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1173                  * so this is probably redundant */
1174                 if (mdev->state.conn >= C_CONNECTED)
1175                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1176         }
1177         req_mod(req, ok ? handed_over_to_network : send_failed);
1178
1179         return ok;
1180 }
1181
1182 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1183 {
1184         struct drbd_conf *odev = mdev;
1185
1186         while (1) {
1187                 if (odev->sync_conf.after == -1)
1188                         return 1;
1189                 odev = minor_to_mdev(odev->sync_conf.after);
1190                 ERR_IF(!odev) return 1;
1191                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1192                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1193                     odev->state.aftr_isp || odev->state.peer_isp ||
1194                     odev->state.user_isp)
1195                         return 0;
1196         }
1197 }
1198
1199 /**
1200  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1201  * @mdev:       DRBD device.
1202  *
1203  * Called from process context only (admin command and after_state_ch).
1204  */
1205 static int _drbd_pause_after(struct drbd_conf *mdev)
1206 {
1207         struct drbd_conf *odev;
1208         int i, rv = 0;
1209
1210         for (i = 0; i < minor_count; i++) {
1211                 odev = minor_to_mdev(i);
1212                 if (!odev)
1213                         continue;
1214                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1215                         continue;
1216                 if (!_drbd_may_sync_now(odev))
1217                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1218                                != SS_NOTHING_TO_DO);
1219         }
1220
1221         return rv;
1222 }
1223
1224 /**
1225  * _drbd_resume_next() - Resume resync on all devices that may resync now
1226  * @mdev:       DRBD device.
1227  *
1228  * Called from process context only (admin command and worker).
1229  */
1230 static int _drbd_resume_next(struct drbd_conf *mdev)
1231 {
1232         struct drbd_conf *odev;
1233         int i, rv = 0;
1234
1235         for (i = 0; i < minor_count; i++) {
1236                 odev = minor_to_mdev(i);
1237                 if (!odev)
1238                         continue;
1239                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1240                         continue;
1241                 if (odev->state.aftr_isp) {
1242                         if (_drbd_may_sync_now(odev))
1243                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1244                                                         CS_HARD, NULL)
1245                                        != SS_NOTHING_TO_DO) ;
1246                 }
1247         }
1248         return rv;
1249 }
1250
1251 void resume_next_sg(struct drbd_conf *mdev)
1252 {
1253         write_lock_irq(&global_state_lock);
1254         _drbd_resume_next(mdev);
1255         write_unlock_irq(&global_state_lock);
1256 }
1257
1258 void suspend_other_sg(struct drbd_conf *mdev)
1259 {
1260         write_lock_irq(&global_state_lock);
1261         _drbd_pause_after(mdev);
1262         write_unlock_irq(&global_state_lock);
1263 }
1264
1265 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1266 {
1267         struct drbd_conf *odev;
1268
1269         if (o_minor == -1)
1270                 return NO_ERROR;
1271         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1272                 return ERR_SYNC_AFTER;
1273
1274         /* check for loops */
1275         odev = minor_to_mdev(o_minor);
1276         while (1) {
1277                 if (odev == mdev)
1278                         return ERR_SYNC_AFTER_CYCLE;
1279
1280                 /* dependency chain ends here, no cycles. */
1281                 if (odev->sync_conf.after == -1)
1282                         return NO_ERROR;
1283
1284                 /* follow the dependency chain */
1285                 odev = minor_to_mdev(odev->sync_conf.after);
1286         }
1287 }
1288
1289 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1290 {
1291         int changes;
1292         int retcode;
1293
1294         write_lock_irq(&global_state_lock);
1295         retcode = sync_after_error(mdev, na);
1296         if (retcode == NO_ERROR) {
1297                 mdev->sync_conf.after = na;
1298                 do {
1299                         changes  = _drbd_pause_after(mdev);
1300                         changes |= _drbd_resume_next(mdev);
1301                 } while (changes);
1302         }
1303         write_unlock_irq(&global_state_lock);
1304         return retcode;
1305 }
1306
1307 /**
1308  * drbd_start_resync() - Start the resync process
1309  * @mdev:       DRBD device.
1310  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1311  *
1312  * This function might bring you directly into one of the
1313  * C_PAUSED_SYNC_* states.
1314  */
1315 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1316 {
1317         union drbd_state ns;
1318         int r;
1319
1320         if (mdev->state.conn >= C_SYNC_SOURCE) {
1321                 dev_err(DEV, "Resync already running!\n");
1322                 return;
1323         }
1324
1325         trace_drbd_resync(mdev, TRACE_LVL_SUMMARY, "Resync starting: side=%s\n",
1326                           side == C_SYNC_TARGET ? "SyncTarget" : "SyncSource");
1327
1328         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1329         drbd_rs_cancel_all(mdev);
1330
1331         if (side == C_SYNC_TARGET) {
1332                 /* Since application IO was locked out during C_WF_BITMAP_T and
1333                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1334                    we check that we might make the data inconsistent. */
1335                 r = drbd_khelper(mdev, "before-resync-target");
1336                 r = (r >> 8) & 0xff;
1337                 if (r > 0) {
1338                         dev_info(DEV, "before-resync-target handler returned %d, "
1339                              "dropping connection.\n", r);
1340                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1341                         return;
1342                 }
1343         }
1344
1345         drbd_state_lock(mdev);
1346
1347         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1348                 drbd_state_unlock(mdev);
1349                 return;
1350         }
1351
1352         if (side == C_SYNC_TARGET) {
1353                 mdev->bm_resync_fo = 0;
1354         } else /* side == C_SYNC_SOURCE */ {
1355                 u64 uuid;
1356
1357                 get_random_bytes(&uuid, sizeof(u64));
1358                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1359                 drbd_send_sync_uuid(mdev, uuid);
1360
1361                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1362         }
1363
1364         write_lock_irq(&global_state_lock);
1365         ns = mdev->state;
1366
1367         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1368
1369         ns.conn = side;
1370
1371         if (side == C_SYNC_TARGET)
1372                 ns.disk = D_INCONSISTENT;
1373         else /* side == C_SYNC_SOURCE */
1374                 ns.pdsk = D_INCONSISTENT;
1375
1376         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1377         ns = mdev->state;
1378
1379         if (ns.conn < C_CONNECTED)
1380                 r = SS_UNKNOWN_ERROR;
1381
1382         if (r == SS_SUCCESS) {
1383                 mdev->rs_total     =
1384                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1385                 mdev->rs_failed    = 0;
1386                 mdev->rs_paused    = 0;
1387                 mdev->rs_start     =
1388                 mdev->rs_mark_time = jiffies;
1389                 mdev->rs_same_csum = 0;
1390                 _drbd_pause_after(mdev);
1391         }
1392         write_unlock_irq(&global_state_lock);
1393         drbd_state_unlock(mdev);
1394         put_ldev(mdev);
1395
1396         if (r == SS_SUCCESS) {
1397                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1398                      drbd_conn_str(ns.conn),
1399                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1400                      (unsigned long) mdev->rs_total);
1401
1402                 if (mdev->rs_total == 0) {
1403                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1404                         request_ping(mdev);
1405                         __set_current_state(TASK_INTERRUPTIBLE);
1406                         schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1407                         drbd_resync_finished(mdev);
1408                         return;
1409                 }
1410
1411                 /* ns.conn may already be != mdev->state.conn,
1412                  * we may have been paused in between, or become paused until
1413                  * the timer triggers.
1414                  * No matter, that is handled in resync_timer_fn() */
1415                 if (ns.conn == C_SYNC_TARGET)
1416                         mod_timer(&mdev->resync_timer, jiffies);
1417
1418                 drbd_md_sync(mdev);
1419         }
1420 }
1421
1422 int drbd_worker(struct drbd_thread *thi)
1423 {
1424         struct drbd_conf *mdev = thi->mdev;
1425         struct drbd_work *w = NULL;
1426         LIST_HEAD(work_list);
1427         int intr = 0, i;
1428
1429         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1430
1431         while (get_t_state(thi) == Running) {
1432                 drbd_thread_current_set_cpu(mdev);
1433
1434                 if (down_trylock(&mdev->data.work.s)) {
1435                         mutex_lock(&mdev->data.mutex);
1436                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1437                                 drbd_tcp_uncork(mdev->data.socket);
1438                         mutex_unlock(&mdev->data.mutex);
1439
1440                         intr = down_interruptible(&mdev->data.work.s);
1441
1442                         mutex_lock(&mdev->data.mutex);
1443                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1444                                 drbd_tcp_cork(mdev->data.socket);
1445                         mutex_unlock(&mdev->data.mutex);
1446                 }
1447
1448                 if (intr) {
1449                         D_ASSERT(intr == -EINTR);
1450                         flush_signals(current);
1451                         ERR_IF (get_t_state(thi) == Running)
1452                                 continue;
1453                         break;
1454                 }
1455
1456                 if (get_t_state(thi) != Running)
1457                         break;
1458                 /* With this break, we have done a down() but not consumed
1459                    the entry from the list. The cleanup code takes care of
1460                    this...   */
1461
1462                 w = NULL;
1463                 spin_lock_irq(&mdev->data.work.q_lock);
1464                 ERR_IF(list_empty(&mdev->data.work.q)) {
1465                         /* something terribly wrong in our logic.
1466                          * we were able to down() the semaphore,
1467                          * but the list is empty... doh.
1468                          *
1469                          * what is the best thing to do now?
1470                          * try again from scratch, restarting the receiver,
1471                          * asender, whatnot? could break even more ugly,
1472                          * e.g. when we are primary, but no good local data.
1473                          *
1474                          * I'll try to get away just starting over this loop.
1475                          */
1476                         spin_unlock_irq(&mdev->data.work.q_lock);
1477                         continue;
1478                 }
1479                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1480                 list_del_init(&w->list);
1481                 spin_unlock_irq(&mdev->data.work.q_lock);
1482
1483                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1484                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1485                         if (mdev->state.conn >= C_CONNECTED)
1486                                 drbd_force_state(mdev,
1487                                                 NS(conn, C_NETWORK_FAILURE));
1488                 }
1489         }
1490         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1491         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1492
1493         spin_lock_irq(&mdev->data.work.q_lock);
1494         i = 0;
1495         while (!list_empty(&mdev->data.work.q)) {
1496                 list_splice_init(&mdev->data.work.q, &work_list);
1497                 spin_unlock_irq(&mdev->data.work.q_lock);
1498
1499                 while (!list_empty(&work_list)) {
1500                         w = list_entry(work_list.next, struct drbd_work, list);
1501                         list_del_init(&w->list);
1502                         w->cb(mdev, w, 1);
1503                         i++; /* dead debugging code */
1504                 }
1505
1506                 spin_lock_irq(&mdev->data.work.q_lock);
1507         }
1508         sema_init(&mdev->data.work.s, 0);
1509         /* DANGEROUS race: if someone did queue his work within the spinlock,
1510          * but up() ed outside the spinlock, we could get an up() on the
1511          * semaphore without corresponding list entry.
1512          * So don't do that.
1513          */
1514         spin_unlock_irq(&mdev->data.work.q_lock);
1515
1516         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1517         /* _drbd_set_state only uses stop_nowait.
1518          * wait here for the Exiting receiver. */
1519         drbd_thread_stop(&mdev->receiver);
1520         drbd_mdev_cleanup(mdev);
1521
1522         dev_info(DEV, "worker terminated\n");
1523
1524         clear_bit(DEVICE_DYING, &mdev->flags);
1525         clear_bit(CONFIG_PENDING, &mdev->flags);
1526         wake_up(&mdev->state_wait);
1527
1528         return 0;
1529 }