*
* O_DIRECT
*
- * 04Jul2002 akpm@zip.com.au
+ * 04Jul2002 Andrew Morton
* Initial version
* 11Sep2002 janetinc@us.ibm.com
* added readv/writev support.
- * 29Oct2002 akpm@zip.com.au
+ * 29Oct2002 Andrew Morton
* rewrote bio_add_page() support.
* 30Oct2002 pbadari@us.ibm.com
* added support for non-aligned IO.
int page_errors; /* errno from get_user_pages() */
/* BIO completion state */
- atomic_t refcount; /* direct_io_worker() and bios */
spinlock_t bio_lock; /* protects BIO fields below */
+ unsigned long refcount; /* direct_io_worker() and bios */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
int nr_pages;
nr_pages = min(dio->total_pages - dio->curr_page, DIO_PAGES);
- down_read(¤t->mm->mmap_sem);
- ret = get_user_pages(
- current, /* Task for fault acounting */
- current->mm, /* whose pages? */
+ ret = get_user_pages_fast(
dio->curr_user_address, /* Where from? */
nr_pages, /* How many pages? */
dio->rw == READ, /* Write to memory? */
- 0, /* force (?) */
- &dio->pages[0],
- NULL); /* vmas */
- up_read(¤t->mm->mmap_sem);
+ &dio->pages[0]); /* Put results here */
if (ret < 0 && dio->blocks_available && (dio->rw & WRITE)) {
- struct page *page = ZERO_PAGE(dio->curr_user_address);
+ struct page *page = ZERO_PAGE(0);
/*
* A memory fault, but the filesystem has some outstanding
* mapped blocks. We need to use those blocks up to avoid
{
ssize_t transferred = 0;
+ /*
+ * AIO submission can race with bio completion to get here while
+ * expecting to have the last io completed by bio completion.
+ * In that case -EIOCBQUEUED is in fact not an error we want
+ * to preserve through this call.
+ */
+ if (ret == -EIOCBQUEUED)
+ ret = 0;
+
if (dio->result) {
transferred = dio->result;
return ret;
}
-/*
- * Called when a BIO has been processed. If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
- */
-static void dio_complete_aio(struct dio *dio)
-{
- int ret;
-
- ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, ret, 0);
- kfree(dio);
- }
-}
-
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
*/
-static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
+static void dio_bio_end_aio(struct bio *bio, int error)
{
struct dio *dio = bio->bi_private;
- int waiter_holds_ref = 0;
- int remaining;
-
- if (bio->bi_size)
- return 1;
+ unsigned long remaining;
+ unsigned long flags;
/* cleanup the bio */
dio_bio_complete(dio, bio);
- waiter_holds_ref = !!dio->waiter;
- remaining = atomic_sub_return(1, (&dio->refcount));
- if (remaining == 1 && waiter_holds_ref)
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ remaining = --dio->refcount;
+ if (remaining == 1 && dio->waiter)
wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
- if (remaining == 0)
- dio_complete_aio(dio);
-
- return 0;
+ if (remaining == 0) {
+ int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ }
}
/*
* During I/O bi_private points at the dio. After I/O, bi_private is used to
* implement a singly-linked list of completed BIOs, at dio->bio_list.
*/
-static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
+static void dio_bio_end_io(struct bio *bio, int error)
{
struct dio *dio = bio->bi_private;
unsigned long flags;
- if (bio->bi_size)
- return 1;
-
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
+ if (--dio->refcount == 1 && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
- return 0;
}
static int
static void dio_bio_submit(struct dio *dio)
{
struct bio *bio = dio->bio;
+ unsigned long flags;
bio->bi_private = dio;
- atomic_inc(&dio->refcount);
+
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->refcount++;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
+
submit_bio(dio->rw, bio);
dio->bio = NULL;
page_cache_release(dio_get_page(dio));
}
-static int wait_for_more_bios(struct dio *dio)
-{
- assert_spin_locked(&dio->bio_lock);
-
- return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
-}
-
/*
* Wait for the next BIO to complete. Remove it and return it. NULL is
* returned once all BIOs have been completed. This must only be called once
struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (wait_for_more_bios(dio)) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (wait_for_more_bios(dio)) {
- dio->waiter = current;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->waiter = NULL;
- }
- set_current_state(TASK_RUNNING);
+
+ /*
+ * Wait as long as the list is empty and there are bios in flight. bio
+ * completion drops the count, maybe adds to the list, and wakes while
+ * holding the bio_lock so we don't need set_current_state()'s barrier
+ * and can call it after testing our condition.
+ */
+ while (dio->refcount > 1 && dio->bio_list == NULL) {
+ __set_current_state(TASK_UNINTERRUPTIBLE);
+ dio->waiter = current;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+ io_schedule();
+ /* wake up sets us TASK_RUNNING */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->waiter = NULL;
}
if (dio->bio_list) {
bio = dio->bio_list;
* Wait on and process all in-flight BIOs. This must only be called once
* all bios have been issued so that the refcount can only decrease.
* This just waits for all bios to make it through dio_bio_complete. IO
- * errors are propogated through dio->io_error and should be propogated via
+ * errors are propagated through dio->io_error and should be propagated via
* dio_complete().
*/
static void dio_await_completion(struct dio *dio)
this_chunk_bytes = this_chunk_blocks << dio->blkbits;
- page = ZERO_PAGE(dio->curr_user_address);
+ page = ZERO_PAGE(0);
if (submit_page_section(dio, page, 0, this_chunk_bytes,
dio->next_block_for_io))
return;
do_holes:
/* Handle holes */
if (!buffer_mapped(map_bh)) {
- char *kaddr;
loff_t i_size_aligned;
/* AKPM: eargh, -ENOTBLK is a hack */
page_cache_release(page);
goto out;
}
- kaddr = kmap_atomic(page, KM_USER0);
- memset(kaddr + (block_in_page << blkbits),
- 0, 1 << blkbits);
- flush_dcache_page(page);
- kunmap_atomic(kaddr, KM_USER0);
+ zero_user(page, block_in_page << blkbits,
+ 1 << blkbits);
dio->block_in_file++;
block_in_page++;
goto next_block;
struct dio *dio)
{
unsigned long user_addr;
+ unsigned long flags;
int seg;
ssize_t ret = 0;
ssize_t ret2;
size_t bytes;
- dio->bio = NULL;
dio->inode = inode;
dio->rw = rw;
dio->blkbits = blkbits;
dio->blkfactor = inode->i_blkbits - blkbits;
- dio->start_zero_done = 0;
- dio->size = 0;
dio->block_in_file = offset >> blkbits;
- dio->blocks_available = 0;
- dio->cur_page = NULL;
- dio->boundary = 0;
- dio->reap_counter = 0;
dio->get_block = get_block;
dio->end_io = end_io;
- dio->map_bh.b_private = NULL;
dio->final_block_in_bio = -1;
dio->next_block_for_io = -1;
- dio->page_errors = 0;
- dio->io_error = 0;
- dio->result = 0;
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- atomic_set(&dio->refcount, 1);
spin_lock_init(&dio->bio_lock);
- dio->bio_list = NULL;
- dio->waiter = NULL;
+ dio->refcount = 1;
/*
* In case of non-aligned buffers, we may need 2 more
*/
if (unlikely(dio->blkfactor))
dio->pages_in_io = 2;
- else
- dio->pages_in_io = 0;
for (seg = 0; seg < nr_segs; seg++) {
user_addr = (unsigned long)iov[seg].iov_base;
mutex_unlock(&dio->inode->i_mutex);
/*
- * OK, all BIOs are submitted, so we can decrement bio_count to truly
- * reflect the number of to-be-processed BIOs.
+ * The only time we want to leave bios in flight is when a successful
+ * partial aio read or full aio write have been setup. In that case
+ * bio completion will call aio_complete. The only time it's safe to
+ * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+ * This had *better* be the only place that raises -EIOCBQUEUED.
*/
- if (dio->is_async) {
- int should_wait = 0;
+ BUG_ON(ret == -EIOCBQUEUED);
+ if (dio->is_async && ret == 0 && dio->result &&
+ ((rw & READ) || (dio->result == dio->size)))
+ ret = -EIOCBQUEUED;
- if (dio->result < dio->size && (rw & WRITE)) {
- dio->waiter = current;
- should_wait = 1;
- }
- if (ret == 0)
- ret = dio->result;
-
- if (should_wait)
- dio_await_completion(dio);
-
- /* this can free the dio */
- if (atomic_dec_and_test(&dio->refcount))
- dio_complete_aio(dio);
-
- if (should_wait)
- kfree(dio);
- } else {
+ if (ret != -EIOCBQUEUED)
dio_await_completion(dio);
- ret = dio_complete(dio, offset, ret);
+ /*
+ * Sync will always be dropping the final ref and completing the
+ * operation. AIO can if it was a broken operation described above or
+ * in fact if all the bios race to complete before we get here. In
+ * that case dio_complete() translates the EIOCBQUEUED into the proper
+ * return code that the caller will hand to aio_complete().
+ *
+ * This is managed by the bio_lock instead of being an atomic_t so that
+ * completion paths can drop their ref and use the remaining count to
+ * decide to wake the submission path atomically.
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ ret2 = --dio->refcount;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
- /* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
- ret >= 0 && dio->result == dio->size)
- /*
- * For AIO writes where we have completed the
- * i/o, we have to mark the the aio complete.
- */
- aio_complete(iocb, ret, 0);
+ if (ret2 == 0) {
+ ret = dio_complete(dio, offset, ret);
+ kfree(dio);
+ } else
+ BUG_ON(ret != -EIOCBQUEUED);
- if (atomic_dec_and_test(&dio->refcount))
- kfree(dio);
- else
- BUG();
- }
return ret;
}
}
}
- dio = kmalloc(sizeof(*dio), GFP_KERNEL);
+ dio = kzalloc(sizeof(*dio), GFP_KERNEL);
retval = -ENOMEM;
if (!dio)
goto out;