splice: implement pipe to pipe splicing
authorMiklos Szeredi <miklos@szeredi.hu>
Thu, 7 May 2009 13:37:35 +0000 (15:37 +0200)
committerJens Axboe <jens.axboe@oracle.com>
Mon, 11 May 2009 12:13:09 +0000 (14:13 +0200)
Allow splice(2) to work when both the input and the output is a pipe.

Based on the impementation of the tee(2) syscall, but instead of
duplicating the buffer references move the buffers from the input pipe
to the output pipe.

Moving the whole buffer only succeeds if the full length of the buffer
is spliced.  Otherwise duplicate the buffer, just like tee(2), set the
length of the output buffer and advance the offset on the input
buffer.

Since splice is operating on two pipes, special care needs to be taken
with locking to prevent AN ABBA deadlock.  Again this is done
similarly to the tee(2) syscall, first preparing the input and output
pipes so there's data to consume and space for that data, and then
doing the move operation while holding both locks.

If other processes are doing I/O on the same pipes parallel to the
splice, then by the time both inodes are locked there might be no
buffers left to move, or no space to move them to.  In this case retry
the whole operation, including the preparation phase.  This could lead
to starvation, but I'm not sure if that's serious enough to worry
about.

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
fs/splice.c

index 666953d..e405cf5 100644 (file)
@@ -1112,6 +1112,9 @@ long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
        return ret;
 }
 
+static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
+                              struct pipe_inode_info *opipe,
+                              size_t len, unsigned int flags);
 /*
  * After the inode slimming patch, i_pipe/i_bdev/i_cdev share the same
  * location, so checking ->i_pipe is not enough to verify that this is a
@@ -1132,12 +1135,32 @@ static long do_splice(struct file *in, loff_t __user *off_in,
                      struct file *out, loff_t __user *off_out,
                      size_t len, unsigned int flags)
 {
-       struct pipe_inode_info *pipe;
+       struct pipe_inode_info *ipipe;
+       struct pipe_inode_info *opipe;
        loff_t offset, *off;
        long ret;
 
-       pipe = pipe_info(in->f_path.dentry->d_inode);
-       if (pipe) {
+       ipipe = pipe_info(in->f_path.dentry->d_inode);
+       opipe = pipe_info(out->f_path.dentry->d_inode);
+
+       if (ipipe && opipe) {
+               if (off_in || off_out)
+                       return -ESPIPE;
+
+               if (!(in->f_mode & FMODE_READ))
+                       return -EBADF;
+
+               if (!(out->f_mode & FMODE_WRITE))
+                       return -EBADF;
+
+               /* Splicing to self would be fun, but... */
+               if (ipipe == opipe)
+                       return -EINVAL;
+
+               return splice_pipe_to_pipe(ipipe, opipe, len, flags);
+       }
+
+       if (ipipe) {
                if (off_in)
                        return -ESPIPE;
                if (off_out) {
@@ -1149,7 +1172,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
                } else
                        off = &out->f_pos;
 
-               ret = do_splice_from(pipe, out, off, len, flags);
+               ret = do_splice_from(ipipe, out, off, len, flags);
 
                if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
                        ret = -EFAULT;
@@ -1157,8 +1180,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
                return ret;
        }
 
-       pipe = pipe_info(out->f_path.dentry->d_inode);
-       if (pipe) {
+       if (opipe) {
                if (off_out)
                        return -ESPIPE;
                if (off_in) {
@@ -1170,7 +1192,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
                } else
                        off = &in->f_pos;
 
-               ret = do_splice_to(in, off, pipe, len, flags);
+               ret = do_splice_to(in, off, opipe, len, flags);
 
                if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
                        ret = -EFAULT;
@@ -1511,7 +1533,7 @@ SYSCALL_DEFINE6(splice, int, fd_in, loff_t __user *, off_in,
  * Make sure there's data to read. Wait for input if we can, otherwise
  * return an appropriate error.
  */
-static int link_ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
+static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 {
        int ret;
 
@@ -1549,7 +1571,7 @@ static int link_ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
  * Make sure there's writeable room. Wait for room if we can, otherwise
  * return an appropriate error.
  */
-static int link_opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
+static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 {
        int ret;
 
@@ -1587,6 +1609,124 @@ static int link_opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 }
 
 /*
+ * Splice contents of ipipe to opipe.
+ */
+static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
+                              struct pipe_inode_info *opipe,
+                              size_t len, unsigned int flags)
+{
+       struct pipe_buffer *ibuf, *obuf;
+       int ret = 0, nbuf;
+       bool input_wakeup = false;
+
+
+retry:
+       ret = ipipe_prep(ipipe, flags);
+       if (ret)
+               return ret;
+
+       ret = opipe_prep(opipe, flags);
+       if (ret)
+               return ret;
+
+       /*
+        * Potential ABBA deadlock, work around it by ordering lock
+        * grabbing by pipe info address. Otherwise two different processes
+        * could deadlock (one doing tee from A -> B, the other from B -> A).
+        */
+       pipe_double_lock(ipipe, opipe);
+
+       do {
+               if (!opipe->readers) {
+                       send_sig(SIGPIPE, current, 0);
+                       if (!ret)
+                               ret = -EPIPE;
+                       break;
+               }
+
+               if (!ipipe->nrbufs && !ipipe->writers)
+                       break;
+
+               /*
+                * Cannot make any progress, because either the input
+                * pipe is empty or the output pipe is full.
+                */
+               if (!ipipe->nrbufs || opipe->nrbufs >= PIPE_BUFFERS) {
+                       /* Already processed some buffers, break */
+                       if (ret)
+                               break;
+
+                       if (flags & SPLICE_F_NONBLOCK) {
+                               ret = -EAGAIN;
+                               break;
+                       }
+
+                       /*
+                        * We raced with another reader/writer and haven't
+                        * managed to process any buffers.  A zero return
+                        * value means EOF, so retry instead.
+                        */
+                       pipe_unlock(ipipe);
+                       pipe_unlock(opipe);
+                       goto retry;
+               }
+
+               ibuf = ipipe->bufs + ipipe->curbuf;
+               nbuf = (opipe->curbuf + opipe->nrbufs) % PIPE_BUFFERS;
+               obuf = opipe->bufs + nbuf;
+
+               if (len >= ibuf->len) {
+                       /*
+                        * Simply move the whole buffer from ipipe to opipe
+                        */
+                       *obuf = *ibuf;
+                       ibuf->ops = NULL;
+                       opipe->nrbufs++;
+                       ipipe->curbuf = (ipipe->curbuf + 1) % PIPE_BUFFERS;
+                       ipipe->nrbufs--;
+                       input_wakeup = true;
+               } else {
+                       /*
+                        * Get a reference to this pipe buffer,
+                        * so we can copy the contents over.
+                        */
+                       ibuf->ops->get(ipipe, ibuf);
+                       *obuf = *ibuf;
+
+                       /*
+                        * Don't inherit the gift flag, we need to
+                        * prevent multiple steals of this page.
+                        */
+                       obuf->flags &= ~PIPE_BUF_FLAG_GIFT;
+
+                       obuf->len = len;
+                       opipe->nrbufs++;
+                       ibuf->offset += obuf->len;
+                       ibuf->len -= obuf->len;
+               }
+               ret += obuf->len;
+               len -= obuf->len;
+       } while (len);
+
+       pipe_unlock(ipipe);
+       pipe_unlock(opipe);
+
+       /*
+        * If we put data in the output pipe, wakeup any potential readers.
+        */
+       if (ret > 0) {
+               smp_mb();
+               if (waitqueue_active(&opipe->wait))
+                       wake_up_interruptible(&opipe->wait);
+               kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
+       }
+       if (input_wakeup)
+               wakeup_pipe_writers(ipipe);
+
+       return ret;
+}
+
+/*
  * Link contents of ipipe to opipe.
  */
 static int link_pipe(struct pipe_inode_info *ipipe,
@@ -1690,9 +1830,9 @@ static long do_tee(struct file *in, struct file *out, size_t len,
                 * Keep going, unless we encounter an error. The ipipe/opipe
                 * ordering doesn't really matter.
                 */
-               ret = link_ipipe_prep(ipipe, flags);
+               ret = ipipe_prep(ipipe, flags);
                if (!ret) {
-                       ret = link_opipe_prep(opipe, flags);
+                       ret = opipe_prep(opipe, flags);
                        if (!ret)
                                ret = link_pipe(ipipe, opipe, len, flags);
                }