relay: use splice_to_pipe() instead of open-coding the pipe loop
[safe/jmp/linux-2.6] / kernel / relay.c
index 33345e7..dd3bc5b 100644 (file)
@@ -7,6 +7,8 @@
  * Copyright (C) 1999-2005 - Karim Yaghmour (karim@opersys.com)
  *
  * Moved to kernel/relay.c by Paul Mundt, 2006.
+ * November 2006 - CPU hotplug support by Mathieu Desnoyers
+ *     (mathieu.desnoyers@polymtl.ca)
  *
  * This file is released under the GPL.
  */
 #include <linux/relay.h>
 #include <linux/vmalloc.h>
 #include <linux/mm.h>
+#include <linux/cpu.h>
+#include <linux/splice.h>
+
+/* list of open channels, for cpu hotplug */
+static DEFINE_MUTEX(relay_channels_mutex);
+static LIST_HEAD(relay_channels);
 
 /*
  * close() vm_op implementation for relay file mapping.
@@ -95,7 +103,7 @@ int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
  *     @buf: the buffer struct
  *     @size: total size of the buffer
  *
- *     Returns a pointer to the resulting buffer, NULL if unsuccessful. The
+ *     Returns a pointer to the resulting buffer, %NULL if unsuccessful. The
  *     passed in size will get page aligned, if it isn't already.
  */
 static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
@@ -114,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
                buf->page_array[i] = alloc_page(GFP_KERNEL);
                if (unlikely(!buf->page_array[i]))
                        goto depopulate;
+               set_page_private(buf->page_array[i], (unsigned long)buf);
        }
        mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
        if (!mem)
@@ -132,14 +141,13 @@ depopulate:
 
 /**
  *     relay_create_buf - allocate and initialize a channel buffer
- *     @alloc_size: size of the buffer to allocate
- *     @n_subbufs: number of sub-buffers in the channel
+ *     @chan: the relay channel
  *
- *     Returns channel buffer if successful, NULL otherwise
+ *     Returns channel buffer if successful, %NULL otherwise.
  */
 struct rchan_buf *relay_create_buf(struct rchan *chan)
 {
-       struct rchan_buf *buf = kcalloc(1, sizeof(struct rchan_buf), GFP_KERNEL);
+       struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
        if (!buf)
                return NULL;
 
@@ -163,6 +171,7 @@ free_buf:
 
 /**
  *     relay_destroy_channel - free the channel struct
+ *     @kref: target kernel reference that contains the relay channel
  *
  *     Should only be called from kref_put().
  */
@@ -187,6 +196,7 @@ void relay_destroy_buf(struct rchan_buf *buf)
                        __free_page(buf->page_array[i]);
                kfree(buf->page_array);
        }
+       chan->buf[buf->cpu] = NULL;
        kfree(buf->padding);
        kfree(buf);
        kref_put(&chan->kref, relay_destroy_channel);
@@ -194,6 +204,7 @@ void relay_destroy_buf(struct rchan_buf *buf)
 
 /**
  *     relay_remove_buf - remove a channel buffer
+ *     @kref: target kernel reference that contains the relay buffer
  *
  *     Removes the file from the fileystem, which also frees the
  *     rchan_buf_struct and the channel buffer.  Should only be called from
@@ -301,15 +312,13 @@ static struct rchan_callbacks default_channel_callbacks = {
 
 /**
  *     wakeup_readers - wake up readers waiting on a channel
- *     @private: the channel buffer
+ *     @data: contains the channel buffer
  *
- *     This is the work function used to defer reader waking.  The
- *     reason waking is deferred is that calling directly from write
- *     causes problems if you're writing from say the scheduler.
+ *     This is the timer function used to defer reader waking.
  */
-static void wakeup_readers(void *private)
+static void wakeup_readers(unsigned long data)
 {
-       struct rchan_buf *buf = private;
+       struct rchan_buf *buf = (struct rchan_buf *)data;
        wake_up_interruptible(&buf->read_wait);
 }
 
@@ -318,20 +327,18 @@ static void wakeup_readers(void *private)
  *     @buf: the channel buffer
  *     @init: 1 if this is a first-time initialization
  *
- *     See relay_reset for description of effect.
+ *     See relay_reset() for description of effect.
  */
-static inline void __relay_reset(struct rchan_buf *buf, unsigned int init)
+static void __relay_reset(struct rchan_buf *buf, unsigned int init)
 {
        size_t i;
 
        if (init) {
                init_waitqueue_head(&buf->read_wait);
                kref_init(&buf->kref);
-               INIT_WORK(&buf->wake_readers, NULL, NULL);
-       } else {
-               cancel_delayed_work(&buf->wake_readers);
-               flush_scheduled_work();
-       }
+               setup_timer(&buf->timer, wakeup_readers, (unsigned long)buf);
+       } else
+               del_timer_sync(&buf->timer);
 
        buf->subbufs_produced = 0;
        buf->subbufs_consumed = 0;
@@ -354,57 +361,75 @@ static inline void __relay_reset(struct rchan_buf *buf, unsigned int init)
  *     and restarting the channel in its initial state.  The buffers
  *     are not freed, so any mappings are still in effect.
  *
- *     NOTE: Care should be taken that the channel isn't actually
+ *     NOTE. Care should be taken that the channel isn't actually
  *     being used by anything when this call is made.
  */
 void relay_reset(struct rchan *chan)
 {
        unsigned int i;
-       struct rchan_buf *prev = NULL;
 
        if (!chan)
                return;
 
-       for (i = 0; i < NR_CPUS; i++) {
-               if (!chan->buf[i] || chan->buf[i] == prev)
-                       break;
-               __relay_reset(chan->buf[i], 0);
-               prev = chan->buf[i];
+       if (chan->is_global && chan->buf[0]) {
+               __relay_reset(chan->buf[0], 0);
+               return;
        }
+
+       mutex_lock(&relay_channels_mutex);
+       for_each_online_cpu(i)
+               if (chan->buf[i])
+                       __relay_reset(chan->buf[i], 0);
+       mutex_unlock(&relay_channels_mutex);
 }
 EXPORT_SYMBOL_GPL(relay_reset);
 
-/**
+/*
  *     relay_open_buf - create a new relay channel buffer
  *
- *     Internal - used by relay_open().
+ *     used by relay_open() and CPU hotplug.
  */
-static struct rchan_buf *relay_open_buf(struct rchan *chan,
-                                       const char *filename,
-                                       struct dentry *parent,
-                                       int *is_global)
+static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
 {
-       struct rchan_buf *buf;
+       struct rchan_buf *buf = NULL;
        struct dentry *dentry;
+       char *tmpname;
 
-       if (*is_global)
+       if (chan->is_global)
                return chan->buf[0];
 
+       tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
+       if (!tmpname)
+               goto end;
+       snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
+
        buf = relay_create_buf(chan);
        if (!buf)
-               return NULL;
+               goto free_name;
+
+       buf->cpu = cpu;
+       __relay_reset(buf, 1);
 
        /* Create file in fs */
-       dentry = chan->cb->create_buf_file(filename, parent, S_IRUSR,
-                                          buf, is_global);
-       if (!dentry) {
-               relay_destroy_buf(buf);
-               return NULL;
-       }
+       dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
+                                          buf, &chan->is_global);
+       if (!dentry)
+               goto free_buf;
 
        buf->dentry = dentry;
-       __relay_reset(buf, 1);
 
+       if(chan->is_global) {
+               chan->buf[0] = buf;
+               buf->cpu = 0;
+       }
+
+       goto free_name;
+
+free_buf:
+       relay_destroy_buf(buf);
+free_name:
+       kfree(tmpname);
+end:
        return buf;
 }
 
@@ -416,15 +441,14 @@ static struct rchan_buf *relay_open_buf(struct rchan *chan,
  *     The channel buffer and channel buffer data structure are then freed
  *     automatically when the last reference is given up.
  */
-static inline void relay_close_buf(struct rchan_buf *buf)
+static void relay_close_buf(struct rchan_buf *buf)
 {
        buf->finalized = 1;
-       cancel_delayed_work(&buf->wake_readers);
-       flush_scheduled_work();
+       del_timer_sync(&buf->timer);
        kref_put(&buf->kref, relay_remove_buf);
 }
 
-static inline void setup_callbacks(struct rchan *chan,
+static void setup_callbacks(struct rchan *chan,
                                   struct rchan_callbacks *cb)
 {
        if (!cb) {
@@ -446,38 +470,79 @@ static inline void setup_callbacks(struct rchan *chan,
 }
 
 /**
+ *     relay_hotcpu_callback - CPU hotplug callback
+ *     @nb: notifier block
+ *     @action: hotplug action to take
+ *     @hcpu: CPU number
+ *
+ *     Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
+                               unsigned long action,
+                               void *hcpu)
+{
+       unsigned int hotcpu = (unsigned long)hcpu;
+       struct rchan *chan;
+
+       switch(action) {
+       case CPU_UP_PREPARE:
+       case CPU_UP_PREPARE_FROZEN:
+               mutex_lock(&relay_channels_mutex);
+               list_for_each_entry(chan, &relay_channels, list) {
+                       if (chan->buf[hotcpu])
+                               continue;
+                       chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
+                       if(!chan->buf[hotcpu]) {
+                               printk(KERN_ERR
+                                       "relay_hotcpu_callback: cpu %d buffer "
+                                       "creation failed\n", hotcpu);
+                               mutex_unlock(&relay_channels_mutex);
+                               return NOTIFY_BAD;
+                       }
+               }
+               mutex_unlock(&relay_channels_mutex);
+               break;
+       case CPU_DEAD:
+       case CPU_DEAD_FROZEN:
+               /* No need to flush the cpu : will be flushed upon
+                * final relay_flush() call. */
+               break;
+       }
+       return NOTIFY_OK;
+}
+
+/**
  *     relay_open - create a new relay channel
  *     @base_filename: base name of files to create
- *     @parent: dentry of parent directory, NULL for root directory
+ *     @parent: dentry of parent directory, %NULL for root directory
  *     @subbuf_size: size of sub-buffers
  *     @n_subbufs: number of sub-buffers
  *     @cb: client callback functions
+ *     @private_data: user-defined data
  *
- *     Returns channel pointer if successful, NULL otherwise.
+ *     Returns channel pointer if successful, %NULL otherwise.
  *
  *     Creates a channel buffer for each cpu using the sizes and
  *     attributes specified.  The created channel buffer files
  *     will be named base_filename0...base_filenameN-1.  File
- *     permissions will be S_IRUSR.
+ *     permissions will be %S_IRUSR.
  */
 struct rchan *relay_open(const char *base_filename,
                         struct dentry *parent,
                         size_t subbuf_size,
                         size_t n_subbufs,
-                        struct rchan_callbacks *cb)
+                        struct rchan_callbacks *cb,
+                        void *private_data)
 {
        unsigned int i;
        struct rchan *chan;
-       char *tmpname;
-       int is_global = 0;
-
        if (!base_filename)
                return NULL;
 
        if (!(subbuf_size && n_subbufs))
                return NULL;
 
-       chan = kcalloc(1, sizeof(struct rchan), GFP_KERNEL);
+       chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
        if (!chan)
                return NULL;
 
@@ -485,38 +550,32 @@ struct rchan *relay_open(const char *base_filename,
        chan->n_subbufs = n_subbufs;
        chan->subbuf_size = subbuf_size;
        chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
+       chan->parent = parent;
+       chan->private_data = private_data;
+       strlcpy(chan->base_filename, base_filename, NAME_MAX);
        setup_callbacks(chan, cb);
        kref_init(&chan->kref);
 
-       tmpname = kmalloc(NAME_MAX + 1, GFP_KERNEL);
-       if (!tmpname)
-               goto free_chan;
-
+       mutex_lock(&relay_channels_mutex);
        for_each_online_cpu(i) {
-               sprintf(tmpname, "%s%d", base_filename, i);
-               chan->buf[i] = relay_open_buf(chan, tmpname, parent,
-                                             &is_global);
+               chan->buf[i] = relay_open_buf(chan, i);
                if (!chan->buf[i])
                        goto free_bufs;
-
-               chan->buf[i]->cpu = i;
        }
+       list_add(&chan->list, &relay_channels);
+       mutex_unlock(&relay_channels_mutex);
 
-       kfree(tmpname);
        return chan;
 
 free_bufs:
-       for (i = 0; i < NR_CPUS; i++) {
+       for_each_online_cpu(i) {
                if (!chan->buf[i])
                        break;
                relay_close_buf(chan->buf[i]);
-               if (is_global)
-                       break;
        }
-       kfree(tmpname);
 
-free_chan:
        kref_put(&chan->kref, relay_destroy_channel);
+       mutex_unlock(&relay_channels_mutex);
        return NULL;
 }
 EXPORT_SYMBOL_GPL(relay_open);
@@ -547,10 +606,14 @@ size_t relay_switch_subbuf(struct rchan_buf *buf, size_t length)
                buf->dentry->d_inode->i_size += buf->chan->subbuf_size -
                        buf->padding[old_subbuf];
                smp_mb();
-               if (waitqueue_active(&buf->read_wait)) {
-                       PREPARE_WORK(&buf->wake_readers, wakeup_readers, buf);
-                       schedule_delayed_work(&buf->wake_readers, 1);
-               }
+               if (waitqueue_active(&buf->read_wait))
+                       /*
+                        * Calling wake_up_interruptible() from here
+                        * will deadlock if we happen to be logging
+                        * from the scheduler (trying to re-grab
+                        * rq->lock), so defer it.
+                        */
+                       __mod_timer(&buf->timer, jiffies + 1);
        }
 
        old = buf->data;
@@ -585,7 +648,7 @@ EXPORT_SYMBOL_GPL(relay_switch_subbuf);
  *     subbufs_consumed should be the number of sub-buffers newly consumed,
  *     not the total consumed.
  *
- *     NOTE: kernel clients don't need to call this function if the channel
+ *     NOTE. Kernel clients don't need to call this function if the channel
  *     mode is 'overwrite'.
  */
 void relay_subbufs_consumed(struct rchan *chan,
@@ -616,24 +679,26 @@ EXPORT_SYMBOL_GPL(relay_subbufs_consumed);
 void relay_close(struct rchan *chan)
 {
        unsigned int i;
-       struct rchan_buf *prev = NULL;
 
        if (!chan)
                return;
 
-       for (i = 0; i < NR_CPUS; i++) {
-               if (!chan->buf[i] || chan->buf[i] == prev)
-                       break;
-               relay_close_buf(chan->buf[i]);
-               prev = chan->buf[i];
-       }
+       mutex_lock(&relay_channels_mutex);
+       if (chan->is_global && chan->buf[0])
+               relay_close_buf(chan->buf[0]);
+       else
+               for_each_possible_cpu(i)
+                       if (chan->buf[i])
+                               relay_close_buf(chan->buf[i]);
 
        if (chan->last_toobig)
                printk(KERN_WARNING "relay: one or more items not logged "
                       "[item size (%Zd) > sub-buffer size (%Zd)]\n",
                       chan->last_toobig, chan->subbuf_size);
 
+       list_del(&chan->list);
        kref_put(&chan->kref, relay_destroy_channel);
+       mutex_unlock(&relay_channels_mutex);
 }
 EXPORT_SYMBOL_GPL(relay_close);
 
@@ -641,22 +706,25 @@ EXPORT_SYMBOL_GPL(relay_close);
  *     relay_flush - close the channel
  *     @chan: the channel
  *
- *     Flushes all channel buffers i.e. forces buffer switch.
+ *     Flushes all channel buffers, i.e. forces buffer switch.
  */
 void relay_flush(struct rchan *chan)
 {
        unsigned int i;
-       struct rchan_buf *prev = NULL;
 
        if (!chan)
                return;
 
-       for (i = 0; i < NR_CPUS; i++) {
-               if (!chan->buf[i] || chan->buf[i] == prev)
-                       break;
-               relay_switch_subbuf(chan->buf[i], 0);
-               prev = chan->buf[i];
+       if (chan->is_global && chan->buf[0]) {
+               relay_switch_subbuf(chan->buf[0], 0);
+               return;
        }
+
+       mutex_lock(&relay_channels_mutex);
+       for_each_possible_cpu(i)
+               if (chan->buf[i])
+                       relay_switch_subbuf(chan->buf[i], 0);
+       mutex_unlock(&relay_channels_mutex);
 }
 EXPORT_SYMBOL_GPL(relay_flush);
 
@@ -669,7 +737,7 @@ EXPORT_SYMBOL_GPL(relay_flush);
  */
 static int relay_file_open(struct inode *inode, struct file *filp)
 {
-       struct rchan_buf *buf = inode->u.generic_ip;
+       struct rchan_buf *buf = inode->i_private;
        kref_get(&buf->kref);
        filp->private_data = buf;
 
@@ -681,7 +749,7 @@ static int relay_file_open(struct inode *inode, struct file *filp)
  *     @filp: the file
  *     @vma: the vma describing what to map
  *
- *     Calls upon relay_mmap_buf to map the file into user space.
+ *     Calls upon relay_mmap_buf() to map the file into user space.
  */
 static int relay_file_mmap(struct file *filp, struct vm_area_struct *vma)
 {
@@ -729,7 +797,7 @@ static int relay_file_release(struct inode *inode, struct file *filp)
        return 0;
 }
 
-/**
+/*
  *     relay_file_read_consume - update the consumed count for the buffer
  */
 static void relay_file_read_consume(struct rchan_buf *buf,
@@ -746,7 +814,10 @@ static void relay_file_read_consume(struct rchan_buf *buf,
        }
 
        buf->bytes_consumed += bytes_consumed;
-       read_subbuf = read_pos / buf->chan->subbuf_size;
+       if (!read_pos)
+               read_subbuf = buf->subbufs_consumed % n_subbufs;
+       else
+               read_subbuf = read_pos / buf->chan->subbuf_size;
        if (buf->bytes_consumed + buf->padding[read_subbuf] == subbuf_size) {
                if ((read_subbuf == buf->subbufs_produced % n_subbufs) &&
                    (buf->offset == subbuf_size))
@@ -756,7 +827,7 @@ static void relay_file_read_consume(struct rchan_buf *buf,
        }
 }
 
-/**
+/*
  *     relay_file_read_avail - boolean, are there unconsumed bytes available?
  */
 static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
@@ -775,8 +846,9 @@ static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
        }
 
        if (unlikely(produced - consumed >= n_subbufs)) {
-               consumed = (produced / n_subbufs) * n_subbufs;
+               consumed = produced - n_subbufs + 1;
                buf->subbufs_consumed = consumed;
+               buf->bytes_consumed = 0;
        }
        
        produced = (produced % n_subbufs) * subbuf_size + buf->offset;
@@ -793,6 +865,8 @@ static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
 
 /**
  *     relay_file_read_subbuf_avail - return bytes available in sub-buffer
+ *     @read_pos: file read position
+ *     @buf: relay channel buffer
  */
 static size_t relay_file_read_subbuf_avail(size_t read_pos,
                                           struct rchan_buf *buf)
@@ -818,8 +892,10 @@ static size_t relay_file_read_subbuf_avail(size_t read_pos,
 
 /**
  *     relay_file_read_start_pos - find the first available byte to read
+ *     @read_pos: file read position
+ *     @buf: relay channel buffer
  *
- *     If the read_pos is in the middle of padding, return the
+ *     If the @read_pos is in the middle of padding, return the
  *     position of the first actually available byte, otherwise
  *     return the original value.
  */
@@ -829,7 +905,10 @@ static size_t relay_file_read_start_pos(size_t read_pos,
        size_t read_subbuf, padding, padding_start, padding_end;
        size_t subbuf_size = buf->chan->subbuf_size;
        size_t n_subbufs = buf->chan->n_subbufs;
+       size_t consumed = buf->subbufs_consumed % n_subbufs;
 
+       if (!read_pos)
+               read_pos = consumed * subbuf_size + buf->bytes_consumed;
        read_subbuf = read_pos / subbuf_size;
        padding = buf->padding[read_subbuf];
        padding_start = (read_subbuf + 1) * subbuf_size - padding;
@@ -844,6 +923,9 @@ static size_t relay_file_read_start_pos(size_t read_pos,
 
 /**
  *     relay_file_read_end_pos - return the new read position
+ *     @read_pos: file read position
+ *     @buf: relay channel buffer
+ *     @count: number of bytes to be read
  */
 static size_t relay_file_read_end_pos(struct rchan_buf *buf,
                                      size_t read_pos,
@@ -865,7 +947,7 @@ static size_t relay_file_read_end_pos(struct rchan_buf *buf,
        return end_pos;
 }
 
-/**
+/*
  *     subbuf_read_actor - read up to one subbuf's worth of data
  */
 static int subbuf_read_actor(size_t read_start,
@@ -879,7 +961,7 @@ static int subbuf_read_actor(size_t read_start,
 
        from = buf->start + read_start;
        ret = avail;
-       if (copy_to_user(desc->arg.data, from, avail)) {
+       if (copy_to_user(desc->arg.buf, from, avail)) {
                desc->error = -EFAULT;
                ret = 0;
        }
@@ -890,73 +972,28 @@ static int subbuf_read_actor(size_t read_start,
        return ret;
 }
 
-/**
- *     subbuf_send_actor - send up to one subbuf's worth of data
- */
-static int subbuf_send_actor(size_t read_start,
-                            struct rchan_buf *buf,
-                            size_t avail,
-                            read_descriptor_t *desc,
-                            read_actor_t actor)
-{
-       unsigned long pidx, poff;
-       unsigned int subbuf_pages;
-       int ret = 0;
-
-       subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
-       pidx = (read_start / PAGE_SIZE) % subbuf_pages;
-       poff = read_start & ~PAGE_MASK;
-       while (avail) {
-               struct page *p = buf->page_array[pidx];
-               unsigned int len;
-
-               len = PAGE_SIZE - poff;
-               if (len > avail)
-                       len = avail;
-
-               len = actor(desc, p, poff, len);
-               if (desc->error)
-                       break;
-
-               avail -= len;
-               ret += len;
-               poff = 0;
-               pidx = (pidx + 1) % subbuf_pages;
-       }
-
-       return ret;
-}
-
 typedef int (*subbuf_actor_t) (size_t read_start,
                               struct rchan_buf *buf,
                               size_t avail,
                               read_descriptor_t *desc,
                               read_actor_t actor);
 
-/**
+/*
  *     relay_file_read_subbufs - read count bytes, bridging subbuf boundaries
  */
-static inline ssize_t relay_file_read_subbufs(struct file *filp,
-                                             loff_t *ppos,
-                                             size_t count,
-                                             subbuf_actor_t subbuf_actor,
-                                             read_actor_t actor,
-                                             void *target)
+static ssize_t relay_file_read_subbufs(struct file *filp, loff_t *ppos,
+                                       subbuf_actor_t subbuf_actor,
+                                       read_actor_t actor,
+                                       read_descriptor_t *desc)
 {
        struct rchan_buf *buf = filp->private_data;
        size_t read_start, avail;
-       read_descriptor_t desc;
        int ret;
 
-       if (!count)
+       if (!desc->count)
                return 0;
 
-       desc.written = 0;
-       desc.count = count;
-       desc.arg.data = target;
-       desc.error = 0;
-
-       mutex_lock(&filp->f_dentry->d_inode->i_mutex);
+       mutex_lock(&filp->f_path.dentry->d_inode->i_mutex);
        do {
                if (!relay_file_read_avail(buf, *ppos))
                        break;
@@ -966,19 +1003,19 @@ static inline ssize_t relay_file_read_subbufs(struct file *filp,
                if (!avail)
                        break;
 
-               avail = min(desc.count, avail);
-               ret = subbuf_actor(read_start, buf, avail, &desc, actor);
-               if (desc.error < 0)
+               avail = min(desc->count, avail);
+               ret = subbuf_actor(read_start, buf, avail, desc, actor);
+               if (desc->error < 0)
                        break;
 
                if (ret) {
                        relay_file_read_consume(buf, read_start, ret);
                        *ppos = relay_file_read_end_pos(buf, read_start, ret);
                }
-       } while (desc.count && ret);
-       mutex_unlock(&filp->f_dentry->d_inode->i_mutex);
+       } while (desc->count && ret);
+       mutex_unlock(&filp->f_path.dentry->d_inode->i_mutex);
 
-       return desc.written;
+       return desc->written;
 }
 
 static ssize_t relay_file_read(struct file *filp,
@@ -986,27 +1023,186 @@ static ssize_t relay_file_read(struct file *filp,
                               size_t count,
                               loff_t *ppos)
 {
-       return relay_file_read_subbufs(filp, ppos, count, subbuf_read_actor,
-                                      NULL, buffer);
+       read_descriptor_t desc;
+       desc.written = 0;
+       desc.count = count;
+       desc.arg.buf = buffer;
+       desc.error = 0;
+       return relay_file_read_subbufs(filp, ppos, subbuf_read_actor,
+                                      NULL, &desc);
+}
+
+static void relay_consume_bytes(struct rchan_buf *rbuf, int bytes_consumed)
+{
+       rbuf->bytes_consumed += bytes_consumed;
+
+       if (rbuf->bytes_consumed >= rbuf->chan->subbuf_size) {
+               relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
+               rbuf->bytes_consumed %= rbuf->chan->subbuf_size;
+       }
+}
+
+static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
+                                  struct pipe_buffer *buf)
+{
+       struct rchan_buf *rbuf;
+
+       rbuf = (struct rchan_buf *)page_private(buf->page);
+       relay_consume_bytes(rbuf, buf->private);
+}
+
+static struct pipe_buf_operations relay_pipe_buf_ops = {
+       .can_merge = 0,
+       .map = generic_pipe_buf_map,
+       .unmap = generic_pipe_buf_unmap,
+       .pin = generic_pipe_buf_pin,
+       .release = relay_pipe_buf_release,
+       .steal = generic_pipe_buf_steal,
+       .get = generic_pipe_buf_get,
+};
+
+/**
+ *     subbuf_splice_actor - splice up to one subbuf's worth of data
+ */
+static int subbuf_splice_actor(struct file *in,
+                              loff_t *ppos,
+                              struct pipe_inode_info *pipe,
+                              size_t len,
+                              unsigned int flags,
+                              int *nonpad_ret)
+{
+       unsigned int pidx, poff, total_len, subbuf_pages, ret;
+       struct rchan_buf *rbuf = in->private_data;
+       unsigned int subbuf_size = rbuf->chan->subbuf_size;
+       size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
+       size_t read_subbuf = read_start / subbuf_size;
+       size_t padding = rbuf->padding[read_subbuf];
+       size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
+       struct page *pages[PIPE_BUFFERS];
+       struct partial_page partial[PIPE_BUFFERS];
+       struct splice_pipe_desc spd = {
+               .pages = pages,
+               .nr_pages = 0,
+               .partial = partial,
+               .flags = flags,
+               .ops = &relay_pipe_buf_ops,
+       };
+
+       if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
+               return 0;
+
+       /*
+        * Adjust read len, if longer than what is available
+        */
+       if (len > (subbuf_size - read_start % subbuf_size))
+               len = subbuf_size - read_start % subbuf_size;
+
+       subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
+       pidx = (read_start / PAGE_SIZE) % subbuf_pages;
+       poff = read_start & ~PAGE_MASK;
+
+       for (total_len = 0; spd.nr_pages < subbuf_pages; spd.nr_pages++) {
+               unsigned int this_len, this_end, private;
+               unsigned int cur_pos = read_start + total_len;
+
+               if (!len)
+                       break;
+
+               this_len = min_t(unsigned long, len, PAGE_SIZE - poff);
+               private = this_len;
+
+               spd.pages[spd.nr_pages] = rbuf->page_array[pidx];
+               spd.partial[spd.nr_pages].offset = poff;
+
+               this_end = cur_pos + this_len;
+               if (this_end >= nonpad_end) {
+                       this_len = nonpad_end - cur_pos;
+                       private = this_len + padding;
+               }
+               spd.partial[spd.nr_pages].len = this_len;
+               spd.partial[spd.nr_pages].private = private;
+
+               len -= this_len;
+               total_len += this_len;
+               poff = 0;
+               pidx = (pidx + 1) % subbuf_pages;
+
+               if (this_end >= nonpad_end) {
+                       spd.nr_pages++;
+                       break;
+               }
+       }
+
+       if (!spd.nr_pages)
+               return 0;
+
+       ret = *nonpad_ret = splice_to_pipe(pipe, &spd);
+       if (ret < 0 || ret < total_len)
+               return ret;
+
+        if (read_start + ret == nonpad_end)
+                ret += padding;
+
+        return ret;
 }
 
-static ssize_t relay_file_sendfile(struct file *filp,
-                                  loff_t *ppos,
-                                  size_t count,
-                                  read_actor_t actor,
-                                  void *target)
+static ssize_t relay_file_splice_read(struct file *in,
+                                     loff_t *ppos,
+                                     struct pipe_inode_info *pipe,
+                                     size_t len,
+                                     unsigned int flags)
 {
-       return relay_file_read_subbufs(filp, ppos, count, subbuf_send_actor,
-                                      actor, target);
+       ssize_t spliced;
+       int ret;
+       int nonpad_ret = 0;
+
+       ret = 0;
+       spliced = 0;
+
+       while (len) {
+               ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
+               if (ret < 0)
+                       break;
+               else if (!ret) {
+                       if (spliced)
+                               break;
+                       if (flags & SPLICE_F_NONBLOCK) {
+                               ret = -EAGAIN;
+                               break;
+                       }
+               }
+
+               *ppos += ret;
+               if (ret > len)
+                       len = 0;
+               else
+                       len -= ret;
+               spliced += nonpad_ret;
+               nonpad_ret = 0;
+       }
+
+       if (spliced)
+               return spliced;
+
+       return ret;
 }
 
-struct file_operations relay_file_operations = {
+const struct file_operations relay_file_operations = {
        .open           = relay_file_open,
        .poll           = relay_file_poll,
        .mmap           = relay_file_mmap,
        .read           = relay_file_read,
        .llseek         = no_llseek,
        .release        = relay_file_release,
-       .sendfile       = relay_file_sendfile,
+       .splice_read    = relay_file_splice_read,
 };
 EXPORT_SYMBOL_GPL(relay_file_operations);
+
+static __init int relay_init(void)
+{
+
+       hotcpu_notifier(relay_hotcpu_callback, 0);
+       return 0;
+}
+
+module_init(relay_init);