fuse: support splice() reading from fuse device
authorMiklos Szeredi <mszeredi@suse.cz>
Tue, 25 May 2010 13:06:07 +0000 (15:06 +0200)
committerMiklos Szeredi <mszeredi@suse.cz>
Tue, 25 May 2010 13:06:07 +0000 (15:06 +0200)
Allow userspace filesystem implementation to use splice() to read from
the fuse device.

The userspace filesystem can now transfer data coming from a WRITE
request to an arbitrary file descriptor (regular file, block device or
socket) without having to go through a userspace buffer.

The semantics of using splice() to read messages are:

 1)  with a single splice() call move the whole message from the fuse
     device to a temporary pipe
 2)  read the header from the pipe and determine the message type
 3a) if message is a WRITE then splice data from pipe to destination
 3b) else read rest of message to userspace buffer

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
fs/fuse/dev.c

index b070d3a..4413f5e 100644 (file)
@@ -515,13 +515,12 @@ struct fuse_copy_state {
 };
 
 static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc,
-                          int write, struct fuse_req *req,
+                          int write,
                           const struct iovec *iov, unsigned long nr_segs)
 {
        memset(cs, 0, sizeof(*cs));
        cs->fc = fc;
        cs->write = write;
-       cs->req = req;
        cs->iov = iov;
        cs->nr_segs = nr_segs;
 }
@@ -532,8 +531,12 @@ static void fuse_copy_finish(struct fuse_copy_state *cs)
        if (cs->currbuf) {
                struct pipe_buffer *buf = cs->currbuf;
 
-               buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
-
+               if (!cs->write) {
+                       buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
+               } else {
+                       kunmap_atomic(cs->mapaddr, KM_USER0);
+                       buf->len = PAGE_SIZE - cs->len;
+               }
                cs->currbuf = NULL;
                cs->mapaddr = NULL;
        } else if (cs->mapaddr) {
@@ -561,17 +564,39 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
        if (cs->pipebufs) {
                struct pipe_buffer *buf = cs->pipebufs;
 
-               err = buf->ops->confirm(cs->pipe, buf);
-               if (err)
-                       return err;
+               if (!cs->write) {
+                       err = buf->ops->confirm(cs->pipe, buf);
+                       if (err)
+                               return err;
+
+                       BUG_ON(!cs->nr_segs);
+                       cs->currbuf = buf;
+                       cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
+                       cs->len = buf->len;
+                       cs->buf = cs->mapaddr + buf->offset;
+                       cs->pipebufs++;
+                       cs->nr_segs--;
+               } else {
+                       struct page *page;
 
-               BUG_ON(!cs->nr_segs);
-               cs->currbuf = buf;
-               cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
-               cs->len = buf->len;
-               cs->buf = cs->mapaddr + buf->offset;
-               cs->pipebufs++;
-               cs->nr_segs--;
+                       if (cs->nr_segs == cs->pipe->buffers)
+                               return -EIO;
+
+                       page = alloc_page(GFP_HIGHUSER);
+                       if (!page)
+                               return -ENOMEM;
+
+                       buf->page = page;
+                       buf->offset = 0;
+                       buf->len = 0;
+
+                       cs->currbuf = buf;
+                       cs->mapaddr = kmap_atomic(page, KM_USER0);
+                       cs->buf = cs->mapaddr;
+                       cs->len = PAGE_SIZE;
+                       cs->pipebufs++;
+                       cs->nr_segs++;
+               }
        } else {
                if (!cs->seglen) {
                        BUG_ON(!cs->nr_segs);
@@ -731,6 +756,30 @@ out_fallback:
        return 1;
 }
 
+static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
+                        unsigned offset, unsigned count)
+{
+       struct pipe_buffer *buf;
+
+       if (cs->nr_segs == cs->pipe->buffers)
+               return -EIO;
+
+       unlock_request(cs->fc, cs->req);
+       fuse_copy_finish(cs);
+
+       buf = cs->pipebufs;
+       page_cache_get(page);
+       buf->page = page;
+       buf->offset = offset;
+       buf->len = count;
+
+       cs->pipebufs++;
+       cs->nr_segs++;
+       cs->len = 0;
+
+       return 0;
+}
+
 /*
  * Copy a page in the request to/from the userspace buffer.  Must be
  * done atomically
@@ -747,7 +796,9 @@ static int fuse_copy_page(struct fuse_copy_state *cs, struct page **pagep,
                kunmap_atomic(mapaddr, KM_USER1);
        }
        while (count) {
-               if (!cs->len) {
+               if (cs->write && cs->pipebufs && page) {
+                       return fuse_ref_page(cs, page, offset, count);
+               } else if (!cs->len) {
                        if (cs->move_pages && page &&
                            offset == 0 && count == PAGE_SIZE) {
                                err = fuse_try_move_page(cs, pagep);
@@ -862,11 +913,10 @@ __acquires(&fc->lock)
  *
  * Called with fc->lock held, releases it
  */
-static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_req *req,
-                              const struct iovec *iov, unsigned long nr_segs)
+static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_copy_state *cs,
+                              size_t nbytes, struct fuse_req *req)
 __releases(&fc->lock)
 {
-       struct fuse_copy_state cs;
        struct fuse_in_header ih;
        struct fuse_interrupt_in arg;
        unsigned reqsize = sizeof(ih) + sizeof(arg);
@@ -882,14 +932,13 @@ __releases(&fc->lock)
        arg.unique = req->in.h.unique;
 
        spin_unlock(&fc->lock);
-       if (iov_length(iov, nr_segs) < reqsize)
+       if (nbytes < reqsize)
                return -EINVAL;
 
-       fuse_copy_init(&cs, fc, 1, NULL, iov, nr_segs);
-       err = fuse_copy_one(&cs, &ih, sizeof(ih));
+       err = fuse_copy_one(cs, &ih, sizeof(ih));
        if (!err)
-               err = fuse_copy_one(&cs, &arg, sizeof(arg));
-       fuse_copy_finish(&cs);
+               err = fuse_copy_one(cs, &arg, sizeof(arg));
+       fuse_copy_finish(cs);
 
        return err ? err : reqsize;
 }
@@ -903,18 +952,13 @@ __releases(&fc->lock)
  * request_end().  Otherwise add it to the processing list, and set
  * the 'sent' flag.
  */
-static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
-                             unsigned long nr_segs, loff_t pos)
+static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+                               struct fuse_copy_state *cs, size_t nbytes)
 {
        int err;
        struct fuse_req *req;
        struct fuse_in *in;
-       struct fuse_copy_state cs;
        unsigned reqsize;
-       struct file *file = iocb->ki_filp;
-       struct fuse_conn *fc = fuse_get_conn(file);
-       if (!fc)
-               return -EPERM;
 
  restart:
        spin_lock(&fc->lock);
@@ -934,7 +978,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
        if (!list_empty(&fc->interrupts)) {
                req = list_entry(fc->interrupts.next, struct fuse_req,
                                 intr_entry);
-               return fuse_read_interrupt(fc, req, iov, nr_segs);
+               return fuse_read_interrupt(fc, cs, nbytes, req);
        }
 
        req = list_entry(fc->pending.next, struct fuse_req, list);
@@ -944,7 +988,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
        in = &req->in;
        reqsize = in->h.len;
        /* If request is too large, reply with an error and restart the read */
-       if (iov_length(iov, nr_segs) < reqsize) {
+       if (nbytes < reqsize) {
                req->out.h.error = -EIO;
                /* SETXATTR is special, since it may contain too large data */
                if (in->h.opcode == FUSE_SETXATTR)
@@ -953,12 +997,12 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
                goto restart;
        }
        spin_unlock(&fc->lock);
-       fuse_copy_init(&cs, fc, 1, req, iov, nr_segs);
-       err = fuse_copy_one(&cs, &in->h, sizeof(in->h));
+       cs->req = req;
+       err = fuse_copy_one(cs, &in->h, sizeof(in->h));
        if (!err)
-               err = fuse_copy_args(&cs, in->numargs, in->argpages,
+               err = fuse_copy_args(cs, in->numargs, in->argpages,
                                     (struct fuse_arg *) in->args, 0);
-       fuse_copy_finish(&cs);
+       fuse_copy_finish(cs);
        spin_lock(&fc->lock);
        req->locked = 0;
        if (req->aborted) {
@@ -986,6 +1030,110 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
        return err;
 }
 
+static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
+                             unsigned long nr_segs, loff_t pos)
+{
+       struct fuse_copy_state cs;
+       struct file *file = iocb->ki_filp;
+       struct fuse_conn *fc = fuse_get_conn(file);
+       if (!fc)
+               return -EPERM;
+
+       fuse_copy_init(&cs, fc, 1, iov, nr_segs);
+
+       return fuse_dev_do_read(fc, file, &cs, iov_length(iov, nr_segs));
+}
+
+static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
+                                  struct pipe_buffer *buf)
+{
+       return 1;
+}
+
+static const struct pipe_buf_operations fuse_dev_pipe_buf_ops = {
+       .can_merge = 0,
+       .map = generic_pipe_buf_map,
+       .unmap = generic_pipe_buf_unmap,
+       .confirm = generic_pipe_buf_confirm,
+       .release = generic_pipe_buf_release,
+       .steal = fuse_dev_pipe_buf_steal,
+       .get = generic_pipe_buf_get,
+};
+
+static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
+                                   struct pipe_inode_info *pipe,
+                                   size_t len, unsigned int flags)
+{
+       int ret;
+       int page_nr = 0;
+       int do_wakeup = 0;
+       struct pipe_buffer *bufs;
+       struct fuse_copy_state cs;
+       struct fuse_conn *fc = fuse_get_conn(in);
+       if (!fc)
+               return -EPERM;
+
+       bufs = kmalloc(pipe->buffers * sizeof (struct pipe_buffer), GFP_KERNEL);
+       if (!bufs)
+               return -ENOMEM;
+
+       fuse_copy_init(&cs, fc, 1, NULL, 0);
+       cs.pipebufs = bufs;
+       cs.pipe = pipe;
+       ret = fuse_dev_do_read(fc, in, &cs, len);
+       if (ret < 0)
+               goto out;
+
+       ret = 0;
+       pipe_lock(pipe);
+
+       if (!pipe->readers) {
+               send_sig(SIGPIPE, current, 0);
+               if (!ret)
+                       ret = -EPIPE;
+               goto out_unlock;
+       }
+
+       if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
+               ret = -EIO;
+               goto out_unlock;
+       }
+
+       while (page_nr < cs.nr_segs) {
+               int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
+               struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+               buf->page = bufs[page_nr].page;
+               buf->offset = bufs[page_nr].offset;
+               buf->len = bufs[page_nr].len;
+               buf->ops = &fuse_dev_pipe_buf_ops;
+
+               pipe->nrbufs++;
+               page_nr++;
+               ret += buf->len;
+
+               if (pipe->inode)
+                       do_wakeup = 1;
+       }
+
+out_unlock:
+       pipe_unlock(pipe);
+
+       if (do_wakeup) {
+               smp_mb();
+               if (waitqueue_active(&pipe->wait))
+                       wake_up_interruptible(&pipe->wait);
+               kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+       }
+
+out:
+       for (; page_nr < cs.nr_segs; page_nr++)
+               page_cache_release(bufs[page_nr].page);
+
+       kfree(bufs);
+       return ret;
+}
+
 static int fuse_notify_poll(struct fuse_conn *fc, unsigned int size,
                            struct fuse_copy_state *cs)
 {
@@ -1246,7 +1394,7 @@ static ssize_t fuse_dev_write(struct kiocb *iocb, const struct iovec *iov,
        if (!fc)
                return -EPERM;
 
-       fuse_copy_init(&cs, fc, 0, NULL, iov, nr_segs);
+       fuse_copy_init(&cs, fc, 0, iov, nr_segs);
 
        return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs));
 }
@@ -1311,11 +1459,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
        }
        pipe_unlock(pipe);
 
-       memset(&cs, 0, sizeof(struct fuse_copy_state));
-       cs.fc = fc;
-       cs.write = 0;
+       fuse_copy_init(&cs, fc, 0, NULL, nbuf);
        cs.pipebufs = bufs;
-       cs.nr_segs = nbuf;
        cs.pipe = pipe;
 
        if (flags & SPLICE_F_MOVE)
@@ -1473,6 +1618,7 @@ const struct file_operations fuse_dev_operations = {
        .llseek         = no_llseek,
        .read           = do_sync_read,
        .aio_read       = fuse_dev_read,
+       .splice_read    = fuse_dev_splice_read,
        .write          = do_sync_write,
        .aio_write      = fuse_dev_write,
        .splice_write   = fuse_dev_splice_write,