splice: relay support
authorTom Zanussi <zanussi@us.ibm.com>
Mon, 4 Jun 2007 07:12:05 +0000 (09:12 +0200)
committerJens Axboe <jens.axboe@oracle.com>
Tue, 10 Jul 2007 06:04:14 +0000 (08:04 +0200)
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
kernel/relay.c

index 95db8c7..d1d1920 100644 (file)
@@ -21,6 +21,7 @@
 #include <linux/vmalloc.h>
 #include <linux/mm.h>
 #include <linux/cpu.h>
+#include <linux/pipe_fs_i.h>
 
 /* list of open channels, for cpu hotplug */
 static DEFINE_MUTEX(relay_channels_mutex);
@@ -121,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
                buf->page_array[i] = alloc_page(GFP_KERNEL);
                if (unlikely(!buf->page_array[i]))
                        goto depopulate;
+               set_page_private(buf->page_array[i], (unsigned long)buf);
        }
        mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
        if (!mem)
@@ -970,43 +972,6 @@ static int subbuf_read_actor(size_t read_start,
        return ret;
 }
 
-/*
- *     subbuf_send_actor - send up to one subbuf's worth of data
- */
-static int subbuf_send_actor(size_t read_start,
-                            struct rchan_buf *buf,
-                            size_t avail,
-                            read_descriptor_t *desc,
-                            read_actor_t actor)
-{
-       unsigned long pidx, poff;
-       unsigned int subbuf_pages;
-       int ret = 0;
-
-       subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
-       pidx = (read_start / PAGE_SIZE) % subbuf_pages;
-       poff = read_start & ~PAGE_MASK;
-       while (avail) {
-               struct page *p = buf->page_array[pidx];
-               unsigned int len;
-
-               len = PAGE_SIZE - poff;
-               if (len > avail)
-                       len = avail;
-
-               len = actor(desc, p, poff, len);
-               if (desc->error)
-                       break;
-
-               avail -= len;
-               ret += len;
-               poff = 0;
-               pidx = (pidx + 1) % subbuf_pages;
-       }
-
-       return ret;
-}
-
 typedef int (*subbuf_actor_t) (size_t read_start,
                               struct rchan_buf *buf,
                               size_t avail,
@@ -1067,19 +1032,195 @@ static ssize_t relay_file_read(struct file *filp,
                                       NULL, &desc);
 }
 
-static ssize_t relay_file_sendfile(struct file *filp,
-                                  loff_t *ppos,
-                                  size_t count,
-                                  read_actor_t actor,
-                                  void *target)
+static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
+                                  struct pipe_buffer *buf)
 {
-       read_descriptor_t desc;
-       desc.written = 0;
-       desc.count = count;
-       desc.arg.data = target;
-       desc.error = 0;
-       return relay_file_read_subbufs(filp, ppos, subbuf_send_actor,
-                                      actor, &desc);
+       struct rchan_buf *rbuf;
+
+       rbuf = (struct rchan_buf *)page_private(buf->page);
+
+       rbuf->bytes_consumed += PAGE_SIZE;
+
+       if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
+               relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
+               rbuf->bytes_consumed = 0;
+       }
+}
+
+static struct pipe_buf_operations relay_pipe_buf_ops = {
+       .can_merge = 0,
+       .map = generic_pipe_buf_map,
+       .unmap = generic_pipe_buf_unmap,
+       .pin = generic_pipe_buf_pin,
+       .release = relay_pipe_buf_release,
+       .steal = generic_pipe_buf_steal,
+       .get = generic_pipe_buf_get,
+};
+
+/**
+ *     subbuf_splice_actor - splice up to one subbuf's worth of data
+ */
+static int subbuf_splice_actor(struct file *in,
+                              loff_t *ppos,
+                              struct pipe_inode_info *pipe,
+                              size_t len,
+                              unsigned int flags,
+                              int *nonpad_ret)
+{
+       unsigned int pidx, poff;
+       unsigned int subbuf_pages;
+       int ret = 0;
+       int do_wakeup = 0;
+       struct rchan_buf *rbuf = in->private_data;
+       unsigned int subbuf_size = rbuf->chan->subbuf_size;
+       size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
+       size_t avail = subbuf_size - read_start % subbuf_size;
+       size_t read_subbuf = read_start / subbuf_size;
+       size_t padding = rbuf->padding[read_subbuf];
+       size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
+
+       if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
+               return 0;
+
+       if (len > avail)
+               len = avail;
+
+       if (pipe->inode)
+               mutex_lock(&pipe->inode->i_mutex);
+
+       subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
+       pidx = (read_start / PAGE_SIZE) % subbuf_pages;
+       poff = read_start & ~PAGE_MASK;
+
+       for (;;) {
+               unsigned int this_len;
+               unsigned int this_end;
+               int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
+               struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+               if (!pipe->readers) {
+                       send_sig(SIGPIPE, current, 0);
+                       if (!ret)
+                               ret = -EPIPE;
+                       break;
+               }
+
+               if (pipe->nrbufs < PIPE_BUFFERS) {
+                       this_len = PAGE_SIZE - poff;
+                       if (this_len > avail)
+                               this_len = avail;
+
+                       buf->page = rbuf->page_array[pidx];
+                       buf->offset = poff;
+                       this_end = read_start + ret + this_len;
+                       if (this_end > nonpad_end) {
+                               if (read_start + ret >= nonpad_end)
+                                       buf->len = 0;
+                               else
+                                       buf->len = nonpad_end - (read_start + ret);
+                       } else
+                               buf->len = this_len;
+
+                       *nonpad_ret += buf->len;
+
+                       buf->ops = &relay_pipe_buf_ops;
+                       pipe->nrbufs++;
+
+                       avail -= this_len;
+                       ret += this_len;
+                       poff = 0;
+                       pidx = (pidx + 1) % subbuf_pages;
+
+                       if (pipe->inode)
+                               do_wakeup = 1;
+
+                       if (!avail)
+                               break;
+
+                       if (pipe->nrbufs < PIPE_BUFFERS)
+                               continue;
+
+                       break;
+               }
+
+               if (flags & SPLICE_F_NONBLOCK) {
+                       if (!ret)
+                               ret = -EAGAIN;
+                       break;
+               }
+
+               if (signal_pending(current)) {
+                       if (!ret)
+                               ret = -ERESTARTSYS;
+                       break;
+               }
+
+               if (do_wakeup) {
+                       smp_mb();
+                       if (waitqueue_active(&pipe->wait))
+                               wake_up_interruptible_sync(&pipe->wait);
+                       kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+                       do_wakeup = 0;
+               }
+
+               pipe->waiting_writers++;
+               pipe_wait(pipe);
+               pipe->waiting_writers--;
+       }
+
+       if (pipe->inode)
+               mutex_unlock(&pipe->inode->i_mutex);
+
+       if (do_wakeup) {
+               smp_mb();
+               if (waitqueue_active(&pipe->wait))
+                       wake_up_interruptible(&pipe->wait);
+               kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+       }
+
+       return ret;
+}
+
+static ssize_t relay_file_splice_read(struct file *in,
+                                     loff_t *ppos,
+                                     struct pipe_inode_info *pipe,
+                                     size_t len,
+                                     unsigned int flags)
+{
+       ssize_t spliced;
+       int ret;
+       int nonpad_ret = 0;
+
+       ret = 0;
+       spliced = 0;
+
+       while (len) {
+               ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
+               if (ret < 0)
+                       break;
+               else if (!ret) {
+                       break;
+                       if (spliced)
+                               break;
+                       if (flags & SPLICE_F_NONBLOCK) {
+                               ret = -EAGAIN;
+                               break;
+                       }
+               }
+
+               *ppos += ret;
+               if (ret > len)
+                       len = 0;
+               else
+                       len -= ret;
+               spliced += nonpad_ret;
+               nonpad_ret = 0;
+       }
+
+       if (spliced)
+               return spliced;
+
+       return ret;
 }
 
 const struct file_operations relay_file_operations = {
@@ -1089,7 +1230,7 @@ const struct file_operations relay_file_operations = {
        .read           = relay_file_read,
        .llseek         = no_llseek,
        .release        = relay_file_release,
-       .sendfile       = relay_file_sendfile,
+       .splice_read    = relay_file_splice_read,
 };
 EXPORT_SYMBOL_GPL(relay_file_operations);