Merge branch 'linus' into tracing/core
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
index 41ca394..5885cdf 100644 (file)
@@ -319,6 +319,11 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
 #define TS_MASK                ((1ULL << TS_SHIFT) - 1)
 #define TS_DELTA_TEST  (~TS_MASK)
 
+/* Flag when events were overwritten */
+#define RB_MISSED_EVENTS       (1 << 31)
+/* Missed count stored at end */
+#define RB_MISSED_STORED       (1 << 30)
+
 struct buffer_data_page {
        u64              time_stamp;    /* page time stamp */
        local_t          commit;        /* write committed index */
@@ -338,6 +343,7 @@ struct buffer_page {
        local_t          write;         /* index for next write */
        unsigned         read;          /* index for next read */
        local_t          entries;       /* entries on this page */
+       unsigned long    real_end;      /* real end of data */
        struct buffer_data_page *page;  /* Actual data page */
 };
 
@@ -417,6 +423,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
                               (unsigned int)sizeof(field.commit),
                               (unsigned int)is_signed_type(long));
 
+       ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
+                              "offset:%u;\tsize:%u;\tsigned:%u;\n",
+                              (unsigned int)offsetof(typeof(field), commit),
+                              1,
+                              (unsigned int)is_signed_type(long));
+
        ret = trace_seq_printf(s, "\tfield: char data;\t"
                               "offset:%u;\tsize:%u;\tsigned:%u;\n",
                               (unsigned int)offsetof(typeof(field), data),
@@ -440,6 +452,8 @@ struct ring_buffer_per_cpu {
        struct buffer_page              *tail_page;     /* write to tail */
        struct buffer_page              *commit_page;   /* committed pages */
        struct buffer_page              *reader_page;
+       unsigned long                   lost_events;
+       unsigned long                   last_overrun;
        local_t                         commit_overrun;
        local_t                         overrun;
        local_t                         entries;
@@ -1762,6 +1776,13 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
        kmemcheck_annotate_bitfield(event, bitfield);
 
        /*
+        * Save the original length to the meta data.
+        * This will be used by the reader to add lost event
+        * counter.
+        */
+       tail_page->real_end = tail;
+
+       /*
         * If this event is bigger than the minimum size, then
         * we need to be careful that we don't subtract the
         * write counter enough to allow another writer to slip
@@ -2838,6 +2859,7 @@ static struct buffer_page *
 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 {
        struct buffer_page *reader = NULL;
+       unsigned long overwrite;
        unsigned long flags;
        int nr_loops = 0;
        int ret;
@@ -2879,6 +2901,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        local_set(&cpu_buffer->reader_page->write, 0);
        local_set(&cpu_buffer->reader_page->entries, 0);
        local_set(&cpu_buffer->reader_page->page->commit, 0);
+       cpu_buffer->reader_page->real_end = 0;
 
  spin:
        /*
@@ -2899,6 +2922,18 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
 
        /*
+        * We want to make sure we read the overruns after we set up our
+        * pointers to the next object. The writer side does a
+        * cmpxchg to cross pages which acts as the mb on the writer
+        * side. Note, the reader will constantly fail the swap
+        * while the writer is updating the pointers, so this
+        * guarantees that the overwrite recorded here is the one we
+        * want to compare with the last_overrun.
+        */
+       smp_mb();
+       overwrite = local_read(&(cpu_buffer->overrun));
+
+       /*
         * Here's the tricky part.
         *
         * We need to move the pointer past the header page.
@@ -2929,6 +2964,11 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->reader_page = reader;
        rb_reset_reader_page(cpu_buffer);
 
+       if (overwrite != cpu_buffer->last_overrun) {
+               cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
+               cpu_buffer->last_overrun = overwrite;
+       }
+
        goto again;
 
  out:
@@ -3005,8 +3045,14 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
                rb_advance_iter(iter);
 }
 
+static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       return cpu_buffer->lost_events;
+}
+
 static struct ring_buffer_event *
-rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
+rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
+              unsigned long *lost_events)
 {
        struct ring_buffer_event *event;
        struct buffer_page *reader;
@@ -3058,6 +3104,8 @@ rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
                        ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
                                                         cpu_buffer->cpu, ts);
                }
+               if (lost_events)
+                       *lost_events = rb_lost_events(cpu_buffer);
                return event;
 
        default:
@@ -3168,12 +3216,14 @@ static inline int rb_ok_to_lock(void)
  * @buffer: The ring buffer to read
  * @cpu: The cpu to peak at
  * @ts: The timestamp counter of this event.
+ * @lost_events: a variable to store if events were lost (may be NULL)
  *
  * This will return the event that will be read next, but does
  * not consume the data.
  */
 struct ring_buffer_event *
-ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
+                unsigned long *lost_events)
 {
        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
        struct ring_buffer_event *event;
@@ -3188,7 +3238,7 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        local_irq_save(flags);
        if (dolock)
                spin_lock(&cpu_buffer->reader_lock);
-       event = rb_buffer_peek(cpu_buffer, ts);
+       event = rb_buffer_peek(cpu_buffer, ts, lost_events);
        if (event && event->type_len == RINGBUF_TYPE_PADDING)
                rb_advance_reader(cpu_buffer);
        if (dolock)
@@ -3230,13 +3280,17 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 /**
  * ring_buffer_consume - return an event and consume it
  * @buffer: The ring buffer to get the next event from
+ * @cpu: the cpu to read the buffer from
+ * @ts: a variable to store the timestamp (may be NULL)
+ * @lost_events: a variable to store if events were lost (may be NULL)
  *
  * Returns the next event in the ring buffer, and that event is consumed.
  * Meaning, that sequential reads will keep returning a different event,
  * and eventually empty the ring buffer if the producer is slower.
  */
 struct ring_buffer_event *
-ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
+ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
+                   unsigned long *lost_events)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_event *event = NULL;
@@ -3257,9 +3311,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
        if (dolock)
                spin_lock(&cpu_buffer->reader_lock);
 
-       event = rb_buffer_peek(cpu_buffer, ts);
-       if (event)
+       event = rb_buffer_peek(cpu_buffer, ts, lost_events);
+       if (event) {
+               cpu_buffer->lost_events = 0;
                rb_advance_reader(cpu_buffer);
+       }
 
        if (dolock)
                spin_unlock(&cpu_buffer->reader_lock);
@@ -3408,6 +3464,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->write_stamp = 0;
        cpu_buffer->read_stamp = 0;
 
+       cpu_buffer->lost_events = 0;
+       cpu_buffer->last_overrun = 0;
+
        rb_head_page_activate(cpu_buffer);
 }
 
@@ -3683,6 +3742,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
        struct ring_buffer_event *event;
        struct buffer_data_page *bpage;
        struct buffer_page *reader;
+       unsigned long missed_events;
        unsigned long flags;
        unsigned int commit;
        unsigned int read;
@@ -3719,6 +3779,9 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
        read = reader->read;
        commit = rb_page_commit(reader);
 
+       /* Check if any events were dropped */
+       missed_events = cpu_buffer->lost_events;
+
        /*
         * If this page has been partially read or
         * if len is not big enough to read the rest of the page or
@@ -3779,9 +3842,35 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
                local_set(&reader->entries, 0);
                reader->read = 0;
                *data_page = bpage;
+
+               /*
+                * Use the real_end for the data size,
+                * This gives us a chance to store the lost events
+                * on the page.
+                */
+               if (reader->real_end)
+                       local_set(&bpage->commit, reader->real_end);
        }
        ret = read;
 
+       cpu_buffer->lost_events = 0;
+       /*
+        * Set a flag in the commit field if we lost events
+        */
+       if (missed_events) {
+               commit = local_read(&bpage->commit);
+
+               /* If there is room at the end of the page to save the
+                * missed events, then record it there.
+                */
+               if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
+                       memcpy(&bpage->data[commit], &missed_events,
+                              sizeof(missed_events));
+                       local_add(RB_MISSED_STORED, &bpage->commit);
+               }
+               local_add(RB_MISSED_EVENTS, &bpage->commit);
+       }
+
  out_unlock:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);