KVM: MMU: invalidate and flush on spte small->large page size change
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
index 969f7cb..1da7b6e 100644 (file)
 #include <linux/debugfs.h>
 #include <linux/uaccess.h>
 #include <linux/hardirq.h>
+#include <linux/kmemcheck.h>
 #include <linux/module.h>
 #include <linux/percpu.h>
 #include <linux/mutex.h>
+#include <linux/slab.h>
 #include <linux/init.h>
 #include <linux/hash.h>
 #include <linux/list.h>
 #include <linux/cpu.h>
 #include <linux/fs.h>
 
+#include <asm/local.h>
 #include "trace.h"
 
 /*
@@ -200,13 +203,19 @@ int tracing_is_on(void)
 }
 EXPORT_SYMBOL_GPL(tracing_is_on);
 
-#include "trace.h"
-
 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
 #define RB_ALIGNMENT           4U
 #define RB_MAX_SMALL_DATA      (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
 #define RB_EVNT_MIN_SIZE       8U      /* two 32bit words */
 
+#if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS)
+# define RB_FORCE_8BYTE_ALIGNMENT      0
+# define RB_ARCH_ALIGNMENT             RB_ALIGNMENT
+#else
+# define RB_FORCE_8BYTE_ALIGNMENT      1
+# define RB_ARCH_ALIGNMENT             8U
+#endif
+
 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
 
@@ -217,17 +226,12 @@ enum {
 
 static inline int rb_null_event(struct ring_buffer_event *event)
 {
-       return event->type_len == RINGBUF_TYPE_PADDING
-                       && event->time_delta == 0;
-}
-
-static inline int rb_discarded_event(struct ring_buffer_event *event)
-{
-       return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta;
+       return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
 }
 
 static void rb_event_set_padding(struct ring_buffer_event *event)
 {
+       /* padding has a NULL time_delta */
        event->type_len = RINGBUF_TYPE_PADDING;
        event->time_delta = 0;
 }
@@ -315,20 +319,49 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
 #define TS_MASK                ((1ULL << TS_SHIFT) - 1)
 #define TS_DELTA_TEST  (~TS_MASK)
 
+/* Flag when events were overwritten */
+#define RB_MISSED_EVENTS       (1 << 31)
+/* Missed count stored at end */
+#define RB_MISSED_STORED       (1 << 30)
+
 struct buffer_data_page {
        u64              time_stamp;    /* page time stamp */
        local_t          commit;        /* write committed index */
        unsigned char    data[];        /* data of buffer page */
 };
 
+/*
+ * Note, the buffer_page list must be first. The buffer pages
+ * are allocated in cache lines, which means that each buffer
+ * page will be at the beginning of a cache line, and thus
+ * the least significant bits will be zero. We use this to
+ * add flags in the list struct pointers, to make the ring buffer
+ * lockless.
+ */
 struct buffer_page {
        struct list_head list;          /* list of buffer pages */
        local_t          write;         /* index for next write */
        unsigned         read;          /* index for next read */
        local_t          entries;       /* entries on this page */
+       unsigned long    real_end;      /* real end of data */
        struct buffer_data_page *page;  /* Actual data page */
 };
 
+/*
+ * The buffer page counters, write and entries, must be reset
+ * atomically when crossing page boundaries. To synchronize this
+ * update, two counters are inserted into the number. One is
+ * the actual counter for the write position or count on the page.
+ *
+ * The other is a counter of updaters. Before an update happens
+ * the update partition of the counter is incremented. This will
+ * allow the updater to update the counter atomically.
+ *
+ * The counter is 20 bits, and the state data is 12.
+ */
+#define RB_WRITE_MASK          0xfffff
+#define RB_WRITE_INTCNT                (1 << 20)
+
 static void rb_init_page(struct buffer_data_page *bpage)
 {
        local_set(&bpage->commit, 0);
@@ -380,18 +413,27 @@ int ring_buffer_print_page_header(struct trace_seq *s)
        int ret;
 
        ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
-                              "offset:0;\tsize:%u;\n",
-                              (unsigned int)sizeof(field.time_stamp));
+                              "offset:0;\tsize:%u;\tsigned:%u;\n",
+                              (unsigned int)sizeof(field.time_stamp),
+                              (unsigned int)is_signed_type(u64));
 
        ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
-                              "offset:%u;\tsize:%u;\n",
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
                               (unsigned int)offsetof(typeof(field), commit),
-                              (unsigned int)sizeof(field.commit));
+                              (unsigned int)sizeof(field.commit),
+                              (unsigned int)is_signed_type(long));
+
+       ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
+                              (unsigned int)offsetof(typeof(field), commit),
+                              1,
+                              (unsigned int)is_signed_type(long));
 
        ret = trace_seq_printf(s, "\tfield: char data;\t"
-                              "offset:%u;\tsize:%u;\n",
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
                               (unsigned int)offsetof(typeof(field), data),
-                              (unsigned int)BUF_PAGE_SIZE);
+                              (unsigned int)BUF_PAGE_SIZE,
+                              (unsigned int)is_signed_type(char));
 
        return ret;
 }
@@ -402,21 +444,22 @@ int ring_buffer_print_page_header(struct trace_seq *s)
 struct ring_buffer_per_cpu {
        int                             cpu;
        struct ring_buffer              *buffer;
-       spinlock_t                      reader_lock; /* serialize readers */
-       raw_spinlock_t                  lock;
+       spinlock_t                      reader_lock;    /* serialize readers */
+       arch_spinlock_t                 lock;
        struct lock_class_key           lock_key;
-       struct list_head                pages;
+       struct list_head                *pages;
        struct buffer_page              *head_page;     /* read from head */
        struct buffer_page              *tail_page;     /* write to tail */
        struct buffer_page              *commit_page;   /* committed pages */
        struct buffer_page              *reader_page;
-       unsigned long                   nmi_dropped;
-       unsigned long                   commit_overrun;
-       unsigned long                   overrun;
-       unsigned long                   read;
+       unsigned long                   lost_events;
+       unsigned long                   last_overrun;
+       local_t                         commit_overrun;
+       local_t                         overrun;
        local_t                         entries;
        local_t                         committing;
        local_t                         commits;
+       unsigned long                   read;
        u64                             write_stamp;
        u64                             read_stamp;
        atomic_t                        record_disabled;
@@ -445,24 +488,31 @@ struct ring_buffer_iter {
        struct ring_buffer_per_cpu      *cpu_buffer;
        unsigned long                   head;
        struct buffer_page              *head_page;
+       struct buffer_page              *cache_reader_page;
+       unsigned long                   cache_read;
        u64                             read_stamp;
 };
 
 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
-#define RB_WARN_ON(buffer, cond)                               \
-       ({                                                      \
-               int _____ret = unlikely(cond);                  \
-               if (_____ret) {                                 \
-                       atomic_inc(&buffer->record_disabled);   \
-                       WARN_ON(1);                             \
-               }                                               \
-               _____ret;                                       \
+#define RB_WARN_ON(b, cond)                                            \
+       ({                                                              \
+               int _____ret = unlikely(cond);                          \
+               if (_____ret) {                                         \
+                       if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
+                               struct ring_buffer_per_cpu *__b =       \
+                                       (void *)b;                      \
+                               atomic_inc(&__b->buffer->record_disabled); \
+                       } else                                          \
+                               atomic_inc(&b->record_disabled);        \
+                       WARN_ON(1);                                     \
+               }                                                       \
+               _____ret;                                               \
        })
 
 /* Up this if you want to test the TIME_EXTENTS and normalization */
 #define DEBUG_SHIFT 0
 
-static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu)
+static inline u64 rb_time_stamp(struct ring_buffer *buffer)
 {
        /* shift to debug/test normalization and TIME_EXTENTS */
        return buffer->clock() << DEBUG_SHIFT;
@@ -473,7 +523,7 @@ u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
        u64 time;
 
        preempt_disable_notrace();
-       time = rb_time_stamp(buffer, cpu);
+       time = rb_time_stamp(buffer);
        preempt_enable_no_resched_notrace();
 
        return time;
@@ -488,6 +538,390 @@ void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
 }
 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
 
+/*
+ * Making the ring buffer lockless makes things tricky.
+ * Although writes only happen on the CPU that they are on,
+ * and they only need to worry about interrupts. Reads can
+ * happen on any CPU.
+ *
+ * The reader page is always off the ring buffer, but when the
+ * reader finishes with a page, it needs to swap its page with
+ * a new one from the buffer. The reader needs to take from
+ * the head (writes go to the tail). But if a writer is in overwrite
+ * mode and wraps, it must push the head page forward.
+ *
+ * Here lies the problem.
+ *
+ * The reader must be careful to replace only the head page, and
+ * not another one. As described at the top of the file in the
+ * ASCII art, the reader sets its old page to point to the next
+ * page after head. It then sets the page after head to point to
+ * the old reader page. But if the writer moves the head page
+ * during this operation, the reader could end up with the tail.
+ *
+ * We use cmpxchg to help prevent this race. We also do something
+ * special with the page before head. We set the LSB to 1.
+ *
+ * When the writer must push the page forward, it will clear the
+ * bit that points to the head page, move the head, and then set
+ * the bit that points to the new head page.
+ *
+ * We also don't want an interrupt coming in and moving the head
+ * page on another writer. Thus we use the second LSB to catch
+ * that too. Thus:
+ *
+ * head->list->prev->next        bit 1          bit 0
+ *                              -------        -------
+ * Normal page                     0              0
+ * Points to head page             0              1
+ * New head page                   1              0
+ *
+ * Note we can not trust the prev pointer of the head page, because:
+ *
+ * +----+       +-----+        +-----+
+ * |    |------>|  T  |---X--->|  N  |
+ * |    |<------|     |        |     |
+ * +----+       +-----+        +-----+
+ *   ^                           ^ |
+ *   |          +-----+          | |
+ *   +----------|  R  |----------+ |
+ *              |     |<-----------+
+ *              +-----+
+ *
+ * Key:  ---X-->  HEAD flag set in pointer
+ *         T      Tail page
+ *         R      Reader page
+ *         N      Next page
+ *
+ * (see __rb_reserve_next() to see where this happens)
+ *
+ *  What the above shows is that the reader just swapped out
+ *  the reader page with a page in the buffer, but before it
+ *  could make the new header point back to the new page added
+ *  it was preempted by a writer. The writer moved forward onto
+ *  the new page added by the reader and is about to move forward
+ *  again.
+ *
+ *  You can see, it is legitimate for the previous pointer of
+ *  the head (or any page) not to point back to itself. But only
+ *  temporarially.
+ */
+
+#define RB_PAGE_NORMAL         0UL
+#define RB_PAGE_HEAD           1UL
+#define RB_PAGE_UPDATE         2UL
+
+
+#define RB_FLAG_MASK           3UL
+
+/* PAGE_MOVED is not part of the mask */
+#define RB_PAGE_MOVED          4UL
+
+/*
+ * rb_list_head - remove any bit
+ */
+static struct list_head *rb_list_head(struct list_head *list)
+{
+       unsigned long val = (unsigned long)list;
+
+       return (struct list_head *)(val & ~RB_FLAG_MASK);
+}
+
+/*
+ * rb_is_head_page - test if the given page is the head page
+ *
+ * Because the reader may move the head_page pointer, we can
+ * not trust what the head page is (it may be pointing to
+ * the reader page). But if the next page is a header page,
+ * its flags will be non zero.
+ */
+static int inline
+rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
+               struct buffer_page *page, struct list_head *list)
+{
+       unsigned long val;
+
+       val = (unsigned long)list->next;
+
+       if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
+               return RB_PAGE_MOVED;
+
+       return val & RB_FLAG_MASK;
+}
+
+/*
+ * rb_is_reader_page
+ *
+ * The unique thing about the reader page, is that, if the
+ * writer is ever on it, the previous pointer never points
+ * back to the reader page.
+ */
+static int rb_is_reader_page(struct buffer_page *page)
+{
+       struct list_head *list = page->list.prev;
+
+       return rb_list_head(list->next) != &page->list;
+}
+
+/*
+ * rb_set_list_to_head - set a list_head to be pointing to head.
+ */
+static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
+                               struct list_head *list)
+{
+       unsigned long *ptr;
+
+       ptr = (unsigned long *)&list->next;
+       *ptr |= RB_PAGE_HEAD;
+       *ptr &= ~RB_PAGE_UPDATE;
+}
+
+/*
+ * rb_head_page_activate - sets up head page
+ */
+static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       struct buffer_page *head;
+
+       head = cpu_buffer->head_page;
+       if (!head)
+               return;
+
+       /*
+        * Set the previous list pointer to have the HEAD flag.
+        */
+       rb_set_list_to_head(cpu_buffer, head->list.prev);
+}
+
+static void rb_list_head_clear(struct list_head *list)
+{
+       unsigned long *ptr = (unsigned long *)&list->next;
+
+       *ptr &= ~RB_FLAG_MASK;
+}
+
+/*
+ * rb_head_page_dactivate - clears head page ptr (for free list)
+ */
+static void
+rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       struct list_head *hd;
+
+       /* Go through the whole list and clear any pointers found. */
+       rb_list_head_clear(cpu_buffer->pages);
+
+       list_for_each(hd, cpu_buffer->pages)
+               rb_list_head_clear(hd);
+}
+
+static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
+                           struct buffer_page *head,
+                           struct buffer_page *prev,
+                           int old_flag, int new_flag)
+{
+       struct list_head *list;
+       unsigned long val = (unsigned long)&head->list;
+       unsigned long ret;
+
+       list = &prev->list;
+
+       val &= ~RB_FLAG_MASK;
+
+       ret = cmpxchg((unsigned long *)&list->next,
+                     val | old_flag, val | new_flag);
+
+       /* check if the reader took the page */
+       if ((ret & ~RB_FLAG_MASK) != val)
+               return RB_PAGE_MOVED;
+
+       return ret & RB_FLAG_MASK;
+}
+
+static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
+                                  struct buffer_page *head,
+                                  struct buffer_page *prev,
+                                  int old_flag)
+{
+       return rb_head_page_set(cpu_buffer, head, prev,
+                               old_flag, RB_PAGE_UPDATE);
+}
+
+static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
+                                struct buffer_page *head,
+                                struct buffer_page *prev,
+                                int old_flag)
+{
+       return rb_head_page_set(cpu_buffer, head, prev,
+                               old_flag, RB_PAGE_HEAD);
+}
+
+static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
+                                  struct buffer_page *head,
+                                  struct buffer_page *prev,
+                                  int old_flag)
+{
+       return rb_head_page_set(cpu_buffer, head, prev,
+                               old_flag, RB_PAGE_NORMAL);
+}
+
+static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
+                              struct buffer_page **bpage)
+{
+       struct list_head *p = rb_list_head((*bpage)->list.next);
+
+       *bpage = list_entry(p, struct buffer_page, list);
+}
+
+static struct buffer_page *
+rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       struct buffer_page *head;
+       struct buffer_page *page;
+       struct list_head *list;
+       int i;
+
+       if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
+               return NULL;
+
+       /* sanity check */
+       list = cpu_buffer->pages;
+       if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
+               return NULL;
+
+       page = head = cpu_buffer->head_page;
+       /*
+        * It is possible that the writer moves the header behind
+        * where we started, and we miss in one loop.
+        * A second loop should grab the header, but we'll do
+        * three loops just because I'm paranoid.
+        */
+       for (i = 0; i < 3; i++) {
+               do {
+                       if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
+                               cpu_buffer->head_page = page;
+                               return page;
+                       }
+                       rb_inc_page(cpu_buffer, &page);
+               } while (page != head);
+       }
+
+       RB_WARN_ON(cpu_buffer, 1);
+
+       return NULL;
+}
+
+static int rb_head_page_replace(struct buffer_page *old,
+                               struct buffer_page *new)
+{
+       unsigned long *ptr = (unsigned long *)&old->list.prev->next;
+       unsigned long val;
+       unsigned long ret;
+
+       val = *ptr & ~RB_FLAG_MASK;
+       val |= RB_PAGE_HEAD;
+
+       ret = cmpxchg(ptr, val, (unsigned long)&new->list);
+
+       return ret == val;
+}
+
+/*
+ * rb_tail_page_update - move the tail page forward
+ *
+ * Returns 1 if moved tail page, 0 if someone else did.
+ */
+static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
+                              struct buffer_page *tail_page,
+                              struct buffer_page *next_page)
+{
+       struct buffer_page *old_tail;
+       unsigned long old_entries;
+       unsigned long old_write;
+       int ret = 0;
+
+       /*
+        * The tail page now needs to be moved forward.
+        *
+        * We need to reset the tail page, but without messing
+        * with possible erasing of data brought in by interrupts
+        * that have moved the tail page and are currently on it.
+        *
+        * We add a counter to the write field to denote this.
+        */
+       old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
+       old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
+
+       /*
+        * Just make sure we have seen our old_write and synchronize
+        * with any interrupts that come in.
+        */
+       barrier();
+
+       /*
+        * If the tail page is still the same as what we think
+        * it is, then it is up to us to update the tail
+        * pointer.
+        */
+       if (tail_page == cpu_buffer->tail_page) {
+               /* Zero the write counter */
+               unsigned long val = old_write & ~RB_WRITE_MASK;
+               unsigned long eval = old_entries & ~RB_WRITE_MASK;
+
+               /*
+                * This will only succeed if an interrupt did
+                * not come in and change it. In which case, we
+                * do not want to modify it.
+                *
+                * We add (void) to let the compiler know that we do not care
+                * about the return value of these functions. We use the
+                * cmpxchg to only update if an interrupt did not already
+                * do it for us. If the cmpxchg fails, we don't care.
+                */
+               (void)local_cmpxchg(&next_page->write, old_write, val);
+               (void)local_cmpxchg(&next_page->entries, old_entries, eval);
+
+               /*
+                * No need to worry about races with clearing out the commit.
+                * it only can increment when a commit takes place. But that
+                * only happens in the outer most nested commit.
+                */
+               local_set(&next_page->page->commit, 0);
+
+               old_tail = cmpxchg(&cpu_buffer->tail_page,
+                                  tail_page, next_page);
+
+               if (old_tail == tail_page)
+                       ret = 1;
+       }
+
+       return ret;
+}
+
+static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
+                         struct buffer_page *bpage)
+{
+       unsigned long val = (unsigned long)bpage;
+
+       if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
+               return 1;
+
+       return 0;
+}
+
+/**
+ * rb_check_list - make sure a pointer to a list has the last bits zero
+ */
+static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
+                        struct list_head *list)
+{
+       if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
+               return 1;
+       if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
+               return 1;
+       return 0;
+}
+
 /**
  * check_pages - integrity check of buffer pages
  * @cpu_buffer: CPU buffer with pages to test
@@ -497,14 +931,19 @@ EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
  */
 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
 {
-       struct list_head *head = &cpu_buffer->pages;
+       struct list_head *head = cpu_buffer->pages;
        struct buffer_page *bpage, *tmp;
 
+       rb_head_page_deactivate(cpu_buffer);
+
        if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
                return -1;
        if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
                return -1;
 
+       if (rb_check_list(cpu_buffer, head))
+               return -1;
+
        list_for_each_entry_safe(bpage, tmp, head, list) {
                if (RB_WARN_ON(cpu_buffer,
                               bpage->list.next->prev != &bpage->list))
@@ -512,25 +951,33 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
                if (RB_WARN_ON(cpu_buffer,
                               bpage->list.prev->next != &bpage->list))
                        return -1;
+               if (rb_check_list(cpu_buffer, &bpage->list))
+                       return -1;
        }
 
+       rb_head_page_activate(cpu_buffer);
+
        return 0;
 }
 
 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
                             unsigned nr_pages)
 {
-       struct list_head *head = &cpu_buffer->pages;
        struct buffer_page *bpage, *tmp;
        unsigned long addr;
        LIST_HEAD(pages);
        unsigned i;
 
+       WARN_ON(!nr_pages);
+
        for (i = 0; i < nr_pages; i++) {
                bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                                    GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
                if (!bpage)
                        goto free_pages;
+
+               rb_check_bpage(cpu_buffer, bpage);
+
                list_add(&bpage->list, &pages);
 
                addr = __get_free_page(GFP_KERNEL);
@@ -540,7 +987,13 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
                rb_init_page(bpage->page);
        }
 
-       list_splice(&pages, head);
+       /*
+        * The ring buffer page list is a circular list that does not
+        * start and end with a list head. All page list items point to
+        * other pages.
+        */
+       cpu_buffer->pages = pages.next;
+       list_del(&pages);
 
        rb_check_pages(cpu_buffer);
 
@@ -571,14 +1024,15 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
        cpu_buffer->buffer = buffer;
        spin_lock_init(&cpu_buffer->reader_lock);
        lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
-       cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
-       INIT_LIST_HEAD(&cpu_buffer->pages);
+       cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
 
        bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                            GFP_KERNEL, cpu_to_node(cpu));
        if (!bpage)
                goto fail_free_buffer;
 
+       rb_check_bpage(cpu_buffer, bpage);
+
        cpu_buffer->reader_page = bpage;
        addr = __get_free_page(GFP_KERNEL);
        if (!addr)
@@ -593,9 +1047,11 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
                goto fail_free_reader;
 
        cpu_buffer->head_page
-               = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
+               = list_entry(cpu_buffer->pages, struct buffer_page, list);
        cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
 
+       rb_head_page_activate(cpu_buffer);
+
        return cpu_buffer;
 
  fail_free_reader:
@@ -608,15 +1064,22 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
 
 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
 {
-       struct list_head *head = &cpu_buffer->pages;
+       struct list_head *head = cpu_buffer->pages;
        struct buffer_page *bpage, *tmp;
 
        free_buffer_page(cpu_buffer->reader_page);
 
-       list_for_each_entry_safe(bpage, tmp, head, list) {
-               list_del_init(&bpage->list);
+       rb_head_page_deactivate(cpu_buffer);
+
+       if (head) {
+               list_for_each_entry_safe(bpage, tmp, head, list) {
+                       list_del_init(&bpage->list);
+                       free_buffer_page(bpage);
+               }
+               bpage = list_entry(head, struct buffer_page, list);
                free_buffer_page(bpage);
        }
+
        kfree(cpu_buffer);
 }
 
@@ -734,6 +1197,7 @@ ring_buffer_free(struct ring_buffer *buffer)
 
        put_online_cpus();
 
+       kfree(buffer->buffers);
        free_cpumask_var(buffer->cpumask);
 
        kfree(buffer);
@@ -755,26 +1219,25 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
        struct list_head *p;
        unsigned i;
 
-       atomic_inc(&cpu_buffer->record_disabled);
-       synchronize_sched();
+       spin_lock_irq(&cpu_buffer->reader_lock);
+       rb_head_page_deactivate(cpu_buffer);
 
        for (i = 0; i < nr_pages; i++) {
-               if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
-                       return;
-               p = cpu_buffer->pages.next;
+               if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
+                       goto out;
+               p = cpu_buffer->pages->next;
                bpage = list_entry(p, struct buffer_page, list);
                list_del_init(&bpage->list);
                free_buffer_page(bpage);
        }
-       if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
-               return;
+       if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
+               goto out;
 
        rb_reset_cpu(cpu_buffer);
-
        rb_check_pages(cpu_buffer);
 
-       atomic_dec(&cpu_buffer->record_disabled);
-
+out:
+       spin_unlock_irq(&cpu_buffer->reader_lock);
 }
 
 static void
@@ -785,22 +1248,22 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
        struct list_head *p;
        unsigned i;
 
-       atomic_inc(&cpu_buffer->record_disabled);
-       synchronize_sched();
+       spin_lock_irq(&cpu_buffer->reader_lock);
+       rb_head_page_deactivate(cpu_buffer);
 
        for (i = 0; i < nr_pages; i++) {
                if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
-                       return;
+                       goto out;
                p = pages->next;
                bpage = list_entry(p, struct buffer_page, list);
                list_del_init(&bpage->list);
-               list_add_tail(&bpage->list, &cpu_buffer->pages);
+               list_add_tail(&bpage->list, cpu_buffer->pages);
        }
        rb_reset_cpu(cpu_buffer);
-
        rb_check_pages(cpu_buffer);
 
-       atomic_dec(&cpu_buffer->record_disabled);
+out:
+       spin_unlock_irq(&cpu_buffer->reader_lock);
 }
 
 /**
@@ -808,11 +1271,6 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
  * @buffer: the buffer to resize.
  * @size: the new size.
  *
- * The tracer is responsible for making sure that the buffer is
- * not being used while changing the size.
- * Note: We may be able to change the above requirement by using
- *  RCU synchronizations.
- *
  * Minimum size is 2 * BUF_PAGE_SIZE.
  *
  * Returns -1 on failure.
@@ -844,6 +1302,11 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        if (size == buffer_size)
                return size;
 
+       atomic_inc(&buffer->record_disabled);
+
+       /* Make sure all writers are done with this buffer. */
+       synchronize_sched();
+
        mutex_lock(&buffer->mutex);
        get_online_cpus();
 
@@ -906,6 +1369,8 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
 
+       atomic_dec(&buffer->record_disabled);
+
        return size;
 
  free_pages:
@@ -915,6 +1380,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        }
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
+       atomic_dec(&buffer->record_disabled);
        return -ENOMEM;
 
        /*
@@ -924,6 +1390,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
  out_fail:
        put_online_cpus();
        mutex_unlock(&buffer->mutex);
+       atomic_dec(&buffer->record_disabled);
        return -1;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_resize);
@@ -947,21 +1414,14 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
 }
 
 static inline struct ring_buffer_event *
-rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
-{
-       return __rb_page_index(cpu_buffer->head_page,
-                              cpu_buffer->head_page->read);
-}
-
-static inline struct ring_buffer_event *
 rb_iter_head_event(struct ring_buffer_iter *iter)
 {
        return __rb_page_index(iter->head_page, iter->head);
 }
 
-static inline unsigned rb_page_write(struct buffer_page *bpage)
+static inline unsigned long rb_page_write(struct buffer_page *bpage)
 {
-       return local_read(&bpage->write);
+       return local_read(&bpage->write) & RB_WRITE_MASK;
 }
 
 static inline unsigned rb_page_commit(struct buffer_page *bpage)
@@ -969,6 +1429,11 @@ static inline unsigned rb_page_commit(struct buffer_page *bpage)
        return local_read(&bpage->page->commit);
 }
 
+static inline unsigned long rb_page_entries(struct buffer_page *bpage)
+{
+       return local_read(&bpage->entries) & RB_WRITE_MASK;
+}
+
 /* Size is determined by what has been commited */
 static inline unsigned rb_page_size(struct buffer_page *bpage)
 {
@@ -981,22 +1446,6 @@ rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
        return rb_page_commit(cpu_buffer->commit_page);
 }
 
-static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
-{
-       return rb_page_commit(cpu_buffer->head_page);
-}
-
-static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
-                              struct buffer_page **bpage)
-{
-       struct list_head *p = (*bpage)->list.next;
-
-       if (p == &cpu_buffer->pages)
-               p = p->next;
-
-       *bpage = list_entry(p, struct buffer_page, list);
-}
-
 static inline unsigned
 rb_event_index(struct ring_buffer_event *event)
 {
@@ -1022,6 +1471,8 @@ rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
 static void
 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
 {
+       unsigned long max_count;
+
        /*
         * We only race with interrupts and NMIs on this CPU.
         * If we own the commit event, then we can commit
@@ -1031,9 +1482,16 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
         * assign the commit to the tail.
         */
  again:
+       max_count = cpu_buffer->buffer->pages * 100;
+
        while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
-               cpu_buffer->commit_page->page->commit =
-                       cpu_buffer->commit_page->write;
+               if (RB_WARN_ON(cpu_buffer, !(--max_count)))
+                       return;
+               if (RB_WARN_ON(cpu_buffer,
+                              rb_is_reader_page(cpu_buffer->tail_page)))
+                       return;
+               local_set(&cpu_buffer->commit_page->page->commit,
+                         rb_page_write(cpu_buffer->commit_page));
                rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
                cpu_buffer->write_stamp =
                        cpu_buffer->commit_page->page->time_stamp;
@@ -1042,8 +1500,12 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
        }
        while (rb_commit_index(cpu_buffer) !=
               rb_page_write(cpu_buffer->commit_page)) {
-               cpu_buffer->commit_page->page->commit =
-                       cpu_buffer->commit_page->write;
+
+               local_set(&cpu_buffer->commit_page->page->commit,
+                         rb_page_write(cpu_buffer->commit_page));
+               RB_WARN_ON(cpu_buffer,
+                          local_read(&cpu_buffer->commit_page->page->commit) &
+                          ~RB_WRITE_MASK);
                barrier();
        }
 
@@ -1076,7 +1538,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
         * to the head page instead of next.
         */
        if (iter->head_page == cpu_buffer->reader_page)
-               iter->head_page = cpu_buffer->head_page;
+               iter->head_page = rb_set_head_page(cpu_buffer);
        else
                rb_inc_page(cpu_buffer, &iter->head_page);
 
@@ -1110,7 +1572,7 @@ rb_update_event(struct ring_buffer_event *event,
 
        case 0:
                length -= RB_EVNT_HDR_SIZE;
-               if (length > RB_MAX_SMALL_DATA)
+               if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
                        event->array[0] = length;
                else
                        event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
@@ -1120,6 +1582,163 @@ rb_update_event(struct ring_buffer_event *event,
        }
 }
 
+/*
+ * rb_handle_head_page - writer hit the head page
+ *
+ * Returns: +1 to retry page
+ *           0 to continue
+ *          -1 on error
+ */
+static int
+rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
+                   struct buffer_page *tail_page,
+                   struct buffer_page *next_page)
+{
+       struct buffer_page *new_head;
+       int entries;
+       int type;
+       int ret;
+
+       entries = rb_page_entries(next_page);
+
+       /*
+        * The hard part is here. We need to move the head
+        * forward, and protect against both readers on
+        * other CPUs and writers coming in via interrupts.
+        */
+       type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
+                                      RB_PAGE_HEAD);
+
+       /*
+        * type can be one of four:
+        *  NORMAL - an interrupt already moved it for us
+        *  HEAD   - we are the first to get here.
+        *  UPDATE - we are the interrupt interrupting
+        *           a current move.
+        *  MOVED  - a reader on another CPU moved the next
+        *           pointer to its reader page. Give up
+        *           and try again.
+        */
+
+       switch (type) {
+       case RB_PAGE_HEAD:
+               /*
+                * We changed the head to UPDATE, thus
+                * it is our responsibility to update
+                * the counters.
+                */
+               local_add(entries, &cpu_buffer->overrun);
+
+               /*
+                * The entries will be zeroed out when we move the
+                * tail page.
+                */
+
+               /* still more to do */
+               break;
+
+       case RB_PAGE_UPDATE:
+               /*
+                * This is an interrupt that interrupt the
+                * previous update. Still more to do.
+                */
+               break;
+       case RB_PAGE_NORMAL:
+               /*
+                * An interrupt came in before the update
+                * and processed this for us.
+                * Nothing left to do.
+                */
+               return 1;
+       case RB_PAGE_MOVED:
+               /*
+                * The reader is on another CPU and just did
+                * a swap with our next_page.
+                * Try again.
+                */
+               return 1;
+       default:
+               RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
+               return -1;
+       }
+
+       /*
+        * Now that we are here, the old head pointer is
+        * set to UPDATE. This will keep the reader from
+        * swapping the head page with the reader page.
+        * The reader (on another CPU) will spin till
+        * we are finished.
+        *
+        * We just need to protect against interrupts
+        * doing the job. We will set the next pointer
+        * to HEAD. After that, we set the old pointer
+        * to NORMAL, but only if it was HEAD before.
+        * otherwise we are an interrupt, and only
+        * want the outer most commit to reset it.
+        */
+       new_head = next_page;
+       rb_inc_page(cpu_buffer, &new_head);
+
+       ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
+                                   RB_PAGE_NORMAL);
+
+       /*
+        * Valid returns are:
+        *  HEAD   - an interrupt came in and already set it.
+        *  NORMAL - One of two things:
+        *            1) We really set it.
+        *            2) A bunch of interrupts came in and moved
+        *               the page forward again.
+        */
+       switch (ret) {
+       case RB_PAGE_HEAD:
+       case RB_PAGE_NORMAL:
+               /* OK */
+               break;
+       default:
+               RB_WARN_ON(cpu_buffer, 1);
+               return -1;
+       }
+
+       /*
+        * It is possible that an interrupt came in,
+        * set the head up, then more interrupts came in
+        * and moved it again. When we get back here,
+        * the page would have been set to NORMAL but we
+        * just set it back to HEAD.
+        *
+        * How do you detect this? Well, if that happened
+        * the tail page would have moved.
+        */
+       if (ret == RB_PAGE_NORMAL) {
+               /*
+                * If the tail had moved passed next, then we need
+                * to reset the pointer.
+                */
+               if (cpu_buffer->tail_page != tail_page &&
+                   cpu_buffer->tail_page != next_page)
+                       rb_head_page_set_normal(cpu_buffer, new_head,
+                                               next_page,
+                                               RB_PAGE_HEAD);
+       }
+
+       /*
+        * If this was the outer most commit (the one that
+        * changed the original pointer from HEAD to UPDATE),
+        * then it is up to us to reset it to NORMAL.
+        */
+       if (type == RB_PAGE_HEAD) {
+               ret = rb_head_page_set_normal(cpu_buffer, next_page,
+                                             tail_page,
+                                             RB_PAGE_UPDATE);
+               if (RB_WARN_ON(cpu_buffer,
+                              ret != RB_PAGE_UPDATE))
+                       return -1;
+       }
+
+       return 0;
+}
+
 static unsigned rb_calculate_event_length(unsigned length)
 {
        struct ring_buffer_event event; /* Used only for sizeof array */
@@ -1128,11 +1747,11 @@ static unsigned rb_calculate_event_length(unsigned length)
        if (!length)
                length = 1;
 
-       if (length > RB_MAX_SMALL_DATA)
+       if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
                length += sizeof(event.array[0]);
 
        length += RB_EVNT_HDR_SIZE;
-       length = ALIGN(length, RB_ALIGNMENT);
+       length = ALIGN(length, RB_ARCH_ALIGNMENT);
 
        return length;
 }
@@ -1149,11 +1768,27 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
         * must fill the old tail_page with padding.
         */
        if (tail >= BUF_PAGE_SIZE) {
+               /*
+                * If the page was filled, then we still need
+                * to update the real_end. Reset it to zero
+                * and the reader will ignore it.
+                */
+               if (tail == BUF_PAGE_SIZE)
+                       tail_page->real_end = 0;
+
                local_sub(length, &tail_page->write);
                return;
        }
 
        event = __rb_page_index(tail_page, tail);
+       kmemcheck_annotate_bitfield(event, bitfield);
+
+       /*
+        * Save the original length to the meta data.
+        * This will be used by the reader to add lost event
+        * counter.
+        */
+       tail_page->real_end = tail;
 
        /*
         * If this event is bigger than the minimum size, then
@@ -1182,9 +1817,6 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
        event->type_len = RINGBUF_TYPE_PADDING;
        /* time delta must be non zero */
        event->time_delta = 1;
-       /* Account for this as an entry */
-       local_inc(&tail_page->entries);
-       local_inc(&cpu_buffer->entries);
 
        /* Set write to end of buffer */
        length = (tail + length) - BUF_PAGE_SIZE;
@@ -1194,52 +1826,16 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
 static struct ring_buffer_event *
 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
             unsigned long length, unsigned long tail,
-            struct buffer_page *commit_page,
             struct buffer_page *tail_page, u64 *ts)
 {
-       struct buffer_page *next_page, *head_page, *reader_page;
+       struct buffer_page *commit_page = cpu_buffer->commit_page;
        struct ring_buffer *buffer = cpu_buffer->buffer;
-       bool lock_taken = false;
-       unsigned long flags;
+       struct buffer_page *next_page;
+       int ret;
 
        next_page = tail_page;
 
-       local_irq_save(flags);
-       /*
-        * Since the write to the buffer is still not
-        * fully lockless, we must be careful with NMIs.
-        * The locks in the writers are taken when a write
-        * crosses to a new page. The locks protect against
-        * races with the readers (this will soon be fixed
-        * with a lockless solution).
-        *
-        * Because we can not protect against NMIs, and we
-        * want to keep traces reentrant, we need to manage
-        * what happens when we are in an NMI.
-        *
-        * NMIs can happen after we take the lock.
-        * If we are in an NMI, only take the lock
-        * if it is not already taken. Otherwise
-        * simply fail.
-        */
-       if (unlikely(in_nmi())) {
-               if (!__raw_spin_trylock(&cpu_buffer->lock)) {
-                       cpu_buffer->nmi_dropped++;
-                       goto out_reset;
-               }
-       } else
-               __raw_spin_lock(&cpu_buffer->lock);
-
-       lock_taken = true;
-
-       rb_inc_page(cpu_buffer, &next_page);
-
-       head_page = cpu_buffer->head_page;
-       reader_page = cpu_buffer->reader_page;
-
-       /* we grabbed the lock before incrementing */
-       if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
-               goto out_reset;
+       rb_inc_page(cpu_buffer, &next_page);
 
        /*
         * If for some reason, we had an interrupt storm that made
@@ -1247,46 +1843,79 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
         * about it.
         */
        if (unlikely(next_page == commit_page)) {
-               cpu_buffer->commit_overrun++;
+               local_inc(&cpu_buffer->commit_overrun);
                goto out_reset;
        }
 
-       if (next_page == head_page) {
-               if (!(buffer->flags & RB_FL_OVERWRITE))
-                       goto out_reset;
-
-               /* tail_page has not moved yet? */
-               if (tail_page == cpu_buffer->tail_page) {
-                       /* count overflows */
-                       cpu_buffer->overrun +=
-                               local_read(&head_page->entries);
+       /*
+        * This is where the fun begins!
+        *
+        * We are fighting against races between a reader that
+        * could be on another CPU trying to swap its reader
+        * page with the buffer head.
+        *
+        * We are also fighting against interrupts coming in and
+        * moving the head or tail on us as well.
+        *
+        * If the next page is the head page then we have filled
+        * the buffer, unless the commit page is still on the
+        * reader page.
+        */
+       if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
 
-                       rb_inc_page(cpu_buffer, &head_page);
-                       cpu_buffer->head_page = head_page;
-                       cpu_buffer->head_page->read = 0;
+               /*
+                * If the commit is not on the reader page, then
+                * move the header page.
+                */
+               if (!rb_is_reader_page(cpu_buffer->commit_page)) {
+                       /*
+                        * If we are not in overwrite mode,
+                        * this is easy, just stop here.
+                        */
+                       if (!(buffer->flags & RB_FL_OVERWRITE))
+                               goto out_reset;
+
+                       ret = rb_handle_head_page(cpu_buffer,
+                                                 tail_page,
+                                                 next_page);
+                       if (ret < 0)
+                               goto out_reset;
+                       if (ret)
+                               goto out_again;
+               } else {
+                       /*
+                        * We need to be careful here too. The
+                        * commit page could still be on the reader
+                        * page. We could have a small buffer, and
+                        * have filled up the buffer with events
+                        * from interrupts and such, and wrapped.
+                        *
+                        * Note, if the tail page is also the on the
+                        * reader_page, we let it move out.
+                        */
+                       if (unlikely((cpu_buffer->commit_page !=
+                                     cpu_buffer->tail_page) &&
+                                    (cpu_buffer->commit_page ==
+                                     cpu_buffer->reader_page))) {
+                               local_inc(&cpu_buffer->commit_overrun);
+                               goto out_reset;
+                       }
                }
        }
 
-       /*
-        * If the tail page is still the same as what we think
-        * it is, then it is up to us to update the tail
-        * pointer.
-        */
-       if (tail_page == cpu_buffer->tail_page) {
-               local_set(&next_page->write, 0);
-               local_set(&next_page->entries, 0);
-               local_set(&next_page->page->commit, 0);
-               cpu_buffer->tail_page = next_page;
-
-               /* reread the time stamp */
-               *ts = rb_time_stamp(buffer, cpu_buffer->cpu);
-               cpu_buffer->tail_page->page->time_stamp = *ts;
+       ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
+       if (ret) {
+               /*
+                * Nested commits always have zero deltas, so
+                * just reread the time stamp
+                */
+               *ts = rb_time_stamp(buffer);
+               next_page->page->time_stamp = *ts;
        }
 
-       rb_reset_tail(cpu_buffer, tail_page, tail, length);
+ out_again:
 
-       __raw_spin_unlock(&cpu_buffer->lock);
-       local_irq_restore(flags);
+       rb_reset_tail(cpu_buffer, tail_page, tail, length);
 
        /* fail and let the caller try again */
        return ERR_PTR(-EAGAIN);
@@ -1295,9 +1924,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
        /* reset write */
        rb_reset_tail(cpu_buffer, tail_page, tail, length);
 
-       if (likely(lock_taken))
-               __raw_spin_unlock(&cpu_buffer->lock);
-       local_irq_restore(flags);
        return NULL;
 }
 
@@ -1305,25 +1931,26 @@ static struct ring_buffer_event *
 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                  unsigned type, unsigned long length, u64 *ts)
 {
-       struct buffer_page *tail_page, *commit_page;
+       struct buffer_page *tail_page;
        struct ring_buffer_event *event;
        unsigned long tail, write;
 
-       commit_page = cpu_buffer->commit_page;
-       /* we just need to protect against interrupts */
-       barrier();
        tail_page = cpu_buffer->tail_page;
        write = local_add_return(length, &tail_page->write);
+
+       /* set write to only the index of the write */
+       write &= RB_WRITE_MASK;
        tail = write - length;
 
        /* See if we shot pass the end of this buffer page */
        if (write > BUF_PAGE_SIZE)
                return rb_move_tail(cpu_buffer, length, tail,
-                                   commit_page, tail_page, ts);
+                                   tail_page, ts);
 
        /* We reserved something on the buffer */
 
        event = __rb_page_index(tail_page, tail);
+       kmemcheck_annotate_bitfield(event, bitfield);
        rb_update_event(event, type, length);
 
        /* The passed in type is zero for DATA */
@@ -1357,12 +1984,16 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
        bpage = cpu_buffer->tail_page;
 
        if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
+               unsigned long write_mask =
+                       local_read(&bpage->write) & ~RB_WRITE_MASK;
                /*
                 * This is on the tail page. It is possible that
                 * a write could come in and move the tail page
                 * and write to the next page. That is fine
                 * because we just shorten what is on this page.
                 */
+               old_index += write_mask;
+               new_index += write_mask;
                index = local_cmpxchg(&bpage->write, old_index, new_index);
                if (index == old_index)
                        return 1;
@@ -1377,17 +2008,13 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                  u64 *ts, u64 *delta)
 {
        struct ring_buffer_event *event;
-       static int once;
        int ret;
 
-       if (unlikely(*delta > (1ULL << 59) && !once++)) {
-               printk(KERN_WARNING "Delta way too big! %llu"
-                      " ts=%llu write stamp = %llu\n",
-                      (unsigned long long)*delta,
-                      (unsigned long long)*ts,
-                      (unsigned long long)cpu_buffer->write_stamp);
-               WARN_ON(1);
-       }
+       WARN_ONCE(*delta > (1ULL << 59),
+                 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n",
+                 (unsigned long long)*delta,
+                 (unsigned long long)*ts,
+                 (unsigned long long)cpu_buffer->write_stamp);
 
        /*
         * The delta is too big, we to add a
@@ -1478,7 +2105,8 @@ static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
 }
 
 static struct ring_buffer_event *
-rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
+rb_reserve_next_event(struct ring_buffer *buffer,
+                     struct ring_buffer_per_cpu *cpu_buffer,
                      unsigned long length)
 {
        struct ring_buffer_event *event;
@@ -1488,6 +2116,21 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
 
        rb_start_commit(cpu_buffer);
 
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
+       /*
+        * Due to the ability to swap a cpu buffer from a buffer
+        * it is possible it was swapped before we committed.
+        * (committing stops a swap). We check for it here and
+        * if it happened, we have to fail the write.
+        */
+       barrier();
+       if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
+               local_dec(&cpu_buffer->committing);
+               local_dec(&cpu_buffer->commits);
+               return NULL;
+       }
+#endif
+
        length = rb_calculate_event_length(length);
  again:
        /*
@@ -1502,7 +2145,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
                goto out_fail;
 
-       ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
+       ts = rb_time_stamp(cpu_buffer->buffer);
 
        /*
         * Only the first commit can update the timestamp.
@@ -1560,6 +2203,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
        return NULL;
 }
 
+#ifdef CONFIG_TRACING
+
 #define TRACE_RECURSIVE_DEPTH 16
 
 static int trace_recursive_lock(void)
@@ -1590,6 +2235,13 @@ static void trace_recursive_unlock(void)
        current->trace_recursion--;
 }
 
+#else
+
+#define trace_recursive_lock()         (0)
+#define trace_recursive_unlock()       do { } while (0)
+
+#endif
+
 static DEFINE_PER_CPU(int, rb_need_resched);
 
 /**
@@ -1617,12 +2269,12 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
        if (ring_buffer_flags != RB_BUFFERS_ON)
                return NULL;
 
-       if (atomic_read(&buffer->record_disabled))
-               return NULL;
-
        /* If we are tracing schedule, we don't want to recurse */
        resched = ftrace_preempt_disable();
 
+       if (atomic_read(&buffer->record_disabled))
+               goto out_nocheck;
+
        if (trace_recursive_lock())
                goto out_nocheck;
 
@@ -1639,7 +2291,7 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
        if (length > BUF_MAX_DATA_SIZE)
                goto out;
 
-       event = rb_reserve_next_event(cpu_buffer, length);
+       event = rb_reserve_next_event(buffer, cpu_buffer, length);
        if (!event)
                goto out;
 
@@ -1662,18 +2314,23 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
 
-static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+static void
+rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
                      struct ring_buffer_event *event)
 {
-       local_inc(&cpu_buffer->entries);
-
        /*
         * The event first in the commit queue updates the
         * time stamp.
         */
        if (rb_event_is_commit(cpu_buffer, event))
                cpu_buffer->write_stamp += event->time_delta;
+}
 
+static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+                     struct ring_buffer_event *event)
+{
+       local_inc(&cpu_buffer->entries);
+       rb_update_write_stamp(cpu_buffer, event);
        rb_end_commit(cpu_buffer);
 }
 
@@ -1720,32 +2377,57 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
                event->time_delta = 1;
 }
 
-/**
- * ring_buffer_event_discard - discard any event in the ring buffer
- * @event: the event to discard
- *
- * Sometimes a event that is in the ring buffer needs to be ignored.
- * This function lets the user discard an event in the ring buffer
- * and then that event will not be read later.
- *
- * Note, it is up to the user to be careful with this, and protect
- * against races. If the user discards an event that has been consumed
- * it is possible that it could corrupt the ring buffer.
+/*
+ * Decrement the entries to the page that an event is on.
+ * The event does not even need to exist, only the pointer
+ * to the page it is on. This may only be called before the commit
+ * takes place.
  */
-void ring_buffer_event_discard(struct ring_buffer_event *event)
+static inline void
+rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
+                  struct ring_buffer_event *event)
 {
-       rb_event_discard(event);
+       unsigned long addr = (unsigned long)event;
+       struct buffer_page *bpage = cpu_buffer->commit_page;
+       struct buffer_page *start;
+
+       addr &= PAGE_MASK;
+
+       /* Do the likely case first */
+       if (likely(bpage->page == (void *)addr)) {
+               local_dec(&bpage->entries);
+               return;
+       }
+
+       /*
+        * Because the commit page may be on the reader page we
+        * start with the next page and check the end loop there.
+        */
+       rb_inc_page(cpu_buffer, &bpage);
+       start = bpage;
+       do {
+               if (bpage->page == (void *)addr) {
+                       local_dec(&bpage->entries);
+                       return;
+               }
+               rb_inc_page(cpu_buffer, &bpage);
+       } while (bpage != start);
+
+       /* commit not part of this buffer?? */
+       RB_WARN_ON(cpu_buffer, 1);
 }
-EXPORT_SYMBOL_GPL(ring_buffer_event_discard);
 
 /**
  * ring_buffer_commit_discard - discard an event that has not been committed
  * @buffer: the ring buffer
  * @event: non committed event to discard
  *
- * This is similar to ring_buffer_event_discard but must only be
- * performed on an event that has not been committed yet. The difference
- * is that this will also try to free the event from the ring buffer
+ * Sometimes an event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * This function only works if it is called before the the item has been
+ * committed. It will try to free the event from the ring buffer
  * if another event has not been added behind it.
  *
  * If another event has been added behind it, it will set the event
@@ -1773,14 +2455,15 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
         */
        RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
 
-       if (!rb_try_to_discard(cpu_buffer, event))
+       rb_decrement_entry(cpu_buffer, event);
+       if (rb_try_to_discard(cpu_buffer, event))
                goto out;
 
        /*
         * The commit is still visible by the reader, so we
-        * must increment entries.
+        * must still update the timestamp.
         */
-       local_inc(&cpu_buffer->entries);
+       rb_update_write_stamp(cpu_buffer, event);
  out:
        rb_end_commit(cpu_buffer);
 
@@ -1823,11 +2506,11 @@ int ring_buffer_write(struct ring_buffer *buffer,
        if (ring_buffer_flags != RB_BUFFERS_ON)
                return -EBUSY;
 
-       if (atomic_read(&buffer->record_disabled))
-               return -EBUSY;
-
        resched = ftrace_preempt_disable();
 
+       if (atomic_read(&buffer->record_disabled))
+               goto out;
+
        cpu = raw_smp_processor_id();
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
@@ -1841,7 +2524,7 @@ int ring_buffer_write(struct ring_buffer *buffer,
        if (length > BUF_MAX_DATA_SIZE)
                goto out;
 
-       event = rb_reserve_next_event(cpu_buffer, length);
+       event = rb_reserve_next_event(buffer, cpu_buffer, length);
        if (!event)
                goto out;
 
@@ -1862,9 +2545,13 @@ EXPORT_SYMBOL_GPL(ring_buffer_write);
 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
 {
        struct buffer_page *reader = cpu_buffer->reader_page;
-       struct buffer_page *head = cpu_buffer->head_page;
+       struct buffer_page *head = rb_set_head_page(cpu_buffer);
        struct buffer_page *commit = cpu_buffer->commit_page;
 
+       /* In case of error, head will be NULL */
+       if (unlikely(!head))
+               return 1;
+
        return reader->read == rb_page_commit(reader) &&
                (commit == reader ||
                 (commit == head &&
@@ -1891,7 +2578,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
  * @buffer: The ring buffer to enable writes
  *
  * Note, multiple disables will need the same number of enables
- * to truely enable the writing (much like preempt_disable).
+ * to truly enable the writing (much like preempt_disable).
  */
 void ring_buffer_record_enable(struct ring_buffer *buffer)
 {
@@ -1927,7 +2614,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
  * @cpu: The CPU to enable.
  *
  * Note, multiple disables will need the same number of enables
- * to truely enable the writing (much like preempt_disable).
+ * to truly enable the writing (much like preempt_disable).
  */
 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
 {
@@ -1955,7 +2642,7 @@ unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
                return 0;
 
        cpu_buffer = buffer->buffers[cpu];
-       ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun)
+       ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun))
                - cpu_buffer->read;
 
        return ret;
@@ -1976,33 +2663,13 @@ unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
                return 0;
 
        cpu_buffer = buffer->buffers[cpu];
-       ret = cpu_buffer->overrun;
+       ret = local_read(&cpu_buffer->overrun);
 
        return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
 
 /**
- * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped
- * @buffer: The ring buffer
- * @cpu: The per CPU buffer to get the number of overruns from
- */
-unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu)
-{
-       struct ring_buffer_per_cpu *cpu_buffer;
-       unsigned long ret;
-
-       if (!cpumask_test_cpu(cpu, buffer->cpumask))
-               return 0;
-
-       cpu_buffer = buffer->buffers[cpu];
-       ret = cpu_buffer->nmi_dropped;
-
-       return ret;
-}
-EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu);
-
-/**
  * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
  * @buffer: The ring buffer
  * @cpu: The per CPU buffer to get the number of overruns from
@@ -2017,7 +2684,7 @@ ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
                return 0;
 
        cpu_buffer = buffer->buffers[cpu];
-       ret = cpu_buffer->commit_overrun;
+       ret = local_read(&cpu_buffer->commit_overrun);
 
        return ret;
 }
@@ -2040,7 +2707,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
                entries += (local_read(&cpu_buffer->entries) -
-                           cpu_buffer->overrun) - cpu_buffer->read;
+                           local_read(&cpu_buffer->overrun)) - cpu_buffer->read;
        }
 
        return entries;
@@ -2048,7 +2715,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
 EXPORT_SYMBOL_GPL(ring_buffer_entries);
 
 /**
- * ring_buffer_overrun_cpu - get the number of overruns in buffer
+ * ring_buffer_overruns - get the number of overruns in buffer
  * @buffer: The ring buffer
  *
  * Returns the total number of overruns in the ring buffer
@@ -2063,7 +2730,7 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
        /* if you care about this being correct, lock the buffer */
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
-               overruns += cpu_buffer->overrun;
+               overruns += local_read(&cpu_buffer->overrun);
        }
 
        return overruns;
@@ -2076,8 +2743,10 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
 
        /* Iterator usage is expected to have record disabled */
        if (list_empty(&cpu_buffer->reader_page->list)) {
-               iter->head_page = cpu_buffer->head_page;
-               iter->head = cpu_buffer->head_page->read;
+               iter->head_page = rb_set_head_page(cpu_buffer);
+               if (unlikely(!iter->head_page))
+                       return;
+               iter->head = iter->head_page->read;
        } else {
                iter->head_page = cpu_buffer->reader_page;
                iter->head = cpu_buffer->reader_page->read;
@@ -2086,6 +2755,8 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
                iter->read_stamp = cpu_buffer->read_stamp;
        else
                iter->read_stamp = iter->head_page->page->time_stamp;
+       iter->cache_reader_page = cpu_buffer->reader_page;
+       iter->cache_read = cpu_buffer->read;
 }
 
 /**
@@ -2192,11 +2863,13 @@ static struct buffer_page *
 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 {
        struct buffer_page *reader = NULL;
+       unsigned long overwrite;
        unsigned long flags;
        int nr_loops = 0;
+       int ret;
 
        local_irq_save(flags);
-       __raw_spin_lock(&cpu_buffer->lock);
+       arch_spin_lock(&cpu_buffer->lock);
 
  again:
        /*
@@ -2227,39 +2900,83 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
                goto out;
 
        /*
-        * Splice the empty reader page into the list around the head.
         * Reset the reader page to size zero.
         */
-
-       reader = cpu_buffer->head_page;
-       cpu_buffer->reader_page->list.next = reader->list.next;
-       cpu_buffer->reader_page->list.prev = reader->list.prev;
-
        local_set(&cpu_buffer->reader_page->write, 0);
        local_set(&cpu_buffer->reader_page->entries, 0);
        local_set(&cpu_buffer->reader_page->page->commit, 0);
+       cpu_buffer->reader_page->real_end = 0;
+
+ spin:
+       /*
+        * Splice the empty reader page into the list around the head.
+        */
+       reader = rb_set_head_page(cpu_buffer);
+       cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
+       cpu_buffer->reader_page->list.prev = reader->list.prev;
+
+       /*
+        * cpu_buffer->pages just needs to point to the buffer, it
+        *  has no specific buffer page to point to. Lets move it out
+        *  of our way so we don't accidently swap it.
+        */
+       cpu_buffer->pages = reader->list.prev;
+
+       /* The reader page will be pointing to the new head */
+       rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
 
-       /* Make the reader page now replace the head */
-       reader->list.prev->next = &cpu_buffer->reader_page->list;
-       reader->list.next->prev = &cpu_buffer->reader_page->list;
+       /*
+        * We want to make sure we read the overruns after we set up our
+        * pointers to the next object. The writer side does a
+        * cmpxchg to cross pages which acts as the mb on the writer
+        * side. Note, the reader will constantly fail the swap
+        * while the writer is updating the pointers, so this
+        * guarantees that the overwrite recorded here is the one we
+        * want to compare with the last_overrun.
+        */
+       smp_mb();
+       overwrite = local_read(&(cpu_buffer->overrun));
+
+       /*
+        * Here's the tricky part.
+        *
+        * We need to move the pointer past the header page.
+        * But we can only do that if a writer is not currently
+        * moving it. The page before the header page has the
+        * flag bit '1' set if it is pointing to the page we want.
+        * but if the writer is in the process of moving it
+        * than it will be '2' or already moved '0'.
+        */
+
+       ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
 
        /*
-        * If the tail is on the reader, then we must set the head
-        * to the inserted page, otherwise we set it one before.
+        * If we did not convert it, then we must try again.
         */
-       cpu_buffer->head_page = cpu_buffer->reader_page;
+       if (!ret)
+               goto spin;
 
-       if (cpu_buffer->commit_page != reader)
-               rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
+       /*
+        * Yeah! We succeeded in replacing the page.
+        *
+        * Now make the new head point back to the reader page.
+        */
+       rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
+       rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
 
        /* Finally update the reader page to the new head */
        cpu_buffer->reader_page = reader;
        rb_reset_reader_page(cpu_buffer);
 
+       if (overwrite != cpu_buffer->last_overrun) {
+               cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
+               cpu_buffer->last_overrun = overwrite;
+       }
+
        goto again;
 
  out:
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
        local_irq_restore(flags);
 
        return reader;
@@ -2279,8 +2996,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 
        event = rb_reader_event(cpu_buffer);
 
-       if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX
-                       || rb_discarded_event(event))
+       if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
                cpu_buffer->read++;
 
        rb_update_read_stamp(cpu_buffer, event);
@@ -2333,16 +3049,19 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
                rb_advance_iter(iter);
 }
 
+static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       return cpu_buffer->lost_events;
+}
+
 static struct ring_buffer_event *
-rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
+              unsigned long *lost_events)
 {
-       struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_event *event;
        struct buffer_page *reader;
        int nr_loops = 0;
 
-       cpu_buffer = buffer->buffers[cpu];
-
  again:
        /*
         * We repeat when a timestamp is encountered. It is possible
@@ -2371,7 +3090,6 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
                 * the box. Return the padding, and we will release
                 * the current locks, and try again.
                 */
-               rb_advance_reader(cpu_buffer);
                return event;
 
        case RINGBUF_TYPE_TIME_EXTEND:
@@ -2387,9 +3105,11 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        case RINGBUF_TYPE_DATA:
                if (ts) {
                        *ts = cpu_buffer->read_stamp + event->time_delta;
-                       ring_buffer_normalize_time_stamp(buffer,
+                       ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
                                                         cpu_buffer->cpu, ts);
                }
+               if (lost_events)
+                       *lost_events = rb_lost_events(cpu_buffer);
                return event;
 
        default:
@@ -2408,13 +3128,22 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_event *event;
        int nr_loops = 0;
 
-       if (ring_buffer_iter_empty(iter))
-               return NULL;
-
        cpu_buffer = iter->cpu_buffer;
        buffer = cpu_buffer->buffer;
 
+       /*
+        * Check if someone performed a consuming read to
+        * the buffer. A consuming read invalidates the iterator
+        * and we need to reset the iterator in this case.
+        */
+       if (unlikely(iter->cache_read != cpu_buffer->read ||
+                    iter->cache_reader_page != cpu_buffer->reader_page))
+               rb_iter_reset(iter);
+
  again:
+       if (ring_buffer_iter_empty(iter))
+               return NULL;
+
        /*
         * We repeat when a timestamp is encountered.
         * We can get multiple timestamps by nested interrupts or also
@@ -2429,6 +3158,11 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        if (rb_per_cpu_empty(cpu_buffer))
                return NULL;
 
+       if (iter->head >= local_read(&iter->head_page->page->commit)) {
+               rb_inc_iter(iter);
+               goto again;
+       }
+
        event = rb_iter_head_event(iter);
 
        switch (event->type_len) {
@@ -2466,34 +3200,57 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
 
+static inline int rb_ok_to_lock(void)
+{
+       /*
+        * If an NMI die dumps out the content of the ring buffer
+        * do not grab locks. We also permanently disable the ring
+        * buffer too. A one time deal is all you get from reading
+        * the ring buffer from an NMI.
+        */
+       if (likely(!in_nmi()))
+               return 1;
+
+       tracing_off_permanent();
+       return 0;
+}
+
 /**
  * ring_buffer_peek - peek at the next event to be read
  * @buffer: The ring buffer to read
  * @cpu: The cpu to peak at
  * @ts: The timestamp counter of this event.
+ * @lost_events: a variable to store if events were lost (may be NULL)
  *
  * This will return the event that will be read next, but does
  * not consume the data.
  */
 struct ring_buffer_event *
-ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
+                unsigned long *lost_events)
 {
        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
        struct ring_buffer_event *event;
        unsigned long flags;
+       int dolock;
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return NULL;
 
+       dolock = rb_ok_to_lock();
  again:
-       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
-       event = rb_buffer_peek(buffer, cpu, ts);
-       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+       local_irq_save(flags);
+       if (dolock)
+               spin_lock(&cpu_buffer->reader_lock);
+       event = rb_buffer_peek(cpu_buffer, ts, lost_events);
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
+               rb_advance_reader(cpu_buffer);
+       if (dolock)
+               spin_unlock(&cpu_buffer->reader_lock);
+       local_irq_restore(flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
@@ -2518,10 +3275,8 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        event = rb_iter_peek(iter, ts);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
@@ -2529,17 +3284,24 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 /**
  * ring_buffer_consume - return an event and consume it
  * @buffer: The ring buffer to get the next event from
+ * @cpu: the cpu to read the buffer from
+ * @ts: a variable to store the timestamp (may be NULL)
+ * @lost_events: a variable to store if events were lost (may be NULL)
  *
  * Returns the next event in the ring buffer, and that event is consumed.
  * Meaning, that sequential reads will keep returning a different event,
  * and eventually empty the ring buffer if the producer is slower.
  */
 struct ring_buffer_event *
-ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
+ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
+                   unsigned long *lost_events)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_event *event = NULL;
        unsigned long flags;
+       int dolock;
+
+       dolock = rb_ok_to_lock();
 
  again:
        /* might be called in atomic */
@@ -2549,47 +3311,55 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
                goto out;
 
        cpu_buffer = buffer->buffers[cpu];
-       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
-
-       event = rb_buffer_peek(buffer, cpu, ts);
-       if (!event)
-               goto out_unlock;
+       local_irq_save(flags);
+       if (dolock)
+               spin_lock(&cpu_buffer->reader_lock);
 
-       rb_advance_reader(cpu_buffer);
+       event = rb_buffer_peek(cpu_buffer, ts, lost_events);
+       if (event) {
+               cpu_buffer->lost_events = 0;
+               rb_advance_reader(cpu_buffer);
+       }
 
- out_unlock:
-       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+       if (dolock)
+               spin_unlock(&cpu_buffer->reader_lock);
+       local_irq_restore(flags);
 
  out:
        preempt_enable();
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
+       if (event && event->type_len == RINGBUF_TYPE_PADDING)
                goto again;
-       }
 
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_consume);
 
 /**
- * ring_buffer_read_start - start a non consuming read of the buffer
+ * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
  * @buffer: The ring buffer to read from
  * @cpu: The cpu buffer to iterate over
  *
- * This starts up an iteration through the buffer. It also disables
- * the recording to the buffer until the reading is finished.
- * This prevents the reading from being corrupted. This is not
- * a consuming read, so a producer is not expected.
+ * This performs the initial preparations necessary to iterate
+ * through the buffer.  Memory is allocated, buffer recording
+ * is disabled, and the iterator pointer is returned to the caller.
  *
- * Must be paired with ring_buffer_finish.
+ * Disabling buffer recordng prevents the reading from being
+ * corrupted. This is not a consuming read, so a producer is not
+ * expected.
+ *
+ * After a sequence of ring_buffer_read_prepare calls, the user is
+ * expected to make at least one call to ring_buffer_prepare_sync.
+ * Afterwards, ring_buffer_read_start is invoked to get things going
+ * for real.
+ *
+ * This overall must be paired with ring_buffer_finish.
  */
 struct ring_buffer_iter *
-ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
+ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_iter *iter;
-       unsigned long flags;
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return NULL;
@@ -2603,15 +3373,52 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
        iter->cpu_buffer = cpu_buffer;
 
        atomic_inc(&cpu_buffer->record_disabled);
+
+       return iter;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
+
+/**
+ * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
+ *
+ * All previously invoked ring_buffer_read_prepare calls to prepare
+ * iterators will be synchronized.  Afterwards, read_buffer_read_start
+ * calls on those iterators are allowed.
+ */
+void
+ring_buffer_read_prepare_sync(void)
+{
        synchronize_sched();
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
+
+/**
+ * ring_buffer_read_start - start a non consuming read of the buffer
+ * @iter: The iterator returned by ring_buffer_read_prepare
+ *
+ * This finalizes the startup of an iteration through the buffer.
+ * The iterator comes from a call to ring_buffer_read_prepare and
+ * an intervening ring_buffer_read_prepare_sync must have been
+ * performed.
+ *
+ * Must be paired with ring_buffer_finish.
+ */
+void
+ring_buffer_read_start(struct ring_buffer_iter *iter)
+{
+       struct ring_buffer_per_cpu *cpu_buffer;
+       unsigned long flags;
+
+       if (!iter)
+               return;
+
+       cpu_buffer = iter->cpu_buffer;
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
-       __raw_spin_lock(&cpu_buffer->lock);
+       arch_spin_lock(&cpu_buffer->lock);
        rb_iter_reset(iter);
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
-
-       return iter;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
 
@@ -2646,21 +3453,19 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
        unsigned long flags;
 
- again:
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+ again:
        event = rb_iter_peek(iter, ts);
        if (!event)
                goto out;
 
+       if (event->type_len == RINGBUF_TYPE_PADDING)
+               goto again;
+
        rb_advance_iter(iter);
  out:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-       if (event && event->type_len == RINGBUF_TYPE_PADDING) {
-               cpu_relax();
-               goto again;
-       }
-
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read);
@@ -2678,8 +3483,10 @@ EXPORT_SYMBOL_GPL(ring_buffer_size);
 static void
 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 {
+       rb_head_page_deactivate(cpu_buffer);
+
        cpu_buffer->head_page
-               = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
+               = list_entry(cpu_buffer->pages, struct buffer_page, list);
        local_set(&cpu_buffer->head_page->write, 0);
        local_set(&cpu_buffer->head_page->entries, 0);
        local_set(&cpu_buffer->head_page->page->commit, 0);
@@ -2695,16 +3502,20 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        local_set(&cpu_buffer->reader_page->page->commit, 0);
        cpu_buffer->reader_page->read = 0;
 
-       cpu_buffer->nmi_dropped = 0;
-       cpu_buffer->commit_overrun = 0;
-       cpu_buffer->overrun = 0;
-       cpu_buffer->read = 0;
+       local_set(&cpu_buffer->commit_overrun, 0);
+       local_set(&cpu_buffer->overrun, 0);
        local_set(&cpu_buffer->entries, 0);
        local_set(&cpu_buffer->committing, 0);
        local_set(&cpu_buffer->commits, 0);
+       cpu_buffer->read = 0;
 
        cpu_buffer->write_stamp = 0;
        cpu_buffer->read_stamp = 0;
+
+       cpu_buffer->lost_events = 0;
+       cpu_buffer->last_overrun = 0;
+
+       rb_head_page_activate(cpu_buffer);
 }
 
 /**
@@ -2724,12 +3535,16 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
 
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
-       __raw_spin_lock(&cpu_buffer->lock);
+       if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
+               goto out;
+
+       arch_spin_lock(&cpu_buffer->lock);
 
        rb_reset_cpu(cpu_buffer);
 
-       __raw_spin_unlock(&cpu_buffer->lock);
+       arch_spin_unlock(&cpu_buffer->lock);
 
+ out:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        atomic_dec(&cpu_buffer->record_disabled);
@@ -2757,15 +3572,23 @@ int ring_buffer_empty(struct ring_buffer *buffer)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        unsigned long flags;
+       int dolock;
        int cpu;
        int ret;
 
+       dolock = rb_ok_to_lock();
+
        /* yes this is racy, but if you don't like the race, lock the buffer */
        for_each_buffer_cpu(buffer, cpu) {
                cpu_buffer = buffer->buffers[cpu];
-               spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+               local_irq_save(flags);
+               if (dolock)
+                       spin_lock(&cpu_buffer->reader_lock);
                ret = rb_per_cpu_empty(cpu_buffer);
-               spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+               if (dolock)
+                       spin_unlock(&cpu_buffer->reader_lock);
+               local_irq_restore(flags);
+
                if (!ret)
                        return 0;
        }
@@ -2783,20 +3606,28 @@ int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        unsigned long flags;
+       int dolock;
        int ret;
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return 1;
 
+       dolock = rb_ok_to_lock();
+
        cpu_buffer = buffer->buffers[cpu];
-       spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       local_irq_save(flags);
+       if (dolock)
+               spin_lock(&cpu_buffer->reader_lock);
        ret = rb_per_cpu_empty(cpu_buffer);
-       spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+       if (dolock)
+               spin_unlock(&cpu_buffer->reader_lock);
+       local_irq_restore(flags);
 
        return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
 
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
 /**
  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
  * @buffer_a: One buffer to swap with
@@ -2851,20 +3682,28 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
        atomic_inc(&cpu_buffer_a->record_disabled);
        atomic_inc(&cpu_buffer_b->record_disabled);
 
+       ret = -EBUSY;
+       if (local_read(&cpu_buffer_a->committing))
+               goto out_dec;
+       if (local_read(&cpu_buffer_b->committing))
+               goto out_dec;
+
        buffer_a->buffers[cpu] = cpu_buffer_b;
        buffer_b->buffers[cpu] = cpu_buffer_a;
 
        cpu_buffer_b->buffer = buffer_a;
        cpu_buffer_a->buffer = buffer_b;
 
+       ret = 0;
+
+out_dec:
        atomic_dec(&cpu_buffer_a->record_disabled);
        atomic_dec(&cpu_buffer_b->record_disabled);
-
-       ret = 0;
 out:
        return ret;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
+#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
 
 /**
  * ring_buffer_alloc_read_page - allocate a page to read from buffer
@@ -2951,6 +3790,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
        struct ring_buffer_event *event;
        struct buffer_data_page *bpage;
        struct buffer_page *reader;
+       unsigned long missed_events;
        unsigned long flags;
        unsigned int commit;
        unsigned int read;
@@ -2987,6 +3827,9 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
        read = reader->read;
        commit = rb_page_commit(reader);
 
+       /* Check if any events were dropped */
+       missed_events = cpu_buffer->lost_events;
+
        /*
         * If this page has been partially read or
         * if len is not big enough to read the rest of the page or
@@ -3037,7 +3880,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
                read = 0;
        } else {
                /* update the entry counter */
-               cpu_buffer->read += local_read(&reader->entries);
+               cpu_buffer->read += rb_page_entries(reader);
 
                /* swap the pages */
                rb_init_page(bpage);
@@ -3047,9 +3890,42 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
                local_set(&reader->entries, 0);
                reader->read = 0;
                *data_page = bpage;
+
+               /*
+                * Use the real_end for the data size,
+                * This gives us a chance to store the lost events
+                * on the page.
+                */
+               if (reader->real_end)
+                       local_set(&bpage->commit, reader->real_end);
        }
        ret = read;
 
+       cpu_buffer->lost_events = 0;
+
+       commit = local_read(&bpage->commit);
+       /*
+        * Set a flag in the commit field if we lost events
+        */
+       if (missed_events) {
+               /* If there is room at the end of the page to save the
+                * missed events, then record it there.
+                */
+               if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
+                       memcpy(&bpage->data[commit], &missed_events,
+                              sizeof(missed_events));
+                       local_add(RB_MISSED_STORED, &bpage->commit);
+                       commit += sizeof(missed_events);
+               }
+               local_add(RB_MISSED_EVENTS, &bpage->commit);
+       }
+
+       /*
+        * This page may be off to user land. Zero it out here.
+        */
+       if (commit < BUF_PAGE_SIZE)
+               memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
+
  out_unlock:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
@@ -3058,6 +3934,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
 
+#ifdef CONFIG_TRACING
 static ssize_t
 rb_simple_read(struct file *filp, char __user *ubuf,
               size_t cnt, loff_t *ppos)
@@ -3125,6 +4002,7 @@ static __init int rb_init_debugfs(void)
 }
 
 fs_initcall(rb_init_debugfs);
+#endif
 
 #ifdef CONFIG_HOTPLUG_CPU
 static int rb_cpu_notify(struct notifier_block *self,