#include <linux/aio_abi.h>
#include <linux/module.h>
#include <linux/syscalls.h>
+#include <linux/uio.h>
#define DEBUG 0
#include <linux/highmem.h>
#include <linux/workqueue.h>
#include <linux/security.h>
+#include <linux/eventfd.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
/*----end sysctl variables---*/
-static kmem_cache_t *kiocb_cachep;
-static kmem_cache_t *kioctx_cachep;
+static struct kmem_cache *kiocb_cachep;
+static struct kmem_cache *kioctx_cachep;
static struct workqueue_struct *aio_wq;
/* Used for rare fput completion. */
-static void aio_fput_routine(void *);
-static DECLARE_WORK(fput_work, aio_fput_routine, NULL);
+static void aio_fput_routine(struct work_struct *);
+static DECLARE_WORK(fput_work, aio_fput_routine);
static DEFINE_SPINLOCK(fput_lock);
static LIST_HEAD(fput_head);
-static void aio_kick_handler(void *);
+static void aio_kick_handler(struct work_struct *);
static void aio_queue_work(struct kioctx *);
/* aio_setup
*/
static int __init aio_setup(void)
{
- kiocb_cachep = kmem_cache_create("kiocb", sizeof(struct kiocb),
- 0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
- kioctx_cachep = kmem_cache_create("kioctx", sizeof(struct kioctx),
- 0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
+ kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
+ kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
aio_wq = create_workqueue("aio");
info->nr = 0;
info->ring_pages = info->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kmalloc(sizeof(struct page *) * nr_pages, GFP_KERNEL);
+ info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
if (!info->ring_pages)
return -ENOMEM;
- memset(info->ring_pages, 0, sizeof(struct page *) * nr_pages);
}
info->mmap_size = nr_pages * PAGE_SIZE;
dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
down_write(&ctx->mm->mmap_sem);
info->mmap_base = do_mmap(NULL, 0, info->mmap_size,
- PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE,
+ PROT_READ|PROT_WRITE, MAP_ANONYMOUS|MAP_PRIVATE,
0);
if (IS_ERR((void *)info->mmap_base)) {
up_write(&ctx->mm->mmap_sem);
- printk("mmap err: %ld\n", -info->mmap_base);
info->mmap_size = 0;
aio_free_ring(ctx);
return -EAGAIN;
if ((unsigned long)nr_events > aio_max_nr)
return ERR_PTR(-EAGAIN);
- ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL);
+ ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
if (!ctx)
return ERR_PTR(-ENOMEM);
- memset(ctx, 0, sizeof(*ctx));
ctx->max_reqs = nr_events;
mm = ctx->mm = current->mm;
atomic_inc(&mm->mm_count);
INIT_LIST_HEAD(&ctx->active_reqs);
INIT_LIST_HEAD(&ctx->run_list);
- INIT_WORK(&ctx->wq, aio_kick_handler, ctx);
+ INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
+ spin_lock_irq(&ctx->ctx_lock);
if (!ctx->reqs_active)
- return;
+ goto out;
add_wait_queue(&ctx->wait, &wait);
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
while (ctx->reqs_active) {
- schedule();
+ spin_unlock_irq(&ctx->ctx_lock);
+ io_schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
+ spin_lock_irq(&ctx->ctx_lock);
}
__set_task_state(tsk, TASK_RUNNING);
remove_wait_queue(&ctx->wait, &wait);
+
+out:
+ spin_unlock_irq(&ctx->ctx_lock);
}
/* wait_on_sync_kiocb:
set_current_state(TASK_UNINTERRUPTIBLE);
if (!iocb->ki_users)
break;
- schedule();
+ io_schedule();
}
__set_current_state(TASK_RUNNING);
return iocb->ki_user_data;
wait_for_all_aios(ctx);
/*
- * this is an overkill, but ensures we don't leave
- * the ctx on the aio_wq
+ * Ensure we don't leave the ctx on the aio_wq
*/
- flush_workqueue(aio_wq);
+ cancel_work_sync(&ctx->wq.work);
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
{
unsigned nr_events = ctx->max_reqs;
- if (unlikely(ctx->reqs_active))
- BUG();
+ BUG_ON(ctx->reqs_active);
cancel_delayed_work(&ctx->wq);
- flush_workqueue(aio_wq);
+ cancel_work_sync(&ctx->wq.work);
aio_free_ring(ctx);
mmdrop(ctx->mm);
ctx->mm = NULL;
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
-static struct kiocb *FASTCALL(__aio_get_req(struct kioctx *ctx));
+static struct kiocb *__aio_get_req(struct kioctx *ctx);
static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx)
{
struct kiocb *req = NULL;
req->ki_retry = NULL;
req->ki_dtor = NULL;
req->private = NULL;
+ req->ki_iovec = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
+ req->ki_eventfd = ERR_PTR(-EINVAL);
/* Check if the completion queue has enough free space to
* accept an event from this io.
ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
list_add(&req->ki_list, &ctx->active_reqs);
- get_ioctx(ctx);
ctx->reqs_active++;
okay = 1;
}
{
assert_spin_locked(&ctx->ctx_lock);
+ if (!IS_ERR(req->ki_eventfd))
+ fput(req->ki_eventfd);
if (req->ki_dtor)
req->ki_dtor(req);
+ if (req->ki_iovec != &req->ki_inline_vec)
+ kfree(req->ki_iovec);
kmem_cache_free(kiocb_cachep, req);
ctx->reqs_active--;
wake_up(&ctx->wait);
}
-static void aio_fput_routine(void *data)
+static void aio_fput_routine(struct work_struct *data)
{
spin_lock_irq(&fput_lock);
while (likely(!list_empty(&fput_head))) {
assert_spin_locked(&ctx->ctx_lock);
req->ki_users --;
- if (unlikely(req->ki_users < 0))
- BUG();
+ BUG_ON(req->ki_users < 0);
if (likely(req->ki_users))
return 0;
list_del(&req->ki_list); /* remove from active_reqs */
spin_lock_irq(&ctx->ctx_lock);
ret = __aio_put_req(ctx, req);
spin_unlock_irq(&ctx->ctx_lock);
- if (ret)
- put_ioctx(ctx);
return ret;
}
* Note that on UML this *requires* PF_BORROWED_MM to be set, otherwise
* it won't work. Update it accordingly if you change it here
*/
- activate_mm(active_mm, mm);
+ switch_mm(active_mm, mm, tsk);
task_unlock(tsk);
mmdrop(active_mm);
* by the calling kernel thread
* (Note: this routine is intended to be called only
* from a kernel thread context)
- *
- * Comments: Called with ctx->ctx_lock held. This nests
- * task_lock instead ctx_lock.
*/
static void unuse_mm(struct mm_struct *mm)
{
* invoked both for initial i/o submission and
* subsequent retries via the aio_kick_handler.
* Expects to be invoked with iocb->ki_ctx->lock
- * already held. The lock is released and reaquired
+ * already held. The lock is released and reacquired
* as needed during processing.
*
* Calls the iocb retry method (already setup for the
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
- if (iocb->ki_retried++ > 1024*1024) {
- printk("Maximal retry count. Bytes done %Zd\n",
- iocb->ki_nbytes - iocb->ki_left);
- return -EAGAIN;
- }
-
- if (!(iocb->ki_retried & 0xff)) {
- pr_debug("%ld retry: %d of %d\n", iocb->ki_retried,
- iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
- }
-
if (!(retry = iocb->ki_retry)) {
printk("aio_run_iocb: iocb->ki_retry = NULL\n");
return 0;
/*
* Now we are all set to call the retry method in async
- * context. By setting this thread's io_wait context
- * to point to the wait queue entry inside the currently
- * running iocb for the duration of the retry, we ensure
- * that async notification wakeups are queued by the
- * operation instead of blocking waits, and when notified,
- * cause the iocb to be kicked for continuation (through
- * the aio_wake_function callback).
+ * context.
*/
- BUG_ON(current->io_wait != NULL);
- current->io_wait = &iocb->ki_wait;
ret = retry(iocb);
- current->io_wait = NULL;
if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
BUG_ON(!list_empty(&iocb->ki_wait.task_list));
static int __aio_run_iocbs(struct kioctx *ctx)
{
struct kiocb *iocb;
- LIST_HEAD(run_list);
+ struct list_head run_list;
assert_spin_locked(&ctx->ctx_lock);
- list_splice_init(&ctx->run_list, &run_list);
+ list_replace_init(&ctx->run_list, &run_list);
while (!list_empty(&run_list)) {
iocb = list_entry(run_list.next, struct kiocb,
ki_run_list);
*/
iocb->ki_users++; /* grab extra reference */
aio_run_iocb(iocb);
- if (__aio_put_req(ctx, iocb)) /* drop extra ref */
- put_ioctx(ctx);
+ __aio_put_req(ctx, iocb);
}
if (!list_empty(&ctx->run_list))
return 1;
* space.
* Run on aiod's context.
*/
-static void aio_kick_handler(void *data)
+static void aio_kick_handler(struct work_struct *work)
{
- struct kioctx *ctx = data;
+ struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
mm_segment_t oldfs = get_fs();
+ struct mm_struct *mm;
int requeue;
set_fs(USER_DS);
use_mm(ctx->mm);
spin_lock_irq(&ctx->ctx_lock);
requeue =__aio_run_iocbs(ctx);
- unuse_mm(ctx->mm);
+ mm = ctx->mm;
spin_unlock_irq(&ctx->ctx_lock);
+ unuse_mm(mm);
set_fs(oldfs);
/*
* we're in a worker thread already, don't use queue_delayed_work,
*/
if (requeue)
- queue_work(aio_wq, &ctx->wq);
+ queue_delayed_work(aio_wq, &ctx->wq, 0);
}
return 1;
}
+ /*
+ * Check if the user asked us to deliver the result through an
+ * eventfd. The eventfd_signal() function is safe to be called
+ * from IRQ context.
+ */
+ if (!IS_ERR(iocb->ki_eventfd))
+ eventfd_signal(iocb->ki_eventfd, 1);
+
info = &ctx->ring_info;
/* add a completion event to the ring buffer.
kunmap_atomic(ring, KM_IRQ1);
pr_debug("added to ring %p at [%lu]\n", iocb, tail);
-
- pr_debug("%ld retries: %d of %d\n", iocb->ki_retried,
- iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
put_rq:
/* everything turned out well, dispose of the aiocb. */
ret = __aio_put_req(ctx, iocb);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
- if (ret)
- put_ioctx(ctx);
-
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
return ret;
}
ret = 0;
if (to.timed_out) /* Only check after read evt */
break;
- schedule();
+ /* Try to only show up in io wait if there are ops
+ * in flight */
+ if (ctx->reqs_active)
+ io_schedule();
+ else
+ schedule();
if (signal_pending(tsk)) {
ret = -EINTR;
break;
return -EINVAL;
}
-/*
- * aio_p{read,write} are the default ki_retry methods for
- * IO_CMD_P{READ,WRITE}. They maintains kiocb retry state around potentially
- * multiple calls to f_op->aio_read(). They loop around partial progress
- * instead of returning -EIOCBRETRY because they don't have the means to call
- * kick_iocb().
- */
-static ssize_t aio_pread(struct kiocb *iocb)
+static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
{
- struct file *file = iocb->ki_filp;
- struct address_space *mapping = file->f_mapping;
- struct inode *inode = mapping->host;
- ssize_t ret = 0;
-
- do {
- ret = file->f_op->aio_read(iocb, iocb->ki_buf,
- iocb->ki_left, iocb->ki_pos);
- /*
- * Can't just depend on iocb->ki_left to determine
- * whether we are done. This may have been a short read.
- */
- if (ret > 0) {
- iocb->ki_buf += ret;
- iocb->ki_left -= ret;
+ struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg];
+
+ BUG_ON(ret <= 0);
+
+ while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) {
+ ssize_t this = min((ssize_t)iov->iov_len, ret);
+ iov->iov_base += this;
+ iov->iov_len -= this;
+ iocb->ki_left -= this;
+ ret -= this;
+ if (iov->iov_len == 0) {
+ iocb->ki_cur_seg++;
+ iov++;
}
+ }
- /*
- * For pipes and sockets we return once we have some data; for
- * regular files we retry till we complete the entire read or
- * find that we can't read any more data (e.g short reads).
- */
- } while (ret > 0 && iocb->ki_left > 0 &&
- !S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode));
-
- /* This means we must have transferred all that we could */
- /* No need to retry anymore */
- if ((ret == 0) || (iocb->ki_left == 0))
- ret = iocb->ki_nbytes - iocb->ki_left;
-
- return ret;
+ /* the caller should not have done more io than what fit in
+ * the remaining iovecs */
+ BUG_ON(ret > 0 && iocb->ki_left == 0);
}
-/* see aio_pread() */
-static ssize_t aio_pwrite(struct kiocb *iocb)
+static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
+ struct address_space *mapping = file->f_mapping;
+ struct inode *inode = mapping->host;
+ ssize_t (*rw_op)(struct kiocb *, const struct iovec *,
+ unsigned long, loff_t);
ssize_t ret = 0;
+ unsigned short opcode;
+
+ if ((iocb->ki_opcode == IOCB_CMD_PREADV) ||
+ (iocb->ki_opcode == IOCB_CMD_PREAD)) {
+ rw_op = file->f_op->aio_read;
+ opcode = IOCB_CMD_PREADV;
+ } else {
+ rw_op = file->f_op->aio_write;
+ opcode = IOCB_CMD_PWRITEV;
+ }
do {
- ret = file->f_op->aio_write(iocb, iocb->ki_buf,
- iocb->ki_left, iocb->ki_pos);
- if (ret > 0) {
- iocb->ki_buf += ret;
- iocb->ki_left -= ret;
- }
- } while (ret > 0 && iocb->ki_left > 0);
+ ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
+ iocb->ki_nr_segs - iocb->ki_cur_seg,
+ iocb->ki_pos);
+ if (ret > 0)
+ aio_advance_iovec(iocb, ret);
+
+ /* retry all partial writes. retry partial reads as long as its a
+ * regular file. */
+ } while (ret > 0 && iocb->ki_left > 0 &&
+ (opcode == IOCB_CMD_PWRITEV ||
+ (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
+ /* This means we must have transferred all that we could */
+ /* No need to retry anymore */
if ((ret == 0) || (iocb->ki_left == 0))
ret = iocb->ki_nbytes - iocb->ki_left;
return ret;
}
+static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb)
+{
+ ssize_t ret;
+
+ ret = rw_copy_check_uvector(type, (struct iovec __user *)kiocb->ki_buf,
+ kiocb->ki_nbytes, 1,
+ &kiocb->ki_inline_vec, &kiocb->ki_iovec);
+ if (ret < 0)
+ goto out;
+
+ kiocb->ki_nr_segs = kiocb->ki_nbytes;
+ kiocb->ki_cur_seg = 0;
+ /* ki_nbytes/left now reflect bytes instead of segs */
+ kiocb->ki_nbytes = ret;
+ kiocb->ki_left = ret;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static ssize_t aio_setup_single_vector(struct kiocb *kiocb)
+{
+ kiocb->ki_iovec = &kiocb->ki_inline_vec;
+ kiocb->ki_iovec->iov_base = kiocb->ki_buf;
+ kiocb->ki_iovec->iov_len = kiocb->ki_left;
+ kiocb->ki_nr_segs = 1;
+ kiocb->ki_cur_seg = 0;
+ return 0;
+}
+
/*
* aio_setup_iocb:
* Performs the initial checks and aio retry method
ret = security_file_permission(file, MAY_READ);
if (unlikely(ret))
break;
+ ret = aio_setup_single_vector(kiocb);
+ if (ret)
+ break;
ret = -EINVAL;
if (file->f_op->aio_read)
- kiocb->ki_retry = aio_pread;
+ kiocb->ki_retry = aio_rw_vect_retry;
break;
case IOCB_CMD_PWRITE:
ret = -EBADF;
ret = security_file_permission(file, MAY_WRITE);
if (unlikely(ret))
break;
+ ret = aio_setup_single_vector(kiocb);
+ if (ret)
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_write)
+ kiocb->ki_retry = aio_rw_vect_retry;
+ break;
+ case IOCB_CMD_PREADV:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ break;
+ ret = security_file_permission(file, MAY_READ);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_vectored_rw(READ, kiocb);
+ if (ret)
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_read)
+ kiocb->ki_retry = aio_rw_vect_retry;
+ break;
+ case IOCB_CMD_PWRITEV:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ break;
+ ret = security_file_permission(file, MAY_WRITE);
+ if (unlikely(ret))
+ break;
+ ret = aio_setup_vectored_rw(WRITE, kiocb);
+ if (ret)
+ break;
ret = -EINVAL;
if (file->f_op->aio_write)
- kiocb->ki_retry = aio_pwrite;
+ kiocb->ki_retry = aio_rw_vect_retry;
break;
case IOCB_CMD_FDSYNC:
ret = -EINVAL;
* Simply triggers a retry of the operation via kick_iocb.
*
* This callback is specified in the wait queue entry in
- * a kiocb (current->io_wait points to this wait queue
- * entry when an aio operation executes; it is used
- * instead of a synchronous wait when an i/o blocking
- * condition is encountered during aio).
+ * a kiocb.
*
* Note:
* This routine is executed with the wait queue lock held.
ssize_t ret;
/* enforce forwards compatibility on users */
- if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
- iocb->aio_reserved3)) {
+ if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
pr_debug("EINVAL: io_submit: reserve field set\n");
return -EINVAL;
}
fput(file);
return -EAGAIN;
}
-
req->ki_filp = file;
+ if (iocb->aio_flags & IOCB_FLAG_RESFD) {
+ /*
+ * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
+ * instance of the file* now. The file descriptor must be
+ * an eventfd() fd, and will be signaled for each completed
+ * event using the eventfd_signal() function.
+ */
+ req->ki_eventfd = eventfd_fget((int) iocb->aio_resfd);
+ if (unlikely(IS_ERR(req->ki_eventfd))) {
+ ret = PTR_ERR(req->ki_eventfd);
+ goto out_put_req;
+ }
+ }
+
ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
dprintk("EFAULT: aio_key\n");
req->ki_opcode = iocb->aio_lio_opcode;
init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
INIT_LIST_HEAD(&req->ki_wait.task_list);
- req->ki_retried = 0;
ret = aio_setup_iocb(req);