Btrfs: shift all end_io work to thread pools
authorChris Mason <chris.mason@oracle.com>
Wed, 17 Dec 2008 19:51:42 +0000 (14:51 -0500)
committerChris Mason <chris.mason@oracle.com>
Wed, 17 Dec 2008 19:51:42 +0000 (14:51 -0500)
bio_end_io for reads without checksumming on and btree writes were
happening without using async thread pools.  This means the extent_io.c
code had to use spin_lock_irq and friends on the rb tree locks for
extent state.

There were some irq safe vs unsafe lock inversions between the delallock
lock and the extent state locks.  This patch gets rid of them by moving
all end_io code into the thread pools.

To avoid contention and deadlocks between the data end_io processing and the
metadata end_io processing yet another thread pool is added to finish
off metadata writes.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
fs/btrfs/ctree.h
fs/btrfs/disk-io.c
fs/btrfs/extent_io.c
fs/btrfs/inode.c

index b89999d..ccea064 100644 (file)
@@ -758,6 +758,7 @@ struct btrfs_fs_info {
        struct btrfs_workers delalloc_workers;
        struct btrfs_workers endio_workers;
        struct btrfs_workers endio_meta_workers;
+       struct btrfs_workers endio_meta_write_workers;
        struct btrfs_workers endio_write_workers;
        struct btrfs_workers submit_workers;
        /*
index 541a827..04f8d70 100644 (file)
@@ -447,8 +447,12 @@ static void end_workqueue_bio(struct bio *bio, int err)
        end_io_wq->work.flags = 0;
 
        if (bio->bi_rw & (1 << BIO_RW)) {
-               btrfs_queue_worker(&fs_info->endio_write_workers,
-                                  &end_io_wq->work);
+               if (end_io_wq->metadata)
+                       btrfs_queue_worker(&fs_info->endio_meta_write_workers,
+                                          &end_io_wq->work);
+               else
+                       btrfs_queue_worker(&fs_info->endio_write_workers,
+                                          &end_io_wq->work);
        } else {
                if (end_io_wq->metadata)
                        btrfs_queue_worker(&fs_info->endio_meta_workers,
@@ -624,23 +628,24 @@ static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
 static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
                                 int mirror_num, unsigned long bio_flags)
 {
-       /*
-        * kthread helpers are used to submit writes so that checksumming
-        * can happen in parallel across all CPUs
-        */
+       int ret;
+
+       ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
+                                         bio, 1);
+       BUG_ON(ret);
+
        if (!(rw & (1 << BIO_RW))) {
-               int ret;
                /*
                 * called for a read, do the setup so that checksum validation
                 * can happen in the async kernel threads
                 */
-               ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
-                                         bio, 1);
-               BUG_ON(ret);
-
                return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
                                     mirror_num, 0);
        }
+       /*
+        * kthread helpers are used to submit writes so that checksumming
+        * can happen in parallel across all CPUs
+        */
        return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
                                   inode, rw, bio, mirror_num, 0,
                                   __btree_submit_bio_start,
@@ -1350,12 +1355,13 @@ static void end_workqueue_fn(struct btrfs_work *work)
        bio = end_io_wq->bio;
        fs_info = end_io_wq->info;
 
-       /* metadata bios are special because the whole tree block must
+       /* metadata bio reads are special because the whole tree block must
         * be checksummed at once.  This makes sure the entire block is in
         * ram and up to date before trying to verify things.  For
         * blocksize <= pagesize, it is basically a noop
         */
-       if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
+       if (!(bio->bi_rw & (1 << BIO_RW)) && end_io_wq->metadata &&
+           !bio_ready_for_csum(bio)) {
                btrfs_queue_worker(&fs_info->endio_meta_workers,
                                   &end_io_wq->work);
                return;
@@ -1668,6 +1674,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
                           fs_info->thread_pool_size);
        btrfs_init_workers(&fs_info->endio_meta_workers, "endio-meta",
                           fs_info->thread_pool_size);
+       btrfs_init_workers(&fs_info->endio_meta_write_workers,
+                          "endio-meta-write", fs_info->thread_pool_size);
        btrfs_init_workers(&fs_info->endio_write_workers, "endio-write",
                           fs_info->thread_pool_size);
 
@@ -1677,6 +1685,7 @@ struct btrfs_root *open_ctree(struct super_block *sb,
         */
        fs_info->endio_workers.idle_thresh = 4;
        fs_info->endio_write_workers.idle_thresh = 64;
+       fs_info->endio_meta_write_workers.idle_thresh = 64;
 
        btrfs_start_workers(&fs_info->workers, 1);
        btrfs_start_workers(&fs_info->submit_workers, 1);
@@ -1685,6 +1694,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
        btrfs_start_workers(&fs_info->endio_workers, fs_info->thread_pool_size);
        btrfs_start_workers(&fs_info->endio_meta_workers,
                            fs_info->thread_pool_size);
+       btrfs_start_workers(&fs_info->endio_meta_write_workers,
+                           fs_info->thread_pool_size);
        btrfs_start_workers(&fs_info->endio_write_workers,
                            fs_info->thread_pool_size);
 
@@ -1866,6 +1877,7 @@ fail_sb_buffer:
        btrfs_stop_workers(&fs_info->workers);
        btrfs_stop_workers(&fs_info->endio_workers);
        btrfs_stop_workers(&fs_info->endio_meta_workers);
+       btrfs_stop_workers(&fs_info->endio_meta_write_workers);
        btrfs_stop_workers(&fs_info->endio_write_workers);
        btrfs_stop_workers(&fs_info->submit_workers);
 fail_iput:
@@ -2253,6 +2265,7 @@ int close_ctree(struct btrfs_root *root)
        btrfs_stop_workers(&fs_info->workers);
        btrfs_stop_workers(&fs_info->endio_workers);
        btrfs_stop_workers(&fs_info->endio_meta_workers);
+       btrfs_stop_workers(&fs_info->endio_meta_write_workers);
        btrfs_stop_workers(&fs_info->endio_write_workers);
        btrfs_stop_workers(&fs_info->submit_workers);
 
index 607f5ff..25ce2d1 100644 (file)
@@ -477,7 +477,6 @@ int clear_extent_bit(struct extent_io_tree *tree, u64 start, u64 end,
        struct extent_state *state;
        struct extent_state *prealloc = NULL;
        struct rb_node *node;
-       unsigned long flags;
        int err;
        int set = 0;
 
@@ -488,7 +487,7 @@ again:
                        return -ENOMEM;
        }
 
-       spin_lock_irqsave(&tree->lock, flags);
+       spin_lock(&tree->lock);
        /*
         * this search will find the extents that end after
         * our range starts
@@ -559,7 +558,7 @@ again:
        goto search_again;
 
 out:
-       spin_unlock_irqrestore(&tree->lock, flags);
+       spin_unlock(&tree->lock);
        if (prealloc)
                free_extent_state(prealloc);
 
@@ -568,7 +567,7 @@ out:
 search_again:
        if (start > end)
                goto out;
-       spin_unlock_irqrestore(&tree->lock, flags);
+       spin_unlock(&tree->lock);
        if (mask & __GFP_WAIT)
                cond_resched();
        goto again;
@@ -582,9 +581,9 @@ static int wait_on_state(struct extent_io_tree *tree,
 {
        DEFINE_WAIT(wait);
        prepare_to_wait(&state->wq, &wait, TASK_UNINTERRUPTIBLE);
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        schedule();
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
        finish_wait(&state->wq, &wait);
        return 0;
 }
@@ -599,7 +598,7 @@ int wait_extent_bit(struct extent_io_tree *tree, u64 start, u64 end, int bits)
        struct extent_state *state;
        struct rb_node *node;
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
 again:
        while (1) {
                /*
@@ -628,13 +627,13 @@ again:
                        break;
 
                if (need_resched()) {
-                       spin_unlock_irq(&tree->lock);
+                       spin_unlock(&tree->lock);
                        cond_resched();
-                       spin_lock_irq(&tree->lock);
+                       spin_lock(&tree->lock);
                }
        }
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return 0;
 }
 EXPORT_SYMBOL(wait_extent_bit);
@@ -668,7 +667,6 @@ static int set_extent_bit(struct extent_io_tree *tree, u64 start, u64 end, int b
        struct extent_state *state;
        struct extent_state *prealloc = NULL;
        struct rb_node *node;
-       unsigned long flags;
        int err = 0;
        int set;
        u64 last_start;
@@ -680,7 +678,7 @@ again:
                        return -ENOMEM;
        }
 
-       spin_lock_irqsave(&tree->lock, flags);
+       spin_lock(&tree->lock);
        /*
         * this search will find all the extents that end after
         * our range starts.
@@ -800,7 +798,7 @@ again:
        goto search_again;
 
 out:
-       spin_unlock_irqrestore(&tree->lock, flags);
+       spin_unlock(&tree->lock);
        if (prealloc)
                free_extent_state(prealloc);
 
@@ -809,7 +807,7 @@ out:
 search_again:
        if (start > end)
                goto out;
-       spin_unlock_irqrestore(&tree->lock, flags);
+       spin_unlock(&tree->lock);
        if (mask & __GFP_WAIT)
                cond_resched();
        goto again;
@@ -1021,7 +1019,7 @@ int find_first_extent_bit(struct extent_io_tree *tree, u64 start,
        struct extent_state *state;
        int ret = 1;
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
        /*
         * this search will find all the extents that end after
         * our range starts.
@@ -1044,7 +1042,7 @@ int find_first_extent_bit(struct extent_io_tree *tree, u64 start,
                        break;
        }
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return ret;
 }
 EXPORT_SYMBOL(find_first_extent_bit);
@@ -1097,7 +1095,7 @@ static noinline u64 find_delalloc_range(struct extent_io_tree *tree,
        u64 found = 0;
        u64 total_bytes = 0;
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
 
        /*
         * this search will find all the extents that end after
@@ -1134,7 +1132,7 @@ static noinline u64 find_delalloc_range(struct extent_io_tree *tree,
                        break;
        }
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return found;
 }
 
@@ -1391,7 +1389,7 @@ u64 count_range_bits(struct extent_io_tree *tree,
                return 0;
        }
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
        if (cur_start == 0 && bits == EXTENT_DIRTY) {
                total_bytes = tree->dirty_bytes;
                goto out;
@@ -1424,7 +1422,7 @@ u64 count_range_bits(struct extent_io_tree *tree,
                        break;
        }
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return total_bytes;
 }
 
@@ -1501,7 +1499,7 @@ int set_state_private(struct extent_io_tree *tree, u64 start, u64 private)
        struct extent_state *state;
        int ret = 0;
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
        /*
         * this search will find all the extents that end after
         * our range starts.
@@ -1518,7 +1516,7 @@ int set_state_private(struct extent_io_tree *tree, u64 start, u64 private)
        }
        state->private = private;
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return ret;
 }
 
@@ -1528,7 +1526,7 @@ int get_state_private(struct extent_io_tree *tree, u64 start, u64 *private)
        struct extent_state *state;
        int ret = 0;
 
-       spin_lock_irq(&tree->lock);
+       spin_lock(&tree->lock);
        /*
         * this search will find all the extents that end after
         * our range starts.
@@ -1545,7 +1543,7 @@ int get_state_private(struct extent_io_tree *tree, u64 start, u64 *private)
        }
        *private = state->private;
 out:
-       spin_unlock_irq(&tree->lock);
+       spin_unlock(&tree->lock);
        return ret;
 }
 
@@ -1561,9 +1559,8 @@ int test_range_bit(struct extent_io_tree *tree, u64 start, u64 end,
        struct extent_state *state = NULL;
        struct rb_node *node;
        int bitset = 0;
-       unsigned long flags;
 
-       spin_lock_irqsave(&tree->lock, flags);
+       spin_lock(&tree->lock);
        node = tree_search(tree, start);
        while (node && start <= end) {
                state = rb_entry(node, struct extent_state, rb_node);
@@ -1594,7 +1591,7 @@ int test_range_bit(struct extent_io_tree *tree, u64 start, u64 end,
                        break;
                }
        }
-       spin_unlock_irqrestore(&tree->lock, flags);
+       spin_unlock(&tree->lock);
        return bitset;
 }
 EXPORT_SYMBOL(test_range_bit);
index 0577e77..068bad4 100644 (file)
@@ -1282,8 +1282,8 @@ static int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
 }
 
 /*
- * extent_io.c submission hook. This does the right thing for csum calculation on write,
- * or reading the csums from the tree before a read
+ * extent_io.c submission hook. This does the right thing for csum calculation
+ * on write, or reading the csums from the tree before a read
  */
 static int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
                          int mirror_num, unsigned long bio_flags)
@@ -1292,11 +1292,11 @@ static int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
        int ret = 0;
        int skip_sum;
 
+       skip_sum = btrfs_test_flag(inode, NODATASUM);
+
        ret = btrfs_bio_wq_end_io(root->fs_info, bio, 0);
        BUG_ON(ret);
 
-       skip_sum = btrfs_test_flag(inode, NODATASUM);
-
        if (!(rw & (1 << BIO_RW))) {
                if (bio_flags & EXTENT_BIO_COMPRESSED) {
                        return btrfs_submit_compressed_read(inode, bio,
@@ -1648,13 +1648,13 @@ static int btrfs_io_failed_hook(struct bio *failed_bio,
                              failrec->logical, failrec->len);
        failrec->last_mirror++;
        if (!state) {
-               spin_lock_irq(&BTRFS_I(inode)->io_tree.lock);
+               spin_lock(&BTRFS_I(inode)->io_tree.lock);
                state = find_first_extent_bit_state(&BTRFS_I(inode)->io_tree,
                                                    failrec->start,
                                                    EXTENT_LOCKED);
                if (state && state->start != failrec->start)
                        state = NULL;
-               spin_unlock_irq(&BTRFS_I(inode)->io_tree.lock);
+               spin_unlock(&BTRFS_I(inode)->io_tree.lock);
        }
        if (!state || failrec->last_mirror > num_copies) {
                set_state_private(failure_tree, failrec->start, 0);