*
* O_DIRECT
*
- * 04Jul2002 akpm@zip.com.au
+ * 04Jul2002 Andrew Morton
* Initial version
* 11Sep2002 janetinc@us.ibm.com
* added readv/writev support.
- * 29Oct2002 akpm@zip.com.au
+ * 29Oct2002 Andrew Morton
* rewrote bio_add_page() support.
* 30Oct2002 pbadari@us.ibm.com
* added support for non-aligned IO.
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
+#include <linux/task_io_accounting_ops.h>
#include <linux/bio.h>
#include <linux/wait.h>
#include <linux/err.h>
*
* If blkfactor is zero then the user's request was aligned to the filesystem's
* blocksize.
- *
- * lock_type is DIO_LOCKING for regular files on direct-IO-naive filesystems.
- * This determines whether we need to do the fancy locking which prevents
- * direct-IO from being able to read uninitialised disk blocks. If its zero
- * (blockdev) this locking is not done, and if it is DIO_OWN_LOCKING i_mutex is
- * not held for the entire direct write (taken briefly, initially, during a
- * direct read though, but its never held for the duration of a direct-IO).
*/
struct dio {
struct inode *inode;
int rw;
loff_t i_size; /* i_size when submitted */
- int lock_type; /* doesn't change */
+ int flags; /* doesn't change */
unsigned blkbits; /* doesn't change */
unsigned blkfactor; /* When we're using an alignment which
is finer than the filesystem's soft
unsigned cur_page_len; /* Nr of bytes at cur_page_offset */
sector_t cur_page_block; /* Where it starts */
+ /* BIO completion state */
+ spinlock_t bio_lock; /* protects BIO fields below */
+ unsigned long refcount; /* direct_io_worker() and bios */
+ struct bio *bio_list; /* singly linked via bi_private */
+ struct task_struct *waiter; /* waiting task (NULL if none) */
+
+ /* AIO related stuff */
+ struct kiocb *iocb; /* kiocb */
+ int is_async; /* is IO async ? */
+ int io_error; /* IO error in completion path */
+ ssize_t result; /* IO result */
+
/*
* Page fetching state. These variables belong to dio_refill_pages().
*/
* Page queue. These variables belong to dio_refill_pages() and
* dio_get_page().
*/
- struct page *pages[DIO_PAGES]; /* page buffer */
unsigned head; /* next page to process */
unsigned tail; /* last valid page + 1 */
int page_errors; /* errno from get_user_pages() */
- /* BIO completion state */
- spinlock_t bio_lock; /* protects BIO fields below */
- int bio_count; /* nr bios to be completed */
- int bios_in_flight; /* nr bios in flight */
- struct bio *bio_list; /* singly linked via bi_private */
- struct task_struct *waiter; /* waiting task (NULL if none) */
-
- /* AIO related stuff */
- struct kiocb *iocb; /* kiocb */
- int is_async; /* is IO async ? */
- int io_error; /* IO error in completion path */
- ssize_t result; /* IO result */
+ /*
+ * pages[] (and any fields placed after it) are not zeroed out at
+ * allocation time. Don't add new fields after pages[] unless you
+ * wish that they not be zeroed.
+ */
+ struct page *pages[DIO_PAGES]; /* page buffer */
};
/*
int nr_pages;
nr_pages = min(dio->total_pages - dio->curr_page, DIO_PAGES);
- down_read(¤t->mm->mmap_sem);
- ret = get_user_pages(
- current, /* Task for fault acounting */
- current->mm, /* whose pages? */
+ ret = get_user_pages_fast(
dio->curr_user_address, /* Where from? */
nr_pages, /* How many pages? */
dio->rw == READ, /* Write to memory? */
- 0, /* force (?) */
- &dio->pages[0],
- NULL); /* vmas */
- up_read(¤t->mm->mmap_sem);
+ &dio->pages[0]); /* Put results here */
- if (ret < 0 && dio->blocks_available && (dio->rw == WRITE)) {
- struct page *page = ZERO_PAGE(dio->curr_user_address);
+ if (ret < 0 && dio->blocks_available && (dio->rw & WRITE)) {
+ struct page *page = ZERO_PAGE(0);
/*
* A memory fault, but the filesystem has some outstanding
* mapped blocks. We need to use those blocks up to avoid
return dio->pages[dio->head++];
}
-/*
- * Called when all DIO BIO I/O has been completed - let the filesystem
- * know, if it registered an interest earlier via get_block. Pass the
- * private field of the map buffer_head so that filesystems can use it
- * to hold additional state between get_block calls and dio_complete.
- */
-static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
-{
- if (dio->end_io && dio->result)
- dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private);
- if (dio->lock_type == DIO_LOCKING)
- up_read(&dio->inode->i_alloc_sem);
-}
-
-/*
- * Called when a BIO has been processed. If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
+/**
+ * dio_complete() - called when all DIO BIO I/O has been completed
+ * @offset: the byte offset in the file of the completed operation
+ *
+ * This releases locks as dictated by the locking type, lets interested parties
+ * know that a DIO operation has completed, and calculates the resulting return
+ * code for the operation.
+ *
+ * It lets the filesystem know if it registered an interest earlier via
+ * get_block. Pass the private field of the map buffer_head so that
+ * filesystems can use it to hold additional state between get_block calls and
+ * dio_complete.
*/
-static void finished_one_bio(struct dio *dio)
+static int dio_complete(struct dio *dio, loff_t offset, int ret)
{
- unsigned long flags;
+ ssize_t transferred = 0;
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->bio_count == 1) {
- if (dio->is_async) {
- ssize_t transferred;
- loff_t offset;
+ /*
+ * AIO submission can race with bio completion to get here while
+ * expecting to have the last io completed by bio completion.
+ * In that case -EIOCBQUEUED is in fact not an error we want
+ * to preserve through this call.
+ */
+ if (ret == -EIOCBQUEUED)
+ ret = 0;
- /*
- * Last reference to the dio is going away.
- * Drop spinlock and complete the DIO.
- */
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ if (dio->result) {
+ transferred = dio->result;
- /* Check for short read case */
- transferred = dio->result;
- offset = dio->iocb->ki_pos;
+ /* Check for short read case */
+ if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
+ transferred = dio->i_size - offset;
+ }
- if ((dio->rw == READ) &&
- ((offset + transferred) > dio->i_size))
- transferred = dio->i_size - offset;
+ if (dio->end_io && dio->result)
+ dio->end_io(dio->iocb, offset, transferred,
+ dio->map_bh.b_private);
- /* check for error in completion path */
- if (dio->io_error)
- transferred = dio->io_error;
+ if (dio->flags & DIO_LOCKING)
+ /* lockdep: non-owner release */
+ up_read_non_owner(&dio->inode->i_alloc_sem);
- dio_complete(dio, offset, transferred);
+ if (ret == 0)
+ ret = dio->page_errors;
+ if (ret == 0)
+ ret = dio->io_error;
+ if (ret == 0)
+ ret = transferred;
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, transferred, 0);
- kfree(dio);
- return;
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count--;
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- return;
- }
- }
- }
- dio->bio_count--;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ return ret;
}
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
*/
-static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
+static void dio_bio_end_aio(struct bio *bio, int error)
{
struct dio *dio = bio->bi_private;
-
- if (bio->bi_size)
- return 1;
+ unsigned long remaining;
+ unsigned long flags;
/* cleanup the bio */
dio_bio_complete(dio, bio);
- return 0;
+
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ remaining = --dio->refcount;
+ if (remaining == 1 && dio->waiter)
+ wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+
+ if (remaining == 0) {
+ int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ }
}
/*
* During I/O bi_private points at the dio. After I/O, bi_private is used to
* implement a singly-linked list of completed BIOs, at dio->bio_list.
*/
-static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
+static void dio_bio_end_io(struct bio *bio, int error)
{
struct dio *dio = bio->bi_private;
unsigned long flags;
- if (bio->bi_size)
- return 1;
-
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- dio->bios_in_flight--;
- if (dio->waiter && dio->bios_in_flight == 0)
+ if (--dio->refcount == 1 && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
- return 0;
}
static int
struct bio *bio;
bio = bio_alloc(GFP_KERNEL, nr_vecs);
- if (bio == NULL)
- return -ENOMEM;
bio->bi_bdev = bdev;
bio->bi_sector = first_sector;
* In the AIO read case we speculatively dirty the pages before starting IO.
* During IO completion, any of these pages which happen to have been written
* back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
*/
static void dio_bio_submit(struct dio *dio)
{
unsigned long flags;
bio->bi_private = dio;
+
spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count++;
- dio->bios_in_flight++;
+ dio->refcount++;
spin_unlock_irqrestore(&dio->bio_lock, flags);
+
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
+
submit_bio(dio->rw, bio);
dio->bio = NULL;
}
/*
- * Wait for the next BIO to complete. Remove it and return it.
+ * Wait for the next BIO to complete. Remove it and return it. NULL is
+ * returned once all BIOs have been completed. This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease. This
+ * requires that that the caller hold a reference on the dio.
*/
static struct bio *dio_await_one(struct dio *dio)
{
unsigned long flags;
- struct bio *bio;
+ struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (dio->bio_list == NULL) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (dio->bio_list == NULL) {
- dio->waiter = current;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- blk_run_address_space(dio->inode->i_mapping);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->waiter = NULL;
- }
- set_current_state(TASK_RUNNING);
+
+ /*
+ * Wait as long as the list is empty and there are bios in flight. bio
+ * completion drops the count, maybe adds to the list, and wakes while
+ * holding the bio_lock so we don't need set_current_state()'s barrier
+ * and can call it after testing our condition.
+ */
+ while (dio->refcount > 1 && dio->bio_list == NULL) {
+ __set_current_state(TASK_UNINTERRUPTIBLE);
+ dio->waiter = current;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+ io_schedule();
+ /* wake up sets us TASK_RUNNING */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->waiter = NULL;
+ }
+ if (dio->bio_list) {
+ bio = dio->bio_list;
+ dio->bio_list = bio->bi_private;
}
- bio = dio->bio_list;
- dio->bio_list = bio->bi_private;
spin_unlock_irqrestore(&dio->bio_lock, flags);
return bio;
}
}
bio_put(bio);
}
- finished_one_bio(dio);
return uptodate ? 0 : -EIO;
}
/*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs. This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete. IO
+ * errors are propagated through dio->io_error and should be propagated via
+ * dio_complete().
*/
-static int dio_await_completion(struct dio *dio)
+static void dio_await_completion(struct dio *dio)
{
- int ret = 0;
-
- if (dio->bio)
- dio_bio_submit(dio);
-
- /*
- * The bio_lock is not held for the read of bio_count.
- * This is ok since it is the dio_bio_complete() that changes
- * bio_count.
- */
- while (dio->bio_count) {
- struct bio *bio = dio_await_one(dio);
- int ret2;
-
- ret2 = dio_bio_complete(dio, bio);
- if (ret == 0)
- ret = ret2;
- }
- return ret;
+ struct bio *bio;
+ do {
+ bio = dio_await_one(dio);
+ if (bio)
+ dio_bio_complete(dio, bio);
+ } while (bio);
}
/*
map_bh->b_state = 0;
map_bh->b_size = fs_count << dio->inode->i_blkbits;
- create = dio->rw == WRITE;
- if (dio->lock_type == DIO_LOCKING) {
+ /*
+ * For writes inside i_size on a DIO_SKIP_HOLES filesystem we
+ * forbid block creations: only overwrites are permitted.
+ * We will return early to the caller once we see an
+ * unmapped buffer head returned, and the caller will fall
+ * back to buffered I/O.
+ *
+ * Otherwise the decision is left to the get_blocks method,
+ * which may decide to handle it or also return an unmapped
+ * buffer head.
+ */
+ create = dio->rw & WRITE;
+ if (dio->flags & DIO_SKIP_HOLES) {
if (dio->block_in_file < (i_size_read(dio->inode) >>
dio->blkbits))
create = 0;
- } else if (dio->lock_type == DIO_NO_LOCKING) {
- create = 0;
}
- /*
- * For writes inside i_size we forbid block creations: only
- * overwrites are permitted. We fall back to buffered writes
- * at a higher level for inside-i_size block-instantiating
- * writes.
- */
ret = (*dio->get_block)(dio->inode, fs_startblk,
map_bh, create);
}
{
int ret = 0;
+ if (dio->rw & WRITE) {
+ /*
+ * Read accounting is performed in submit_bio()
+ */
+ task_io_account_write(len);
+ }
+
/*
* Can we just grow the current page's presence in the dio?
*/
this_chunk_bytes = this_chunk_blocks << dio->blkbits;
- page = ZERO_PAGE(dio->curr_user_address);
+ page = ZERO_PAGE(0);
if (submit_page_section(dio, page, 0, this_chunk_bytes,
dio->next_block_for_io))
return;
do_holes:
/* Handle holes */
if (!buffer_mapped(map_bh)) {
- char *kaddr;
loff_t i_size_aligned;
/* AKPM: eargh, -ENOTBLK is a hack */
- if (dio->rw == WRITE) {
+ if (dio->rw & WRITE) {
page_cache_release(page);
return -ENOTBLK;
}
page_cache_release(page);
goto out;
}
- kaddr = kmap_atomic(page, KM_USER0);
- memset(kaddr + (block_in_page << blkbits),
- 0, 1 << blkbits);
- flush_dcache_page(page);
- kunmap_atomic(kaddr, KM_USER0);
+ zero_user(page, block_in_page << blkbits,
+ 1 << blkbits);
dio->block_in_file++;
block_in_page++;
goto next_block;
struct dio *dio)
{
unsigned long user_addr;
+ unsigned long flags;
int seg;
ssize_t ret = 0;
ssize_t ret2;
size_t bytes;
- dio->bio = NULL;
dio->inode = inode;
dio->rw = rw;
dio->blkbits = blkbits;
dio->blkfactor = inode->i_blkbits - blkbits;
- dio->start_zero_done = 0;
- dio->size = 0;
dio->block_in_file = offset >> blkbits;
- dio->blocks_available = 0;
- dio->cur_page = NULL;
- dio->boundary = 0;
- dio->reap_counter = 0;
dio->get_block = get_block;
dio->end_io = end_io;
- dio->map_bh.b_private = NULL;
dio->final_block_in_bio = -1;
dio->next_block_for_io = -1;
- dio->page_errors = 0;
- dio->io_error = 0;
- dio->result = 0;
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- /*
- * BIO completion state.
- *
- * ->bio_count starts out at one, and we decrement it to zero after all
- * BIOs are submitted. This to avoid the situation where a really fast
- * (or synchronous) device could take the count to zero while we're
- * still submitting BIOs.
- */
- dio->bio_count = 1;
- dio->bios_in_flight = 0;
spin_lock_init(&dio->bio_lock);
- dio->bio_list = NULL;
- dio->waiter = NULL;
+ dio->refcount = 1;
/*
* In case of non-aligned buffers, we may need 2 more
*/
if (unlikely(dio->blkfactor))
dio->pages_in_io = 2;
- else
- dio->pages_in_io = 0;
for (seg = 0; seg < nr_segs; seg++) {
user_addr = (unsigned long)iov[seg].iov_base;
}
} /* end iovec loop */
- if (ret == -ENOTBLK && rw == WRITE) {
+ if (ret == -ENOTBLK && (rw & WRITE)) {
/*
* The remaining part of the request will be
* be handled by buffered I/O when we return
* we can let i_mutex go now that its achieved its purpose
* of protecting us from looking up uninitialized blocks.
*/
- if ((rw == READ) && (dio->lock_type == DIO_LOCKING))
+ if (rw == READ && (dio->flags & DIO_LOCKING))
mutex_unlock(&dio->inode->i_mutex);
/*
- * OK, all BIOs are submitted, so we can decrement bio_count to truly
- * reflect the number of to-be-processed BIOs.
+ * The only time we want to leave bios in flight is when a successful
+ * partial aio read or full aio write have been setup. In that case
+ * bio completion will call aio_complete. The only time it's safe to
+ * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+ * This had *better* be the only place that raises -EIOCBQUEUED.
*/
- if (dio->is_async) {
- int should_wait = 0;
+ BUG_ON(ret == -EIOCBQUEUED);
+ if (dio->is_async && ret == 0 && dio->result &&
+ ((rw & READ) || (dio->result == dio->size)))
+ ret = -EIOCBQUEUED;
- if (dio->result < dio->size && rw == WRITE) {
- dio->waiter = current;
- should_wait = 1;
- }
- if (ret == 0)
- ret = dio->result;
- finished_one_bio(dio); /* This can free the dio */
+ if (ret != -EIOCBQUEUED) {
+ /* All IO is now issued, send it on its way */
blk_run_address_space(inode->i_mapping);
- if (should_wait) {
- unsigned long flags;
- /*
- * Wait for already issued I/O to drain out and
- * release its references to user-space pages
- * before returning to fallback on buffered I/O
- */
-
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- while (dio->bio_count) {
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- }
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- set_current_state(TASK_RUNNING);
- kfree(dio);
- }
- } else {
- ssize_t transferred = 0;
-
- finished_one_bio(dio);
- ret2 = dio_await_completion(dio);
- if (ret == 0)
- ret = ret2;
- if (ret == 0)
- ret = dio->page_errors;
- if (dio->result) {
- loff_t i_size = i_size_read(inode);
+ dio_await_completion(dio);
+ }
- transferred = dio->result;
- /*
- * Adjust the return value if the read crossed a
- * non-block-aligned EOF.
- */
- if (rw == READ && (offset + transferred > i_size))
- transferred = i_size - offset;
- }
- dio_complete(dio, offset, transferred);
- if (ret == 0)
- ret = transferred;
+ /*
+ * Sync will always be dropping the final ref and completing the
+ * operation. AIO can if it was a broken operation described above or
+ * in fact if all the bios race to complete before we get here. In
+ * that case dio_complete() translates the EIOCBQUEUED into the proper
+ * return code that the caller will hand to aio_complete().
+ *
+ * This is managed by the bio_lock instead of being an atomic_t so that
+ * completion paths can drop their ref and use the remaining count to
+ * decide to wake the submission path atomically.
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ ret2 = --dio->refcount;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
- /* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && rw == WRITE &&
- ret >= 0 && dio->result == dio->size)
- /*
- * For AIO writes where we have completed the
- * i/o, we have to mark the the aio complete.
- */
- aio_complete(iocb, ret, 0);
+ if (ret2 == 0) {
+ ret = dio_complete(dio, offset, ret);
kfree(dio);
- }
+ } else
+ BUG_ON(ret != -EIOCBQUEUED);
+
return ret;
}
/*
* This is a library function for use by filesystem drivers.
- * The locking rules are governed by the dio_lock_type parameter.
- *
- * DIO_NO_LOCKING (no locking, for raw block device access)
- * For writes, i_mutex is not held on entry; it is never taken.
- *
- * DIO_LOCKING (simple locking for regular files)
- * For writes we are called under i_mutex and return with i_mutex held, even
- * though it is internally dropped.
- * For reads, i_mutex is not held on entry, but it is taken and dropped before
- * returning.
*
- * DIO_OWN_LOCKING (filesystem provides synchronisation and handling of
- * uninitialised data, allowing parallel direct readers and writers)
- * For writes we are called without i_mutex, return without it, never touch it.
- * For reads we are called under i_mutex and return with i_mutex held, even
- * though it may be internally dropped.
+ * The locking rules are governed by the flags parameter:
+ * - if the flags value contains DIO_LOCKING we use a fancy locking
+ * scheme for dumb filesystems.
+ * For writes this function is called under i_mutex and returns with
+ * i_mutex held, for reads, i_mutex is not held on entry, but it is
+ * taken and dropped again before returning.
+ * For reads and writes i_alloc_sem is taken in shared mode and released
+ * on I/O completion (which may happen asynchronously after returning to
+ * the caller).
*
- * Additional i_alloc_sem locking requirements described inline below.
+ * - if the flags value does NOT contain DIO_LOCKING we don't use any
+ * internal locking but rather rely on the filesystem to synchronize
+ * direct I/O reads/writes versus each other and truncate.
+ * For reads and writes both i_mutex and i_alloc_sem are not held on
+ * entry and are never taken.
*/
ssize_t
__blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
struct block_device *bdev, const struct iovec *iov, loff_t offset,
unsigned long nr_segs, get_block_t get_block, dio_iodone_t end_io,
- int dio_lock_type)
+ int flags)
{
int seg;
size_t size;
ssize_t retval = -EINVAL;
loff_t end = offset;
struct dio *dio;
- int release_i_mutex = 0;
- int acquire_i_mutex = 0;
if (rw & WRITE)
- current->flags |= PF_SYNCWRITE;
+ rw = WRITE_ODIRECT_PLUG;
if (bdev)
- bdev_blkbits = blksize_bits(bdev_hardsect_size(bdev));
+ bdev_blkbits = blksize_bits(bdev_logical_block_size(bdev));
if (offset & blocksize_mask) {
if (bdev)
retval = -ENOMEM;
if (!dio)
goto out;
-
/*
- * For block device access DIO_NO_LOCKING is used,
- * neither readers nor writers do any locking at all
- * For regular files using DIO_LOCKING,
- * readers need to grab i_mutex and i_alloc_sem
- * writers need to grab i_alloc_sem only (i_mutex is already held)
- * For regular files using DIO_OWN_LOCKING,
- * neither readers nor writers take any locks here
+ * Believe it or not, zeroing out the page array caused a .5%
+ * performance regression in a database benchmark. So, we take
+ * care to only zero out what's needed.
*/
- dio->lock_type = dio_lock_type;
- if (dio_lock_type != DIO_NO_LOCKING) {
+ memset(dio, 0, offsetof(struct dio, pages));
+
+ dio->flags = flags;
+ if (dio->flags & DIO_LOCKING) {
/* watch out for a 0 len io from a tricksy fs */
if (rw == READ && end > offset) {
- struct address_space *mapping;
+ struct address_space *mapping =
+ iocb->ki_filp->f_mapping;
- mapping = iocb->ki_filp->f_mapping;
- if (dio_lock_type != DIO_OWN_LOCKING) {
- mutex_lock(&inode->i_mutex);
- release_i_mutex = 1;
- }
+ /* will be released by direct_io_worker */
+ mutex_lock(&inode->i_mutex);
retval = filemap_write_and_wait_range(mapping, offset,
end - 1);
if (retval) {
+ mutex_unlock(&inode->i_mutex);
kfree(dio);
goto out;
}
-
- if (dio_lock_type == DIO_OWN_LOCKING) {
- mutex_unlock(&inode->i_mutex);
- acquire_i_mutex = 1;
- }
}
- if (dio_lock_type == DIO_LOCKING)
- down_read(&inode->i_alloc_sem);
+ /*
+ * Will be released at I/O completion, possibly in a
+ * different thread.
+ */
+ down_read_non_owner(&inode->i_alloc_sem);
}
/*
* even for AIO, we need to wait for i/o to complete before
* returning in this case.
*/
- dio->is_async = !is_sync_kiocb(iocb) && !((rw == WRITE) &&
+ dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
(end > i_size_read(inode)));
retval = direct_io_worker(rw, iocb, inode, iov, offset,
nr_segs, blkbits, get_block, end_io, dio);
- if (rw == READ && dio_lock_type == DIO_LOCKING)
- release_i_mutex = 0;
+ /*
+ * In case of error extending write may have instantiated a few
+ * blocks outside i_size. Trim these off again for DIO_LOCKING.
+ *
+ * NOTE: filesystems with their own locking have to handle this
+ * on their own.
+ */
+ if (flags & DIO_LOCKING) {
+ if (unlikely((rw & WRITE) && retval < 0)) {
+ loff_t isize = i_size_read(inode);
+ if (end > isize)
+ vmtruncate(inode, isize);
+ }
+ }
out:
- if (release_i_mutex)
- mutex_unlock(&inode->i_mutex);
- else if (acquire_i_mutex)
- mutex_lock(&inode->i_mutex);
- if (rw & WRITE)
- current->flags &= ~PF_SYNCWRITE;
return retval;
}
EXPORT_SYMBOL(__blockdev_direct_IO);