dm kcopyd: clean interface
[safe/jmp/linux-2.6] / drivers / md / dm-raid1.c
1 /*
2  * Copyright (C) 2003 Sistina Software Limited.
3  *
4  * This file is released under the GPL.
5  */
6
7 #include "dm.h"
8 #include "dm-bio-list.h"
9 #include "dm-bio-record.h"
10 #include "dm-io.h"
11 #include "dm-log.h"
12 #include "kcopyd.h"
13
14 #include <linux/ctype.h>
15 #include <linux/init.h>
16 #include <linux/mempool.h>
17 #include <linux/module.h>
18 #include <linux/pagemap.h>
19 #include <linux/slab.h>
20 #include <linux/time.h>
21 #include <linux/vmalloc.h>
22 #include <linux/workqueue.h>
23 #include <linux/log2.h>
24 #include <linux/hardirq.h>
25
26 #define DM_MSG_PREFIX "raid1"
27 #define DM_IO_PAGES 64
28
29 #define DM_RAID1_HANDLE_ERRORS 0x01
30 #define errors_handled(p)       ((p)->features & DM_RAID1_HANDLE_ERRORS)
31
32 static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
33
34 /*-----------------------------------------------------------------
35  * Region hash
36  *
37  * The mirror splits itself up into discrete regions.  Each
38  * region can be in one of three states: clean, dirty,
39  * nosync.  There is no need to put clean regions in the hash.
40  *
41  * In addition to being present in the hash table a region _may_
42  * be present on one of three lists.
43  *
44  *   clean_regions: Regions on this list have no io pending to
45  *   them, they are in sync, we are no longer interested in them,
46  *   they are dull.  rh_update_states() will remove them from the
47  *   hash table.
48  *
49  *   quiesced_regions: These regions have been spun down, ready
50  *   for recovery.  rh_recovery_start() will remove regions from
51  *   this list and hand them to kmirrord, which will schedule the
52  *   recovery io with kcopyd.
53  *
54  *   recovered_regions: Regions that kcopyd has successfully
55  *   recovered.  rh_update_states() will now schedule any delayed
56  *   io, up the recovery_count, and remove the region from the
57  *   hash.
58  *
59  * There are 2 locks:
60  *   A rw spin lock 'hash_lock' protects just the hash table,
61  *   this is never held in write mode from interrupt context,
62  *   which I believe means that we only have to disable irqs when
63  *   doing a write lock.
64  *
65  *   An ordinary spin lock 'region_lock' that protects the three
66  *   lists in the region_hash, with the 'state', 'list' and
67  *   'bhs_delayed' fields of the regions.  This is used from irq
68  *   context, so all other uses will have to suspend local irqs.
69  *---------------------------------------------------------------*/
70 struct mirror_set;
71 struct region_hash {
72         struct mirror_set *ms;
73         uint32_t region_size;
74         unsigned region_shift;
75
76         /* holds persistent region state */
77         struct dirty_log *log;
78
79         /* hash table */
80         rwlock_t hash_lock;
81         mempool_t *region_pool;
82         unsigned int mask;
83         unsigned int nr_buckets;
84         struct list_head *buckets;
85
86         spinlock_t region_lock;
87         atomic_t recovery_in_flight;
88         struct semaphore recovery_count;
89         struct list_head clean_regions;
90         struct list_head quiesced_regions;
91         struct list_head recovered_regions;
92         struct list_head failed_recovered_regions;
93 };
94
95 enum {
96         RH_CLEAN,
97         RH_DIRTY,
98         RH_NOSYNC,
99         RH_RECOVERING
100 };
101
102 struct region {
103         struct region_hash *rh; /* FIXME: can we get rid of this ? */
104         region_t key;
105         int state;
106
107         struct list_head hash_list;
108         struct list_head list;
109
110         atomic_t pending;
111         struct bio_list delayed_bios;
112 };
113
114
115 /*-----------------------------------------------------------------
116  * Mirror set structures.
117  *---------------------------------------------------------------*/
118 enum dm_raid1_error {
119         DM_RAID1_WRITE_ERROR,
120         DM_RAID1_SYNC_ERROR,
121         DM_RAID1_READ_ERROR
122 };
123
124 struct mirror {
125         struct mirror_set *ms;
126         atomic_t error_count;
127         unsigned long error_type;
128         struct dm_dev *dev;
129         sector_t offset;
130 };
131
132 struct mirror_set {
133         struct dm_target *ti;
134         struct list_head list;
135         struct region_hash rh;
136         struct dm_kcopyd_client *kcopyd_client;
137         uint64_t features;
138
139         spinlock_t lock;        /* protects the lists */
140         struct bio_list reads;
141         struct bio_list writes;
142         struct bio_list failures;
143
144         struct dm_io_client *io_client;
145         mempool_t *read_record_pool;
146
147         /* recovery */
148         region_t nr_regions;
149         int in_sync;
150         int log_failure;
151         atomic_t suspend;
152
153         atomic_t default_mirror;        /* Default mirror */
154
155         struct workqueue_struct *kmirrord_wq;
156         struct work_struct kmirrord_work;
157         struct work_struct trigger_event;
158
159         unsigned int nr_mirrors;
160         struct mirror mirror[0];
161 };
162
163 /*
164  * Conversion fns
165  */
166 static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio)
167 {
168         return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift;
169 }
170
171 static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
172 {
173         return region << rh->region_shift;
174 }
175
176 static void wake(struct mirror_set *ms)
177 {
178         queue_work(ms->kmirrord_wq, &ms->kmirrord_work);
179 }
180
181 /* FIXME move this */
182 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
183
184 #define MIN_REGIONS 64
185 #define MAX_RECOVERY 1
186 static int rh_init(struct region_hash *rh, struct mirror_set *ms,
187                    struct dirty_log *log, uint32_t region_size,
188                    region_t nr_regions)
189 {
190         unsigned int nr_buckets, max_buckets;
191         size_t i;
192
193         /*
194          * Calculate a suitable number of buckets for our hash
195          * table.
196          */
197         max_buckets = nr_regions >> 6;
198         for (nr_buckets = 128u; nr_buckets < max_buckets; nr_buckets <<= 1)
199                 ;
200         nr_buckets >>= 1;
201
202         rh->ms = ms;
203         rh->log = log;
204         rh->region_size = region_size;
205         rh->region_shift = ffs(region_size) - 1;
206         rwlock_init(&rh->hash_lock);
207         rh->mask = nr_buckets - 1;
208         rh->nr_buckets = nr_buckets;
209
210         rh->buckets = vmalloc(nr_buckets * sizeof(*rh->buckets));
211         if (!rh->buckets) {
212                 DMERR("unable to allocate region hash memory");
213                 return -ENOMEM;
214         }
215
216         for (i = 0; i < nr_buckets; i++)
217                 INIT_LIST_HEAD(rh->buckets + i);
218
219         spin_lock_init(&rh->region_lock);
220         sema_init(&rh->recovery_count, 0);
221         atomic_set(&rh->recovery_in_flight, 0);
222         INIT_LIST_HEAD(&rh->clean_regions);
223         INIT_LIST_HEAD(&rh->quiesced_regions);
224         INIT_LIST_HEAD(&rh->recovered_regions);
225         INIT_LIST_HEAD(&rh->failed_recovered_regions);
226
227         rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
228                                                       sizeof(struct region));
229         if (!rh->region_pool) {
230                 vfree(rh->buckets);
231                 rh->buckets = NULL;
232                 return -ENOMEM;
233         }
234
235         return 0;
236 }
237
238 static void rh_exit(struct region_hash *rh)
239 {
240         unsigned int h;
241         struct region *reg, *nreg;
242
243         BUG_ON(!list_empty(&rh->quiesced_regions));
244         for (h = 0; h < rh->nr_buckets; h++) {
245                 list_for_each_entry_safe(reg, nreg, rh->buckets + h, hash_list) {
246                         BUG_ON(atomic_read(&reg->pending));
247                         mempool_free(reg, rh->region_pool);
248                 }
249         }
250
251         if (rh->log)
252                 dm_destroy_dirty_log(rh->log);
253         if (rh->region_pool)
254                 mempool_destroy(rh->region_pool);
255         vfree(rh->buckets);
256 }
257
258 #define RH_HASH_MULT 2654435387U
259
260 static inline unsigned int rh_hash(struct region_hash *rh, region_t region)
261 {
262         return (unsigned int) ((region * RH_HASH_MULT) >> 12) & rh->mask;
263 }
264
265 static struct region *__rh_lookup(struct region_hash *rh, region_t region)
266 {
267         struct region *reg;
268
269         list_for_each_entry (reg, rh->buckets + rh_hash(rh, region), hash_list)
270                 if (reg->key == region)
271                         return reg;
272
273         return NULL;
274 }
275
276 static void __rh_insert(struct region_hash *rh, struct region *reg)
277 {
278         unsigned int h = rh_hash(rh, reg->key);
279         list_add(&reg->hash_list, rh->buckets + h);
280 }
281
282 static struct region *__rh_alloc(struct region_hash *rh, region_t region)
283 {
284         struct region *reg, *nreg;
285
286         read_unlock(&rh->hash_lock);
287         nreg = mempool_alloc(rh->region_pool, GFP_ATOMIC);
288         if (unlikely(!nreg))
289                 nreg = kmalloc(sizeof(struct region), GFP_NOIO);
290         nreg->state = rh->log->type->in_sync(rh->log, region, 1) ?
291                 RH_CLEAN : RH_NOSYNC;
292         nreg->rh = rh;
293         nreg->key = region;
294
295         INIT_LIST_HEAD(&nreg->list);
296
297         atomic_set(&nreg->pending, 0);
298         bio_list_init(&nreg->delayed_bios);
299         write_lock_irq(&rh->hash_lock);
300
301         reg = __rh_lookup(rh, region);
302         if (reg)
303                 /* we lost the race */
304                 mempool_free(nreg, rh->region_pool);
305
306         else {
307                 __rh_insert(rh, nreg);
308                 if (nreg->state == RH_CLEAN) {
309                         spin_lock(&rh->region_lock);
310                         list_add(&nreg->list, &rh->clean_regions);
311                         spin_unlock(&rh->region_lock);
312                 }
313                 reg = nreg;
314         }
315         write_unlock_irq(&rh->hash_lock);
316         read_lock(&rh->hash_lock);
317
318         return reg;
319 }
320
321 static inline struct region *__rh_find(struct region_hash *rh, region_t region)
322 {
323         struct region *reg;
324
325         reg = __rh_lookup(rh, region);
326         if (!reg)
327                 reg = __rh_alloc(rh, region);
328
329         return reg;
330 }
331
332 static int rh_state(struct region_hash *rh, region_t region, int may_block)
333 {
334         int r;
335         struct region *reg;
336
337         read_lock(&rh->hash_lock);
338         reg = __rh_lookup(rh, region);
339         read_unlock(&rh->hash_lock);
340
341         if (reg)
342                 return reg->state;
343
344         /*
345          * The region wasn't in the hash, so we fall back to the
346          * dirty log.
347          */
348         r = rh->log->type->in_sync(rh->log, region, may_block);
349
350         /*
351          * Any error from the dirty log (eg. -EWOULDBLOCK) gets
352          * taken as a RH_NOSYNC
353          */
354         return r == 1 ? RH_CLEAN : RH_NOSYNC;
355 }
356
357 static inline int rh_in_sync(struct region_hash *rh,
358                              region_t region, int may_block)
359 {
360         int state = rh_state(rh, region, may_block);
361         return state == RH_CLEAN || state == RH_DIRTY;
362 }
363
364 static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
365 {
366         struct bio *bio;
367
368         while ((bio = bio_list_pop(bio_list))) {
369                 queue_bio(ms, bio, WRITE);
370         }
371 }
372
373 static void complete_resync_work(struct region *reg, int success)
374 {
375         struct region_hash *rh = reg->rh;
376
377         rh->log->type->set_region_sync(rh->log, reg->key, success);
378
379         /*
380          * Dispatch the bios before we call 'wake_up_all'.
381          * This is important because if we are suspending,
382          * we want to know that recovery is complete and
383          * the work queue is flushed.  If we wake_up_all
384          * before we dispatch_bios (queue bios and call wake()),
385          * then we risk suspending before the work queue
386          * has been properly flushed.
387          */
388         dispatch_bios(rh->ms, &reg->delayed_bios);
389         if (atomic_dec_and_test(&rh->recovery_in_flight))
390                 wake_up_all(&_kmirrord_recovery_stopped);
391         up(&rh->recovery_count);
392 }
393
394 static void rh_update_states(struct region_hash *rh)
395 {
396         struct region *reg, *next;
397
398         LIST_HEAD(clean);
399         LIST_HEAD(recovered);
400         LIST_HEAD(failed_recovered);
401
402         /*
403          * Quickly grab the lists.
404          */
405         write_lock_irq(&rh->hash_lock);
406         spin_lock(&rh->region_lock);
407         if (!list_empty(&rh->clean_regions)) {
408                 list_splice_init(&rh->clean_regions, &clean);
409
410                 list_for_each_entry(reg, &clean, list)
411                         list_del(&reg->hash_list);
412         }
413
414         if (!list_empty(&rh->recovered_regions)) {
415                 list_splice_init(&rh->recovered_regions, &recovered);
416
417                 list_for_each_entry (reg, &recovered, list)
418                         list_del(&reg->hash_list);
419         }
420
421         if (!list_empty(&rh->failed_recovered_regions)) {
422                 list_splice_init(&rh->failed_recovered_regions,
423                                  &failed_recovered);
424
425                 list_for_each_entry(reg, &failed_recovered, list)
426                         list_del(&reg->hash_list);
427         }
428
429         spin_unlock(&rh->region_lock);
430         write_unlock_irq(&rh->hash_lock);
431
432         /*
433          * All the regions on the recovered and clean lists have
434          * now been pulled out of the system, so no need to do
435          * any more locking.
436          */
437         list_for_each_entry_safe (reg, next, &recovered, list) {
438                 rh->log->type->clear_region(rh->log, reg->key);
439                 complete_resync_work(reg, 1);
440                 mempool_free(reg, rh->region_pool);
441         }
442
443         list_for_each_entry_safe(reg, next, &failed_recovered, list) {
444                 complete_resync_work(reg, errors_handled(rh->ms) ? 0 : 1);
445                 mempool_free(reg, rh->region_pool);
446         }
447
448         list_for_each_entry_safe(reg, next, &clean, list) {
449                 rh->log->type->clear_region(rh->log, reg->key);
450                 mempool_free(reg, rh->region_pool);
451         }
452
453         rh->log->type->flush(rh->log);
454 }
455
456 static void rh_inc(struct region_hash *rh, region_t region)
457 {
458         struct region *reg;
459
460         read_lock(&rh->hash_lock);
461         reg = __rh_find(rh, region);
462
463         spin_lock_irq(&rh->region_lock);
464         atomic_inc(&reg->pending);
465
466         if (reg->state == RH_CLEAN) {
467                 reg->state = RH_DIRTY;
468                 list_del_init(&reg->list);      /* take off the clean list */
469                 spin_unlock_irq(&rh->region_lock);
470
471                 rh->log->type->mark_region(rh->log, reg->key);
472         } else
473                 spin_unlock_irq(&rh->region_lock);
474
475
476         read_unlock(&rh->hash_lock);
477 }
478
479 static void rh_inc_pending(struct region_hash *rh, struct bio_list *bios)
480 {
481         struct bio *bio;
482
483         for (bio = bios->head; bio; bio = bio->bi_next)
484                 rh_inc(rh, bio_to_region(rh, bio));
485 }
486
487 static void rh_dec(struct region_hash *rh, region_t region)
488 {
489         unsigned long flags;
490         struct region *reg;
491         int should_wake = 0;
492
493         read_lock(&rh->hash_lock);
494         reg = __rh_lookup(rh, region);
495         read_unlock(&rh->hash_lock);
496
497         spin_lock_irqsave(&rh->region_lock, flags);
498         if (atomic_dec_and_test(&reg->pending)) {
499                 /*
500                  * There is no pending I/O for this region.
501                  * We can move the region to corresponding list for next action.
502                  * At this point, the region is not yet connected to any list.
503                  *
504                  * If the state is RH_NOSYNC, the region should be kept off
505                  * from clean list.
506                  * The hash entry for RH_NOSYNC will remain in memory
507                  * until the region is recovered or the map is reloaded.
508                  */
509
510                 /* do nothing for RH_NOSYNC */
511                 if (reg->state == RH_RECOVERING) {
512                         list_add_tail(&reg->list, &rh->quiesced_regions);
513                 } else if (reg->state == RH_DIRTY) {
514                         reg->state = RH_CLEAN;
515                         list_add(&reg->list, &rh->clean_regions);
516                 }
517                 should_wake = 1;
518         }
519         spin_unlock_irqrestore(&rh->region_lock, flags);
520
521         if (should_wake)
522                 wake(rh->ms);
523 }
524
525 /*
526  * Starts quiescing a region in preparation for recovery.
527  */
528 static int __rh_recovery_prepare(struct region_hash *rh)
529 {
530         int r;
531         struct region *reg;
532         region_t region;
533
534         /*
535          * Ask the dirty log what's next.
536          */
537         r = rh->log->type->get_resync_work(rh->log, &region);
538         if (r <= 0)
539                 return r;
540
541         /*
542          * Get this region, and start it quiescing by setting the
543          * recovering flag.
544          */
545         read_lock(&rh->hash_lock);
546         reg = __rh_find(rh, region);
547         read_unlock(&rh->hash_lock);
548
549         spin_lock_irq(&rh->region_lock);
550         reg->state = RH_RECOVERING;
551
552         /* Already quiesced ? */
553         if (atomic_read(&reg->pending))
554                 list_del_init(&reg->list);
555         else
556                 list_move(&reg->list, &rh->quiesced_regions);
557
558         spin_unlock_irq(&rh->region_lock);
559
560         return 1;
561 }
562
563 static void rh_recovery_prepare(struct region_hash *rh)
564 {
565         /* Extra reference to avoid race with rh_stop_recovery */
566         atomic_inc(&rh->recovery_in_flight);
567
568         while (!down_trylock(&rh->recovery_count)) {
569                 atomic_inc(&rh->recovery_in_flight);
570                 if (__rh_recovery_prepare(rh) <= 0) {
571                         atomic_dec(&rh->recovery_in_flight);
572                         up(&rh->recovery_count);
573                         break;
574                 }
575         }
576
577         /* Drop the extra reference */
578         if (atomic_dec_and_test(&rh->recovery_in_flight))
579                 wake_up_all(&_kmirrord_recovery_stopped);
580 }
581
582 /*
583  * Returns any quiesced regions.
584  */
585 static struct region *rh_recovery_start(struct region_hash *rh)
586 {
587         struct region *reg = NULL;
588
589         spin_lock_irq(&rh->region_lock);
590         if (!list_empty(&rh->quiesced_regions)) {
591                 reg = list_entry(rh->quiesced_regions.next,
592                                  struct region, list);
593                 list_del_init(&reg->list);      /* remove from the quiesced list */
594         }
595         spin_unlock_irq(&rh->region_lock);
596
597         return reg;
598 }
599
600 static void rh_recovery_end(struct region *reg, int success)
601 {
602         struct region_hash *rh = reg->rh;
603
604         spin_lock_irq(&rh->region_lock);
605         if (success)
606                 list_add(&reg->list, &reg->rh->recovered_regions);
607         else {
608                 reg->state = RH_NOSYNC;
609                 list_add(&reg->list, &reg->rh->failed_recovered_regions);
610         }
611         spin_unlock_irq(&rh->region_lock);
612
613         wake(rh->ms);
614 }
615
616 static int rh_flush(struct region_hash *rh)
617 {
618         return rh->log->type->flush(rh->log);
619 }
620
621 static void rh_delay(struct region_hash *rh, struct bio *bio)
622 {
623         struct region *reg;
624
625         read_lock(&rh->hash_lock);
626         reg = __rh_find(rh, bio_to_region(rh, bio));
627         bio_list_add(&reg->delayed_bios, bio);
628         read_unlock(&rh->hash_lock);
629 }
630
631 static void rh_stop_recovery(struct region_hash *rh)
632 {
633         int i;
634
635         /* wait for any recovering regions */
636         for (i = 0; i < MAX_RECOVERY; i++)
637                 down(&rh->recovery_count);
638 }
639
640 static void rh_start_recovery(struct region_hash *rh)
641 {
642         int i;
643
644         for (i = 0; i < MAX_RECOVERY; i++)
645                 up(&rh->recovery_count);
646
647         wake(rh->ms);
648 }
649
650 #define MIN_READ_RECORDS 20
651 struct dm_raid1_read_record {
652         struct mirror *m;
653         struct dm_bio_details details;
654 };
655
656 /*
657  * Every mirror should look like this one.
658  */
659 #define DEFAULT_MIRROR 0
660
661 /*
662  * This is yucky.  We squirrel the mirror struct away inside
663  * bi_next for read/write buffers.  This is safe since the bh
664  * doesn't get submitted to the lower levels of block layer.
665  */
666 static struct mirror *bio_get_m(struct bio *bio)
667 {
668         return (struct mirror *) bio->bi_next;
669 }
670
671 static void bio_set_m(struct bio *bio, struct mirror *m)
672 {
673         bio->bi_next = (struct bio *) m;
674 }
675
676 static struct mirror *get_default_mirror(struct mirror_set *ms)
677 {
678         return &ms->mirror[atomic_read(&ms->default_mirror)];
679 }
680
681 static void set_default_mirror(struct mirror *m)
682 {
683         struct mirror_set *ms = m->ms;
684         struct mirror *m0 = &(ms->mirror[0]);
685
686         atomic_set(&ms->default_mirror, m - m0);
687 }
688
689 /* fail_mirror
690  * @m: mirror device to fail
691  * @error_type: one of the enum's, DM_RAID1_*_ERROR
692  *
693  * If errors are being handled, record the type of
694  * error encountered for this device.  If this type
695  * of error has already been recorded, we can return;
696  * otherwise, we must signal userspace by triggering
697  * an event.  Additionally, if the device is the
698  * primary device, we must choose a new primary, but
699  * only if the mirror is in-sync.
700  *
701  * This function must not block.
702  */
703 static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
704 {
705         struct mirror_set *ms = m->ms;
706         struct mirror *new;
707
708         if (!errors_handled(ms))
709                 return;
710
711         /*
712          * error_count is used for nothing more than a
713          * simple way to tell if a device has encountered
714          * errors.
715          */
716         atomic_inc(&m->error_count);
717
718         if (test_and_set_bit(error_type, &m->error_type))
719                 return;
720
721         if (m != get_default_mirror(ms))
722                 goto out;
723
724         if (!ms->in_sync) {
725                 /*
726                  * Better to issue requests to same failing device
727                  * than to risk returning corrupt data.
728                  */
729                 DMERR("Primary mirror (%s) failed while out-of-sync: "
730                       "Reads may fail.", m->dev->name);
731                 goto out;
732         }
733
734         for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
735                 if (!atomic_read(&new->error_count)) {
736                         set_default_mirror(new);
737                         break;
738                 }
739
740         if (unlikely(new == ms->mirror + ms->nr_mirrors))
741                 DMWARN("All sides of mirror have failed.");
742
743 out:
744         schedule_work(&ms->trigger_event);
745 }
746
747 /*-----------------------------------------------------------------
748  * Recovery.
749  *
750  * When a mirror is first activated we may find that some regions
751  * are in the no-sync state.  We have to recover these by
752  * recopying from the default mirror to all the others.
753  *---------------------------------------------------------------*/
754 static void recovery_complete(int read_err, unsigned long write_err,
755                               void *context)
756 {
757         struct region *reg = (struct region *)context;
758         struct mirror_set *ms = reg->rh->ms;
759         int m, bit = 0;
760
761         if (read_err) {
762                 /* Read error means the failure of default mirror. */
763                 DMERR_LIMIT("Unable to read primary mirror during recovery");
764                 fail_mirror(get_default_mirror(ms), DM_RAID1_SYNC_ERROR);
765         }
766
767         if (write_err) {
768                 DMERR_LIMIT("Write error during recovery (error = 0x%lx)",
769                             write_err);
770                 /*
771                  * Bits correspond to devices (excluding default mirror).
772                  * The default mirror cannot change during recovery.
773                  */
774                 for (m = 0; m < ms->nr_mirrors; m++) {
775                         if (&ms->mirror[m] == get_default_mirror(ms))
776                                 continue;
777                         if (test_bit(bit, &write_err))
778                                 fail_mirror(ms->mirror + m,
779                                             DM_RAID1_SYNC_ERROR);
780                         bit++;
781                 }
782         }
783
784         rh_recovery_end(reg, !(read_err || write_err));
785 }
786
787 static int recover(struct mirror_set *ms, struct region *reg)
788 {
789         int r;
790         unsigned int i;
791         struct dm_io_region from, to[DM_KCOPYD_MAX_REGIONS], *dest;
792         struct mirror *m;
793         unsigned long flags = 0;
794
795         /* fill in the source */
796         m = get_default_mirror(ms);
797         from.bdev = m->dev->bdev;
798         from.sector = m->offset + region_to_sector(reg->rh, reg->key);
799         if (reg->key == (ms->nr_regions - 1)) {
800                 /*
801                  * The final region may be smaller than
802                  * region_size.
803                  */
804                 from.count = ms->ti->len & (reg->rh->region_size - 1);
805                 if (!from.count)
806                         from.count = reg->rh->region_size;
807         } else
808                 from.count = reg->rh->region_size;
809
810         /* fill in the destinations */
811         for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
812                 if (&ms->mirror[i] == get_default_mirror(ms))
813                         continue;
814
815                 m = ms->mirror + i;
816                 dest->bdev = m->dev->bdev;
817                 dest->sector = m->offset + region_to_sector(reg->rh, reg->key);
818                 dest->count = from.count;
819                 dest++;
820         }
821
822         /* hand to kcopyd */
823         set_bit(DM_KCOPYD_IGNORE_ERROR, &flags);
824         r = dm_kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to,
825                            flags, recovery_complete, reg);
826
827         return r;
828 }
829
830 static void do_recovery(struct mirror_set *ms)
831 {
832         int r;
833         struct region *reg;
834         struct dirty_log *log = ms->rh.log;
835
836         /*
837          * Start quiescing some regions.
838          */
839         rh_recovery_prepare(&ms->rh);
840
841         /*
842          * Copy any already quiesced regions.
843          */
844         while ((reg = rh_recovery_start(&ms->rh))) {
845                 r = recover(ms, reg);
846                 if (r)
847                         rh_recovery_end(reg, 0);
848         }
849
850         /*
851          * Update the in sync flag.
852          */
853         if (!ms->in_sync &&
854             (log->type->get_sync_count(log) == ms->nr_regions)) {
855                 /* the sync is complete */
856                 dm_table_event(ms->ti->table);
857                 ms->in_sync = 1;
858         }
859 }
860
861 /*-----------------------------------------------------------------
862  * Reads
863  *---------------------------------------------------------------*/
864 static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
865 {
866         struct mirror *m = get_default_mirror(ms);
867
868         do {
869                 if (likely(!atomic_read(&m->error_count)))
870                         return m;
871
872                 if (m-- == ms->mirror)
873                         m += ms->nr_mirrors;
874         } while (m != get_default_mirror(ms));
875
876         return NULL;
877 }
878
879 static int default_ok(struct mirror *m)
880 {
881         struct mirror *default_mirror = get_default_mirror(m->ms);
882
883         return !atomic_read(&default_mirror->error_count);
884 }
885
886 static int mirror_available(struct mirror_set *ms, struct bio *bio)
887 {
888         region_t region = bio_to_region(&ms->rh, bio);
889
890         if (ms->rh.log->type->in_sync(ms->rh.log, region, 0))
891                 return choose_mirror(ms,  bio->bi_sector) ? 1 : 0;
892
893         return 0;
894 }
895
896 /*
897  * remap a buffer to a particular mirror.
898  */
899 static sector_t map_sector(struct mirror *m, struct bio *bio)
900 {
901         return m->offset + (bio->bi_sector - m->ms->ti->begin);
902 }
903
904 static void map_bio(struct mirror *m, struct bio *bio)
905 {
906         bio->bi_bdev = m->dev->bdev;
907         bio->bi_sector = map_sector(m, bio);
908 }
909
910 static void map_region(struct dm_io_region *io, struct mirror *m,
911                        struct bio *bio)
912 {
913         io->bdev = m->dev->bdev;
914         io->sector = map_sector(m, bio);
915         io->count = bio->bi_size >> 9;
916 }
917
918 /*-----------------------------------------------------------------
919  * Reads
920  *---------------------------------------------------------------*/
921 static void read_callback(unsigned long error, void *context)
922 {
923         struct bio *bio = context;
924         struct mirror *m;
925
926         m = bio_get_m(bio);
927         bio_set_m(bio, NULL);
928
929         if (likely(!error)) {
930                 bio_endio(bio, 0);
931                 return;
932         }
933
934         fail_mirror(m, DM_RAID1_READ_ERROR);
935
936         if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
937                 DMWARN_LIMIT("Read failure on mirror device %s.  "
938                              "Trying alternative device.",
939                              m->dev->name);
940                 queue_bio(m->ms, bio, bio_rw(bio));
941                 return;
942         }
943
944         DMERR_LIMIT("Read failure on mirror device %s.  Failing I/O.",
945                     m->dev->name);
946         bio_endio(bio, -EIO);
947 }
948
949 /* Asynchronous read. */
950 static void read_async_bio(struct mirror *m, struct bio *bio)
951 {
952         struct dm_io_region io;
953         struct dm_io_request io_req = {
954                 .bi_rw = READ,
955                 .mem.type = DM_IO_BVEC,
956                 .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
957                 .notify.fn = read_callback,
958                 .notify.context = bio,
959                 .client = m->ms->io_client,
960         };
961
962         map_region(&io, m, bio);
963         bio_set_m(bio, m);
964         (void) dm_io(&io_req, 1, &io, NULL);
965 }
966
967 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
968 {
969         region_t region;
970         struct bio *bio;
971         struct mirror *m;
972
973         while ((bio = bio_list_pop(reads))) {
974                 region = bio_to_region(&ms->rh, bio);
975                 m = get_default_mirror(ms);
976
977                 /*
978                  * We can only read balance if the region is in sync.
979                  */
980                 if (likely(rh_in_sync(&ms->rh, region, 1)))
981                         m = choose_mirror(ms, bio->bi_sector);
982                 else if (m && atomic_read(&m->error_count))
983                         m = NULL;
984
985                 if (likely(m))
986                         read_async_bio(m, bio);
987                 else
988                         bio_endio(bio, -EIO);
989         }
990 }
991
992 /*-----------------------------------------------------------------
993  * Writes.
994  *
995  * We do different things with the write io depending on the
996  * state of the region that it's in:
997  *
998  * SYNC:        increment pending, use kcopyd to write to *all* mirrors
999  * RECOVERING:  delay the io until recovery completes
1000  * NOSYNC:      increment pending, just write to the default mirror
1001  *---------------------------------------------------------------*/
1002
1003 /* __bio_mark_nosync
1004  * @ms
1005  * @bio
1006  * @done
1007  * @error
1008  *
1009  * The bio was written on some mirror(s) but failed on other mirror(s).
1010  * We can successfully endio the bio but should avoid the region being
1011  * marked clean by setting the state RH_NOSYNC.
1012  *
1013  * This function is _not_ safe in interrupt context!
1014  */
1015 static void __bio_mark_nosync(struct mirror_set *ms,
1016                               struct bio *bio, unsigned done, int error)
1017 {
1018         unsigned long flags;
1019         struct region_hash *rh = &ms->rh;
1020         struct dirty_log *log = ms->rh.log;
1021         struct region *reg;
1022         region_t region = bio_to_region(rh, bio);
1023         int recovering = 0;
1024
1025         /* We must inform the log that the sync count has changed. */
1026         log->type->set_region_sync(log, region, 0);
1027         ms->in_sync = 0;
1028
1029         read_lock(&rh->hash_lock);
1030         reg = __rh_find(rh, region);
1031         read_unlock(&rh->hash_lock);
1032
1033         /* region hash entry should exist because write was in-flight */
1034         BUG_ON(!reg);
1035         BUG_ON(!list_empty(&reg->list));
1036
1037         spin_lock_irqsave(&rh->region_lock, flags);
1038         /*
1039          * Possible cases:
1040          *   1) RH_DIRTY
1041          *   2) RH_NOSYNC: was dirty, other preceeding writes failed
1042          *   3) RH_RECOVERING: flushing pending writes
1043          * Either case, the region should have not been connected to list.
1044          */
1045         recovering = (reg->state == RH_RECOVERING);
1046         reg->state = RH_NOSYNC;
1047         BUG_ON(!list_empty(&reg->list));
1048         spin_unlock_irqrestore(&rh->region_lock, flags);
1049
1050         bio_endio(bio, error);
1051         if (recovering)
1052                 complete_resync_work(reg, 0);
1053 }
1054
1055 static void write_callback(unsigned long error, void *context)
1056 {
1057         unsigned i, ret = 0;
1058         struct bio *bio = (struct bio *) context;
1059         struct mirror_set *ms;
1060         int uptodate = 0;
1061         int should_wake = 0;
1062         unsigned long flags;
1063
1064         ms = bio_get_m(bio)->ms;
1065         bio_set_m(bio, NULL);
1066
1067         /*
1068          * NOTE: We don't decrement the pending count here,
1069          * instead it is done by the targets endio function.
1070          * This way we handle both writes to SYNC and NOSYNC
1071          * regions with the same code.
1072          */
1073         if (likely(!error))
1074                 goto out;
1075
1076         for (i = 0; i < ms->nr_mirrors; i++)
1077                 if (test_bit(i, &error))
1078                         fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
1079                 else
1080                         uptodate = 1;
1081
1082         if (unlikely(!uptodate)) {
1083                 DMERR("All replicated volumes dead, failing I/O");
1084                 /* None of the writes succeeded, fail the I/O. */
1085                 ret = -EIO;
1086         } else if (errors_handled(ms)) {
1087                 /*
1088                  * Need to raise event.  Since raising
1089                  * events can block, we need to do it in
1090                  * the main thread.
1091                  */
1092                 spin_lock_irqsave(&ms->lock, flags);
1093                 if (!ms->failures.head)
1094                         should_wake = 1;
1095                 bio_list_add(&ms->failures, bio);
1096                 spin_unlock_irqrestore(&ms->lock, flags);
1097                 if (should_wake)
1098                         wake(ms);
1099                 return;
1100         }
1101 out:
1102         bio_endio(bio, ret);
1103 }
1104
1105 static void do_write(struct mirror_set *ms, struct bio *bio)
1106 {
1107         unsigned int i;
1108         struct dm_io_region io[ms->nr_mirrors], *dest = io;
1109         struct mirror *m;
1110         struct dm_io_request io_req = {
1111                 .bi_rw = WRITE,
1112                 .mem.type = DM_IO_BVEC,
1113                 .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
1114                 .notify.fn = write_callback,
1115                 .notify.context = bio,
1116                 .client = ms->io_client,
1117         };
1118
1119         for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
1120                 map_region(dest++, m, bio);
1121
1122         /*
1123          * Use default mirror because we only need it to retrieve the reference
1124          * to the mirror set in write_callback().
1125          */
1126         bio_set_m(bio, get_default_mirror(ms));
1127
1128         (void) dm_io(&io_req, ms->nr_mirrors, io, NULL);
1129 }
1130
1131 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
1132 {
1133         int state;
1134         struct bio *bio;
1135         struct bio_list sync, nosync, recover, *this_list = NULL;
1136
1137         if (!writes->head)
1138                 return;
1139
1140         /*
1141          * Classify each write.
1142          */
1143         bio_list_init(&sync);
1144         bio_list_init(&nosync);
1145         bio_list_init(&recover);
1146
1147         while ((bio = bio_list_pop(writes))) {
1148                 state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1);
1149                 switch (state) {
1150                 case RH_CLEAN:
1151                 case RH_DIRTY:
1152                         this_list = &sync;
1153                         break;
1154
1155                 case RH_NOSYNC:
1156                         this_list = &nosync;
1157                         break;
1158
1159                 case RH_RECOVERING:
1160                         this_list = &recover;
1161                         break;
1162                 }
1163
1164                 bio_list_add(this_list, bio);
1165         }
1166
1167         /*
1168          * Increment the pending counts for any regions that will
1169          * be written to (writes to recover regions are going to
1170          * be delayed).
1171          */
1172         rh_inc_pending(&ms->rh, &sync);
1173         rh_inc_pending(&ms->rh, &nosync);
1174         ms->log_failure = rh_flush(&ms->rh) ? 1 : 0;
1175
1176         /*
1177          * Dispatch io.
1178          */
1179         if (unlikely(ms->log_failure)) {
1180                 spin_lock_irq(&ms->lock);
1181                 bio_list_merge(&ms->failures, &sync);
1182                 spin_unlock_irq(&ms->lock);
1183         } else
1184                 while ((bio = bio_list_pop(&sync)))
1185                         do_write(ms, bio);
1186
1187         while ((bio = bio_list_pop(&recover)))
1188                 rh_delay(&ms->rh, bio);
1189
1190         while ((bio = bio_list_pop(&nosync))) {
1191                 map_bio(get_default_mirror(ms), bio);
1192                 generic_make_request(bio);
1193         }
1194 }
1195
1196 static void do_failures(struct mirror_set *ms, struct bio_list *failures)
1197 {
1198         struct bio *bio;
1199
1200         if (!failures->head)
1201                 return;
1202
1203         if (!ms->log_failure) {
1204                 while ((bio = bio_list_pop(failures)))
1205                         __bio_mark_nosync(ms, bio, bio->bi_size, 0);
1206                 return;
1207         }
1208
1209         /*
1210          * If the log has failed, unattempted writes are being
1211          * put on the failures list.  We can't issue those writes
1212          * until a log has been marked, so we must store them.
1213          *
1214          * If a 'noflush' suspend is in progress, we can requeue
1215          * the I/O's to the core.  This give userspace a chance
1216          * to reconfigure the mirror, at which point the core
1217          * will reissue the writes.  If the 'noflush' flag is
1218          * not set, we have no choice but to return errors.
1219          *
1220          * Some writes on the failures list may have been
1221          * submitted before the log failure and represent a
1222          * failure to write to one of the devices.  It is ok
1223          * for us to treat them the same and requeue them
1224          * as well.
1225          */
1226         if (dm_noflush_suspending(ms->ti)) {
1227                 while ((bio = bio_list_pop(failures)))
1228                         bio_endio(bio, DM_ENDIO_REQUEUE);
1229                 return;
1230         }
1231
1232         if (atomic_read(&ms->suspend)) {
1233                 while ((bio = bio_list_pop(failures)))
1234                         bio_endio(bio, -EIO);
1235                 return;
1236         }
1237
1238         spin_lock_irq(&ms->lock);
1239         bio_list_merge(&ms->failures, failures);
1240         spin_unlock_irq(&ms->lock);
1241
1242         wake(ms);
1243 }
1244
1245 static void trigger_event(struct work_struct *work)
1246 {
1247         struct mirror_set *ms =
1248                 container_of(work, struct mirror_set, trigger_event);
1249
1250         dm_table_event(ms->ti->table);
1251 }
1252
1253 /*-----------------------------------------------------------------
1254  * kmirrord
1255  *---------------------------------------------------------------*/
1256 static int _do_mirror(struct work_struct *work)
1257 {
1258         struct mirror_set *ms =container_of(work, struct mirror_set,
1259                                             kmirrord_work);
1260         struct bio_list reads, writes, failures;
1261         unsigned long flags;
1262
1263         spin_lock_irqsave(&ms->lock, flags);
1264         reads = ms->reads;
1265         writes = ms->writes;
1266         failures = ms->failures;
1267         bio_list_init(&ms->reads);
1268         bio_list_init(&ms->writes);
1269         bio_list_init(&ms->failures);
1270         spin_unlock_irqrestore(&ms->lock, flags);
1271
1272         rh_update_states(&ms->rh);
1273         do_recovery(ms);
1274         do_reads(ms, &reads);
1275         do_writes(ms, &writes);
1276         do_failures(ms, &failures);
1277
1278         return (ms->failures.head) ? 1 : 0;
1279 }
1280
1281 static void do_mirror(struct work_struct *work)
1282 {
1283         /*
1284          * If _do_mirror returns 1, we give it
1285          * another shot.  This helps for cases like
1286          * 'suspend' where we call flush_workqueue
1287          * and expect all work to be finished.  If
1288          * a failure happens during a suspend, we
1289          * couldn't issue a 'wake' because it would
1290          * not be honored.  Therefore, we return '1'
1291          * from _do_mirror, and retry here.
1292          */
1293         while (_do_mirror(work))
1294                 schedule();
1295 }
1296
1297
1298 /*-----------------------------------------------------------------
1299  * Target functions
1300  *---------------------------------------------------------------*/
1301 static struct mirror_set *alloc_context(unsigned int nr_mirrors,
1302                                         uint32_t region_size,
1303                                         struct dm_target *ti,
1304                                         struct dirty_log *dl)
1305 {
1306         size_t len;
1307         struct mirror_set *ms = NULL;
1308
1309         if (array_too_big(sizeof(*ms), sizeof(ms->mirror[0]), nr_mirrors))
1310                 return NULL;
1311
1312         len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
1313
1314         ms = kzalloc(len, GFP_KERNEL);
1315         if (!ms) {
1316                 ti->error = "Cannot allocate mirror context";
1317                 return NULL;
1318         }
1319
1320         spin_lock_init(&ms->lock);
1321
1322         ms->ti = ti;
1323         ms->nr_mirrors = nr_mirrors;
1324         ms->nr_regions = dm_sector_div_up(ti->len, region_size);
1325         ms->in_sync = 0;
1326         ms->log_failure = 0;
1327         atomic_set(&ms->suspend, 0);
1328         atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
1329
1330         len = sizeof(struct dm_raid1_read_record);
1331         ms->read_record_pool = mempool_create_kmalloc_pool(MIN_READ_RECORDS,
1332                                                            len);
1333         if (!ms->read_record_pool) {
1334                 ti->error = "Error creating mirror read_record_pool";
1335                 kfree(ms);
1336                 return NULL;
1337         }
1338
1339         ms->io_client = dm_io_client_create(DM_IO_PAGES);
1340         if (IS_ERR(ms->io_client)) {
1341                 ti->error = "Error creating dm_io client";
1342                 mempool_destroy(ms->read_record_pool);
1343                 kfree(ms);
1344                 return NULL;
1345         }
1346
1347         if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
1348                 ti->error = "Error creating dirty region hash";
1349                 dm_io_client_destroy(ms->io_client);
1350                 mempool_destroy(ms->read_record_pool);
1351                 kfree(ms);
1352                 return NULL;
1353         }
1354
1355         return ms;
1356 }
1357
1358 static void free_context(struct mirror_set *ms, struct dm_target *ti,
1359                          unsigned int m)
1360 {
1361         while (m--)
1362                 dm_put_device(ti, ms->mirror[m].dev);
1363
1364         dm_io_client_destroy(ms->io_client);
1365         rh_exit(&ms->rh);
1366         mempool_destroy(ms->read_record_pool);
1367         kfree(ms);
1368 }
1369
1370 static inline int _check_region_size(struct dm_target *ti, uint32_t size)
1371 {
1372         return !(size % (PAGE_SIZE >> 9) || !is_power_of_2(size) ||
1373                  size > ti->len);
1374 }
1375
1376 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
1377                       unsigned int mirror, char **argv)
1378 {
1379         unsigned long long offset;
1380
1381         if (sscanf(argv[1], "%llu", &offset) != 1) {
1382                 ti->error = "Invalid offset";
1383                 return -EINVAL;
1384         }
1385
1386         if (dm_get_device(ti, argv[0], offset, ti->len,
1387                           dm_table_get_mode(ti->table),
1388                           &ms->mirror[mirror].dev)) {
1389                 ti->error = "Device lookup failure";
1390                 return -ENXIO;
1391         }
1392
1393         ms->mirror[mirror].ms = ms;
1394         atomic_set(&(ms->mirror[mirror].error_count), 0);
1395         ms->mirror[mirror].error_type = 0;
1396         ms->mirror[mirror].offset = offset;
1397
1398         return 0;
1399 }
1400
1401 /*
1402  * Create dirty log: log_type #log_params <log_params>
1403  */
1404 static struct dirty_log *create_dirty_log(struct dm_target *ti,
1405                                           unsigned int argc, char **argv,
1406                                           unsigned int *args_used)
1407 {
1408         unsigned int param_count;
1409         struct dirty_log *dl;
1410
1411         if (argc < 2) {
1412                 ti->error = "Insufficient mirror log arguments";
1413                 return NULL;
1414         }
1415
1416         if (sscanf(argv[1], "%u", &param_count) != 1) {
1417                 ti->error = "Invalid mirror log argument count";
1418                 return NULL;
1419         }
1420
1421         *args_used = 2 + param_count;
1422
1423         if (argc < *args_used) {
1424                 ti->error = "Insufficient mirror log arguments";
1425                 return NULL;
1426         }
1427
1428         dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
1429         if (!dl) {
1430                 ti->error = "Error creating mirror dirty log";
1431                 return NULL;
1432         }
1433
1434         if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
1435                 ti->error = "Invalid region size";
1436                 dm_destroy_dirty_log(dl);
1437                 return NULL;
1438         }
1439
1440         return dl;
1441 }
1442
1443 static int parse_features(struct mirror_set *ms, unsigned argc, char **argv,
1444                           unsigned *args_used)
1445 {
1446         unsigned num_features;
1447         struct dm_target *ti = ms->ti;
1448
1449         *args_used = 0;
1450
1451         if (!argc)
1452                 return 0;
1453
1454         if (sscanf(argv[0], "%u", &num_features) != 1) {
1455                 ti->error = "Invalid number of features";
1456                 return -EINVAL;
1457         }
1458
1459         argc--;
1460         argv++;
1461         (*args_used)++;
1462
1463         if (num_features > argc) {
1464                 ti->error = "Not enough arguments to support feature count";
1465                 return -EINVAL;
1466         }
1467
1468         if (!strcmp("handle_errors", argv[0]))
1469                 ms->features |= DM_RAID1_HANDLE_ERRORS;
1470         else {
1471                 ti->error = "Unrecognised feature requested";
1472                 return -EINVAL;
1473         }
1474
1475         (*args_used)++;
1476
1477         return 0;
1478 }
1479
1480 /*
1481  * Construct a mirror mapping:
1482  *
1483  * log_type #log_params <log_params>
1484  * #mirrors [mirror_path offset]{2,}
1485  * [#features <features>]
1486  *
1487  * log_type is "core" or "disk"
1488  * #log_params is between 1 and 3
1489  *
1490  * If present, features must be "handle_errors".
1491  */
1492 static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1493 {
1494         int r;
1495         unsigned int nr_mirrors, m, args_used;
1496         struct mirror_set *ms;
1497         struct dirty_log *dl;
1498
1499         dl = create_dirty_log(ti, argc, argv, &args_used);
1500         if (!dl)
1501                 return -EINVAL;
1502
1503         argv += args_used;
1504         argc -= args_used;
1505
1506         if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
1507             nr_mirrors < 2 || nr_mirrors > DM_KCOPYD_MAX_REGIONS + 1) {
1508                 ti->error = "Invalid number of mirrors";
1509                 dm_destroy_dirty_log(dl);
1510                 return -EINVAL;
1511         }
1512
1513         argv++, argc--;
1514
1515         if (argc < nr_mirrors * 2) {
1516                 ti->error = "Too few mirror arguments";
1517                 dm_destroy_dirty_log(dl);
1518                 return -EINVAL;
1519         }
1520
1521         ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1522         if (!ms) {
1523                 dm_destroy_dirty_log(dl);
1524                 return -ENOMEM;
1525         }
1526
1527         /* Get the mirror parameter sets */
1528         for (m = 0; m < nr_mirrors; m++) {
1529                 r = get_mirror(ms, ti, m, argv);
1530                 if (r) {
1531                         free_context(ms, ti, m);
1532                         return r;
1533                 }
1534                 argv += 2;
1535                 argc -= 2;
1536         }
1537
1538         ti->private = ms;
1539         ti->split_io = ms->rh.region_size;
1540
1541         ms->kmirrord_wq = create_singlethread_workqueue("kmirrord");
1542         if (!ms->kmirrord_wq) {
1543                 DMERR("couldn't start kmirrord");
1544                 r = -ENOMEM;
1545                 goto err_free_context;
1546         }
1547         INIT_WORK(&ms->kmirrord_work, do_mirror);
1548         INIT_WORK(&ms->trigger_event, trigger_event);
1549
1550         r = parse_features(ms, argc, argv, &args_used);
1551         if (r)
1552                 goto err_destroy_wq;
1553
1554         argv += args_used;
1555         argc -= args_used;
1556
1557         /*
1558          * Any read-balancing addition depends on the
1559          * DM_RAID1_HANDLE_ERRORS flag being present.
1560          * This is because the decision to balance depends
1561          * on the sync state of a region.  If the above
1562          * flag is not present, we ignore errors; and
1563          * the sync state may be inaccurate.
1564          */
1565
1566         if (argc) {
1567                 ti->error = "Too many mirror arguments";
1568                 r = -EINVAL;
1569                 goto err_destroy_wq;
1570         }
1571
1572         r = dm_kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
1573         if (r)
1574                 goto err_destroy_wq;
1575
1576         wake(ms);
1577         return 0;
1578
1579 err_destroy_wq:
1580         destroy_workqueue(ms->kmirrord_wq);
1581 err_free_context:
1582         free_context(ms, ti, ms->nr_mirrors);
1583         return r;
1584 }
1585
1586 static void mirror_dtr(struct dm_target *ti)
1587 {
1588         struct mirror_set *ms = (struct mirror_set *) ti->private;
1589
1590         flush_workqueue(ms->kmirrord_wq);
1591         dm_kcopyd_client_destroy(ms->kcopyd_client);
1592         destroy_workqueue(ms->kmirrord_wq);
1593         free_context(ms, ti, ms->nr_mirrors);
1594 }
1595
1596 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1597 {
1598         unsigned long flags;
1599         int should_wake = 0;
1600         struct bio_list *bl;
1601
1602         bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1603         spin_lock_irqsave(&ms->lock, flags);
1604         should_wake = !(bl->head);
1605         bio_list_add(bl, bio);
1606         spin_unlock_irqrestore(&ms->lock, flags);
1607
1608         if (should_wake)
1609                 wake(ms);
1610 }
1611
1612 /*
1613  * Mirror mapping function
1614  */
1615 static int mirror_map(struct dm_target *ti, struct bio *bio,
1616                       union map_info *map_context)
1617 {
1618         int r, rw = bio_rw(bio);
1619         struct mirror *m;
1620         struct mirror_set *ms = ti->private;
1621         struct dm_raid1_read_record *read_record = NULL;
1622
1623         if (rw == WRITE) {
1624                 /* Save region for mirror_end_io() handler */
1625                 map_context->ll = bio_to_region(&ms->rh, bio);
1626                 queue_bio(ms, bio, rw);
1627                 return DM_MAPIO_SUBMITTED;
1628         }
1629
1630         r = ms->rh.log->type->in_sync(ms->rh.log,
1631                                       bio_to_region(&ms->rh, bio), 0);
1632         if (r < 0 && r != -EWOULDBLOCK)
1633                 return r;
1634
1635         /*
1636          * If region is not in-sync queue the bio.
1637          */
1638         if (!r || (r == -EWOULDBLOCK)) {
1639                 if (rw == READA)
1640                         return -EWOULDBLOCK;
1641
1642                 queue_bio(ms, bio, rw);
1643                 return DM_MAPIO_SUBMITTED;
1644         }
1645
1646         /*
1647          * The region is in-sync and we can perform reads directly.
1648          * Store enough information so we can retry if it fails.
1649          */
1650         m = choose_mirror(ms, bio->bi_sector);
1651         if (unlikely(!m))
1652                 return -EIO;
1653
1654         read_record = mempool_alloc(ms->read_record_pool, GFP_NOIO);
1655         if (likely(read_record)) {
1656                 dm_bio_record(&read_record->details, bio);
1657                 map_context->ptr = read_record;
1658                 read_record->m = m;
1659         }
1660
1661         map_bio(m, bio);
1662
1663         return DM_MAPIO_REMAPPED;
1664 }
1665
1666 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1667                          int error, union map_info *map_context)
1668 {
1669         int rw = bio_rw(bio);
1670         struct mirror_set *ms = (struct mirror_set *) ti->private;
1671         struct mirror *m = NULL;
1672         struct dm_bio_details *bd = NULL;
1673         struct dm_raid1_read_record *read_record = map_context->ptr;
1674
1675         /*
1676          * We need to dec pending if this was a write.
1677          */
1678         if (rw == WRITE) {
1679                 rh_dec(&ms->rh, map_context->ll);
1680                 return error;
1681         }
1682
1683         if (error == -EOPNOTSUPP)
1684                 goto out;
1685
1686         if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
1687                 goto out;
1688
1689         if (unlikely(error)) {
1690                 if (!read_record) {
1691                         /*
1692                          * There wasn't enough memory to record necessary
1693                          * information for a retry or there was no other
1694                          * mirror in-sync.
1695                          */
1696                         DMERR_LIMIT("Mirror read failed.");
1697                         return -EIO;
1698                 }
1699
1700                 m = read_record->m;
1701
1702                 DMERR("Mirror read failed from %s. Trying alternative device.",
1703                       m->dev->name);
1704
1705                 fail_mirror(m, DM_RAID1_READ_ERROR);
1706
1707                 /*
1708                  * A failed read is requeued for another attempt using an intact
1709                  * mirror.
1710                  */
1711                 if (default_ok(m) || mirror_available(ms, bio)) {
1712                         bd = &read_record->details;
1713
1714                         dm_bio_restore(bd, bio);
1715                         mempool_free(read_record, ms->read_record_pool);
1716                         map_context->ptr = NULL;
1717                         queue_bio(ms, bio, rw);
1718                         return 1;
1719                 }
1720                 DMERR("All replicated volumes dead, failing I/O");
1721         }
1722
1723 out:
1724         if (read_record) {
1725                 mempool_free(read_record, ms->read_record_pool);
1726                 map_context->ptr = NULL;
1727         }
1728
1729         return error;
1730 }
1731
1732 static void mirror_presuspend(struct dm_target *ti)
1733 {
1734         struct mirror_set *ms = (struct mirror_set *) ti->private;
1735         struct dirty_log *log = ms->rh.log;
1736
1737         atomic_set(&ms->suspend, 1);
1738
1739         /*
1740          * We must finish up all the work that we've
1741          * generated (i.e. recovery work).
1742          */
1743         rh_stop_recovery(&ms->rh);
1744
1745         wait_event(_kmirrord_recovery_stopped,
1746                    !atomic_read(&ms->rh.recovery_in_flight));
1747
1748         if (log->type->presuspend && log->type->presuspend(log))
1749                 /* FIXME: need better error handling */
1750                 DMWARN("log presuspend failed");
1751
1752         /*
1753          * Now that recovery is complete/stopped and the
1754          * delayed bios are queued, we need to wait for
1755          * the worker thread to complete.  This way,
1756          * we know that all of our I/O has been pushed.
1757          */
1758         flush_workqueue(ms->kmirrord_wq);
1759 }
1760
1761 static void mirror_postsuspend(struct dm_target *ti)
1762 {
1763         struct mirror_set *ms = ti->private;
1764         struct dirty_log *log = ms->rh.log;
1765
1766         if (log->type->postsuspend && log->type->postsuspend(log))
1767                 /* FIXME: need better error handling */
1768                 DMWARN("log postsuspend failed");
1769 }
1770
1771 static void mirror_resume(struct dm_target *ti)
1772 {
1773         struct mirror_set *ms = ti->private;
1774         struct dirty_log *log = ms->rh.log;
1775
1776         atomic_set(&ms->suspend, 0);
1777         if (log->type->resume && log->type->resume(log))
1778                 /* FIXME: need better error handling */
1779                 DMWARN("log resume failed");
1780         rh_start_recovery(&ms->rh);
1781 }
1782
1783 /*
1784  * device_status_char
1785  * @m: mirror device/leg we want the status of
1786  *
1787  * We return one character representing the most severe error
1788  * we have encountered.
1789  *    A => Alive - No failures
1790  *    D => Dead - A write failure occurred leaving mirror out-of-sync
1791  *    S => Sync - A sychronization failure occurred, mirror out-of-sync
1792  *    R => Read - A read failure occurred, mirror data unaffected
1793  *
1794  * Returns: <char>
1795  */
1796 static char device_status_char(struct mirror *m)
1797 {
1798         if (!atomic_read(&(m->error_count)))
1799                 return 'A';
1800
1801         return (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
1802                 (test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
1803                 (test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
1804 }
1805
1806
1807 static int mirror_status(struct dm_target *ti, status_type_t type,
1808                          char *result, unsigned int maxlen)
1809 {
1810         unsigned int m, sz = 0;
1811         struct mirror_set *ms = (struct mirror_set *) ti->private;
1812         struct dirty_log *log = ms->rh.log;
1813         char buffer[ms->nr_mirrors + 1];
1814
1815         switch (type) {
1816         case STATUSTYPE_INFO:
1817                 DMEMIT("%d ", ms->nr_mirrors);
1818                 for (m = 0; m < ms->nr_mirrors; m++) {
1819                         DMEMIT("%s ", ms->mirror[m].dev->name);
1820                         buffer[m] = device_status_char(&(ms->mirror[m]));
1821                 }
1822                 buffer[m] = '\0';
1823
1824                 DMEMIT("%llu/%llu 1 %s ",
1825                       (unsigned long long)log->type->get_sync_count(ms->rh.log),
1826                       (unsigned long long)ms->nr_regions, buffer);
1827
1828                 sz += log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
1829
1830                 break;
1831
1832         case STATUSTYPE_TABLE:
1833                 sz = log->type->status(ms->rh.log, type, result, maxlen);
1834
1835                 DMEMIT("%d", ms->nr_mirrors);
1836                 for (m = 0; m < ms->nr_mirrors; m++)
1837                         DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1838                                (unsigned long long)ms->mirror[m].offset);
1839
1840                 if (ms->features & DM_RAID1_HANDLE_ERRORS)
1841                         DMEMIT(" 1 handle_errors");
1842         }
1843
1844         return 0;
1845 }
1846
1847 static struct target_type mirror_target = {
1848         .name    = "mirror",
1849         .version = {1, 0, 20},
1850         .module  = THIS_MODULE,
1851         .ctr     = mirror_ctr,
1852         .dtr     = mirror_dtr,
1853         .map     = mirror_map,
1854         .end_io  = mirror_end_io,
1855         .presuspend = mirror_presuspend,
1856         .postsuspend = mirror_postsuspend,
1857         .resume  = mirror_resume,
1858         .status  = mirror_status,
1859 };
1860
1861 static int __init dm_mirror_init(void)
1862 {
1863         int r;
1864
1865         r = dm_register_target(&mirror_target);
1866         if (r < 0)
1867                 DMERR("Failed to register mirror target");
1868
1869         return r;
1870 }
1871
1872 static void __exit dm_mirror_exit(void)
1873 {
1874         int r;
1875
1876         r = dm_unregister_target(&mirror_target);
1877         if (r < 0)
1878                 DMERR("unregister failed %d", r);
1879 }
1880
1881 /* Module hooks */
1882 module_init(dm_mirror_init);
1883 module_exit(dm_mirror_exit);
1884
1885 MODULE_DESCRIPTION(DM_NAME " mirror target");
1886 MODULE_AUTHOR("Joe Thornber");
1887 MODULE_LICENSE("GPL");