nfsd4: shutdown callbacks on expiry
[safe/jmp/linux-2.6] / drivers / md / dm-raid1.c
index 4d6bc10..ddda531 100644 (file)
@@ -5,7 +5,6 @@
  * This file is released under the GPL.
  */
 
-#include "dm-bio-list.h"
 #include "dm-bio-record.h"
 
 #include <linux/init.h>
@@ -36,6 +35,7 @@ static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
  *---------------------------------------------------------------*/
 enum dm_raid1_error {
        DM_RAID1_WRITE_ERROR,
+       DM_RAID1_FLUSH_ERROR,
        DM_RAID1_SYNC_ERROR,
        DM_RAID1_READ_ERROR
 };
@@ -58,6 +58,7 @@ struct mirror_set {
        struct bio_list reads;
        struct bio_list writes;
        struct bio_list failures;
+       struct bio_list holds;  /* bios are waiting until suspend */
 
        struct dm_region_hash *rh;
        struct dm_kcopyd_client *kcopyd_client;
@@ -68,6 +69,7 @@ struct mirror_set {
        region_t nr_regions;
        int in_sync;
        int log_failure;
+       int leg_failure;
        atomic_t suspend;
 
        atomic_t default_mirror;        /* Default mirror */
@@ -145,6 +147,8 @@ struct dm_raid1_read_record {
        struct dm_bio_details details;
 };
 
+static struct kmem_cache *_dm_raid1_read_record_cache;
+
 /*
  * Every mirror should look like this one.
  */
@@ -178,6 +182,17 @@ static void set_default_mirror(struct mirror *m)
        atomic_set(&ms->default_mirror, m - m0);
 }
 
+static struct mirror *get_valid_mirror(struct mirror_set *ms)
+{
+       struct mirror *m;
+
+       for (m = ms->mirror; m < ms->mirror + ms->nr_mirrors; m++)
+               if (!atomic_read(&m->error_count))
+                       return m;
+
+       return NULL;
+}
+
 /* fail_mirror
  * @m: mirror device to fail
  * @error_type: one of the enum's, DM_RAID1_*_ERROR
@@ -197,6 +212,8 @@ static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
        struct mirror_set *ms = m->ms;
        struct mirror *new;
 
+       ms->leg_failure = 1;
+
        /*
         * error_count is used for nothing more than a
         * simple way to tell if a device has encountered
@@ -223,19 +240,50 @@ static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
                goto out;
        }
 
-       for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
-               if (!atomic_read(&new->error_count)) {
-                       set_default_mirror(new);
-                       break;
-               }
-
-       if (unlikely(new == ms->mirror + ms->nr_mirrors))
+       new = get_valid_mirror(ms);
+       if (new)
+               set_default_mirror(new);
+       else
                DMWARN("All sides of mirror have failed.");
 
 out:
        schedule_work(&ms->trigger_event);
 }
 
+static int mirror_flush(struct dm_target *ti)
+{
+       struct mirror_set *ms = ti->private;
+       unsigned long error_bits;
+
+       unsigned int i;
+       struct dm_io_region io[ms->nr_mirrors];
+       struct mirror *m;
+       struct dm_io_request io_req = {
+               .bi_rw = WRITE_BARRIER,
+               .mem.type = DM_IO_KMEM,
+               .mem.ptr.bvec = NULL,
+               .client = ms->io_client,
+       };
+
+       for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++) {
+               io[i].bdev = m->dev->bdev;
+               io[i].sector = 0;
+               io[i].count = 0;
+       }
+
+       error_bits = -1;
+       dm_io(&io_req, ms->nr_mirrors, io, &error_bits);
+       if (unlikely(error_bits != 0)) {
+               for (i = 0; i < ms->nr_mirrors; i++)
+                       if (test_bit(i, &error_bits))
+                               fail_mirror(ms->mirror + i,
+                                           DM_RAID1_FLUSH_ERROR);
+               return -EIO;
+       }
+
+       return 0;
+}
+
 /*-----------------------------------------------------------------
  * Recovery.
  *
@@ -395,6 +443,8 @@ static int mirror_available(struct mirror_set *ms, struct bio *bio)
  */
 static sector_t map_sector(struct mirror *m, struct bio *bio)
 {
+       if (unlikely(!bio->bi_size))
+               return 0;
        return m->offset + (bio->bi_sector - m->ms->ti->begin);
 }
 
@@ -412,6 +462,34 @@ static void map_region(struct dm_io_region *io, struct mirror *m,
        io->count = bio->bi_size >> 9;
 }
 
+static void hold_bio(struct mirror_set *ms, struct bio *bio)
+{
+       /*
+        * Lock is required to avoid race condition during suspend
+        * process.
+        */
+       spin_lock_irq(&ms->lock);
+
+       if (atomic_read(&ms->suspend)) {
+               spin_unlock_irq(&ms->lock);
+
+               /*
+                * If device is suspended, complete the bio.
+                */
+               if (dm_noflush_suspending(ms->ti))
+                       bio_endio(bio, DM_ENDIO_REQUEUE);
+               else
+                       bio_endio(bio, -EIO);
+               return;
+       }
+
+       /*
+        * Hold bio until the suspend is complete.
+        */
+       bio_list_add(&ms->holds, bio);
+       spin_unlock_irq(&ms->lock);
+}
+
 /*-----------------------------------------------------------------
  * Reads
  *---------------------------------------------------------------*/
@@ -510,7 +588,6 @@ static void write_callback(unsigned long error, void *context)
        unsigned i, ret = 0;
        struct bio *bio = (struct bio *) context;
        struct mirror_set *ms;
-       int uptodate = 0;
        int should_wake = 0;
        unsigned long flags;
 
@@ -523,36 +600,27 @@ static void write_callback(unsigned long error, void *context)
         * This way we handle both writes to SYNC and NOSYNC
         * regions with the same code.
         */
-       if (likely(!error))
-               goto out;
+       if (likely(!error)) {
+               bio_endio(bio, ret);
+               return;
+       }
 
        for (i = 0; i < ms->nr_mirrors; i++)
                if (test_bit(i, &error))
                        fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
-               else
-                       uptodate = 1;
 
-       if (unlikely(!uptodate)) {
-               DMERR("All replicated volumes dead, failing I/O");
-               /* None of the writes succeeded, fail the I/O. */
-               ret = -EIO;
-       } else if (errors_handled(ms)) {
-               /*
-                * Need to raise event.  Since raising
-                * events can block, we need to do it in
-                * the main thread.
-                */
-               spin_lock_irqsave(&ms->lock, flags);
-               if (!ms->failures.head)
-                       should_wake = 1;
-               bio_list_add(&ms->failures, bio);
-               spin_unlock_irqrestore(&ms->lock, flags);
-               if (should_wake)
-                       wakeup_mirrord(ms);
-               return;
-       }
-out:
-       bio_endio(bio, ret);
+       /*
+        * Need to raise event.  Since raising
+        * events can block, we need to do it in
+        * the main thread.
+        */
+       spin_lock_irqsave(&ms->lock, flags);
+       if (!ms->failures.head)
+               should_wake = 1;
+       bio_list_add(&ms->failures, bio);
+       spin_unlock_irqrestore(&ms->lock, flags);
+       if (should_wake)
+               wakeup_mirrord(ms);
 }
 
 static void do_write(struct mirror_set *ms, struct bio *bio)
@@ -561,7 +629,7 @@ static void do_write(struct mirror_set *ms, struct bio *bio)
        struct dm_io_region io[ms->nr_mirrors], *dest = io;
        struct mirror *m;
        struct dm_io_request io_req = {
-               .bi_rw = WRITE,
+               .bi_rw = WRITE | (bio->bi_rw & WRITE_BARRIER),
                .mem.type = DM_IO_BVEC,
                .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
                .notify.fn = write_callback,
@@ -586,6 +654,9 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
        int state;
        struct bio *bio;
        struct bio_list sync, nosync, recover, *this_list = NULL;
+       struct bio_list requeue;
+       struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
+       region_t region;
 
        if (!writes->head)
                return;
@@ -596,10 +667,23 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
        bio_list_init(&sync);
        bio_list_init(&nosync);
        bio_list_init(&recover);
+       bio_list_init(&requeue);
 
        while ((bio = bio_list_pop(writes))) {
-               state = dm_rh_get_state(ms->rh,
-                                       dm_rh_bio_to_region(ms->rh, bio), 1);
+               if (unlikely(bio_empty_barrier(bio))) {
+                       bio_list_add(&sync, bio);
+                       continue;
+               }
+
+               region = dm_rh_bio_to_region(ms->rh, bio);
+
+               if (log->type->is_remote_recovering &&
+                   log->type->is_remote_recovering(log, region)) {
+                       bio_list_add(&requeue, bio);
+                       continue;
+               }
+
+               state = dm_rh_get_state(ms->rh, region, 1);
                switch (state) {
                case DM_RH_CLEAN:
                case DM_RH_DIRTY:
@@ -619,18 +703,35 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
        }
 
        /*
+        * Add bios that are delayed due to remote recovery
+        * back on to the write queue
+        */
+       if (unlikely(requeue.head)) {
+               spin_lock_irq(&ms->lock);
+               bio_list_merge(&ms->writes, &requeue);
+               spin_unlock_irq(&ms->lock);
+               delayed_wake(ms);
+       }
+
+       /*
         * Increment the pending counts for any regions that will
         * be written to (writes to recover regions are going to
         * be delayed).
         */
        dm_rh_inc_pending(ms->rh, &sync);
        dm_rh_inc_pending(ms->rh, &nosync);
-       ms->log_failure = dm_rh_flush(ms->rh) ? 1 : 0;
+
+       /*
+        * If the flush fails on a previous call and succeeds here,
+        * we must not reset the log_failure variable.  We need
+        * userspace interaction to do that.
+        */
+       ms->log_failure = dm_rh_flush(ms->rh) ? 1 : ms->log_failure;
 
        /*
         * Dispatch io.
         */
-       if (unlikely(ms->log_failure)) {
+       if (unlikely(ms->log_failure) && errors_handled(ms)) {
                spin_lock_irq(&ms->lock);
                bio_list_merge(&ms->failures, &sync);
                spin_unlock_irq(&ms->lock);
@@ -643,8 +744,15 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
                dm_rh_delay(ms->rh, bio);
 
        while ((bio = bio_list_pop(&nosync))) {
-               map_bio(get_default_mirror(ms), bio);
-               generic_make_request(bio);
+               if (unlikely(ms->leg_failure) && errors_handled(ms)) {
+                       spin_lock_irq(&ms->lock);
+                       bio_list_add(&ms->failures, bio);
+                       spin_unlock_irq(&ms->lock);
+                       wakeup_mirrord(ms);
+               } else {
+                       map_bio(get_default_mirror(ms), bio);
+                       generic_make_request(bio);
+               }
        }
 }
 
@@ -652,20 +760,12 @@ static void do_failures(struct mirror_set *ms, struct bio_list *failures)
 {
        struct bio *bio;
 
-       if (!failures->head)
+       if (likely(!failures->head))
                return;
 
-       if (!ms->log_failure) {
-               while ((bio = bio_list_pop(failures))) {
-                       ms->in_sync = 0;
-                       dm_rh_mark_nosync(ms->rh, bio, bio->bi_size, 0);
-               }
-               return;
-       }
-
        /*
         * If the log has failed, unattempted writes are being
-        * put on the failures list.  We can't issue those writes
+        * put on the holds list.  We can't issue those writes
         * until a log has been marked, so we must store them.
         *
         * If a 'noflush' suspend is in progress, we can requeue
@@ -680,23 +780,27 @@ static void do_failures(struct mirror_set *ms, struct bio_list *failures)
         * for us to treat them the same and requeue them
         * as well.
         */
-       if (dm_noflush_suspending(ms->ti)) {
-               while ((bio = bio_list_pop(failures)))
-                       bio_endio(bio, DM_ENDIO_REQUEUE);
-               return;
-       }
+       while ((bio = bio_list_pop(failures))) {
+               if (!ms->log_failure) {
+                       ms->in_sync = 0;
+                       dm_rh_mark_nosync(ms->rh, bio);
+               }
 
-       if (atomic_read(&ms->suspend)) {
-               while ((bio = bio_list_pop(failures)))
+               /*
+                * If all the legs are dead, fail the I/O.
+                * If we have been told to handle errors, hold the bio
+                * and wait for userspace to deal with the problem.
+                * Otherwise pretend that the I/O succeeded. (This would
+                * be wrong if the failed leg returned after reboot and
+                * got replicated back to the good legs.)
+                */
+               if (!get_valid_mirror(ms))
                        bio_endio(bio, -EIO);
-               return;
+               else if (errors_handled(ms))
+                       hold_bio(ms, bio);
+               else
+                       bio_endio(bio, 0);
        }
-
-       spin_lock_irq(&ms->lock);
-       bio_list_merge(&ms->failures, failures);
-       spin_unlock_irq(&ms->lock);
-
-       delayed_wake(ms);
 }
 
 static void trigger_event(struct work_struct *work)
@@ -755,18 +859,23 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
        }
 
        spin_lock_init(&ms->lock);
+       bio_list_init(&ms->reads);
+       bio_list_init(&ms->writes);
+       bio_list_init(&ms->failures);
+       bio_list_init(&ms->holds);
 
        ms->ti = ti;
        ms->nr_mirrors = nr_mirrors;
        ms->nr_regions = dm_sector_div_up(ti->len, region_size);
        ms->in_sync = 0;
        ms->log_failure = 0;
+       ms->leg_failure = 0;
        atomic_set(&ms->suspend, 0);
        atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
 
-       len = sizeof(struct dm_raid1_read_record);
-       ms->read_record_pool = mempool_create_kmalloc_pool(MIN_READ_RECORDS,
-                                                          len);
+       ms->read_record_pool = mempool_create_slab_pool(MIN_READ_RECORDS,
+                                               _dm_raid1_read_record_cache);
+
        if (!ms->read_record_pool) {
                ti->error = "Error creating mirror read_record_pool";
                kfree(ms);
@@ -818,8 +927,7 @@ static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
                return -EINVAL;
        }
 
-       if (dm_get_device(ti, argv[0], offset, ti->len,
-                         dm_table_get_mode(ti->table),
+       if (dm_get_device(ti, argv[0], dm_table_get_mode(ti->table),
                          &ms->mirror[mirror].dev)) {
                ti->error = "Device lookup failure";
                return -ENXIO;
@@ -860,7 +968,8 @@ static struct dm_dirty_log *create_dirty_log(struct dm_target *ti,
                return NULL;
        }
 
-       dl = dm_dirty_log_create(argv[0], ti, param_count, argv + 2);
+       dl = dm_dirty_log_create(argv[0], ti, mirror_flush, param_count,
+                                argv + 2);
        if (!dl) {
                ti->error = "Error creating mirror dirty log";
                return NULL;
@@ -966,6 +1075,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 
        ti->private = ms;
        ti->split_io = dm_rh_get_region_size(ms->rh);
+       ti->num_flush_requests = 1;
 
        ms->kmirrord_wq = create_singlethread_workqueue("kmirrord");
        if (!ms->kmirrord_wq) {
@@ -1093,14 +1203,15 @@ static int mirror_end_io(struct dm_target *ti, struct bio *bio,
         * We need to dec pending if this was a write.
         */
        if (rw == WRITE) {
-               dm_rh_dec(ms->rh, map_context->ll);
+               if (likely(!bio_empty_barrier(bio)))
+                       dm_rh_dec(ms->rh, map_context->ll);
                return error;
        }
 
        if (error == -EOPNOTSUPP)
                goto out;
 
-       if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
+       if ((error == -EWOULDBLOCK) && bio_rw_flagged(bio, BIO_RW_AHEAD))
                goto out;
 
        if (unlikely(error)) {
@@ -1151,9 +1262,26 @@ static void mirror_presuspend(struct dm_target *ti)
        struct mirror_set *ms = (struct mirror_set *) ti->private;
        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
 
+       struct bio_list holds;
+       struct bio *bio;
+
        atomic_set(&ms->suspend, 1);
 
        /*
+        * Process bios in the hold list to start recovery waiting
+        * for bios in the hold list. After the process, no bio has
+        * a chance to be added in the hold list because ms->suspend
+        * is set.
+        */
+       spin_lock_irq(&ms->lock);
+       holds = ms->holds;
+       bio_list_init(&ms->holds);
+       spin_unlock_irq(&ms->lock);
+
+       while ((bio = bio_list_pop(&holds)))
+               hold_bio(ms, bio);
+
+       /*
         * We must finish up all the work that we've
         * generated (i.e. recovery work).
         */
@@ -1215,7 +1343,8 @@ static char device_status_char(struct mirror *m)
        if (!atomic_read(&(m->error_count)))
                return 'A';
 
-       return (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
+       return (test_bit(DM_RAID1_FLUSH_ERROR, &(m->error_type))) ? 'F' :
+               (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
                (test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
                (test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
 }
@@ -1261,9 +1390,23 @@ static int mirror_status(struct dm_target *ti, status_type_t type,
        return 0;
 }
 
+static int mirror_iterate_devices(struct dm_target *ti,
+                                 iterate_devices_callout_fn fn, void *data)
+{
+       struct mirror_set *ms = ti->private;
+       int ret = 0;
+       unsigned i;
+
+       for (i = 0; !ret && i < ms->nr_mirrors; i++)
+               ret = fn(ti, ms->mirror[i].dev,
+                        ms->mirror[i].offset, ti->len, data);
+
+       return ret;
+}
+
 static struct target_type mirror_target = {
        .name    = "mirror",
-       .version = {1, 0, 20},
+       .version = {1, 12, 0},
        .module  = THIS_MODULE,
        .ctr     = mirror_ctr,
        .dtr     = mirror_dtr,
@@ -1273,22 +1416,38 @@ static struct target_type mirror_target = {
        .postsuspend = mirror_postsuspend,
        .resume  = mirror_resume,
        .status  = mirror_status,
+       .iterate_devices = mirror_iterate_devices,
 };
 
 static int __init dm_mirror_init(void)
 {
        int r;
 
+       _dm_raid1_read_record_cache = KMEM_CACHE(dm_raid1_read_record, 0);
+       if (!_dm_raid1_read_record_cache) {
+               DMERR("Can't allocate dm_raid1_read_record cache");
+               r = -ENOMEM;
+               goto bad_cache;
+       }
+
        r = dm_register_target(&mirror_target);
-       if (r < 0)
+       if (r < 0) {
                DMERR("Failed to register mirror target");
+               goto bad_target;
+       }
+
+       return 0;
 
+bad_target:
+       kmem_cache_destroy(_dm_raid1_read_record_cache);
+bad_cache:
        return r;
 }
 
 static void __exit dm_mirror_exit(void)
 {
        dm_unregister_target(&mirror_target);
+       kmem_cache_destroy(_dm_raid1_read_record_cache);
 }
 
 /* Module hooks */