#include <linux/aio_abi.h>
#include <linux/module.h>
#include <linux/syscalls.h>
+#include <linux/uio.h>
#define DEBUG 0
#include <linux/highmem.h>
#include <linux/workqueue.h>
#include <linux/security.h>
+#include <linux/eventfd.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
#define dprintk(x...) do { ; } while (0)
#endif
-static long aio_run = 0; /* for testing only */
-static long aio_wakeups = 0; /* for testing only */
-
/*------ sysctl variables----*/
-atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */
-unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
+static DEFINE_SPINLOCK(aio_nr_lock);
+unsigned long aio_nr; /* current system wide number of aio requests */
+unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
/*----end sysctl variables---*/
-static kmem_cache_t *kiocb_cachep;
-static kmem_cache_t *kioctx_cachep;
+static struct kmem_cache *kiocb_cachep;
+static struct kmem_cache *kioctx_cachep;
static struct workqueue_struct *aio_wq;
/* Used for rare fput completion. */
-static void aio_fput_routine(void *);
-static DECLARE_WORK(fput_work, aio_fput_routine, NULL);
+static void aio_fput_routine(struct work_struct *);
+static DECLARE_WORK(fput_work, aio_fput_routine);
static DEFINE_SPINLOCK(fput_lock);
static LIST_HEAD(fput_head);
-static void aio_kick_handler(void *);
+static void aio_kick_handler(struct work_struct *);
+static void aio_queue_work(struct kioctx *);
/* aio_setup
* Creates the slab caches used by the aio routines, panic on
*/
static int __init aio_setup(void)
{
- kiocb_cachep = kmem_cache_create("kiocb", sizeof(struct kiocb),
- 0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
- kioctx_cachep = kmem_cache_create("kioctx", sizeof(struct kioctx),
- 0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
+ kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
+ kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
aio_wq = create_workqueue("aio");
info->nr = 0;
info->ring_pages = info->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kmalloc(sizeof(struct page *) * nr_pages, GFP_KERNEL);
+ info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
if (!info->ring_pages)
return -ENOMEM;
- memset(info->ring_pages, 0, sizeof(struct page *) * nr_pages);
}
info->mmap_size = nr_pages * PAGE_SIZE;
dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
down_write(&ctx->mm->mmap_sem);
info->mmap_base = do_mmap(NULL, 0, info->mmap_size,
- PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE,
+ PROT_READ|PROT_WRITE, MAP_ANONYMOUS|MAP_PRIVATE,
0);
if (IS_ERR((void *)info->mmap_base)) {
up_write(&ctx->mm->mmap_sem);
- printk("mmap err: %ld\n", -info->mmap_base);
info->mmap_size = 0;
aio_free_ring(ctx);
return -EAGAIN;
return ERR_PTR(-EINVAL);
}
- if (nr_events > aio_max_nr)
+ if ((unsigned long)nr_events > aio_max_nr)
return ERR_PTR(-EAGAIN);
- ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL);
+ ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
if (!ctx)
return ERR_PTR(-ENOMEM);
- memset(ctx, 0, sizeof(*ctx));
ctx->max_reqs = nr_events;
mm = ctx->mm = current->mm;
atomic_inc(&mm->mm_count);
INIT_LIST_HEAD(&ctx->active_reqs);
INIT_LIST_HEAD(&ctx->run_list);
- INIT_WORK(&ctx->wq, aio_kick_handler, ctx);
+ INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
/* limit the number of system wide aios */
- atomic_add(ctx->max_reqs, &aio_nr); /* undone by __put_ioctx */
- if (unlikely(atomic_read(&aio_nr) > aio_max_nr))
+ spin_lock(&aio_nr_lock);
+ if (aio_nr + ctx->max_reqs > aio_max_nr ||
+ aio_nr + ctx->max_reqs < aio_nr)
+ ctx->max_reqs = 0;
+ else
+ aio_nr += ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+ if (ctx->max_reqs == 0)
goto out_cleanup;
/* now link into global list. kludge. FIXME */
return ctx;
out_cleanup:
- atomic_sub(ctx->max_reqs, &aio_nr);
- ctx->max_reqs = 0; /* prevent __put_ioctx from sub'ing aio_nr */
__put_ioctx(ctx);
return ERR_PTR(-EAGAIN);
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
+ spin_lock_irq(&ctx->ctx_lock);
if (!ctx->reqs_active)
- return;
+ goto out;
add_wait_queue(&ctx->wait, &wait);
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (ctx->reqs_active) {
- schedule();
+ spin_unlock_irq(&ctx->ctx_lock);
+ io_schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ spin_lock_irq(&ctx->ctx_lock);
}
__set_task_state(tsk, TASK_RUNNING);
remove_wait_queue(&ctx->wait, &wait);
+
+out:
+ spin_unlock_irq(&ctx->ctx_lock);
}
/* wait_on_sync_kiocb:
set_current_state(TASK_UNINTERRUPTIBLE);
if (!iocb->ki_users)
break;
- schedule();
+ io_schedule();
}
__set_current_state(TASK_RUNNING);
return iocb->ki_user_data;
wait_for_all_aios(ctx);
/*
- * this is an overkill, but ensures we don't leave
- * the ctx on the aio_wq
+ * Ensure we don't leave the ctx on the aio_wq
*/
- flush_workqueue(aio_wq);
+ cancel_work_sync(&ctx->wq.work);
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
{
unsigned nr_events = ctx->max_reqs;
- if (unlikely(ctx->reqs_active))
- BUG();
+ BUG_ON(ctx->reqs_active);
cancel_delayed_work(&ctx->wq);
- flush_workqueue(aio_wq);
+ cancel_work_sync(&ctx->wq.work);
aio_free_ring(ctx);
mmdrop(ctx->mm);
ctx->mm = NULL;
pr_debug("__put_ioctx: freeing %p\n", ctx);
kmem_cache_free(kioctx_cachep, ctx);
- atomic_sub(nr_events, &aio_nr);
+ if (nr_events) {
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - nr_events > aio_nr);
+ aio_nr -= nr_events;
+ spin_unlock(&aio_nr_lock);
+ }
}
/* aio_get_req
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
-static struct kiocb *FASTCALL(__aio_get_req(struct kioctx *ctx));
+static struct kiocb *__aio_get_req(struct kioctx *ctx);
static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx)
{
struct kiocb *req = NULL;
if (unlikely(!req))
return NULL;
- req->ki_flags = 1 << KIF_LOCKED;
+ req->ki_flags = 0;
req->ki_users = 2;
req->ki_key = 0;
req->ki_ctx = ctx;
req->ki_retry = NULL;
req->ki_dtor = NULL;
req->private = NULL;
+ req->ki_iovec = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
+ req->ki_eventfd = ERR_PTR(-EINVAL);
/* Check if the completion queue has enough free space to
* accept an event from this io.
ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
list_add(&req->ki_list, &ctx->active_reqs);
- get_ioctx(ctx);
ctx->reqs_active++;
okay = 1;
}
static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
{
+ assert_spin_locked(&ctx->ctx_lock);
+
+ if (!IS_ERR(req->ki_eventfd))
+ fput(req->ki_eventfd);
if (req->ki_dtor)
req->ki_dtor(req);
+ if (req->ki_iovec != &req->ki_inline_vec)
+ kfree(req->ki_iovec);
kmem_cache_free(kiocb_cachep, req);
ctx->reqs_active--;
wake_up(&ctx->wait);
}
-static void aio_fput_routine(void *data)
+static void aio_fput_routine(struct work_struct *data)
{
spin_lock_irq(&fput_lock);
while (likely(!list_empty(&fput_head))) {
dprintk(KERN_DEBUG "aio_put(%p): f_count=%d\n",
req, atomic_read(&req->ki_filp->f_count));
+ assert_spin_locked(&ctx->ctx_lock);
+
req->ki_users --;
- if (unlikely(req->ki_users < 0))
- BUG();
+ BUG_ON(req->ki_users < 0);
if (likely(req->ki_users))
return 0;
list_del(&req->ki_list); /* remove from active_reqs */
spin_lock_irq(&ctx->ctx_lock);
ret = __aio_put_req(ctx, req);
spin_unlock_irq(&ctx->ctx_lock);
- if (ret)
- put_ioctx(ctx);
return ret;
}
atomic_inc(&mm->mm_count);
tsk->mm = mm;
tsk->active_mm = mm;
- activate_mm(active_mm, mm);
+ /*
+ * Note that on UML this *requires* PF_BORROWED_MM to be set, otherwise
+ * it won't work. Update it accordingly if you change it here
+ */
+ switch_mm(active_mm, mm, tsk);
task_unlock(tsk);
mmdrop(active_mm);
* by the calling kernel thread
* (Note: this routine is intended to be called only
* from a kernel thread context)
- *
- * Comments: Called with ctx->ctx_lock held. This nests
- * task_lock instead ctx_lock.
*/
static void unuse_mm(struct mm_struct *mm)
{
* the kiocb (to tell the caller to activate the work
* queue to process it), or 0, if it found that it was
* already queued.
- *
- * Should be called with the spin lock iocb->ki_ctx->ctx_lock
- * held
*/
static inline int __queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
+ assert_spin_locked(&ctx->ctx_lock);
+
if (list_empty(&iocb->ki_run_list)) {
list_add_tail(&iocb->ki_run_list,
&ctx->run_list);
- iocb->ki_queued++;
return 1;
}
return 0;
* invoked both for initial i/o submission and
* subsequent retries via the aio_kick_handler.
* Expects to be invoked with iocb->ki_ctx->lock
- * already held. The lock is released and reaquired
+ * already held. The lock is released and reacquired
* as needed during processing.
*
* Calls the iocb retry method (already setup for the
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
- if (iocb->ki_retried++ > 1024*1024) {
- printk("Maximal retry count. Bytes done %Zd\n",
- iocb->ki_nbytes - iocb->ki_left);
- return -EAGAIN;
- }
-
- if (!(iocb->ki_retried & 0xff)) {
- pr_debug("%ld retry: %d of %d (kick %ld, Q %ld run %ld, wake %ld)\n",
- iocb->ki_retried,
- iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
- iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
- }
-
if (!(retry = iocb->ki_retry)) {
printk("aio_run_iocb: iocb->ki_retry = NULL\n");
return 0;
/*
* Now we are all set to call the retry method in async
- * context. By setting this thread's io_wait context
- * to point to the wait queue entry inside the currently
- * running iocb for the duration of the retry, we ensure
- * that async notification wakeups are queued by the
- * operation instead of blocking waits, and when notified,
- * cause the iocb to be kicked for continuation (through
- * the aio_wake_function callback).
+ * context.
*/
- BUG_ON(current->io_wait != NULL);
- current->io_wait = &iocb->ki_wait;
ret = retry(iocb);
- current->io_wait = NULL;
- if (-EIOCBRETRY != ret) {
- if (-EIOCBQUEUED != ret) {
- BUG_ON(!list_empty(&iocb->ki_wait.task_list));
- aio_complete(iocb, ret, 0);
- /* must not access the iocb after this */
- }
- } else {
- /*
- * Issue an additional retry to avoid waiting forever if
- * no waits were queued (e.g. in case of a short read).
- */
- if (list_empty(&iocb->ki_wait.task_list))
- kiocbSetKicked(iocb);
+ if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
+ BUG_ON(!list_empty(&iocb->ki_wait.task_list));
+ aio_complete(iocb, ret, 0);
}
out:
spin_lock_irq(&ctx->ctx_lock);
* has already been kicked */
if (kiocbIsKicked(iocb)) {
__queue_kicked_iocb(iocb);
+
+ /*
+ * __queue_kicked_iocb will always return 1 here, because
+ * iocb->ki_run_list is empty at this point so it should
+ * be safe to unconditionally queue the context into the
+ * work queue.
+ */
+ aio_queue_work(ctx);
}
}
return ret;
* Process all pending retries queued on the ioctx
* run list.
* Assumes it is operating within the aio issuer's mm
- * context. Expects to be called with ctx->ctx_lock held
+ * context.
*/
static int __aio_run_iocbs(struct kioctx *ctx)
{
struct kiocb *iocb;
- int count = 0;
- LIST_HEAD(run_list);
+ struct list_head run_list;
+
+ assert_spin_locked(&ctx->ctx_lock);
- list_splice_init(&ctx->run_list, &run_list);
+ list_replace_init(&ctx->run_list, &run_list);
while (!list_empty(&run_list)) {
iocb = list_entry(run_list.next, struct kiocb,
ki_run_list);
*/
iocb->ki_users++; /* grab extra reference */
aio_run_iocb(iocb);
- if (__aio_put_req(ctx, iocb)) /* drop extra ref */
- put_ioctx(ctx);
- count++;
+ __aio_put_req(ctx, iocb);
}
- aio_run++;
if (!list_empty(&ctx->run_list))
return 1;
return 0;
* space.
* Run on aiod's context.
*/
-static void aio_kick_handler(void *data)
+static void aio_kick_handler(struct work_struct *work)
{
- struct kioctx *ctx = data;
+ struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
mm_segment_t oldfs = get_fs();
+ struct mm_struct *mm;
int requeue;
set_fs(USER_DS);
use_mm(ctx->mm);
spin_lock_irq(&ctx->ctx_lock);
requeue =__aio_run_iocbs(ctx);
- unuse_mm(ctx->mm);
+ mm = ctx->mm;
spin_unlock_irq(&ctx->ctx_lock);
+ unuse_mm(mm);
set_fs(oldfs);
/*
* we're in a worker thread already, don't use queue_delayed_work,
*/
if (requeue)
- queue_work(aio_wq, &ctx->wq);
+ queue_delayed_work(aio_wq, &ctx->wq, 0);
}
* and if required activate the aio work queue to process
* it
*/
-static void queue_kicked_iocb(struct kiocb *iocb)
+static void try_queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
unsigned long flags;
int run = 0;
- WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
+ /* We're supposed to be the only path putting the iocb back on the run
+ * list. If we find that the iocb is *back* on a wait queue already
+ * than retry has happened before we could queue the iocb. This also
+ * means that the retry could have completed and freed our iocb, no
+ * good. */
+ BUG_ON((!list_empty(&iocb->ki_wait.task_list)));
spin_lock_irqsave(&ctx->ctx_lock, flags);
- run = __queue_kicked_iocb(iocb);
+ /* set this inside the lock so that we can't race with aio_run_iocb()
+ * testing it and putting the iocb on the run list under the lock */
+ if (!kiocbTryKick(iocb))
+ run = __queue_kicked_iocb(iocb);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- if (run) {
+ if (run)
aio_queue_work(ctx);
- aio_wakeups++;
- }
}
/*
return;
}
- iocb->ki_kicked++;
- /* If its already kicked we shouldn't queue it again */
- if (!kiocbTryKick(iocb)) {
- queue_kicked_iocb(iocb);
- }
+ try_queue_kicked_iocb(iocb);
}
EXPORT_SYMBOL(kick_iocb);
unsigned long tail;
int ret;
- /* Special case handling for sync iocbs: events go directly
- * into the iocb for fast handling. Note that this will not
- * work if we allow sync kiocbs to be cancelled. in which
- * case the usage count checks will have to move under ctx_lock
- * for all cases.
+ /*
+ * Special case handling for sync iocbs:
+ * - events go directly into the iocb for fast handling
+ * - the sync task with the iocb in its stack holds the single iocb
+ * ref, no other paths have a way to get another ref
+ * - the sync task helpfully left a reference to itself in the iocb
*/
if (is_sync_kiocb(iocb)) {
- int ret;
-
+ BUG_ON(iocb->ki_users != 1);
iocb->ki_user_data = res;
- if (iocb->ki_users == 1) {
- iocb->ki_users = 0;
- ret = 1;
- } else {
- spin_lock_irq(&ctx->ctx_lock);
- iocb->ki_users--;
- ret = (0 == iocb->ki_users);
- spin_unlock_irq(&ctx->ctx_lock);
- }
- /* sync iocbs put the task here for us */
+ iocb->ki_users = 0;
wake_up_process(iocb->ki_obj.tsk);
- return ret;
+ return 1;
}
+ /*
+ * Check if the user asked us to deliver the result through an
+ * eventfd. The eventfd_signal() function is safe to be called
+ * from IRQ context.
+ */
+ if (!IS_ERR(iocb->ki_eventfd))
+ eventfd_signal(iocb->ki_eventfd, 1);
+
info = &ctx->ring_info;
/* add a completion event to the ring buffer.
kunmap_atomic(ring, KM_IRQ1);
pr_debug("added to ring %p at [%lu]\n", iocb, tail);
-
- pr_debug("%ld retries: %d of %d (kicked %ld, Q %ld run %ld wake %ld)\n",
- iocb->ki_retried,
- iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
- iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
put_rq:
/* everything turned out well, dispose of the aiocb. */
ret = __aio_put_req(ctx, iocb);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
- if (ret)
- put_ioctx(ctx);
-
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
return ret;
}
int i = 0;
struct io_event ent;
struct aio_timeout to;
- int event_loop = 0; /* testing only */
int retry = 0;
/* needed to zero any padding within an entry (there shouldn't be
ret = 0;
if (to.timed_out) /* Only check after read evt */
break;
- schedule();
- event_loop++;
+ /* Try to only show up in io wait if there are ops
+ * in flight */
+ if (ctx->reqs_active)
+ io_schedule();
+ else
+ schedule();
if (signal_pending(tsk)) {
ret = -EINTR;
break;
if (timeout)
clear_timeout(&to);
out:
- pr_debug("event loop executed %d times\n", event_loop);
- pr_debug("aio_run %ld\n", aio_run);
- pr_debug("aio_wakeups %ld\n", aio_wakeups);
return i ? i : ret;
}
goto out;
ret = -EINVAL;
- if (unlikely(ctx || (int)nr_events <= 0)) {
- pr_debug("EINVAL: io_setup: ctx or nr_events > max\n");
+ if (unlikely(ctx || nr_events == 0)) {
+ pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
+ ctx, nr_events);
goto out;
}
return -EINVAL;
}
-/*
- * Default retry method for aio_read (also used for first time submit)
- * Responsible for updating iocb state as retries progress
- */
-static ssize_t aio_pread(struct kiocb *iocb)
+static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
{
- struct file *file = iocb->ki_filp;
- struct address_space *mapping = file->f_mapping;
- struct inode *inode = mapping->host;
- ssize_t ret = 0;
-
- ret = file->f_op->aio_read(iocb, iocb->ki_buf,
- iocb->ki_left, iocb->ki_pos);
-
- /*
- * Can't just depend on iocb->ki_left to determine
- * whether we are done. This may have been a short read.
- */
- if (ret > 0) {
- iocb->ki_buf += ret;
- iocb->ki_left -= ret;
- /*
- * For pipes and sockets we return once we have
- * some data; for regular files we retry till we
- * complete the entire read or find that we can't
- * read any more data (e.g short reads).
- */
- if (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))
- ret = -EIOCBRETRY;
+ struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg];
+
+ BUG_ON(ret <= 0);
+
+ while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) {
+ ssize_t this = min((ssize_t)iov->iov_len, ret);
+ iov->iov_base += this;
+ iov->iov_len -= this;
+ iocb->ki_left -= this;
+ ret -= this;
+ if (iov->iov_len == 0) {
+ iocb->ki_cur_seg++;
+ iov++;
+ }
}
- /* This means we must have transferred all that we could */
- /* No need to retry anymore */
- if ((ret == 0) || (iocb->ki_left == 0))
- ret = iocb->ki_nbytes - iocb->ki_left;
-
- return ret;
+ /* the caller should not have done more io than what fit in
+ * the remaining iovecs */
+ BUG_ON(ret > 0 && iocb->ki_left == 0);
}
-/*
- * Default retry method for aio_write (also used for first time submit)
- * Responsible for updating iocb state as retries progress
- */
-static ssize_t aio_pwrite(struct kiocb *iocb)
+static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
+ struct address_space *mapping = file->f_mapping;
+ struct inode *inode = mapping->host;
+ ssize_t (*rw_op)(struct kiocb *, const struct iovec *,
+ unsigned long, loff_t);
ssize_t ret = 0;
+ unsigned short opcode;
- ret = file->f_op->aio_write(iocb, iocb->ki_buf,
- iocb->ki_left, iocb->ki_pos);
+ if ((iocb->ki_opcode == IOCB_CMD_PREADV) ||
+ (iocb->ki_opcode == IOCB_CMD_PREAD)) {
+ rw_op = file->f_op->aio_read;
+ opcode = IOCB_CMD_PREADV;
+ } else {
+ rw_op = file->f_op->aio_write;
+ opcode = IOCB_CMD_PWRITEV;
+ }
- if (ret > 0) {
- iocb->ki_buf += ret;
- iocb->ki_left -= ret;
+ do {
+ ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
+ iocb->ki_nr_segs - iocb->ki_cur_seg,
+ iocb->ki_pos);
+ if (ret > 0)
+ aio_advance_iovec(iocb, ret);
- ret = -EIOCBRETRY;
- }
+ /* retry all partial writes. retry partial reads as long as its a
+ * regular file. */
+ } while (ret > 0 && iocb->ki_left > 0 &&
+ (opcode == IOCB_CMD_PWRITEV ||
+ (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
/* This means we must have transferred all that we could */
/* No need to retry anymore */
return ret;
}
+static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb)
+{
+ ssize_t ret;
+
+ ret = rw_copy_check_uvector(type, (struct iovec __user *)kiocb->ki_buf,
+ kiocb->ki_nbytes, 1,
+ &kiocb->ki_inline_vec, &kiocb->ki_iovec);
+ if (ret < 0)
+ goto out;
+
+ kiocb->ki_nr_segs = kiocb->ki_nbytes;
+ kiocb->ki_cur_seg = 0;
+ /* ki_nbytes/left now reflect bytes instead of segs */
+ kiocb->ki_nbytes = ret;
+ kiocb->ki_left = ret;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static ssize_t aio_setup_single_vector(struct kiocb *kiocb)
+{
+ kiocb->ki_iovec = &kiocb->ki_inline_vec;
+ kiocb->ki_iovec->iov_base = kiocb->ki_buf;
+ kiocb->ki_iovec->iov_len = kiocb->ki_left;
+ kiocb->ki_nr_segs = 1;
+ kiocb->ki_cur_seg = 0;
+ return 0;
+}
+
/*
* aio_setup_iocb:
* Performs the initial checks and aio retry method
if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
kiocb->ki_left)))
break;
+ ret = security_file_permission(file, MAY_READ);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_single_vector(kiocb);
+ if (ret)
+ break;
ret = -EINVAL;
if (file->f_op->aio_read)
- kiocb->ki_retry = aio_pread;
+ kiocb->ki_retry = aio_rw_vect_retry;
break;
case IOCB_CMD_PWRITE:
ret = -EBADF;
if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
kiocb->ki_left)))
break;
+ ret = security_file_permission(file, MAY_WRITE);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_single_vector(kiocb);
+ if (ret)
+ break;
ret = -EINVAL;
if (file->f_op->aio_write)
- kiocb->ki_retry = aio_pwrite;
+ kiocb->ki_retry = aio_rw_vect_retry;
+ break;
+ case IOCB_CMD_PREADV:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ break;
+ ret = security_file_permission(file, MAY_READ);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_vectored_rw(READ, kiocb);
+ if (ret)
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_read)
+ kiocb->ki_retry = aio_rw_vect_retry;
+ break;
+ case IOCB_CMD_PWRITEV:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ break;
+ ret = security_file_permission(file, MAY_WRITE);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_vectored_rw(WRITE, kiocb);
+ if (ret)
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_write)
+ kiocb->ki_retry = aio_rw_vect_retry;
break;
case IOCB_CMD_FDSYNC:
ret = -EINVAL;
* Simply triggers a retry of the operation via kick_iocb.
*
* This callback is specified in the wait queue entry in
- * a kiocb (current->io_wait points to this wait queue
- * entry when an aio operation executes; it is used
- * instead of a synchronous wait when an i/o blocking
- * condition is encountered during aio).
+ * a kiocb.
*
* Note:
* This routine is executed with the wait queue lock held.
ssize_t ret;
/* enforce forwards compatibility on users */
- if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
- iocb->aio_reserved3)) {
+ if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
pr_debug("EINVAL: io_submit: reserve field set\n");
return -EINVAL;
}
fput(file);
return -EAGAIN;
}
-
req->ki_filp = file;
+ if (iocb->aio_flags & IOCB_FLAG_RESFD) {
+ /*
+ * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
+ * instance of the file* now. The file descriptor must be
+ * an eventfd() fd, and will be signaled for each completed
+ * event using the eventfd_signal() function.
+ */
+ req->ki_eventfd = eventfd_fget((int) iocb->aio_resfd);
+ if (unlikely(IS_ERR(req->ki_eventfd))) {
+ ret = PTR_ERR(req->ki_eventfd);
+ goto out_put_req;
+ }
+ }
+
ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
dprintk("EFAULT: aio_key\n");
req->ki_opcode = iocb->aio_lio_opcode;
init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
INIT_LIST_HEAD(&req->ki_wait.task_list);
- req->ki_retried = 0;
- req->ki_kicked = 0;
- req->ki_queued = 0;
- aio_run = 0;
- aio_wakeups = 0;
ret = aio_setup_iocb(req);
goto out_put_req;
spin_lock_irq(&ctx->ctx_lock);
- list_add_tail(&req->ki_run_list, &ctx->run_list);
- /* drain the run list */
- while (__aio_run_iocbs(ctx))
- ;
+ aio_run_iocb(req);
+ if (!list_empty(&ctx->run_list)) {
+ /* drain the run list */
+ while (__aio_run_iocbs(ctx))
+ ;
+ }
spin_unlock_irq(&ctx->ctx_lock);
aio_put_req(req); /* drop extra ref to req */
return 0;
/* lookup_kiocb
* Finds a given iocb for cancellation.
- * MUST be called with ctx->ctx_lock held.
*/
static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
u32 key)
{
struct list_head *pos;
+
+ assert_spin_locked(&ctx->ctx_lock);
+
/* TODO: use a hash or array, this sucks. */
list_for_each(pos, &ctx->active_reqs) {
struct kiocb *kiocb = list_kiocb(pos);
ret = -EFAULT;
}
} else
- printk(KERN_DEBUG "iocb has no cancel operation\n");
+ ret = -EINVAL;
put_ioctx(ctx);