dropping unneeded include autoconf.h
[safe/jmp/linux-2.6] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
32 #include <linux/mm.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
37 #include <linux/mm.h>
38 #include <linux/string.h>
39 #include <linux/scatterlist.h>
40
41 #include "drbd_int.h"
42 #include "drbd_req.h"
43 #include "drbd_tracing.h"
44
45 #define SLEEP_TIME (HZ/10)
46
47 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
48
49
50
51 /* defined here:
52    drbd_md_io_complete
53    drbd_endio_write_sec
54    drbd_endio_read_sec
55    drbd_endio_pri
56
57  * more endio handlers:
58    atodb_endio in drbd_actlog.c
59    drbd_bm_async_io_complete in drbd_bitmap.c
60
61  * For all these callbacks, note the following:
62  * The callbacks will be called in irq context by the IDE drivers,
63  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
64  * Try to get the locking right :)
65  *
66  */
67
68
69 /* About the global_state_lock
70    Each state transition on an device holds a read lock. In case we have
71    to evaluate the sync after dependencies, we grab a write lock, because
72    we need stable states on all devices for that.  */
73 rwlock_t global_state_lock;
74
75 /* used for synchronous meta data and bitmap IO
76  * submitted by drbd_md_sync_page_io()
77  */
78 void drbd_md_io_complete(struct bio *bio, int error)
79 {
80         struct drbd_md_io *md_io;
81
82         md_io = (struct drbd_md_io *)bio->bi_private;
83         md_io->error = error;
84
85         trace_drbd_bio(md_io->mdev, "Md", bio, 1, NULL);
86
87         complete(&md_io->event);
88 }
89
90 /* reads on behalf of the partner,
91  * "submitted" by the receiver
92  */
93 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
94 {
95         unsigned long flags = 0;
96         struct drbd_epoch_entry *e = NULL;
97         struct drbd_conf *mdev;
98         int uptodate = bio_flagged(bio, BIO_UPTODATE);
99
100         e = bio->bi_private;
101         mdev = e->mdev;
102
103         if (error)
104                 dev_warn(DEV, "read: error=%d s=%llus\n", error,
105                                 (unsigned long long)e->sector);
106         if (!error && !uptodate) {
107                 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
108                                 (unsigned long long)e->sector);
109                 /* strange behavior of some lower level drivers...
110                  * fail the request by clearing the uptodate flag,
111                  * but do not return any error?! */
112                 error = -EIO;
113         }
114
115         D_ASSERT(e->block_id != ID_VACANT);
116
117         trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
118
119         spin_lock_irqsave(&mdev->req_lock, flags);
120         mdev->read_cnt += e->size >> 9;
121         list_del(&e->w.list);
122         if (list_empty(&mdev->read_ee))
123                 wake_up(&mdev->ee_wait);
124         spin_unlock_irqrestore(&mdev->req_lock, flags);
125
126         drbd_chk_io_error(mdev, error, FALSE);
127         drbd_queue_work(&mdev->data.work, &e->w);
128         put_ldev(mdev);
129
130         trace_drbd_ee(mdev, e, "read completed");
131 }
132
133 /* writes on behalf of the partner, or resync writes,
134  * "submitted" by the receiver.
135  */
136 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
137 {
138         unsigned long flags = 0;
139         struct drbd_epoch_entry *e = NULL;
140         struct drbd_conf *mdev;
141         sector_t e_sector;
142         int do_wake;
143         int is_syncer_req;
144         int do_al_complete_io;
145         int uptodate = bio_flagged(bio, BIO_UPTODATE);
146         int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
147
148         e = bio->bi_private;
149         mdev = e->mdev;
150
151         if (error)
152                 dev_warn(DEV, "write: error=%d s=%llus\n", error,
153                                 (unsigned long long)e->sector);
154         if (!error && !uptodate) {
155                 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
156                                 (unsigned long long)e->sector);
157                 /* strange behavior of some lower level drivers...
158                  * fail the request by clearing the uptodate flag,
159                  * but do not return any error?! */
160                 error = -EIO;
161         }
162
163         /* error == -ENOTSUPP would be a better test,
164          * alas it is not reliable */
165         if (error && is_barrier && e->flags & EE_IS_BARRIER) {
166                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
167                 spin_lock_irqsave(&mdev->req_lock, flags);
168                 list_del(&e->w.list);
169                 e->w.cb = w_e_reissue;
170                 /* put_ldev actually happens below, once we come here again. */
171                 __release(local);
172                 spin_unlock_irqrestore(&mdev->req_lock, flags);
173                 drbd_queue_work(&mdev->data.work, &e->w);
174                 return;
175         }
176
177         D_ASSERT(e->block_id != ID_VACANT);
178
179         trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
180
181         spin_lock_irqsave(&mdev->req_lock, flags);
182         mdev->writ_cnt += e->size >> 9;
183         is_syncer_req = is_syncer_block_id(e->block_id);
184
185         /* after we moved e to done_ee,
186          * we may no longer access it,
187          * it may be freed/reused already!
188          * (as soon as we release the req_lock) */
189         e_sector = e->sector;
190         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
191
192         list_del(&e->w.list); /* has been on active_ee or sync_ee */
193         list_add_tail(&e->w.list, &mdev->done_ee);
194
195         trace_drbd_ee(mdev, e, "write completed");
196
197         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
198          * neither did we wake possibly waiting conflicting requests.
199          * done from "drbd_process_done_ee" within the appropriate w.cb
200          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
201
202         do_wake = is_syncer_req
203                 ? list_empty(&mdev->sync_ee)
204                 : list_empty(&mdev->active_ee);
205
206         if (error)
207                 __drbd_chk_io_error(mdev, FALSE);
208         spin_unlock_irqrestore(&mdev->req_lock, flags);
209
210         if (is_syncer_req)
211                 drbd_rs_complete_io(mdev, e_sector);
212
213         if (do_wake)
214                 wake_up(&mdev->ee_wait);
215
216         if (do_al_complete_io)
217                 drbd_al_complete_io(mdev, e_sector);
218
219         wake_asender(mdev);
220         put_ldev(mdev);
221
222 }
223
224 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
225  */
226 void drbd_endio_pri(struct bio *bio, int error)
227 {
228         unsigned long flags;
229         struct drbd_request *req = bio->bi_private;
230         struct drbd_conf *mdev = req->mdev;
231         struct bio_and_error m;
232         enum drbd_req_event what;
233         int uptodate = bio_flagged(bio, BIO_UPTODATE);
234
235         if (error)
236                 dev_warn(DEV, "p %s: error=%d\n",
237                          bio_data_dir(bio) == WRITE ? "write" : "read", error);
238         if (!error && !uptodate) {
239                 dev_warn(DEV, "p %s: setting error to -EIO\n",
240                          bio_data_dir(bio) == WRITE ? "write" : "read");
241                 /* strange behavior of some lower level drivers...
242                  * fail the request by clearing the uptodate flag,
243                  * but do not return any error?! */
244                 error = -EIO;
245         }
246
247         trace_drbd_bio(mdev, "Pri", bio, 1, NULL);
248
249         /* to avoid recursion in __req_mod */
250         if (unlikely(error)) {
251                 what = (bio_data_dir(bio) == WRITE)
252                         ? write_completed_with_error
253                         : (bio_rw(bio) == READA)
254                           ? read_completed_with_error
255                           : read_ahead_completed_with_error;
256         } else
257                 what = completed_ok;
258
259         bio_put(req->private_bio);
260         req->private_bio = ERR_PTR(error);
261
262         spin_lock_irqsave(&mdev->req_lock, flags);
263         __req_mod(req, what, &m);
264         spin_unlock_irqrestore(&mdev->req_lock, flags);
265
266         if (m.bio)
267                 complete_master_bio(mdev, &m);
268 }
269
270 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
271 {
272         struct drbd_request *req = container_of(w, struct drbd_request, w);
273
274         /* NOTE: mdev->ldev can be NULL by the time we get here! */
275         /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
276
277         /* the only way this callback is scheduled is from _req_may_be_done,
278          * when it is done and had a local write error, see comments there */
279         drbd_req_free(req);
280
281         return TRUE;
282 }
283
284 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
285 {
286         struct drbd_request *req = container_of(w, struct drbd_request, w);
287
288         /* We should not detach for read io-error,
289          * but try to WRITE the P_DATA_REPLY to the failed location,
290          * to give the disk the chance to relocate that block */
291
292         spin_lock_irq(&mdev->req_lock);
293         if (cancel ||
294             mdev->state.conn < C_CONNECTED ||
295             mdev->state.pdsk <= D_INCONSISTENT) {
296                 _req_mod(req, send_canceled);
297                 spin_unlock_irq(&mdev->req_lock);
298                 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
299                 return 1;
300         }
301         spin_unlock_irq(&mdev->req_lock);
302
303         return w_send_read_req(mdev, w, 0);
304 }
305
306 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
307 {
308         ERR_IF(cancel) return 1;
309         dev_err(DEV, "resync inactive, but callback triggered??\n");
310         return 1; /* Simply ignore this! */
311 }
312
313 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
314 {
315         struct hash_desc desc;
316         struct scatterlist sg;
317         struct bio_vec *bvec;
318         int i;
319
320         desc.tfm = tfm;
321         desc.flags = 0;
322
323         sg_init_table(&sg, 1);
324         crypto_hash_init(&desc);
325
326         __bio_for_each_segment(bvec, bio, i, 0) {
327                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
328                 crypto_hash_update(&desc, &sg, sg.length);
329         }
330         crypto_hash_final(&desc, digest);
331 }
332
333 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
334 {
335         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
336         int digest_size;
337         void *digest;
338         int ok;
339
340         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
341
342         if (unlikely(cancel)) {
343                 drbd_free_ee(mdev, e);
344                 return 1;
345         }
346
347         if (likely(drbd_bio_uptodate(e->private_bio))) {
348                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
349                 digest = kmalloc(digest_size, GFP_NOIO);
350                 if (digest) {
351                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
352
353                         inc_rs_pending(mdev);
354                         ok = drbd_send_drequest_csum(mdev,
355                                                      e->sector,
356                                                      e->size,
357                                                      digest,
358                                                      digest_size,
359                                                      P_CSUM_RS_REQUEST);
360                         kfree(digest);
361                 } else {
362                         dev_err(DEV, "kmalloc() of digest failed.\n");
363                         ok = 0;
364                 }
365         } else
366                 ok = 1;
367
368         drbd_free_ee(mdev, e);
369
370         if (unlikely(!ok))
371                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
372         return ok;
373 }
374
375 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
376
377 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
378 {
379         struct drbd_epoch_entry *e;
380
381         if (!get_ldev(mdev))
382                 return 0;
383
384         /* GFP_TRY, because if there is no memory available right now, this may
385          * be rescheduled for later. It is "only" background resync, after all. */
386         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
387         if (!e) {
388                 put_ldev(mdev);
389                 return 2;
390         }
391
392         spin_lock_irq(&mdev->req_lock);
393         list_add(&e->w.list, &mdev->read_ee);
394         spin_unlock_irq(&mdev->req_lock);
395
396         e->private_bio->bi_end_io = drbd_endio_read_sec;
397         e->private_bio->bi_rw = READ;
398         e->w.cb = w_e_send_csum;
399
400         mdev->read_cnt += size >> 9;
401         drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
402
403         return 1;
404 }
405
406 void resync_timer_fn(unsigned long data)
407 {
408         unsigned long flags;
409         struct drbd_conf *mdev = (struct drbd_conf *) data;
410         int queue;
411
412         spin_lock_irqsave(&mdev->req_lock, flags);
413
414         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
415                 queue = 1;
416                 if (mdev->state.conn == C_VERIFY_S)
417                         mdev->resync_work.cb = w_make_ov_request;
418                 else
419                         mdev->resync_work.cb = w_make_resync_request;
420         } else {
421                 queue = 0;
422                 mdev->resync_work.cb = w_resync_inactive;
423         }
424
425         spin_unlock_irqrestore(&mdev->req_lock, flags);
426
427         /* harmless race: list_empty outside data.work.q_lock */
428         if (list_empty(&mdev->resync_work.list) && queue)
429                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
430 }
431
432 int w_make_resync_request(struct drbd_conf *mdev,
433                 struct drbd_work *w, int cancel)
434 {
435         unsigned long bit;
436         sector_t sector;
437         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
438         int max_segment_size = queue_max_segment_size(mdev->rq_queue);
439         int number, i, size, pe, mx;
440         int align, queued, sndbuf;
441
442         if (unlikely(cancel))
443                 return 1;
444
445         if (unlikely(mdev->state.conn < C_CONNECTED)) {
446                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
447                 return 0;
448         }
449
450         if (mdev->state.conn != C_SYNC_TARGET)
451                 dev_err(DEV, "%s in w_make_resync_request\n",
452                         drbd_conn_str(mdev->state.conn));
453
454         if (!get_ldev(mdev)) {
455                 /* Since we only need to access mdev->rsync a
456                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
457                    to continue resync with a broken disk makes no sense at
458                    all */
459                 dev_err(DEV, "Disk broke down during resync!\n");
460                 mdev->resync_work.cb = w_resync_inactive;
461                 return 1;
462         }
463
464         number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
465         pe = atomic_read(&mdev->rs_pending_cnt);
466
467         mutex_lock(&mdev->data.mutex);
468         if (mdev->data.socket)
469                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
470         else
471                 mx = 1;
472         mutex_unlock(&mdev->data.mutex);
473
474         /* For resync rates >160MB/sec, allow more pending RS requests */
475         if (number > mx)
476                 mx = number;
477
478         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
479         if ((pe + number) > mx) {
480                 number = mx - pe;
481         }
482
483         for (i = 0; i < number; i++) {
484                 /* Stop generating RS requests, when half of the send buffer is filled */
485                 mutex_lock(&mdev->data.mutex);
486                 if (mdev->data.socket) {
487                         queued = mdev->data.socket->sk->sk_wmem_queued;
488                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
489                 } else {
490                         queued = 1;
491                         sndbuf = 0;
492                 }
493                 mutex_unlock(&mdev->data.mutex);
494                 if (queued > sndbuf / 2)
495                         goto requeue;
496
497 next_sector:
498                 size = BM_BLOCK_SIZE;
499                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
500
501                 if (bit == -1UL) {
502                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
503                         mdev->resync_work.cb = w_resync_inactive;
504                         put_ldev(mdev);
505                         return 1;
506                 }
507
508                 sector = BM_BIT_TO_SECT(bit);
509
510                 if (drbd_try_rs_begin_io(mdev, sector)) {
511                         mdev->bm_resync_fo = bit;
512                         goto requeue;
513                 }
514                 mdev->bm_resync_fo = bit + 1;
515
516                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
517                         drbd_rs_complete_io(mdev, sector);
518                         goto next_sector;
519                 }
520
521 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
522                 /* try to find some adjacent bits.
523                  * we stop if we have already the maximum req size.
524                  *
525                  * Additionally always align bigger requests, in order to
526                  * be prepared for all stripe sizes of software RAIDs.
527                  *
528                  * we _do_ care about the agreed-upon q->max_segment_size
529                  * here, as splitting up the requests on the other side is more
530                  * difficult.  the consequence is, that on lvm and md and other
531                  * "indirect" devices, this is dead code, since
532                  * q->max_segment_size will be PAGE_SIZE.
533                  */
534                 align = 1;
535                 for (;;) {
536                         if (size + BM_BLOCK_SIZE > max_segment_size)
537                                 break;
538
539                         /* Be always aligned */
540                         if (sector & ((1<<(align+3))-1))
541                                 break;
542
543                         /* do not cross extent boundaries */
544                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
545                                 break;
546                         /* now, is it actually dirty, after all?
547                          * caution, drbd_bm_test_bit is tri-state for some
548                          * obscure reason; ( b == 0 ) would get the out-of-band
549                          * only accidentally right because of the "oddly sized"
550                          * adjustment below */
551                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
552                                 break;
553                         bit++;
554                         size += BM_BLOCK_SIZE;
555                         if ((BM_BLOCK_SIZE << align) <= size)
556                                 align++;
557                         i++;
558                 }
559                 /* if we merged some,
560                  * reset the offset to start the next drbd_bm_find_next from */
561                 if (size > BM_BLOCK_SIZE)
562                         mdev->bm_resync_fo = bit + 1;
563 #endif
564
565                 /* adjust very last sectors, in case we are oddly sized */
566                 if (sector + (size>>9) > capacity)
567                         size = (capacity-sector)<<9;
568                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
569                         switch (read_for_csum(mdev, sector, size)) {
570                         case 0: /* Disk failure*/
571                                 put_ldev(mdev);
572                                 return 0;
573                         case 2: /* Allocation failed */
574                                 drbd_rs_complete_io(mdev, sector);
575                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
576                                 goto requeue;
577                         /* case 1: everything ok */
578                         }
579                 } else {
580                         inc_rs_pending(mdev);
581                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
582                                                sector, size, ID_SYNCER)) {
583                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
584                                 dec_rs_pending(mdev);
585                                 put_ldev(mdev);
586                                 return 0;
587                         }
588                 }
589         }
590
591         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
592                 /* last syncer _request_ was sent,
593                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
594                  * next sync group will resume), as soon as we receive the last
595                  * resync data block, and the last bit is cleared.
596                  * until then resync "work" is "inactive" ...
597                  */
598                 mdev->resync_work.cb = w_resync_inactive;
599                 put_ldev(mdev);
600                 return 1;
601         }
602
603  requeue:
604         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
605         put_ldev(mdev);
606         return 1;
607 }
608
609 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
610 {
611         int number, i, size;
612         sector_t sector;
613         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
614
615         if (unlikely(cancel))
616                 return 1;
617
618         if (unlikely(mdev->state.conn < C_CONNECTED)) {
619                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
620                 return 0;
621         }
622
623         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
624         if (atomic_read(&mdev->rs_pending_cnt) > number)
625                 goto requeue;
626
627         number -= atomic_read(&mdev->rs_pending_cnt);
628
629         sector = mdev->ov_position;
630         for (i = 0; i < number; i++) {
631                 if (sector >= capacity) {
632                         mdev->resync_work.cb = w_resync_inactive;
633                         return 1;
634                 }
635
636                 size = BM_BLOCK_SIZE;
637
638                 if (drbd_try_rs_begin_io(mdev, sector)) {
639                         mdev->ov_position = sector;
640                         goto requeue;
641                 }
642
643                 if (sector + (size>>9) > capacity)
644                         size = (capacity-sector)<<9;
645
646                 inc_rs_pending(mdev);
647                 if (!drbd_send_ov_request(mdev, sector, size)) {
648                         dec_rs_pending(mdev);
649                         return 0;
650                 }
651                 sector += BM_SECT_PER_BIT;
652         }
653         mdev->ov_position = sector;
654
655  requeue:
656         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
657         return 1;
658 }
659
660
661 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
662 {
663         kfree(w);
664         ov_oos_print(mdev);
665         drbd_resync_finished(mdev);
666
667         return 1;
668 }
669
670 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
671 {
672         kfree(w);
673
674         drbd_resync_finished(mdev);
675
676         return 1;
677 }
678
679 int drbd_resync_finished(struct drbd_conf *mdev)
680 {
681         unsigned long db, dt, dbdt;
682         unsigned long n_oos;
683         union drbd_state os, ns;
684         struct drbd_work *w;
685         char *khelper_cmd = NULL;
686
687         /* Remove all elements from the resync LRU. Since future actions
688          * might set bits in the (main) bitmap, then the entries in the
689          * resync LRU would be wrong. */
690         if (drbd_rs_del_all(mdev)) {
691                 /* In case this is not possible now, most probably because
692                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
693                  * queue (or even the read operations for those packets
694                  * is not finished by now).   Retry in 100ms. */
695
696                 drbd_kick_lo(mdev);
697                 __set_current_state(TASK_INTERRUPTIBLE);
698                 schedule_timeout(HZ / 10);
699                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
700                 if (w) {
701                         w->cb = w_resync_finished;
702                         drbd_queue_work(&mdev->data.work, w);
703                         return 1;
704                 }
705                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
706         }
707
708         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
709         if (dt <= 0)
710                 dt = 1;
711         db = mdev->rs_total;
712         dbdt = Bit2KB(db/dt);
713         mdev->rs_paused /= HZ;
714
715         if (!get_ldev(mdev))
716                 goto out;
717
718         spin_lock_irq(&mdev->req_lock);
719         os = mdev->state;
720
721         /* This protects us against multiple calls (that can happen in the presence
722            of application IO), and against connectivity loss just before we arrive here. */
723         if (os.conn <= C_CONNECTED)
724                 goto out_unlock;
725
726         ns = os;
727         ns.conn = C_CONNECTED;
728
729         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
730              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
731              "Online verify " : "Resync",
732              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
733
734         n_oos = drbd_bm_total_weight(mdev);
735
736         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
737                 if (n_oos) {
738                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
739                               n_oos, Bit2KB(1));
740                         khelper_cmd = "out-of-sync";
741                 }
742         } else {
743                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
744
745                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
746                         khelper_cmd = "after-resync-target";
747
748                 if (mdev->csums_tfm && mdev->rs_total) {
749                         const unsigned long s = mdev->rs_same_csum;
750                         const unsigned long t = mdev->rs_total;
751                         const int ratio =
752                                 (t == 0)     ? 0 :
753                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
754                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
755                              "transferred %luK total %luK\n",
756                              ratio,
757                              Bit2KB(mdev->rs_same_csum),
758                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
759                              Bit2KB(mdev->rs_total));
760                 }
761         }
762
763         if (mdev->rs_failed) {
764                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
765
766                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
767                         ns.disk = D_INCONSISTENT;
768                         ns.pdsk = D_UP_TO_DATE;
769                 } else {
770                         ns.disk = D_UP_TO_DATE;
771                         ns.pdsk = D_INCONSISTENT;
772                 }
773         } else {
774                 ns.disk = D_UP_TO_DATE;
775                 ns.pdsk = D_UP_TO_DATE;
776
777                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
778                         if (mdev->p_uuid) {
779                                 int i;
780                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
781                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
782                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
783                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
784                         } else {
785                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
786                         }
787                 }
788
789                 drbd_uuid_set_bm(mdev, 0UL);
790
791                 if (mdev->p_uuid) {
792                         /* Now the two UUID sets are equal, update what we
793                          * know of the peer. */
794                         int i;
795                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
796                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
797                 }
798         }
799
800         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
801 out_unlock:
802         spin_unlock_irq(&mdev->req_lock);
803         put_ldev(mdev);
804 out:
805         mdev->rs_total  = 0;
806         mdev->rs_failed = 0;
807         mdev->rs_paused = 0;
808         mdev->ov_start_sector = 0;
809
810         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
811                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
812                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
813         }
814
815         if (khelper_cmd)
816                 drbd_khelper(mdev, khelper_cmd);
817
818         return 1;
819 }
820
821 /* helper */
822 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
823 {
824         if (drbd_bio_has_active_page(e->private_bio)) {
825                 /* This might happen if sendpage() has not finished */
826                 spin_lock_irq(&mdev->req_lock);
827                 list_add_tail(&e->w.list, &mdev->net_ee);
828                 spin_unlock_irq(&mdev->req_lock);
829         } else
830                 drbd_free_ee(mdev, e);
831 }
832
833 /**
834  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
835  * @mdev:       DRBD device.
836  * @w:          work object.
837  * @cancel:     The connection will be closed anyways
838  */
839 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
840 {
841         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
842         int ok;
843
844         if (unlikely(cancel)) {
845                 drbd_free_ee(mdev, e);
846                 dec_unacked(mdev);
847                 return 1;
848         }
849
850         if (likely(drbd_bio_uptodate(e->private_bio))) {
851                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
852         } else {
853                 if (__ratelimit(&drbd_ratelimit_state))
854                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
855                             (unsigned long long)e->sector);
856
857                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
858         }
859
860         dec_unacked(mdev);
861
862         move_to_net_ee_or_free(mdev, e);
863
864         if (unlikely(!ok))
865                 dev_err(DEV, "drbd_send_block() failed\n");
866         return ok;
867 }
868
869 /**
870  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
871  * @mdev:       DRBD device.
872  * @w:          work object.
873  * @cancel:     The connection will be closed anyways
874  */
875 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
876 {
877         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
878         int ok;
879
880         if (unlikely(cancel)) {
881                 drbd_free_ee(mdev, e);
882                 dec_unacked(mdev);
883                 return 1;
884         }
885
886         if (get_ldev_if_state(mdev, D_FAILED)) {
887                 drbd_rs_complete_io(mdev, e->sector);
888                 put_ldev(mdev);
889         }
890
891         if (likely(drbd_bio_uptodate(e->private_bio))) {
892                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
893                         inc_rs_pending(mdev);
894                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
895                 } else {
896                         if (__ratelimit(&drbd_ratelimit_state))
897                                 dev_err(DEV, "Not sending RSDataReply, "
898                                     "partner DISKLESS!\n");
899                         ok = 1;
900                 }
901         } else {
902                 if (__ratelimit(&drbd_ratelimit_state))
903                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
904                             (unsigned long long)e->sector);
905
906                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
907
908                 /* update resync data with failure */
909                 drbd_rs_failed_io(mdev, e->sector, e->size);
910         }
911
912         dec_unacked(mdev);
913
914         move_to_net_ee_or_free(mdev, e);
915
916         if (unlikely(!ok))
917                 dev_err(DEV, "drbd_send_block() failed\n");
918         return ok;
919 }
920
921 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
922 {
923         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
924         struct digest_info *di;
925         int digest_size;
926         void *digest = NULL;
927         int ok, eq = 0;
928
929         if (unlikely(cancel)) {
930                 drbd_free_ee(mdev, e);
931                 dec_unacked(mdev);
932                 return 1;
933         }
934
935         drbd_rs_complete_io(mdev, e->sector);
936
937         di = (struct digest_info *)(unsigned long)e->block_id;
938
939         if (likely(drbd_bio_uptodate(e->private_bio))) {
940                 /* quick hack to try to avoid a race against reconfiguration.
941                  * a real fix would be much more involved,
942                  * introducing more locking mechanisms */
943                 if (mdev->csums_tfm) {
944                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
945                         D_ASSERT(digest_size == di->digest_size);
946                         digest = kmalloc(digest_size, GFP_NOIO);
947                 }
948                 if (digest) {
949                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
950                         eq = !memcmp(digest, di->digest, digest_size);
951                         kfree(digest);
952                 }
953
954                 if (eq) {
955                         drbd_set_in_sync(mdev, e->sector, e->size);
956                         mdev->rs_same_csum++;
957                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
958                 } else {
959                         inc_rs_pending(mdev);
960                         e->block_id = ID_SYNCER;
961                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
962                 }
963         } else {
964                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
965                 if (__ratelimit(&drbd_ratelimit_state))
966                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
967         }
968
969         dec_unacked(mdev);
970
971         kfree(di);
972
973         move_to_net_ee_or_free(mdev, e);
974
975         if (unlikely(!ok))
976                 dev_err(DEV, "drbd_send_block/ack() failed\n");
977         return ok;
978 }
979
980 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
981 {
982         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
983         int digest_size;
984         void *digest;
985         int ok = 1;
986
987         if (unlikely(cancel))
988                 goto out;
989
990         if (unlikely(!drbd_bio_uptodate(e->private_bio)))
991                 goto out;
992
993         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
994         /* FIXME if this allocation fails, online verify will not terminate! */
995         digest = kmalloc(digest_size, GFP_NOIO);
996         if (digest) {
997                 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
998                 inc_rs_pending(mdev);
999                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1000                                              digest, digest_size, P_OV_REPLY);
1001                 if (!ok)
1002                         dec_rs_pending(mdev);
1003                 kfree(digest);
1004         }
1005
1006 out:
1007         drbd_free_ee(mdev, e);
1008
1009         dec_unacked(mdev);
1010
1011         return ok;
1012 }
1013
1014 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1015 {
1016         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1017                 mdev->ov_last_oos_size += size>>9;
1018         } else {
1019                 mdev->ov_last_oos_start = sector;
1020                 mdev->ov_last_oos_size = size>>9;
1021         }
1022         drbd_set_out_of_sync(mdev, sector, size);
1023         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1024 }
1025
1026 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1027 {
1028         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1029         struct digest_info *di;
1030         int digest_size;
1031         void *digest;
1032         int ok, eq = 0;
1033
1034         if (unlikely(cancel)) {
1035                 drbd_free_ee(mdev, e);
1036                 dec_unacked(mdev);
1037                 return 1;
1038         }
1039
1040         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1041          * the resync lru has been cleaned up already */
1042         drbd_rs_complete_io(mdev, e->sector);
1043
1044         di = (struct digest_info *)(unsigned long)e->block_id;
1045
1046         if (likely(drbd_bio_uptodate(e->private_bio))) {
1047                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1048                 digest = kmalloc(digest_size, GFP_NOIO);
1049                 if (digest) {
1050                         drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1051
1052                         D_ASSERT(digest_size == di->digest_size);
1053                         eq = !memcmp(digest, di->digest, digest_size);
1054                         kfree(digest);
1055                 }
1056         } else {
1057                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1058                 if (__ratelimit(&drbd_ratelimit_state))
1059                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1060         }
1061
1062         dec_unacked(mdev);
1063
1064         kfree(di);
1065
1066         if (!eq)
1067                 drbd_ov_oos_found(mdev, e->sector, e->size);
1068         else
1069                 ov_oos_print(mdev);
1070
1071         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1072                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1073
1074         drbd_free_ee(mdev, e);
1075
1076         if (--mdev->ov_left == 0) {
1077                 ov_oos_print(mdev);
1078                 drbd_resync_finished(mdev);
1079         }
1080
1081         return ok;
1082 }
1083
1084 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1085 {
1086         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1087         complete(&b->done);
1088         return 1;
1089 }
1090
1091 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1092 {
1093         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1094         struct p_barrier *p = &mdev->data.sbuf.barrier;
1095         int ok = 1;
1096
1097         /* really avoid racing with tl_clear.  w.cb may have been referenced
1098          * just before it was reassigned and re-queued, so double check that.
1099          * actually, this race was harmless, since we only try to send the
1100          * barrier packet here, and otherwise do nothing with the object.
1101          * but compare with the head of w_clear_epoch */
1102         spin_lock_irq(&mdev->req_lock);
1103         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1104                 cancel = 1;
1105         spin_unlock_irq(&mdev->req_lock);
1106         if (cancel)
1107                 return 1;
1108
1109         if (!drbd_get_data_sock(mdev))
1110                 return 0;
1111         p->barrier = b->br_number;
1112         /* inc_ap_pending was done where this was queued.
1113          * dec_ap_pending will be done in got_BarrierAck
1114          * or (on connection loss) in w_clear_epoch.  */
1115         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1116                                 (struct p_header *)p, sizeof(*p), 0);
1117         drbd_put_data_sock(mdev);
1118
1119         return ok;
1120 }
1121
1122 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1123 {
1124         if (cancel)
1125                 return 1;
1126         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1127 }
1128
1129 /**
1130  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1131  * @mdev:       DRBD device.
1132  * @w:          work object.
1133  * @cancel:     The connection will be closed anyways
1134  */
1135 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1136 {
1137         struct drbd_request *req = container_of(w, struct drbd_request, w);
1138         int ok;
1139
1140         if (unlikely(cancel)) {
1141                 req_mod(req, send_canceled);
1142                 return 1;
1143         }
1144
1145         ok = drbd_send_dblock(mdev, req);
1146         req_mod(req, ok ? handed_over_to_network : send_failed);
1147
1148         return ok;
1149 }
1150
1151 /**
1152  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1153  * @mdev:       DRBD device.
1154  * @w:          work object.
1155  * @cancel:     The connection will be closed anyways
1156  */
1157 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1158 {
1159         struct drbd_request *req = container_of(w, struct drbd_request, w);
1160         int ok;
1161
1162         if (unlikely(cancel)) {
1163                 req_mod(req, send_canceled);
1164                 return 1;
1165         }
1166
1167         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1168                                 (unsigned long)req);
1169
1170         if (!ok) {
1171                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1172                  * so this is probably redundant */
1173                 if (mdev->state.conn >= C_CONNECTED)
1174                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1175         }
1176         req_mod(req, ok ? handed_over_to_network : send_failed);
1177
1178         return ok;
1179 }
1180
1181 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1182 {
1183         struct drbd_conf *odev = mdev;
1184
1185         while (1) {
1186                 if (odev->sync_conf.after == -1)
1187                         return 1;
1188                 odev = minor_to_mdev(odev->sync_conf.after);
1189                 ERR_IF(!odev) return 1;
1190                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1191                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1192                     odev->state.aftr_isp || odev->state.peer_isp ||
1193                     odev->state.user_isp)
1194                         return 0;
1195         }
1196 }
1197
1198 /**
1199  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1200  * @mdev:       DRBD device.
1201  *
1202  * Called from process context only (admin command and after_state_ch).
1203  */
1204 static int _drbd_pause_after(struct drbd_conf *mdev)
1205 {
1206         struct drbd_conf *odev;
1207         int i, rv = 0;
1208
1209         for (i = 0; i < minor_count; i++) {
1210                 odev = minor_to_mdev(i);
1211                 if (!odev)
1212                         continue;
1213                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1214                         continue;
1215                 if (!_drbd_may_sync_now(odev))
1216                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1217                                != SS_NOTHING_TO_DO);
1218         }
1219
1220         return rv;
1221 }
1222
1223 /**
1224  * _drbd_resume_next() - Resume resync on all devices that may resync now
1225  * @mdev:       DRBD device.
1226  *
1227  * Called from process context only (admin command and worker).
1228  */
1229 static int _drbd_resume_next(struct drbd_conf *mdev)
1230 {
1231         struct drbd_conf *odev;
1232         int i, rv = 0;
1233
1234         for (i = 0; i < minor_count; i++) {
1235                 odev = minor_to_mdev(i);
1236                 if (!odev)
1237                         continue;
1238                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1239                         continue;
1240                 if (odev->state.aftr_isp) {
1241                         if (_drbd_may_sync_now(odev))
1242                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1243                                                         CS_HARD, NULL)
1244                                        != SS_NOTHING_TO_DO) ;
1245                 }
1246         }
1247         return rv;
1248 }
1249
1250 void resume_next_sg(struct drbd_conf *mdev)
1251 {
1252         write_lock_irq(&global_state_lock);
1253         _drbd_resume_next(mdev);
1254         write_unlock_irq(&global_state_lock);
1255 }
1256
1257 void suspend_other_sg(struct drbd_conf *mdev)
1258 {
1259         write_lock_irq(&global_state_lock);
1260         _drbd_pause_after(mdev);
1261         write_unlock_irq(&global_state_lock);
1262 }
1263
1264 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1265 {
1266         struct drbd_conf *odev;
1267
1268         if (o_minor == -1)
1269                 return NO_ERROR;
1270         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1271                 return ERR_SYNC_AFTER;
1272
1273         /* check for loops */
1274         odev = minor_to_mdev(o_minor);
1275         while (1) {
1276                 if (odev == mdev)
1277                         return ERR_SYNC_AFTER_CYCLE;
1278
1279                 /* dependency chain ends here, no cycles. */
1280                 if (odev->sync_conf.after == -1)
1281                         return NO_ERROR;
1282
1283                 /* follow the dependency chain */
1284                 odev = minor_to_mdev(odev->sync_conf.after);
1285         }
1286 }
1287
1288 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1289 {
1290         int changes;
1291         int retcode;
1292
1293         write_lock_irq(&global_state_lock);
1294         retcode = sync_after_error(mdev, na);
1295         if (retcode == NO_ERROR) {
1296                 mdev->sync_conf.after = na;
1297                 do {
1298                         changes  = _drbd_pause_after(mdev);
1299                         changes |= _drbd_resume_next(mdev);
1300                 } while (changes);
1301         }
1302         write_unlock_irq(&global_state_lock);
1303         return retcode;
1304 }
1305
1306 /**
1307  * drbd_start_resync() - Start the resync process
1308  * @mdev:       DRBD device.
1309  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1310  *
1311  * This function might bring you directly into one of the
1312  * C_PAUSED_SYNC_* states.
1313  */
1314 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1315 {
1316         union drbd_state ns;
1317         int r;
1318
1319         if (mdev->state.conn >= C_SYNC_SOURCE) {
1320                 dev_err(DEV, "Resync already running!\n");
1321                 return;
1322         }
1323
1324         trace_drbd_resync(mdev, TRACE_LVL_SUMMARY, "Resync starting: side=%s\n",
1325                           side == C_SYNC_TARGET ? "SyncTarget" : "SyncSource");
1326
1327         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1328         drbd_rs_cancel_all(mdev);
1329
1330         if (side == C_SYNC_TARGET) {
1331                 /* Since application IO was locked out during C_WF_BITMAP_T and
1332                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1333                    we check that we might make the data inconsistent. */
1334                 r = drbd_khelper(mdev, "before-resync-target");
1335                 r = (r >> 8) & 0xff;
1336                 if (r > 0) {
1337                         dev_info(DEV, "before-resync-target handler returned %d, "
1338                              "dropping connection.\n", r);
1339                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1340                         return;
1341                 }
1342         }
1343
1344         drbd_state_lock(mdev);
1345
1346         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1347                 drbd_state_unlock(mdev);
1348                 return;
1349         }
1350
1351         if (side == C_SYNC_TARGET) {
1352                 mdev->bm_resync_fo = 0;
1353         } else /* side == C_SYNC_SOURCE */ {
1354                 u64 uuid;
1355
1356                 get_random_bytes(&uuid, sizeof(u64));
1357                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1358                 drbd_send_sync_uuid(mdev, uuid);
1359
1360                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1361         }
1362
1363         write_lock_irq(&global_state_lock);
1364         ns = mdev->state;
1365
1366         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1367
1368         ns.conn = side;
1369
1370         if (side == C_SYNC_TARGET)
1371                 ns.disk = D_INCONSISTENT;
1372         else /* side == C_SYNC_SOURCE */
1373                 ns.pdsk = D_INCONSISTENT;
1374
1375         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1376         ns = mdev->state;
1377
1378         if (ns.conn < C_CONNECTED)
1379                 r = SS_UNKNOWN_ERROR;
1380
1381         if (r == SS_SUCCESS) {
1382                 mdev->rs_total     =
1383                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1384                 mdev->rs_failed    = 0;
1385                 mdev->rs_paused    = 0;
1386                 mdev->rs_start     =
1387                 mdev->rs_mark_time = jiffies;
1388                 mdev->rs_same_csum = 0;
1389                 _drbd_pause_after(mdev);
1390         }
1391         write_unlock_irq(&global_state_lock);
1392         drbd_state_unlock(mdev);
1393         put_ldev(mdev);
1394
1395         if (r == SS_SUCCESS) {
1396                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1397                      drbd_conn_str(ns.conn),
1398                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1399                      (unsigned long) mdev->rs_total);
1400
1401                 if (mdev->rs_total == 0) {
1402                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1403                         request_ping(mdev);
1404                         __set_current_state(TASK_INTERRUPTIBLE);
1405                         schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1406                         drbd_resync_finished(mdev);
1407                         return;
1408                 }
1409
1410                 /* ns.conn may already be != mdev->state.conn,
1411                  * we may have been paused in between, or become paused until
1412                  * the timer triggers.
1413                  * No matter, that is handled in resync_timer_fn() */
1414                 if (ns.conn == C_SYNC_TARGET)
1415                         mod_timer(&mdev->resync_timer, jiffies);
1416
1417                 drbd_md_sync(mdev);
1418         }
1419 }
1420
1421 int drbd_worker(struct drbd_thread *thi)
1422 {
1423         struct drbd_conf *mdev = thi->mdev;
1424         struct drbd_work *w = NULL;
1425         LIST_HEAD(work_list);
1426         int intr = 0, i;
1427
1428         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1429
1430         while (get_t_state(thi) == Running) {
1431                 drbd_thread_current_set_cpu(mdev);
1432
1433                 if (down_trylock(&mdev->data.work.s)) {
1434                         mutex_lock(&mdev->data.mutex);
1435                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1436                                 drbd_tcp_uncork(mdev->data.socket);
1437                         mutex_unlock(&mdev->data.mutex);
1438
1439                         intr = down_interruptible(&mdev->data.work.s);
1440
1441                         mutex_lock(&mdev->data.mutex);
1442                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1443                                 drbd_tcp_cork(mdev->data.socket);
1444                         mutex_unlock(&mdev->data.mutex);
1445                 }
1446
1447                 if (intr) {
1448                         D_ASSERT(intr == -EINTR);
1449                         flush_signals(current);
1450                         ERR_IF (get_t_state(thi) == Running)
1451                                 continue;
1452                         break;
1453                 }
1454
1455                 if (get_t_state(thi) != Running)
1456                         break;
1457                 /* With this break, we have done a down() but not consumed
1458                    the entry from the list. The cleanup code takes care of
1459                    this...   */
1460
1461                 w = NULL;
1462                 spin_lock_irq(&mdev->data.work.q_lock);
1463                 ERR_IF(list_empty(&mdev->data.work.q)) {
1464                         /* something terribly wrong in our logic.
1465                          * we were able to down() the semaphore,
1466                          * but the list is empty... doh.
1467                          *
1468                          * what is the best thing to do now?
1469                          * try again from scratch, restarting the receiver,
1470                          * asender, whatnot? could break even more ugly,
1471                          * e.g. when we are primary, but no good local data.
1472                          *
1473                          * I'll try to get away just starting over this loop.
1474                          */
1475                         spin_unlock_irq(&mdev->data.work.q_lock);
1476                         continue;
1477                 }
1478                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1479                 list_del_init(&w->list);
1480                 spin_unlock_irq(&mdev->data.work.q_lock);
1481
1482                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1483                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1484                         if (mdev->state.conn >= C_CONNECTED)
1485                                 drbd_force_state(mdev,
1486                                                 NS(conn, C_NETWORK_FAILURE));
1487                 }
1488         }
1489         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1490         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1491
1492         spin_lock_irq(&mdev->data.work.q_lock);
1493         i = 0;
1494         while (!list_empty(&mdev->data.work.q)) {
1495                 list_splice_init(&mdev->data.work.q, &work_list);
1496                 spin_unlock_irq(&mdev->data.work.q_lock);
1497
1498                 while (!list_empty(&work_list)) {
1499                         w = list_entry(work_list.next, struct drbd_work, list);
1500                         list_del_init(&w->list);
1501                         w->cb(mdev, w, 1);
1502                         i++; /* dead debugging code */
1503                 }
1504
1505                 spin_lock_irq(&mdev->data.work.q_lock);
1506         }
1507         sema_init(&mdev->data.work.s, 0);
1508         /* DANGEROUS race: if someone did queue his work within the spinlock,
1509          * but up() ed outside the spinlock, we could get an up() on the
1510          * semaphore without corresponding list entry.
1511          * So don't do that.
1512          */
1513         spin_unlock_irq(&mdev->data.work.q_lock);
1514
1515         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1516         /* _drbd_set_state only uses stop_nowait.
1517          * wait here for the Exiting receiver. */
1518         drbd_thread_stop(&mdev->receiver);
1519         drbd_mdev_cleanup(mdev);
1520
1521         dev_info(DEV, "worker terminated\n");
1522
1523         clear_bit(DEVICE_DYING, &mdev->flags);
1524         clear_bit(CONFIG_PENDING, &mdev->flags);
1525         wake_up(&mdev->state_wait);
1526
1527         return 0;
1528 }