ext4: Avoid crashing on NULL ptr dereference on a filesystem error
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
index da2c59d..41ca394 100644 (file)
 #include <linux/module.h>
 #include <linux/percpu.h>
 #include <linux/mutex.h>
+#include <linux/slab.h>
 #include <linux/init.h>
 #include <linux/hash.h>
 #include <linux/list.h>
 #include <linux/cpu.h>
 #include <linux/fs.h>
 
+#include <asm/local.h>
 #include "trace.h"
 
 /*
@@ -201,13 +203,19 @@ int tracing_is_on(void)
 }
 EXPORT_SYMBOL_GPL(tracing_is_on);
 
-#include "trace.h"
-
 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
 #define RB_ALIGNMENT           4U
 #define RB_MAX_SMALL_DATA      (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
 #define RB_EVNT_MIN_SIZE       8U      /* two 32bit words */
 
+#if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS)
+# define RB_FORCE_8BYTE_ALIGNMENT      0
+# define RB_ARCH_ALIGNMENT             RB_ALIGNMENT
+#else
+# define RB_FORCE_8BYTE_ALIGNMENT      1
+# define RB_ARCH_ALIGNMENT             8U
+#endif
+
 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
 
@@ -218,17 +226,12 @@ enum {
 
 static inline int rb_null_event(struct ring_buffer_event *event)
 {
-       return event->type_len == RINGBUF_TYPE_PADDING
-                       && event->time_delta == 0;
-}
-
-static inline int rb_discarded_event(struct ring_buffer_event *event)
-{
-       return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta;
+       return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
 }
 
 static void rb_event_set_padding(struct ring_buffer_event *event)
 {
+       /* padding has a NULL time_delta */
        event->type_len = RINGBUF_TYPE_PADDING;
        event->time_delta = 0;
 }
@@ -404,18 +407,21 @@ int ring_buffer_print_page_header(struct trace_seq *s)
        int ret;
 
        ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
-                              "offset:0;\tsize:%u;\n",
-                              (unsigned int)sizeof(field.time_stamp));
+                              "offset:0;\tsize:%u;\tsigned:%u;\n",
+                              (unsigned int)sizeof(field.time_stamp),
+                              (unsigned int)is_signed_type(u64));
 
        ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
-                              "offset:%u;\tsize:%u;\n",
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
                               (unsigned int)offsetof(typeof(field), commit),
-                              (unsigned int)sizeof(field.commit));
+                              (unsigned int)sizeof(field.commit),
+                              (unsigned int)is_signed_type(long));
 
        ret = trace_seq_printf(s, "\tfield: char data;\t"
-                              "offset:%u;\tsize:%u;\n",
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
                               (unsigned int)offsetof(typeof(field), data),
-                              (unsigned int)BUF_PAGE_SIZE);
+                              (unsigned int)BUF_PAGE_SIZE,
+                              (unsigned int)is_signed_type(char));
 
        return ret;
 }
@@ -427,7 +433,7 @@ struct ring_buffer_per_cpu {
        int                             cpu;
        struct ring_buffer              *buffer;
        spinlock_t                      reader_lock;    /* serialize readers */
-       raw_spinlock_t                  lock;
+       arch_spinlock_t                 lock;
        struct lock_class_key           lock_key;
        struct list_head                *pages;
        struct buffer_page              *head_page;     /* read from head */
@@ -468,24 +474,31 @@ struct ring_buffer_iter {
        struct ring_buffer_per_cpu      *cpu_buffer;
        unsigned long                   head;
        struct buffer_page              *head_page;
+       struct buffer_page              *cache_reader_page;
+       unsigned long                   cache_read;
        u64                             read_stamp;
 };
 
 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
-#define RB_WARN_ON(buffer, cond)                               \
-       ({                                                      \
-               int _____ret = unlikely(cond);                  \
-               if (_____ret) {                                 \
-                       atomic_inc(&buffer->record_disabled);   \
-                       WARN_ON(1);                             \
-               }                                               \
-               _____ret;                                       \
+#define RB_WARN_ON(b, cond)                                            \
+       ({                                                              \
+               int _____ret = unlikely(cond);                          \
+               if (_____ret) {                                         \
+                       if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
+                               struct ring_buffer_per_cpu *__b =       \
+                                       (void *)b;                      \
+                               atomic_inc(&__b->buffer->record_disabled); \
+                       } else                                          \
+                               atomic_inc(&b->record_disabled);        \
+                       WARN_ON(1);                                     \
+               }                                                       \
+               _____ret;                                               \
        })
 
 /* Up this if you want to test the TIME_EXTENTS and normalization */
 #define DEBUG_SHIFT 0
 
-static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu)
+static inline u64 rb_time_stamp(struct ring_buffer *buffer)
 {
        /* shift to debug/test normalization and TIME_EXTENTS */
        return buffer->clock() << DEBUG_SHIFT;
@@ -496,7 +509,7 @@ u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
        u64 time;
 
        preempt_disable_notrace();
-       time = rb_time_stamp(buffer, cpu);
+       time = rb_time_stamp(buffer);
        preempt_enable_no_resched_notrace();
 
        return time;
@@ -601,7 +614,7 @@ static struct list_head *rb_list_head(struct list_head *list)
 }
 
 /*
- * rb_is_head_page - test if the give page is the head page
+ * rb_is_head_page - test if the given page is the head page
  *
  * Because the reader may move the head_page pointer, we can
  * not trust what the head page is (it may be pointing to
@@ -701,8 +714,8 @@ static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
 
        val &= ~RB_FLAG_MASK;
 
-       ret = (unsigned long)cmpxchg(&list->next,
-                                    val | old_flag, val | new_flag);
+       ret = cmpxchg((unsigned long *)&list->next,
+                     val | old_flag, val | new_flag);
 
        /* check if the reader took the page */
        if ((ret & ~RB_FLAG_MASK) != val)
@@ -794,7 +807,7 @@ static int rb_head_page_replace(struct buffer_page *old,
        val = *ptr & ~RB_FLAG_MASK;
        val |= RB_PAGE_HEAD;
 
-       ret = cmpxchg(ptr, val, &new->list);
+       ret = cmpxchg(ptr, val, (unsigned long)&new->list);
 
        return ret == val;
 }
@@ -997,7 +1010,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
        cpu_buffer->buffer = buffer;
        spin_lock_init(&cpu_buffer->reader_lock);
        lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
-       cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
+       cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
 
        bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                            GFP_KERNEL, cpu_to_node(cpu));
@@ -1192,28 +1205,25 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
        struct list_head *p;
        unsigned i;
 
-       atomic_inc(&cpu_buffer->record_disabled);
-       synchronize_sched();
-
+       spin_lock_irq(&cpu_buffer->reader_lock);
        rb_head_page_deactivate(cpu_buffer);
 
        for (i = 0; i < nr_pages; i++) {
                if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
-                       return;
+                       goto out;
                p = cpu_buffer->pages->next;
                bpage = list_entry(p, struct buffer_page, list);
                list_del_init(&bpage->list);
                free_buffer_page(bpage);
        }
        if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
-               return;
+               goto out;
 
        rb_reset_cpu(cpu_buffer);
-
        rb_check_pages(cpu_buffer);
 
-       atomic_dec(&cpu_buffer->record_disabled);
-
+out:
+       spin_unlock_irq(&cpu_buffer->reader_lock);
 }
 
 static void
@@ -1224,26 +1234,22 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
        struct list_head *p;
        unsigned i;
 
-       atomic_inc(&cpu_buffer->record_disabled);
-       synchronize_sched();
-
        spin_lock_irq(&cpu_buffer->reader_lock);
        rb_head_page_deactivate(cpu_buffer);
 
        for (i = 0; i < nr_pages; i++) {
                if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
-                       return;
+                       goto out;
                p = pages->next;
                bpage = list_entry(p, struct buffer_page, list);
                list_del_init(&bpage->list);
                list_add_tail(&bpage->list, cpu_buffer->pages);
        }
        rb_reset_cpu(cpu_buffer);
-       spin_unlock_irq(&cpu_buffer->reader_lock);
-
        rb_check_pages(cpu_buffer);
 
-       atomic_dec(&cpu_buffer->record_disabled);
+out:
+       spin_unlock_irq(&cpu_buffer->reader_lock);
 }
 
 /**
@@ -1251,11 +1257,6 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
  * @buffer: the buffer to resize.
  * @size: the new size.
  *
- * The tracer is responsible for making sure that the buffer is
- * not being used while changing the size.
- * Note: We may be able to change the above requirement by using
- *  RCU synchronizations.
- *
  * Minimum size is 2 * BUF_PAGE_SIZE.
  *
  * Returns -1 on failure.
@@ -1287,6 +1288,11 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        if (size == buffer_size)
                return size;
 
+       atomic_inc(&buffer->record_disabled);
+
+       /* Make sure all writers are done with this buffer. */
+       synchronize_sched();
+
        mutex_lock(&buffer->mutex);
        get_online_cpus();
 
@@ -1349,6 +1355,8 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
 
+       atomic_dec(&buffer->record_disabled);
+
        return size;
 
  free_pages:
@@ -1358,6 +1366,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        }
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
+       atomic_dec(&buffer->record_disabled);
        return -ENOMEM;
 
        /*
@@ -1367,6 +1376,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
  out_fail:
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
+       atomic_dec(&buffer->record_disabled);
        return -1;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_resize);
@@ -1548,7 +1558,7 @@ rb_update_event(struct ring_buffer_event *event,
 
        case 0:
                length -= RB_EVNT_HDR_SIZE;
-               if (length > RB_MAX_SMALL_DATA)
+               if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
                        event->array[0] = length;
                else
                        event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
@@ -1723,11 +1733,11 @@ static unsigned rb_calculate_event_length(unsigned length)
        if (!length)
                length = 1;
 
-       if (length > RB_MAX_SMALL_DATA)
+       if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
                length += sizeof(event.array[0]);
 
        length += RB_EVNT_HDR_SIZE;
-       length = ALIGN(length, RB_ALIGNMENT);
+       length = ALIGN(length, RB_ARCH_ALIGNMENT);
 
        return length;
 }
@@ -1778,9 +1788,6 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
        event->type_len = RINGBUF_TYPE_PADDING;
        /* time delta must be non zero */
        event->time_delta = 1;
-       /* Account for this as an entry */
-       local_inc(&tail_page->entries);
-       local_inc(&cpu_buffer->entries);
 
        /* Set write to end of buffer */
        length = (tail + length) - BUF_PAGE_SIZE;
@@ -1790,9 +1797,9 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
 static struct ring_buffer_event *
 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
             unsigned long length, unsigned long tail,
-            struct buffer_page *commit_page,
             struct buffer_page *tail_page, u64 *ts)
 {
+       struct buffer_page *commit_page = cpu_buffer->commit_page;
        struct ring_buffer *buffer = cpu_buffer->buffer;
        struct buffer_page *next_page;
        int ret;
@@ -1873,7 +1880,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
                 * Nested commits always have zero deltas, so
                 * just reread the time stamp
                 */
-               *ts = rb_time_stamp(buffer, cpu_buffer->cpu);
+               *ts = rb_time_stamp(buffer);
                next_page->page->time_stamp = *ts;
        }
 
@@ -1895,13 +1902,10 @@ static struct ring_buffer_event *
 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                  unsigned type, unsigned long length, u64 *ts)
 {
-       struct buffer_page *tail_page, *commit_page;
+       struct buffer_page *tail_page;
        struct ring_buffer_event *event;
        unsigned long tail, write;
 
-       commit_page = cpu_buffer->commit_page;
-       /* we just need to protect against interrupts */
-       barrier();
        tail_page = cpu_buffer->tail_page;
        write = local_add_return(length, &tail_page->write);
 
@@ -1912,7 +1916,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
        /* See if we shot pass the end of this buffer page */
        if (write > BUF_PAGE_SIZE)
                return rb_move_tail(cpu_buffer, length, tail,
-                                   commit_page, tail_page, ts);
+                                   tail_page, ts);
 
        /* We reserved something on the buffer */
 
@@ -2076,7 +2080,8 @@ static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
 }
 
 static struct ring_buffer_event *
-rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
+rb_reserve_next_event(struct ring_buffer *buffer,
+                     struct ring_buffer_per_cpu *cpu_buffer,
                      unsigned long length)
 {
        struct ring_buffer_event *event;
@@ -2086,6 +2091,21 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
 
        rb_start_commit(cpu_buffer);
 
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
+       /*
+        * Due to the ability to swap a cpu buffer from a buffer
+        * it is possible it was swapped before we committed.
+        * (committing stops a swap). We check for it here and
+        * if it happened, we have to fail the write.
+        */
+       barrier();
+       if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
+               local_dec(&cpu_buffer->committing);
+               local_dec(&cpu_buffer->commits);
+               return NULL;
+       }
+#endif
+
        length = rb_calculate_event_length(length);
  again:
        /*
@@ -2100,7 +2120,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
                goto out_fail;
 
-       ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
+       ts = rb_time_stamp(cpu_buffer->buffer);
 
        /*
         * Only the first commit can update the timestamp.
@@ -2224,12 +2244,12 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
        if (ring_buffer_flags != RB_BUFFERS_ON)
                return NULL;
 
-       if (atomic_read(&buffer->record_disabled))
-               return NULL;
-
        /* If we are tracing schedule, we don't want to recurse */
        resched = ftrace_preempt_disable();
 
+       if (atomic_read(&buffer->record_disabled))
+               goto out_nocheck;
+
        if (trace_recursive_lock())
                goto out_nocheck;
 
@@ -2246,7 +2266,7 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
        if (length > BUF_MAX_DATA_SIZE)
                goto out;
 
-       event = rb_reserve_next_event(cpu_buffer, length);
+       event = rb_reserve_next_event(buffer, cpu_buffer, length);
        if (!event)
                goto out;
 
@@ -2269,18 +2289,23 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
 
-static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+static void
+rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                      struct ring_buffer_event *event)
 {
-       local_inc(&cpu_buffer->entries);
-
        /*
         * The event first in the commit queue updates the
         * time stamp.
         */
        if (rb_event_is_commit(cpu_buffer, event))
                cpu_buffer->write_stamp += event->time_delta;
+}
 
+static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+                     struct ring_buffer_event *event)
+{
+       local_inc(&cpu_buffer->entries);
+       rb_update_write_stamp(cpu_buffer, event);
        rb_end_commit(cpu_buffer);
 }
 
@@ -2327,32 +2352,57 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
                event->time_delta = 1;
 }
 
-/**
- * ring_buffer_event_discard - discard any event in the ring buffer
- * @event: the event to discard
- *
- * Sometimes a event that is in the ring buffer needs to be ignored.
- * This function lets the user discard an event in the ring buffer
- * and then that event will not be read later.
- *
- * Note, it is up to the user to be careful with this, and protect
- * against races. If the user discards an event that has been consumed
- * it is possible that it could corrupt the ring buffer.
+/*
+ * Decrement the entries to the page that an event is on.
+ * The event does not even need to exist, only the pointer
+ * to the page it is on. This may only be called before the commit
+ * takes place.
  */
-void ring_buffer_event_discard(struct ring_buffer_event *event)
+static inline void
+rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
+                  struct ring_buffer_event *event)
 {
-       rb_event_discard(event);
+       unsigned long addr = (unsigned long)event;
+       struct buffer_page *bpage = cpu_buffer->commit_page;
+       struct buffer_page *start;
+
+       addr &= PAGE_MASK;
+
+       /* Do the likely case first */
+       if (likely(bpage->page == (void *)addr)) {
+               local_dec(&bpage->entries);
+               return;
+       }
+
+       /*
+        * Because the commit page may be on the reader page we
+        * start with the next page and check the end loop there.
+        */
+       rb_inc_page(cpu_buffer, &bpage);
+       start = bpage;
+       do {
+               if (bpage->page == (void *)addr) {
+                       local_dec(&bpage->entries);
+                       return;
+               }
+               rb_inc_page(cpu_buffer, &bpage);
+       } while (bpage != start);
+
+       /* commit not part of this buffer?? */
+       RB_WARN_ON(cpu_buffer, 1);
 }
-EXPORT_SYMBOL_GPL(ring_buffer_event_discard);
 
 /**
  * ring_buffer_commit_discard - discard an event that has not been committed
  * @buffer: the ring buffer
  * @event: non committed event to discard
  *
- * This is similar to ring_buffer_event_discard but must only be
- * performed on an event that has not been committed yet. The difference
- * is that this will also try to free the event from the ring buffer
+ * Sometimes an event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * This function only works if it is called before the the item has been
+ * committed. It will try to free the event from the ring buffer
  * if another event has not been added behind it.
  *
  * If another event has been added behind it, it will set the event
@@ -2380,14 +2430,15 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
         */
        RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
 
+       rb_decrement_entry(cpu_buffer, event);
        if (rb_try_to_discard(cpu_buffer, event))
                goto out;
 
        /*
         * The commit is still visible by the reader, so we
-        * must increment entries.
+        * must still update the timestamp.
         */
-       local_inc(&cpu_buffer->entries);
+       rb_update_write_stamp(cpu_buffer, event);
  out:
        rb_end_commit(cpu_buffer);
 
@@ -2430,11 +2481,11 @@ int ring_buffer_write(struct ring_buffer *buffer,
        if (ring_buffer_flags != RB_BUFFERS_ON)
                return -EBUSY;
 
-       if (atomic_read(&buffer->record_disabled))
-               return -EBUSY;
-
        resched = ftrace_preempt_disable();
 
+       if (atomic_read(&buffer->record_disabled))
+               goto out;
+
        cpu = raw_smp_processor_id();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
@@ -2448,7 +2499,7 @@ int ring_buffer_write(struct ring_buffer *buffer,
        if (length > BUF_MAX_DATA_SIZE)
                goto out;
 
-       event = rb_reserve_next_event(cpu_buffer, length);
+       event = rb_reserve_next_event(buffer, cpu_buffer, length);
        if (!event)
                goto out;
 
@@ -2502,7 +2553,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
  * @buffer: The ring buffer to enable writes
  *
  * Note, multiple disables will need the same number of enables
- * to truely enable the writing (much like preempt_disable).
+ * to truly enable the writing (much like preempt_disable).
  */
 void ring_buffer_record_enable(struct ring_buffer *buffer)
 {
@@ -2538,7 +2589,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
  * @cpu: The CPU to enable.
  *
  * Note, multiple disables will need the same number of enables
- * to truely enable the writing (much like preempt_disable).
+ * to truly enable the writing (much like preempt_disable).
  */
 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
 {
@@ -2639,7 +2690,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
 EXPORT_SYMBOL_GPL(ring_buffer_entries);
 
 /**
- * ring_buffer_overrun_cpu - get the number of overruns in buffer
+ * ring_buffer_overruns - get the number of overruns in buffer
  * @buffer: The ring buffer
  *
  * Returns the total number of overruns in the ring buffer
@@ -2679,6 +2730,8 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
                iter->read_stamp = cpu_buffer->read_stamp;
        else
                iter->read_stamp = iter->head_page->page->time_stamp;
+       iter->cache_reader_page = cpu_buffer->reader_page;
+       iter->cache_read = cpu_buffer->read;
 }
 
 /**
@@ -2790,7 +2843,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        int ret;
 
        local_irq_save(flags);
-       __raw_spin_lock(&cpu_buffer->lock);
+       arch_spin_lock(&cpu_buffer->lock);
 
  again:
        /*
@@ -2832,7 +2885,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
         * Splice the empty reader page into the list around the head.
         */
        reader = rb_set_head_page(cpu_buffer);
-       cpu_buffer->reader_page->list.next = reader->list.next;
+       cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
        cpu_buffer->reader_page->list.prev = reader->list.prev;
 
        /*
@@ -2869,7 +2922,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
         *
         * Now make the new head point back to the reader page.
         */
-       reader->list.next->prev = &cpu_buffer->reader_page->list;
+       rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
        rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
 
        /* Finally update the reader page to the new head */
@@ -2879,7 +2932,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        goto again;
 
  out:
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
        local_irq_restore(flags);
 
        return reader;
@@ -2899,8 +2952,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 
        event = rb_reader_event(cpu_buffer);
 
-       if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX
-                       || rb_discarded_event(event))
+       if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
                cpu_buffer->read++;
 
        rb_update_read_stamp(cpu_buffer, event);
@@ -2954,15 +3006,12 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
 }
 
 static struct ring_buffer_event *
-rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
 {
-       struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_event *event;
        struct buffer_page *reader;
        int nr_loops = 0;
 
-       cpu_buffer = buffer->buffers[cpu];
-
  again:
        /*
         * We repeat when a timestamp is encountered. It is possible
@@ -3006,7 +3055,7 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        case RINGBUF_TYPE_DATA:
                if (ts) {
                        *ts = cpu_buffer->read_stamp + event->time_delta;
-                       ring_buffer_normalize_time_stamp(buffer,
+                       ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
                                                         cpu_buffer->cpu, ts);
                }
                return event;
@@ -3027,13 +3076,22 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_event *event;
        int nr_loops = 0;
 
-       if (ring_buffer_iter_empty(iter))
-               return NULL;
-
        cpu_buffer = iter->cpu_buffer;
        buffer = cpu_buffer->buffer;
 
+       /*
+        * Check if someone performed a consuming read to
+        * the buffer. A consuming read invalidates the iterator
+        * and we need to reset the iterator in this case.
+        */
+       if (unlikely(iter->cache_read != cpu_buffer->read ||
+                    iter->cache_reader_page != cpu_buffer->reader_page))
+               rb_iter_reset(iter);
+
  again:
+       if (ring_buffer_iter_empty(iter))
+               return NULL;
+
        /*
         * We repeat when a timestamp is encountered.
         * We can get multiple timestamps by nested interrupts or also
@@ -3048,6 +3106,11 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        if (rb_per_cpu_empty(cpu_buffer))
                return NULL;
 
+       if (iter->head >= local_read(&iter->head_page->page->commit)) {
+               rb_inc_iter(iter);
+               goto again;
+       }
+
        event = rb_iter_head_event(iter);
 
        switch (event->type_len) {
@@ -3125,17 +3188,15 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        local_irq_save(flags);
        if (dolock)
                spin_lock(&cpu_buffer->reader_lock);
-       event = rb_buffer_peek(buffer, cpu, ts);
+       event = rb_buffer_peek(cpu_buffer, ts);
        if (event && event->type_len == RINGBUF_TYPE_PADDING)
                rb_advance_reader(cpu_buffer);
        if (dolock)
                spin_unlock(&cpu_buffer->reader_lock);
        local_irq_restore(flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
@@ -3160,10 +3221,8 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        event = rb_iter_peek(iter, ts);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
@@ -3198,7 +3257,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
        if (dolock)
                spin_lock(&cpu_buffer->reader_lock);
 
-       event = rb_buffer_peek(buffer, cpu, ts);
+       event = rb_buffer_peek(cpu_buffer, ts);
        if (event)
                rb_advance_reader(cpu_buffer);
 
@@ -3209,10 +3268,8 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
  out:
        preempt_enable();
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
@@ -3252,9 +3309,9 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
        synchronize_sched();
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
-       __raw_spin_lock(&cpu_buffer->lock);
+       arch_spin_lock(&cpu_buffer->lock);
        rb_iter_reset(iter);
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        return iter;
@@ -3292,21 +3349,19 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
        unsigned long flags;
 
- again:
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+ again:
        event = rb_iter_peek(iter, ts);
        if (!event)
                goto out;
 
+       if (event->type_len == RINGBUF_TYPE_PADDING)
+               goto again;
+
        rb_advance_iter(iter);
  out:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
-               goto again;
-       }
-
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read);
@@ -3373,12 +3428,16 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
-       __raw_spin_lock(&cpu_buffer->lock);
+       if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
+               goto out;
+
+       arch_spin_lock(&cpu_buffer->lock);
 
        rb_reset_cpu(cpu_buffer);
 
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
 
+ out:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        atomic_dec(&cpu_buffer->record_disabled);
@@ -3461,6 +3520,7 @@ int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
 
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
 /**
  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
  * @buffer_a: One buffer to swap with
@@ -3515,20 +3575,28 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
        atomic_inc(&cpu_buffer_a->record_disabled);
        atomic_inc(&cpu_buffer_b->record_disabled);
 
+       ret = -EBUSY;
+       if (local_read(&cpu_buffer_a->committing))
+               goto out_dec;
+       if (local_read(&cpu_buffer_b->committing))
+               goto out_dec;
+
        buffer_a->buffers[cpu] = cpu_buffer_b;
        buffer_b->buffers[cpu] = cpu_buffer_a;
 
        cpu_buffer_b->buffer = buffer_a;
        cpu_buffer_a->buffer = buffer_b;
 
+       ret = 0;
+
+out_dec:
        atomic_dec(&cpu_buffer_a->record_disabled);
        atomic_dec(&cpu_buffer_b->record_disabled);
-
-       ret = 0;
 out:
        return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
+#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
 
 /**
  * ring_buffer_alloc_read_page - allocate a page to read from buffer