X-Git-Url: http://ftp.safe.ca/?a=blobdiff_plain;f=kernel%2Ftrace%2Fring_buffer.c;h=a2f0fe9518318fd7813f47e05dab8a5bacb2ed50;hb=318ae2edc3b29216abd8a2510f3f80b764f06858;hp=d07c2888396f8782d37cb00947ec8d3385b82f03;hpb=554f786e284a6ce859d51f62240d615603944c8e;p=safe%2Fjmp%2Flinux-2.6 diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index d07c288..a2f0fe9 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -19,9 +20,100 @@ #include #include +#include #include "trace.h" /* + * The ring buffer header is special. We must manually up keep it. + */ +int ring_buffer_print_entry_header(struct trace_seq *s) +{ + int ret; + + ret = trace_seq_printf(s, "# compressed entry header\n"); + ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); + ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); + ret = trace_seq_printf(s, "\tarray : 32 bits\n"); + ret = trace_seq_printf(s, "\n"); + ret = trace_seq_printf(s, "\tpadding : type == %d\n", + RINGBUF_TYPE_PADDING); + ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", + RINGBUF_TYPE_TIME_EXTEND); + ret = trace_seq_printf(s, "\tdata max type_len == %d\n", + RINGBUF_TYPE_DATA_TYPE_LEN_MAX); + + return ret; +} + +/* + * The ring buffer is made up of a list of pages. A separate list of pages is + * allocated for each CPU. A writer may only write to a buffer that is + * associated with the CPU it is currently executing on. A reader may read + * from any per cpu buffer. + * + * The reader is special. For each per cpu buffer, the reader has its own + * reader page. When a reader has read the entire reader page, this reader + * page is swapped with another page in the ring buffer. + * + * Now, as long as the writer is off the reader page, the reader can do what + * ever it wants with that page. The writer will never write to that page + * again (as long as it is out of the ring buffer). + * + * Here's some silly ASCII art. + * + * +------+ + * |reader| RING BUFFER + * |page | + * +------+ +---+ +---+ +---+ + * | |-->| |-->| | + * +---+ +---+ +---+ + * ^ | + * | | + * +---------------+ + * + * + * +------+ + * |reader| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * | |-->| |-->| | + * +---+ +---+ +---+ + * ^ | + * | | + * +---------------+ + * + * + * +------+ + * |reader| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * ^ | |-->| |-->| | + * | +---+ +---+ +---+ + * | | + * | | + * +------------------------------+ + * + * + * +------+ + * |buffer| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * ^ | | | |-->| | + * | New +---+ +---+ +---+ + * | Reader------^ | + * | page | + * +------------------------------+ + * + * + * After we make this swap, the reader can hand this page off to the splice + * code and be done with it. It can even allocate a new page if it needs to + * and swap that into the ring buffer. + * + * We will be using cmpxchg soon to make all this lockless. + * + */ + +/* * A fast way to enable or disable all ring buffers is to * call tracing_on or tracing_off. Turning off the ring buffers * prevents all ring buffers from being recorded to. @@ -110,50 +202,53 @@ int tracing_is_on(void) } EXPORT_SYMBOL_GPL(tracing_is_on); -#include "trace.h" - -/* Up this if you want to test the TIME_EXTENTS and normalization */ -#define DEBUG_SHIFT 0 +#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) +#define RB_ALIGNMENT 4U +#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) +#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ -u64 ring_buffer_time_stamp(int cpu) -{ - u64 time; +/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ +#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX - preempt_disable_notrace(); - /* shift to debug/test normalization and TIME_EXTENTS */ - time = trace_clock_local() << DEBUG_SHIFT; - preempt_enable_no_resched_notrace(); +enum { + RB_LEN_TIME_EXTEND = 8, + RB_LEN_TIME_STAMP = 16, +}; - return time; +static inline int rb_null_event(struct ring_buffer_event *event) +{ + return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; } -EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); -void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) +static void rb_event_set_padding(struct ring_buffer_event *event) { - /* Just stupid testing the normalize function and deltas */ - *ts >>= DEBUG_SHIFT; + /* padding has a NULL time_delta */ + event->type_len = RINGBUF_TYPE_PADDING; + event->time_delta = 0; } -EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); -#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) -#define RB_ALIGNMENT 4U -#define RB_MAX_SMALL_DATA 28 +static unsigned +rb_event_data_length(struct ring_buffer_event *event) +{ + unsigned length; -enum { - RB_LEN_TIME_EXTEND = 8, - RB_LEN_TIME_STAMP = 16, -}; + if (event->type_len) + length = event->type_len * RB_ALIGNMENT; + else + length = event->array[0]; + return length + RB_EVNT_HDR_SIZE; +} /* inline for ring buffer fast paths */ static unsigned rb_event_length(struct ring_buffer_event *event) { - unsigned length; - - switch (event->type) { + switch (event->type_len) { case RINGBUF_TYPE_PADDING: - /* undefined */ - return -1; + if (rb_null_event(event)) + /* undefined */ + return -1; + return event->array[0] + RB_EVNT_HDR_SIZE; case RINGBUF_TYPE_TIME_EXTEND: return RB_LEN_TIME_EXTEND; @@ -162,11 +257,7 @@ rb_event_length(struct ring_buffer_event *event) return RB_LEN_TIME_STAMP; case RINGBUF_TYPE_DATA: - if (event->len) - length = event->len * RB_ALIGNMENT; - else - length = event->array[0]; - return length + RB_EVNT_HDR_SIZE; + return rb_event_data_length(event); default: BUG(); } @@ -181,7 +272,7 @@ rb_event_length(struct ring_buffer_event *event) unsigned ring_buffer_event_length(struct ring_buffer_event *event) { unsigned length = rb_event_length(event); - if (event->type != RINGBUF_TYPE_DATA) + if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) return length; length -= RB_EVNT_HDR_SIZE; if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) @@ -194,9 +285,9 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_length); static void * rb_event_data(struct ring_buffer_event *event) { - BUG_ON(event->type != RINGBUF_TYPE_DATA); + BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); /* If length is in len field, then array[0] has the data */ - if (event->len) + if (event->type_len) return (void *)&event->array[0]; /* Otherwise length is in array[0] and array[1] has the data */ return (void *)&event->array[1]; @@ -225,13 +316,37 @@ struct buffer_data_page { unsigned char data[]; /* data of buffer page */ }; +/* + * Note, the buffer_page list must be first. The buffer pages + * are allocated in cache lines, which means that each buffer + * page will be at the beginning of a cache line, and thus + * the least significant bits will be zero. We use this to + * add flags in the list struct pointers, to make the ring buffer + * lockless. + */ struct buffer_page { + struct list_head list; /* list of buffer pages */ local_t write; /* index for next write */ unsigned read; /* index for next read */ - struct list_head list; /* list of free pages */ + local_t entries; /* entries on this page */ struct buffer_data_page *page; /* Actual data page */ }; +/* + * The buffer page counters, write and entries, must be reset + * atomically when crossing page boundaries. To synchronize this + * update, two counters are inserted into the number. One is + * the actual counter for the write position or count on the page. + * + * The other is a counter of updaters. Before an update happens + * the update partition of the counter is incremented. This will + * allow the updater to update the counter atomically. + * + * The counter is 20 bits, and the state data is 12. + */ +#define RB_WRITE_MASK 0xfffff +#define RB_WRITE_INTCNT (1 << 20) + static void rb_init_page(struct buffer_data_page *bpage) { local_set(&bpage->commit, 0); @@ -271,22 +386,57 @@ static inline int test_time_stamp(u64 delta) #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) +/* Max payload is BUF_PAGE_SIZE - header (8bytes) */ +#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) + +/* Max number of timestamps that can fit on a page */ +#define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP) + +int ring_buffer_print_page_header(struct trace_seq *s) +{ + struct buffer_data_page field; + int ret; + + ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" + "offset:0;\tsize:%u;\tsigned:%u;\n", + (unsigned int)sizeof(field.time_stamp), + (unsigned int)is_signed_type(u64)); + + ret = trace_seq_printf(s, "\tfield: local_t commit;\t" + "offset:%u;\tsize:%u;\tsigned:%u;\n", + (unsigned int)offsetof(typeof(field), commit), + (unsigned int)sizeof(field.commit), + (unsigned int)is_signed_type(long)); + + ret = trace_seq_printf(s, "\tfield: char data;\t" + "offset:%u;\tsize:%u;\tsigned:%u;\n", + (unsigned int)offsetof(typeof(field), data), + (unsigned int)BUF_PAGE_SIZE, + (unsigned int)is_signed_type(char)); + + return ret; +} + /* * head_page == tail_page && head == tail then buffer is empty. */ struct ring_buffer_per_cpu { int cpu; struct ring_buffer *buffer; - spinlock_t reader_lock; /* serialize readers */ - raw_spinlock_t lock; + spinlock_t reader_lock; /* serialize readers */ + arch_spinlock_t lock; struct lock_class_key lock_key; - struct list_head pages; + struct list_head *pages; struct buffer_page *head_page; /* read from head */ struct buffer_page *tail_page; /* write to tail */ struct buffer_page *commit_page; /* committed pages */ struct buffer_page *reader_page; - unsigned long overrun; - unsigned long entries; + local_t commit_overrun; + local_t overrun; + local_t entries; + local_t committing; + local_t commits; + unsigned long read; u64 write_stamp; u64 read_stamp; atomic_t record_disabled; @@ -299,215 +449,668 @@ struct ring_buffer { atomic_t record_disabled; cpumask_var_t cpumask; + struct lock_class_key *reader_lock_key; + struct mutex mutex; struct ring_buffer_per_cpu **buffers; -#ifdef CONFIG_HOTPLUG +#ifdef CONFIG_HOTPLUG_CPU struct notifier_block cpu_notify; #endif + u64 (*clock)(void); }; struct ring_buffer_iter { struct ring_buffer_per_cpu *cpu_buffer; unsigned long head; struct buffer_page *head_page; + struct buffer_page *cache_reader_page; + unsigned long cache_read; u64 read_stamp; }; /* buffer may be either ring_buffer or ring_buffer_per_cpu */ -#define RB_WARN_ON(buffer, cond) \ - ({ \ - int _____ret = unlikely(cond); \ - if (_____ret) { \ - atomic_inc(&buffer->record_disabled); \ - WARN_ON(1); \ - } \ - _____ret; \ +#define RB_WARN_ON(b, cond) \ + ({ \ + int _____ret = unlikely(cond); \ + if (_____ret) { \ + if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ + struct ring_buffer_per_cpu *__b = \ + (void *)b; \ + atomic_inc(&__b->buffer->record_disabled); \ + } else \ + atomic_inc(&b->record_disabled); \ + WARN_ON(1); \ + } \ + _____ret; \ }) -/** - * check_pages - integrity check of buffer pages - * @cpu_buffer: CPU buffer with pages to test - * - * As a safety measure we check to make sure the data pages have not - * been corrupted. - */ -static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) +/* Up this if you want to test the TIME_EXTENTS and normalization */ +#define DEBUG_SHIFT 0 + +static inline u64 rb_time_stamp(struct ring_buffer *buffer) { - struct list_head *head = &cpu_buffer->pages; - struct buffer_page *bpage, *tmp; + /* shift to debug/test normalization and TIME_EXTENTS */ + return buffer->clock() << DEBUG_SHIFT; +} - if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) - return -1; - if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) - return -1; +u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) +{ + u64 time; - list_for_each_entry_safe(bpage, tmp, head, list) { - if (RB_WARN_ON(cpu_buffer, - bpage->list.next->prev != &bpage->list)) - return -1; - if (RB_WARN_ON(cpu_buffer, - bpage->list.prev->next != &bpage->list)) - return -1; - } + preempt_disable_notrace(); + time = rb_time_stamp(buffer); + preempt_enable_no_resched_notrace(); - return 0; + return time; } +EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); -static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, - unsigned nr_pages) +void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, + int cpu, u64 *ts) { - struct list_head *head = &cpu_buffer->pages; - struct buffer_page *bpage, *tmp; - unsigned long addr; - LIST_HEAD(pages); - unsigned i; + /* Just stupid testing the normalize function and deltas */ + *ts >>= DEBUG_SHIFT; +} +EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); - for (i = 0; i < nr_pages; i++) { - bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), - GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); - if (!bpage) - goto free_pages; - list_add(&bpage->list, &pages); +/* + * Making the ring buffer lockless makes things tricky. + * Although writes only happen on the CPU that they are on, + * and they only need to worry about interrupts. Reads can + * happen on any CPU. + * + * The reader page is always off the ring buffer, but when the + * reader finishes with a page, it needs to swap its page with + * a new one from the buffer. The reader needs to take from + * the head (writes go to the tail). But if a writer is in overwrite + * mode and wraps, it must push the head page forward. + * + * Here lies the problem. + * + * The reader must be careful to replace only the head page, and + * not another one. As described at the top of the file in the + * ASCII art, the reader sets its old page to point to the next + * page after head. It then sets the page after head to point to + * the old reader page. But if the writer moves the head page + * during this operation, the reader could end up with the tail. + * + * We use cmpxchg to help prevent this race. We also do something + * special with the page before head. We set the LSB to 1. + * + * When the writer must push the page forward, it will clear the + * bit that points to the head page, move the head, and then set + * the bit that points to the new head page. + * + * We also don't want an interrupt coming in and moving the head + * page on another writer. Thus we use the second LSB to catch + * that too. Thus: + * + * head->list->prev->next bit 1 bit 0 + * ------- ------- + * Normal page 0 0 + * Points to head page 0 1 + * New head page 1 0 + * + * Note we can not trust the prev pointer of the head page, because: + * + * +----+ +-----+ +-----+ + * | |------>| T |---X--->| N | + * | |<------| | | | + * +----+ +-----+ +-----+ + * ^ ^ | + * | +-----+ | | + * +----------| R |----------+ | + * | |<-----------+ + * +-----+ + * + * Key: ---X--> HEAD flag set in pointer + * T Tail page + * R Reader page + * N Next page + * + * (see __rb_reserve_next() to see where this happens) + * + * What the above shows is that the reader just swapped out + * the reader page with a page in the buffer, but before it + * could make the new header point back to the new page added + * it was preempted by a writer. The writer moved forward onto + * the new page added by the reader and is about to move forward + * again. + * + * You can see, it is legitimate for the previous pointer of + * the head (or any page) not to point back to itself. But only + * temporarially. + */ - addr = __get_free_page(GFP_KERNEL); - if (!addr) - goto free_pages; - bpage->page = (void *)addr; - rb_init_page(bpage->page); - } +#define RB_PAGE_NORMAL 0UL +#define RB_PAGE_HEAD 1UL +#define RB_PAGE_UPDATE 2UL - list_splice(&pages, head); - rb_check_pages(cpu_buffer); +#define RB_FLAG_MASK 3UL - return 0; +/* PAGE_MOVED is not part of the mask */ +#define RB_PAGE_MOVED 4UL - free_pages: - list_for_each_entry_safe(bpage, tmp, &pages, list) { - list_del_init(&bpage->list); - free_buffer_page(bpage); - } - return -ENOMEM; +/* + * rb_list_head - remove any bit + */ +static struct list_head *rb_list_head(struct list_head *list) +{ + unsigned long val = (unsigned long)list; + + return (struct list_head *)(val & ~RB_FLAG_MASK); } -static struct ring_buffer_per_cpu * -rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) +/* + * rb_is_head_page - test if the given page is the head page + * + * Because the reader may move the head_page pointer, we can + * not trust what the head page is (it may be pointing to + * the reader page). But if the next page is a header page, + * its flags will be non zero. + */ +static int inline +rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *page, struct list_head *list) { - struct ring_buffer_per_cpu *cpu_buffer; - struct buffer_page *bpage; - unsigned long addr; - int ret; + unsigned long val; - cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), - GFP_KERNEL, cpu_to_node(cpu)); - if (!cpu_buffer) - return NULL; + val = (unsigned long)list->next; - cpu_buffer->cpu = cpu; - cpu_buffer->buffer = buffer; - spin_lock_init(&cpu_buffer->reader_lock); - cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; - INIT_LIST_HEAD(&cpu_buffer->pages); + if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) + return RB_PAGE_MOVED; - bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), - GFP_KERNEL, cpu_to_node(cpu)); - if (!bpage) - goto fail_free_buffer; + return val & RB_FLAG_MASK; +} - cpu_buffer->reader_page = bpage; - addr = __get_free_page(GFP_KERNEL); - if (!addr) - goto fail_free_reader; - bpage->page = (void *)addr; - rb_init_page(bpage->page); +/* + * rb_is_reader_page + * + * The unique thing about the reader page, is that, if the + * writer is ever on it, the previous pointer never points + * back to the reader page. + */ +static int rb_is_reader_page(struct buffer_page *page) +{ + struct list_head *list = page->list.prev; - INIT_LIST_HEAD(&cpu_buffer->reader_page->list); + return rb_list_head(list->next) != &page->list; +} - ret = rb_allocate_pages(cpu_buffer, buffer->pages); - if (ret < 0) - goto fail_free_reader; +/* + * rb_set_list_to_head - set a list_head to be pointing to head. + */ +static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, + struct list_head *list) +{ + unsigned long *ptr; - cpu_buffer->head_page - = list_entry(cpu_buffer->pages.next, struct buffer_page, list); - cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; + ptr = (unsigned long *)&list->next; + *ptr |= RB_PAGE_HEAD; + *ptr &= ~RB_PAGE_UPDATE; +} - return cpu_buffer; +/* + * rb_head_page_activate - sets up head page + */ +static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct buffer_page *head; - fail_free_reader: - free_buffer_page(cpu_buffer->reader_page); + head = cpu_buffer->head_page; + if (!head) + return; - fail_free_buffer: - kfree(cpu_buffer); - return NULL; + /* + * Set the previous list pointer to have the HEAD flag. + */ + rb_set_list_to_head(cpu_buffer, head->list.prev); } -static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) +static void rb_list_head_clear(struct list_head *list) { - struct list_head *head = &cpu_buffer->pages; - struct buffer_page *bpage, *tmp; - - list_del_init(&cpu_buffer->reader_page->list); - free_buffer_page(cpu_buffer->reader_page); + unsigned long *ptr = (unsigned long *)&list->next; - list_for_each_entry_safe(bpage, tmp, head, list) { - list_del_init(&bpage->list); - free_buffer_page(bpage); - } - kfree(cpu_buffer); + *ptr &= ~RB_FLAG_MASK; } /* - * Causes compile errors if the struct buffer_page gets bigger - * than the struct page. + * rb_head_page_dactivate - clears head page ptr (for free list) */ -extern int ring_buffer_page_too_big(void); +static void +rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct list_head *hd; -#ifdef CONFIG_HOTPLUG -static int __cpuinit rb_cpu_notify(struct notifier_block *self, - unsigned long action, void *hcpu); -#endif + /* Go through the whole list and clear any pointers found. */ + rb_list_head_clear(cpu_buffer->pages); -/** - * ring_buffer_alloc - allocate a new ring_buffer - * @size: the size in bytes per cpu that is needed. - * @flags: attributes to set for the ring buffer. - * - * Currently the only flag that is available is the RB_FL_OVERWRITE - * flag. This flag means that the buffer will overwrite old data - * when the buffer wraps. If this flag is not set, the buffer will - * drop data when the tail hits the head. - */ -struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) + list_for_each(hd, cpu_buffer->pages) + rb_list_head_clear(hd); +} + +static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *head, + struct buffer_page *prev, + int old_flag, int new_flag) { - struct ring_buffer *buffer; - int bsize; - int cpu; + struct list_head *list; + unsigned long val = (unsigned long)&head->list; + unsigned long ret; - /* Paranoid! Optimizes out when all is well */ - if (sizeof(struct buffer_page) > sizeof(struct page)) - ring_buffer_page_too_big(); + list = &prev->list; + val &= ~RB_FLAG_MASK; - /* keep it in its own cache line */ - buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), - GFP_KERNEL); - if (!buffer) - return NULL; + ret = cmpxchg((unsigned long *)&list->next, + val | old_flag, val | new_flag); - if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) - goto fail_free_buffer; + /* check if the reader took the page */ + if ((ret & ~RB_FLAG_MASK) != val) + return RB_PAGE_MOVED; - buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); - buffer->flags = flags; + return ret & RB_FLAG_MASK; +} + +static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *head, + struct buffer_page *prev, + int old_flag) +{ + return rb_head_page_set(cpu_buffer, head, prev, + old_flag, RB_PAGE_UPDATE); +} + +static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *head, + struct buffer_page *prev, + int old_flag) +{ + return rb_head_page_set(cpu_buffer, head, prev, + old_flag, RB_PAGE_HEAD); +} + +static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *head, + struct buffer_page *prev, + int old_flag) +{ + return rb_head_page_set(cpu_buffer, head, prev, + old_flag, RB_PAGE_NORMAL); +} + +static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page **bpage) +{ + struct list_head *p = rb_list_head((*bpage)->list.next); + + *bpage = list_entry(p, struct buffer_page, list); +} + +static struct buffer_page * +rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct buffer_page *head; + struct buffer_page *page; + struct list_head *list; + int i; + + if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) + return NULL; + + /* sanity check */ + list = cpu_buffer->pages; + if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) + return NULL; + + page = head = cpu_buffer->head_page; + /* + * It is possible that the writer moves the header behind + * where we started, and we miss in one loop. + * A second loop should grab the header, but we'll do + * three loops just because I'm paranoid. + */ + for (i = 0; i < 3; i++) { + do { + if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { + cpu_buffer->head_page = page; + return page; + } + rb_inc_page(cpu_buffer, &page); + } while (page != head); + } + + RB_WARN_ON(cpu_buffer, 1); + + return NULL; +} + +static int rb_head_page_replace(struct buffer_page *old, + struct buffer_page *new) +{ + unsigned long *ptr = (unsigned long *)&old->list.prev->next; + unsigned long val; + unsigned long ret; + + val = *ptr & ~RB_FLAG_MASK; + val |= RB_PAGE_HEAD; + + ret = cmpxchg(ptr, val, (unsigned long)&new->list); + + return ret == val; +} + +/* + * rb_tail_page_update - move the tail page forward + * + * Returns 1 if moved tail page, 0 if someone else did. + */ +static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *tail_page, + struct buffer_page *next_page) +{ + struct buffer_page *old_tail; + unsigned long old_entries; + unsigned long old_write; + int ret = 0; + + /* + * The tail page now needs to be moved forward. + * + * We need to reset the tail page, but without messing + * with possible erasing of data brought in by interrupts + * that have moved the tail page and are currently on it. + * + * We add a counter to the write field to denote this. + */ + old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); + old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); + + /* + * Just make sure we have seen our old_write and synchronize + * with any interrupts that come in. + */ + barrier(); + + /* + * If the tail page is still the same as what we think + * it is, then it is up to us to update the tail + * pointer. + */ + if (tail_page == cpu_buffer->tail_page) { + /* Zero the write counter */ + unsigned long val = old_write & ~RB_WRITE_MASK; + unsigned long eval = old_entries & ~RB_WRITE_MASK; + + /* + * This will only succeed if an interrupt did + * not come in and change it. In which case, we + * do not want to modify it. + * + * We add (void) to let the compiler know that we do not care + * about the return value of these functions. We use the + * cmpxchg to only update if an interrupt did not already + * do it for us. If the cmpxchg fails, we don't care. + */ + (void)local_cmpxchg(&next_page->write, old_write, val); + (void)local_cmpxchg(&next_page->entries, old_entries, eval); + + /* + * No need to worry about races with clearing out the commit. + * it only can increment when a commit takes place. But that + * only happens in the outer most nested commit. + */ + local_set(&next_page->page->commit, 0); + + old_tail = cmpxchg(&cpu_buffer->tail_page, + tail_page, next_page); + + if (old_tail == tail_page) + ret = 1; + } + + return ret; +} + +static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *bpage) +{ + unsigned long val = (unsigned long)bpage; + + if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) + return 1; + + return 0; +} + +/** + * rb_check_list - make sure a pointer to a list has the last bits zero + */ +static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, + struct list_head *list) +{ + if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) + return 1; + if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) + return 1; + return 0; +} + +/** + * check_pages - integrity check of buffer pages + * @cpu_buffer: CPU buffer with pages to test + * + * As a safety measure we check to make sure the data pages have not + * been corrupted. + */ +static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct list_head *head = cpu_buffer->pages; + struct buffer_page *bpage, *tmp; + + rb_head_page_deactivate(cpu_buffer); + + if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) + return -1; + if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) + return -1; + + if (rb_check_list(cpu_buffer, head)) + return -1; + + list_for_each_entry_safe(bpage, tmp, head, list) { + if (RB_WARN_ON(cpu_buffer, + bpage->list.next->prev != &bpage->list)) + return -1; + if (RB_WARN_ON(cpu_buffer, + bpage->list.prev->next != &bpage->list)) + return -1; + if (rb_check_list(cpu_buffer, &bpage->list)) + return -1; + } + + rb_head_page_activate(cpu_buffer); + + return 0; +} + +static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, + unsigned nr_pages) +{ + struct buffer_page *bpage, *tmp; + unsigned long addr; + LIST_HEAD(pages); + unsigned i; + + WARN_ON(!nr_pages); + + for (i = 0; i < nr_pages; i++) { + bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), + GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); + if (!bpage) + goto free_pages; + + rb_check_bpage(cpu_buffer, bpage); + + list_add(&bpage->list, &pages); + + addr = __get_free_page(GFP_KERNEL); + if (!addr) + goto free_pages; + bpage->page = (void *)addr; + rb_init_page(bpage->page); + } + + /* + * The ring buffer page list is a circular list that does not + * start and end with a list head. All page list items point to + * other pages. + */ + cpu_buffer->pages = pages.next; + list_del(&pages); + + rb_check_pages(cpu_buffer); + + return 0; + + free_pages: + list_for_each_entry_safe(bpage, tmp, &pages, list) { + list_del_init(&bpage->list); + free_buffer_page(bpage); + } + return -ENOMEM; +} + +static struct ring_buffer_per_cpu * +rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + struct buffer_page *bpage; + unsigned long addr; + int ret; + + cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), + GFP_KERNEL, cpu_to_node(cpu)); + if (!cpu_buffer) + return NULL; + + cpu_buffer->cpu = cpu; + cpu_buffer->buffer = buffer; + spin_lock_init(&cpu_buffer->reader_lock); + lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); + cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; + + bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), + GFP_KERNEL, cpu_to_node(cpu)); + if (!bpage) + goto fail_free_buffer; + + rb_check_bpage(cpu_buffer, bpage); + + cpu_buffer->reader_page = bpage; + addr = __get_free_page(GFP_KERNEL); + if (!addr) + goto fail_free_reader; + bpage->page = (void *)addr; + rb_init_page(bpage->page); + + INIT_LIST_HEAD(&cpu_buffer->reader_page->list); + + ret = rb_allocate_pages(cpu_buffer, buffer->pages); + if (ret < 0) + goto fail_free_reader; + + cpu_buffer->head_page + = list_entry(cpu_buffer->pages, struct buffer_page, list); + cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; + + rb_head_page_activate(cpu_buffer); + + return cpu_buffer; + + fail_free_reader: + free_buffer_page(cpu_buffer->reader_page); + + fail_free_buffer: + kfree(cpu_buffer); + return NULL; +} + +static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct list_head *head = cpu_buffer->pages; + struct buffer_page *bpage, *tmp; + + free_buffer_page(cpu_buffer->reader_page); + + rb_head_page_deactivate(cpu_buffer); + + if (head) { + list_for_each_entry_safe(bpage, tmp, head, list) { + list_del_init(&bpage->list); + free_buffer_page(bpage); + } + bpage = list_entry(head, struct buffer_page, list); + free_buffer_page(bpage); + } + + kfree(cpu_buffer); +} + +#ifdef CONFIG_HOTPLUG_CPU +static int rb_cpu_notify(struct notifier_block *self, + unsigned long action, void *hcpu); +#endif + +/** + * ring_buffer_alloc - allocate a new ring_buffer + * @size: the size in bytes per cpu that is needed. + * @flags: attributes to set for the ring buffer. + * + * Currently the only flag that is available is the RB_FL_OVERWRITE + * flag. This flag means that the buffer will overwrite old data + * when the buffer wraps. If this flag is not set, the buffer will + * drop data when the tail hits the head. + */ +struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, + struct lock_class_key *key) +{ + struct ring_buffer *buffer; + int bsize; + int cpu; + + /* keep it in its own cache line */ + buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), + GFP_KERNEL); + if (!buffer) + return NULL; + + if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) + goto fail_free_buffer; + + buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); + buffer->flags = flags; + buffer->clock = trace_clock_local; + buffer->reader_lock_key = key; /* need at least two pages */ - if (buffer->pages == 1) - buffer->pages++; + if (buffer->pages < 2) + buffer->pages = 2; + /* + * In case of non-hotplug cpu, if the ring-buffer is allocated + * in early initcall, it will not be notified of secondary cpus. + * In that off case, we need to allocate for all possible cpus. + */ +#ifdef CONFIG_HOTPLUG_CPU get_online_cpus(); cpumask_copy(buffer->cpumask, cpu_online_mask); +#else + cpumask_copy(buffer->cpumask, cpu_possible_mask); +#endif buffer->cpus = nr_cpu_ids; bsize = sizeof(void *) * nr_cpu_ids; @@ -523,7 +1126,7 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) goto fail_free_buffers; } -#ifdef CONFIG_HOTPLUG +#ifdef CONFIG_HOTPLUG_CPU buffer->cpu_notify.notifier_call = rb_cpu_notify; buffer->cpu_notify.priority = 0; register_cpu_notifier(&buffer->cpu_notify); @@ -549,7 +1152,7 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) kfree(buffer); return NULL; } -EXPORT_SYMBOL_GPL(ring_buffer_alloc); +EXPORT_SYMBOL_GPL(__ring_buffer_alloc); /** * ring_buffer_free - free a ring buffer. @@ -562,7 +1165,7 @@ ring_buffer_free(struct ring_buffer *buffer) get_online_cpus(); -#ifdef CONFIG_HOTPLUG +#ifdef CONFIG_HOTPLUG_CPU unregister_cpu_notifier(&buffer->cpu_notify); #endif @@ -571,12 +1174,19 @@ ring_buffer_free(struct ring_buffer *buffer) put_online_cpus(); + kfree(buffer->buffers); free_cpumask_var(buffer->cpumask); kfree(buffer); } EXPORT_SYMBOL_GPL(ring_buffer_free); +void ring_buffer_set_clock(struct ring_buffer *buffer, + u64 (*clock)(void)) +{ + buffer->clock = clock; +} + static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); static void @@ -586,26 +1196,24 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) struct list_head *p; unsigned i; - atomic_inc(&cpu_buffer->record_disabled); - synchronize_sched(); + spin_lock_irq(&cpu_buffer->reader_lock); + rb_head_page_deactivate(cpu_buffer); for (i = 0; i < nr_pages; i++) { - if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) + if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) return; - p = cpu_buffer->pages.next; + p = cpu_buffer->pages->next; bpage = list_entry(p, struct buffer_page, list); list_del_init(&bpage->list); free_buffer_page(bpage); } - if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) + if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) return; rb_reset_cpu(cpu_buffer); - rb_check_pages(cpu_buffer); - atomic_dec(&cpu_buffer->record_disabled); - + spin_unlock_irq(&cpu_buffer->reader_lock); } static void @@ -616,8 +1224,8 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, struct list_head *p; unsigned i; - atomic_inc(&cpu_buffer->record_disabled); - synchronize_sched(); + spin_lock_irq(&cpu_buffer->reader_lock); + rb_head_page_deactivate(cpu_buffer); for (i = 0; i < nr_pages; i++) { if (RB_WARN_ON(cpu_buffer, list_empty(pages))) @@ -625,13 +1233,12 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, p = pages->next; bpage = list_entry(p, struct buffer_page, list); list_del_init(&bpage->list); - list_add_tail(&bpage->list, &cpu_buffer->pages); + list_add_tail(&bpage->list, cpu_buffer->pages); } rb_reset_cpu(cpu_buffer); - rb_check_pages(cpu_buffer); - atomic_dec(&cpu_buffer->record_disabled); + spin_unlock_irq(&cpu_buffer->reader_lock); } /** @@ -639,11 +1246,6 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, * @buffer: the buffer to resize. * @size: the new size. * - * The tracer is responsible for making sure that the buffer is - * not being used while changing the size. - * Note: We may be able to change the above requirement by using - * RCU synchronizations. - * * Minimum size is 2 * BUF_PAGE_SIZE. * * Returns -1 on failure. @@ -675,6 +1277,11 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) if (size == buffer_size) return size; + atomic_inc(&buffer->record_disabled); + + /* Make sure all writers are done with this buffer. */ + synchronize_sched(); + mutex_lock(&buffer->mutex); get_online_cpus(); @@ -737,6 +1344,8 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) put_online_cpus(); mutex_unlock(&buffer->mutex); + atomic_dec(&buffer->record_disabled); + return size; free_pages: @@ -746,6 +1355,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) } put_online_cpus(); mutex_unlock(&buffer->mutex); + atomic_dec(&buffer->record_disabled); return -ENOMEM; /* @@ -755,15 +1365,11 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) out_fail: put_online_cpus(); mutex_unlock(&buffer->mutex); + atomic_dec(&buffer->record_disabled); return -1; } EXPORT_SYMBOL_GPL(ring_buffer_resize); -static inline int rb_null_event(struct ring_buffer_event *event) -{ - return event->type == RINGBUF_TYPE_PADDING; -} - static inline void * __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) { @@ -783,21 +1389,14 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) } static inline struct ring_buffer_event * -rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) -{ - return __rb_page_index(cpu_buffer->head_page, - cpu_buffer->head_page->read); -} - -static inline struct ring_buffer_event * rb_iter_head_event(struct ring_buffer_iter *iter) { return __rb_page_index(iter->head_page, iter->head); } -static inline unsigned rb_page_write(struct buffer_page *bpage) +static inline unsigned long rb_page_write(struct buffer_page *bpage) { - return local_read(&bpage->write); + return local_read(&bpage->write) & RB_WRITE_MASK; } static inline unsigned rb_page_commit(struct buffer_page *bpage) @@ -805,6 +1404,11 @@ static inline unsigned rb_page_commit(struct buffer_page *bpage) return local_read(&bpage->page->commit); } +static inline unsigned long rb_page_entries(struct buffer_page *bpage) +{ + return local_read(&bpage->entries) & RB_WRITE_MASK; +} + /* Size is determined by what has been commited */ static inline unsigned rb_page_size(struct buffer_page *bpage) { @@ -817,58 +1421,17 @@ rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) return rb_page_commit(cpu_buffer->commit_page); } -static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) -{ - return rb_page_commit(cpu_buffer->head_page); -} - -/* - * When the tail hits the head and the buffer is in overwrite mode, - * the head jumps to the next page and all content on the previous - * page is discarded. But before doing so, we update the overrun - * variable of the buffer. - */ -static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer) -{ - struct ring_buffer_event *event; - unsigned long head; - - for (head = 0; head < rb_head_size(cpu_buffer); - head += rb_event_length(event)) { - - event = __rb_page_index(cpu_buffer->head_page, head); - if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) - return; - /* Only count data entries */ - if (event->type != RINGBUF_TYPE_DATA) - continue; - cpu_buffer->overrun++; - cpu_buffer->entries--; - } -} - -static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, - struct buffer_page **bpage) -{ - struct list_head *p = (*bpage)->list.next; - - if (p == &cpu_buffer->pages) - p = p->next; - - *bpage = list_entry(p, struct buffer_page, list); -} - static inline unsigned rb_event_index(struct ring_buffer_event *event) { unsigned long addr = (unsigned long)event; - return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); + return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; } -static int -rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event) +static inline int +rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event *event) { unsigned long addr = (unsigned long)event; unsigned long index; @@ -881,33 +1444,10 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, } static void -rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event) -{ - unsigned long addr = (unsigned long)event; - unsigned long index; - - index = rb_event_index(event); - addr &= PAGE_MASK; - - while (cpu_buffer->commit_page->page != (void *)addr) { - if (RB_WARN_ON(cpu_buffer, - cpu_buffer->commit_page == cpu_buffer->tail_page)) - return; - cpu_buffer->commit_page->page->commit = - cpu_buffer->commit_page->write; - rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); - cpu_buffer->write_stamp = - cpu_buffer->commit_page->page->time_stamp; - } - - /* Now set the commit to the event's index */ - local_set(&cpu_buffer->commit_page->page->commit, index); -} - -static void rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) { + unsigned long max_count; + /* * We only race with interrupts and NMIs on this CPU. * If we own the commit event, then we can commit @@ -917,9 +1457,16 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) * assign the commit to the tail. */ again: + max_count = cpu_buffer->buffer->pages * 100; + while (cpu_buffer->commit_page != cpu_buffer->tail_page) { - cpu_buffer->commit_page->page->commit = - cpu_buffer->commit_page->write; + if (RB_WARN_ON(cpu_buffer, !(--max_count))) + return; + if (RB_WARN_ON(cpu_buffer, + rb_is_reader_page(cpu_buffer->tail_page))) + return; + local_set(&cpu_buffer->commit_page->page->commit, + rb_page_write(cpu_buffer->commit_page)); rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); cpu_buffer->write_stamp = cpu_buffer->commit_page->page->time_stamp; @@ -928,8 +1475,12 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) } while (rb_commit_index(cpu_buffer) != rb_page_write(cpu_buffer->commit_page)) { - cpu_buffer->commit_page->page->commit = - cpu_buffer->commit_page->write; + + local_set(&cpu_buffer->commit_page->page->commit, + rb_page_write(cpu_buffer->commit_page)); + RB_WARN_ON(cpu_buffer, + local_read(&cpu_buffer->commit_page->page->commit) & + ~RB_WRITE_MASK); barrier(); } @@ -962,7 +1513,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter) * to the head page instead of next. */ if (iter->head_page == cpu_buffer->reader_page) - iter->head_page = cpu_buffer->head_page; + iter->head_page = rb_set_head_page(cpu_buffer); else rb_inc_page(cpu_buffer, &iter->head_page); @@ -985,32 +1536,182 @@ static void rb_update_event(struct ring_buffer_event *event, unsigned type, unsigned length) { - event->type = type; + event->type_len = type; + + switch (type) { + + case RINGBUF_TYPE_PADDING: + case RINGBUF_TYPE_TIME_EXTEND: + case RINGBUF_TYPE_TIME_STAMP: + break; + + case 0: + length -= RB_EVNT_HDR_SIZE; + if (length > RB_MAX_SMALL_DATA) + event->array[0] = length; + else + event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); + break; + default: + BUG(); + } +} + +/* + * rb_handle_head_page - writer hit the head page + * + * Returns: +1 to retry page + * 0 to continue + * -1 on error + */ +static int +rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *tail_page, + struct buffer_page *next_page) +{ + struct buffer_page *new_head; + int entries; + int type; + int ret; + + entries = rb_page_entries(next_page); + + /* + * The hard part is here. We need to move the head + * forward, and protect against both readers on + * other CPUs and writers coming in via interrupts. + */ + type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, + RB_PAGE_HEAD); + + /* + * type can be one of four: + * NORMAL - an interrupt already moved it for us + * HEAD - we are the first to get here. + * UPDATE - we are the interrupt interrupting + * a current move. + * MOVED - a reader on another CPU moved the next + * pointer to its reader page. Give up + * and try again. + */ + + switch (type) { + case RB_PAGE_HEAD: + /* + * We changed the head to UPDATE, thus + * it is our responsibility to update + * the counters. + */ + local_add(entries, &cpu_buffer->overrun); + + /* + * The entries will be zeroed out when we move the + * tail page. + */ + + /* still more to do */ + break; + + case RB_PAGE_UPDATE: + /* + * This is an interrupt that interrupt the + * previous update. Still more to do. + */ + break; + case RB_PAGE_NORMAL: + /* + * An interrupt came in before the update + * and processed this for us. + * Nothing left to do. + */ + return 1; + case RB_PAGE_MOVED: + /* + * The reader is on another CPU and just did + * a swap with our next_page. + * Try again. + */ + return 1; + default: + RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ + return -1; + } - switch (type) { + /* + * Now that we are here, the old head pointer is + * set to UPDATE. This will keep the reader from + * swapping the head page with the reader page. + * The reader (on another CPU) will spin till + * we are finished. + * + * We just need to protect against interrupts + * doing the job. We will set the next pointer + * to HEAD. After that, we set the old pointer + * to NORMAL, but only if it was HEAD before. + * otherwise we are an interrupt, and only + * want the outer most commit to reset it. + */ + new_head = next_page; + rb_inc_page(cpu_buffer, &new_head); - case RINGBUF_TYPE_PADDING: - break; + ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, + RB_PAGE_NORMAL); - case RINGBUF_TYPE_TIME_EXTEND: - event->len = DIV_ROUND_UP(RB_LEN_TIME_EXTEND, RB_ALIGNMENT); + /* + * Valid returns are: + * HEAD - an interrupt came in and already set it. + * NORMAL - One of two things: + * 1) We really set it. + * 2) A bunch of interrupts came in and moved + * the page forward again. + */ + switch (ret) { + case RB_PAGE_HEAD: + case RB_PAGE_NORMAL: + /* OK */ break; + default: + RB_WARN_ON(cpu_buffer, 1); + return -1; + } - case RINGBUF_TYPE_TIME_STAMP: - event->len = DIV_ROUND_UP(RB_LEN_TIME_STAMP, RB_ALIGNMENT); - break; + /* + * It is possible that an interrupt came in, + * set the head up, then more interrupts came in + * and moved it again. When we get back here, + * the page would have been set to NORMAL but we + * just set it back to HEAD. + * + * How do you detect this? Well, if that happened + * the tail page would have moved. + */ + if (ret == RB_PAGE_NORMAL) { + /* + * If the tail had moved passed next, then we need + * to reset the pointer. + */ + if (cpu_buffer->tail_page != tail_page && + cpu_buffer->tail_page != next_page) + rb_head_page_set_normal(cpu_buffer, new_head, + next_page, + RB_PAGE_HEAD); + } - case RINGBUF_TYPE_DATA: - length -= RB_EVNT_HDR_SIZE; - if (length > RB_MAX_SMALL_DATA) { - event->len = 0; - event->array[0] = length; - } else - event->len = DIV_ROUND_UP(length, RB_ALIGNMENT); - break; - default: - BUG(); + /* + * If this was the outer most commit (the one that + * changed the original pointer from HEAD to UPDATE), + * then it is up to us to reset it to NORMAL. + */ + if (type == RB_PAGE_HEAD) { + ret = rb_head_page_set_normal(cpu_buffer, next_page, + tail_page, + RB_PAGE_UPDATE); + if (RB_WARN_ON(cpu_buffer, + ret != RB_PAGE_UPDATE)) + return -1; } + + return 0; } static unsigned rb_calculate_event_length(unsigned length) @@ -1030,158 +1731,236 @@ static unsigned rb_calculate_event_length(unsigned length) return length; } -static struct ring_buffer_event * -__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, - unsigned type, unsigned long length, u64 *ts) +static inline void +rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, + struct buffer_page *tail_page, + unsigned long tail, unsigned long length) { - struct buffer_page *tail_page, *head_page, *reader_page, *commit_page; - unsigned long tail, write; - struct ring_buffer *buffer = cpu_buffer->buffer; struct ring_buffer_event *event; - unsigned long flags; - bool lock_taken = false; - commit_page = cpu_buffer->commit_page; - /* we just need to protect against interrupts */ - barrier(); - tail_page = cpu_buffer->tail_page; - write = local_add_return(length, &tail_page->write); - tail = write - length; + /* + * Only the event that crossed the page boundary + * must fill the old tail_page with padding. + */ + if (tail >= BUF_PAGE_SIZE) { + local_sub(length, &tail_page->write); + return; + } - /* See if we shot pass the end of this buffer page */ - if (write > BUF_PAGE_SIZE) { - struct buffer_page *next_page = tail_page; + event = __rb_page_index(tail_page, tail); + kmemcheck_annotate_bitfield(event, bitfield); - local_irq_save(flags); - /* - * Since the write to the buffer is still not - * fully lockless, we must be careful with NMIs. - * The locks in the writers are taken when a write - * crosses to a new page. The locks protect against - * races with the readers (this will soon be fixed - * with a lockless solution). - * - * Because we can not protect against NMIs, and we - * want to keep traces reentrant, we need to manage - * what happens when we are in an NMI. - * - * NMIs can happen after we take the lock. - * If we are in an NMI, only take the lock - * if it is not already taken. Otherwise - * simply fail. - */ - if (unlikely(in_nmi())) { - if (!__raw_spin_trylock(&cpu_buffer->lock)) - goto out_reset; - } else - __raw_spin_lock(&cpu_buffer->lock); + /* + * If this event is bigger than the minimum size, then + * we need to be careful that we don't subtract the + * write counter enough to allow another writer to slip + * in on this page. + * We put in a discarded commit instead, to make sure + * that this space is not used again. + * + * If we are less than the minimum size, we don't need to + * worry about it. + */ + if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { + /* No room for any events */ + + /* Mark the rest of the page with padding */ + rb_event_set_padding(event); + + /* Set the write back to the previous setting */ + local_sub(length, &tail_page->write); + return; + } + + /* Put in a discarded event */ + event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; + event->type_len = RINGBUF_TYPE_PADDING; + /* time delta must be non zero */ + event->time_delta = 1; - lock_taken = true; + /* Set write to end of buffer */ + length = (tail + length) - BUF_PAGE_SIZE; + local_sub(length, &tail_page->write); +} + +static struct ring_buffer_event * +rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, + unsigned long length, unsigned long tail, + struct buffer_page *tail_page, u64 *ts) +{ + struct buffer_page *commit_page = cpu_buffer->commit_page; + struct ring_buffer *buffer = cpu_buffer->buffer; + struct buffer_page *next_page; + int ret; - rb_inc_page(cpu_buffer, &next_page); + next_page = tail_page; - head_page = cpu_buffer->head_page; - reader_page = cpu_buffer->reader_page; + rb_inc_page(cpu_buffer, &next_page); + + /* + * If for some reason, we had an interrupt storm that made + * it all the way around the buffer, bail, and warn + * about it. + */ + if (unlikely(next_page == commit_page)) { + local_inc(&cpu_buffer->commit_overrun); + goto out_reset; + } - /* we grabbed the lock before incrementing */ - if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) - goto out_reset; + /* + * This is where the fun begins! + * + * We are fighting against races between a reader that + * could be on another CPU trying to swap its reader + * page with the buffer head. + * + * We are also fighting against interrupts coming in and + * moving the head or tail on us as well. + * + * If the next page is the head page then we have filled + * the buffer, unless the commit page is still on the + * reader page. + */ + if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { /* - * If for some reason, we had an interrupt storm that made - * it all the way around the buffer, bail, and warn - * about it. + * If the commit is not on the reader page, then + * move the header page. */ - if (unlikely(next_page == commit_page)) { - WARN_ON_ONCE(1); - goto out_reset; - } - - if (next_page == head_page) { + if (!rb_is_reader_page(cpu_buffer->commit_page)) { + /* + * If we are not in overwrite mode, + * this is easy, just stop here. + */ if (!(buffer->flags & RB_FL_OVERWRITE)) goto out_reset; - /* tail_page has not moved yet? */ - if (tail_page == cpu_buffer->tail_page) { - /* count overflows */ - rb_update_overflow(cpu_buffer); - - rb_inc_page(cpu_buffer, &head_page); - cpu_buffer->head_page = head_page; - cpu_buffer->head_page->read = 0; + ret = rb_handle_head_page(cpu_buffer, + tail_page, + next_page); + if (ret < 0) + goto out_reset; + if (ret) + goto out_again; + } else { + /* + * We need to be careful here too. The + * commit page could still be on the reader + * page. We could have a small buffer, and + * have filled up the buffer with events + * from interrupts and such, and wrapped. + * + * Note, if the tail page is also the on the + * reader_page, we let it move out. + */ + if (unlikely((cpu_buffer->commit_page != + cpu_buffer->tail_page) && + (cpu_buffer->commit_page == + cpu_buffer->reader_page))) { + local_inc(&cpu_buffer->commit_overrun); + goto out_reset; } } + } + ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); + if (ret) { /* - * If the tail page is still the same as what we think - * it is, then it is up to us to update the tail - * pointer. + * Nested commits always have zero deltas, so + * just reread the time stamp */ - if (tail_page == cpu_buffer->tail_page) { - local_set(&next_page->write, 0); - local_set(&next_page->page->commit, 0); - cpu_buffer->tail_page = next_page; - - /* reread the time stamp */ - *ts = ring_buffer_time_stamp(cpu_buffer->cpu); - cpu_buffer->tail_page->page->time_stamp = *ts; - } + *ts = rb_time_stamp(buffer); + next_page->page->time_stamp = *ts; + } - /* - * The actual tail page has moved forward. - */ - if (tail < BUF_PAGE_SIZE) { - /* Mark the rest of the page with padding */ - event = __rb_page_index(tail_page, tail); - event->type = RINGBUF_TYPE_PADDING; - } + out_again: - if (tail <= BUF_PAGE_SIZE) - /* Set the write back to the previous setting */ - local_set(&tail_page->write, tail); + rb_reset_tail(cpu_buffer, tail_page, tail, length); - /* - * If this was a commit entry that failed, - * increment that too - */ - if (tail_page == cpu_buffer->commit_page && - tail == rb_commit_index(cpu_buffer)) { - rb_set_commit_to_write(cpu_buffer); - } + /* fail and let the caller try again */ + return ERR_PTR(-EAGAIN); - __raw_spin_unlock(&cpu_buffer->lock); - local_irq_restore(flags); + out_reset: + /* reset write */ + rb_reset_tail(cpu_buffer, tail_page, tail, length); - /* fail and let the caller try again */ - return ERR_PTR(-EAGAIN); - } + return NULL; +} - /* We reserved something on the buffer */ +static struct ring_buffer_event * +__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, + unsigned type, unsigned long length, u64 *ts) +{ + struct buffer_page *tail_page; + struct ring_buffer_event *event; + unsigned long tail, write; - if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE)) - return NULL; + tail_page = cpu_buffer->tail_page; + write = local_add_return(length, &tail_page->write); + + /* set write to only the index of the write */ + write &= RB_WRITE_MASK; + tail = write - length; + + /* See if we shot pass the end of this buffer page */ + if (write > BUF_PAGE_SIZE) + return rb_move_tail(cpu_buffer, length, tail, + tail_page, ts); + + /* We reserved something on the buffer */ event = __rb_page_index(tail_page, tail); + kmemcheck_annotate_bitfield(event, bitfield); rb_update_event(event, type, length); + /* The passed in type is zero for DATA */ + if (likely(!type)) + local_inc(&tail_page->entries); + /* - * If this is a commit and the tail is zero, then update - * this page's time stamp. + * If this is the first commit on the page, then update + * its timestamp. */ - if (!tail && rb_is_commit(cpu_buffer, event)) - cpu_buffer->commit_page->page->time_stamp = *ts; + if (!tail) + tail_page->page->time_stamp = *ts; return event; +} - out_reset: - /* reset write */ - if (tail <= BUF_PAGE_SIZE) - local_set(&tail_page->write, tail); +static inline int +rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event *event) +{ + unsigned long new_index, old_index; + struct buffer_page *bpage; + unsigned long index; + unsigned long addr; - if (likely(lock_taken)) - __raw_spin_unlock(&cpu_buffer->lock); - local_irq_restore(flags); - return NULL; + new_index = rb_event_index(event); + old_index = new_index + rb_event_length(event); + addr = (unsigned long)event; + addr &= PAGE_MASK; + + bpage = cpu_buffer->tail_page; + + if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { + unsigned long write_mask = + local_read(&bpage->write) & ~RB_WRITE_MASK; + /* + * This is on the tail page. It is possible that + * a write could come in and move the tail page + * and write to the next page. That is fine + * because we just shorten what is on this page. + */ + old_index += write_mask; + new_index += write_mask; + index = local_cmpxchg(&bpage->write, old_index, new_index); + if (index == old_index) + return 1; + } + + /* could not discard */ + return 0; } static int @@ -1216,26 +1995,33 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, return -EAGAIN; /* Only a commited time event can update the write stamp */ - if (rb_is_commit(cpu_buffer, event)) { + if (rb_event_is_commit(cpu_buffer, event)) { /* - * If this is the first on the page, then we need to - * update the page itself, and just put in a zero. + * If this is the first on the page, then it was + * updated with the page itself. Try to discard it + * and if we can't just make it zero. */ if (rb_event_index(event)) { event->time_delta = *delta & TS_MASK; event->array[0] = *delta >> TS_SHIFT; } else { - cpu_buffer->commit_page->page->time_stamp = *ts; - event->time_delta = 0; - event->array[0] = 0; + /* try to discard, since we do not need this */ + if (!rb_try_to_discard(cpu_buffer, event)) { + /* nope, just zero it */ + event->time_delta = 0; + event->array[0] = 0; + } } cpu_buffer->write_stamp = *ts; /* let the caller know this was the commit */ ret = 1; } else { - /* Darn, this is just wasted space */ - event->time_delta = 0; - event->array[0] = 0; + /* Try to discard the event */ + if (!rb_try_to_discard(cpu_buffer, event)) { + /* Darn, this is just wasted space */ + event->time_delta = 0; + event->array[0] = 0; + } ret = 0; } @@ -1244,15 +2030,72 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, return ret; } +static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) +{ + local_inc(&cpu_buffer->committing); + local_inc(&cpu_buffer->commits); +} + +static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) +{ + unsigned long commits; + + if (RB_WARN_ON(cpu_buffer, + !local_read(&cpu_buffer->committing))) + return; + + again: + commits = local_read(&cpu_buffer->commits); + /* synchronize with interrupts */ + barrier(); + if (local_read(&cpu_buffer->committing) == 1) + rb_set_commit_to_write(cpu_buffer); + + local_dec(&cpu_buffer->committing); + + /* synchronize with interrupts */ + barrier(); + + /* + * Need to account for interrupts coming in between the + * updating of the commit page and the clearing of the + * committing counter. + */ + if (unlikely(local_read(&cpu_buffer->commits) != commits) && + !local_read(&cpu_buffer->committing)) { + local_inc(&cpu_buffer->committing); + goto again; + } +} + static struct ring_buffer_event * -rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, - unsigned type, unsigned long length) +rb_reserve_next_event(struct ring_buffer *buffer, + struct ring_buffer_per_cpu *cpu_buffer, + unsigned long length) { struct ring_buffer_event *event; - u64 ts, delta; + u64 ts, delta = 0; int commit = 0; int nr_loops = 0; + rb_start_commit(cpu_buffer); + +#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP + /* + * Due to the ability to swap a cpu buffer from a buffer + * it is possible it was swapped before we committed. + * (committing stops a swap). We check for it here and + * if it happened, we have to fail the write. + */ + barrier(); + if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { + local_dec(&cpu_buffer->committing); + local_dec(&cpu_buffer->commits); + return NULL; + } +#endif + + length = rb_calculate_event_length(length); again: /* * We allow for interrupts to reenter here and do a trace. @@ -1264,9 +2107,9 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, * Bail! */ if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) - return NULL; + goto out_fail; - ts = ring_buffer_time_stamp(cpu_buffer->cpu); + ts = rb_time_stamp(cpu_buffer->buffer); /* * Only the first commit can update the timestamp. @@ -1276,63 +2119,93 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, * also be made. But only the entry that did the actual * commit will be something other than zero. */ - if (cpu_buffer->tail_page == cpu_buffer->commit_page && - rb_page_write(cpu_buffer->tail_page) == - rb_commit_index(cpu_buffer)) { + if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page && + rb_page_write(cpu_buffer->tail_page) == + rb_commit_index(cpu_buffer))) { + u64 diff; - delta = ts - cpu_buffer->write_stamp; + diff = ts - cpu_buffer->write_stamp; - /* make sure this delta is calculated here */ + /* make sure this diff is calculated here */ barrier(); /* Did the write stamp get updated already? */ if (unlikely(ts < cpu_buffer->write_stamp)) - delta = 0; + goto get_event; - if (test_time_stamp(delta)) { + delta = diff; + if (unlikely(test_time_stamp(delta))) { commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); - if (commit == -EBUSY) - return NULL; + goto out_fail; if (commit == -EAGAIN) goto again; RB_WARN_ON(cpu_buffer, commit < 0); } - } else - /* Non commits have zero deltas */ - delta = 0; + } - event = __rb_reserve_next(cpu_buffer, type, length, &ts); - if (PTR_ERR(event) == -EAGAIN) + get_event: + event = __rb_reserve_next(cpu_buffer, 0, length, &ts); + if (unlikely(PTR_ERR(event) == -EAGAIN)) goto again; - if (!event) { - if (unlikely(commit)) - /* - * Ouch! We needed a timestamp and it was commited. But - * we didn't get our event reserved. - */ - rb_set_commit_to_write(cpu_buffer); - return NULL; - } + if (!event) + goto out_fail; - /* - * If the timestamp was commited, make the commit our entry - * now so that we will update it when needed. - */ - if (commit) - rb_set_commit_event(cpu_buffer, event); - else if (!rb_is_commit(cpu_buffer, event)) + if (!rb_event_is_commit(cpu_buffer, event)) delta = 0; event->time_delta = delta; return event; + + out_fail: + rb_end_commit(cpu_buffer); + return NULL; +} + +#ifdef CONFIG_TRACING + +#define TRACE_RECURSIVE_DEPTH 16 + +static int trace_recursive_lock(void) +{ + current->trace_recursion++; + + if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) + return 0; + + /* Disable all tracing before we do anything else */ + tracing_off_permanent(); + + printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" + "HC[%lu]:SC[%lu]:NMI[%lu]\n", + current->trace_recursion, + hardirq_count() >> HARDIRQ_SHIFT, + softirq_count() >> SOFTIRQ_SHIFT, + in_nmi()); + + WARN_ON_ONCE(1); + return -1; +} + +static void trace_recursive_unlock(void) +{ + WARN_ON_ONCE(!current->trace_recursion); + + current->trace_recursion--; } +#else + +#define trace_recursive_lock() (0) +#define trace_recursive_unlock() do { } while (0) + +#endif + static DEFINE_PER_CPU(int, rb_need_resched); /** @@ -1366,6 +2239,9 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) /* If we are tracing schedule, we don't want to recurse */ resched = ftrace_preempt_disable(); + if (trace_recursive_lock()) + goto out_nocheck; + cpu = raw_smp_processor_id(); if (!cpumask_test_cpu(cpu, buffer->cpumask)) @@ -1376,11 +2252,10 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) if (atomic_read(&cpu_buffer->record_disabled)) goto out; - length = rb_calculate_event_length(length); - if (length > BUF_PAGE_SIZE) + if (length > BUF_MAX_DATA_SIZE) goto out; - event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length); + event = rb_reserve_next_event(buffer, cpu_buffer, length); if (!event) goto out; @@ -1392,46 +2267,171 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) if (preempt_count() == 1) per_cpu(rb_need_resched, cpu) = resched; - return event; + return event; + + out: + trace_recursive_unlock(); + + out_nocheck: + ftrace_preempt_enable(resched); + return NULL; +} +EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); + +static void +rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event *event) +{ + /* + * The event first in the commit queue updates the + * time stamp. + */ + if (rb_event_is_commit(cpu_buffer, event)) + cpu_buffer->write_stamp += event->time_delta; +} + +static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event *event) +{ + local_inc(&cpu_buffer->entries); + rb_update_write_stamp(cpu_buffer, event); + rb_end_commit(cpu_buffer); +} + +/** + * ring_buffer_unlock_commit - commit a reserved + * @buffer: The buffer to commit to + * @event: The event pointer to commit. + * + * This commits the data to the ring buffer, and releases any locks held. + * + * Must be paired with ring_buffer_lock_reserve. + */ +int ring_buffer_unlock_commit(struct ring_buffer *buffer, + struct ring_buffer_event *event) +{ + struct ring_buffer_per_cpu *cpu_buffer; + int cpu = raw_smp_processor_id(); + + cpu_buffer = buffer->buffers[cpu]; + + rb_commit(cpu_buffer, event); + + trace_recursive_unlock(); + + /* + * Only the last preempt count needs to restore preemption. + */ + if (preempt_count() == 1) + ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); + else + preempt_enable_no_resched_notrace(); + + return 0; +} +EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); - out: - ftrace_preempt_enable(resched); - return NULL; +static inline void rb_event_discard(struct ring_buffer_event *event) +{ + /* array[0] holds the actual length for the discarded event */ + event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; + event->type_len = RINGBUF_TYPE_PADDING; + /* time delta must be non zero */ + if (!event->time_delta) + event->time_delta = 1; } -EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); -static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event) +/* + * Decrement the entries to the page that an event is on. + * The event does not even need to exist, only the pointer + * to the page it is on. This may only be called before the commit + * takes place. + */ +static inline void +rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event *event) { - cpu_buffer->entries++; + unsigned long addr = (unsigned long)event; + struct buffer_page *bpage = cpu_buffer->commit_page; + struct buffer_page *start; + + addr &= PAGE_MASK; - /* Only process further if we own the commit */ - if (!rb_is_commit(cpu_buffer, event)) + /* Do the likely case first */ + if (likely(bpage->page == (void *)addr)) { + local_dec(&bpage->entries); return; + } - cpu_buffer->write_stamp += event->time_delta; + /* + * Because the commit page may be on the reader page we + * start with the next page and check the end loop there. + */ + rb_inc_page(cpu_buffer, &bpage); + start = bpage; + do { + if (bpage->page == (void *)addr) { + local_dec(&bpage->entries); + return; + } + rb_inc_page(cpu_buffer, &bpage); + } while (bpage != start); - rb_set_commit_to_write(cpu_buffer); + /* commit not part of this buffer?? */ + RB_WARN_ON(cpu_buffer, 1); } /** - * ring_buffer_unlock_commit - commit a reserved - * @buffer: The buffer to commit to - * @event: The event pointer to commit. + * ring_buffer_commit_discard - discard an event that has not been committed + * @buffer: the ring buffer + * @event: non committed event to discard * - * This commits the data to the ring buffer, and releases any locks held. + * Sometimes an event that is in the ring buffer needs to be ignored. + * This function lets the user discard an event in the ring buffer + * and then that event will not be read later. * - * Must be paired with ring_buffer_lock_reserve. + * This function only works if it is called before the the item has been + * committed. It will try to free the event from the ring buffer + * if another event has not been added behind it. + * + * If another event has been added behind it, it will set the event + * up as discarded, and perform the commit. + * + * If this function is called, do not call ring_buffer_unlock_commit on + * the event. */ -int ring_buffer_unlock_commit(struct ring_buffer *buffer, - struct ring_buffer_event *event) +void ring_buffer_discard_commit(struct ring_buffer *buffer, + struct ring_buffer_event *event) { struct ring_buffer_per_cpu *cpu_buffer; - int cpu = raw_smp_processor_id(); + int cpu; + /* The event is discarded regardless */ + rb_event_discard(event); + + cpu = smp_processor_id(); cpu_buffer = buffer->buffers[cpu]; - rb_commit(cpu_buffer, event); + /* + * This must only be called if the event has not been + * committed yet. Thus we can assume that preemption + * is still disabled. + */ + RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); + + rb_decrement_entry(cpu_buffer, event); + if (rb_try_to_discard(cpu_buffer, event)) + goto out; + + /* + * The commit is still visible by the reader, so we + * must still update the timestamp. + */ + rb_update_write_stamp(cpu_buffer, event); + out: + rb_end_commit(cpu_buffer); + + trace_recursive_unlock(); /* * Only the last preempt count needs to restore preemption. @@ -1441,9 +2441,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer, else preempt_enable_no_resched_notrace(); - return 0; } -EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); +EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); /** * ring_buffer_write - write data to the buffer without reserving @@ -1464,7 +2463,6 @@ int ring_buffer_write(struct ring_buffer *buffer, { struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_event *event; - unsigned long event_length; void *body; int ret = -EBUSY; int cpu, resched; @@ -1487,9 +2485,10 @@ int ring_buffer_write(struct ring_buffer *buffer, if (atomic_read(&cpu_buffer->record_disabled)) goto out; - event_length = rb_calculate_event_length(length); - event = rb_reserve_next_event(cpu_buffer, - RINGBUF_TYPE_DATA, event_length); + if (length > BUF_MAX_DATA_SIZE) + goto out; + + event = rb_reserve_next_event(buffer, cpu_buffer, length); if (!event) goto out; @@ -1510,9 +2509,13 @@ EXPORT_SYMBOL_GPL(ring_buffer_write); static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) { struct buffer_page *reader = cpu_buffer->reader_page; - struct buffer_page *head = cpu_buffer->head_page; + struct buffer_page *head = rb_set_head_page(cpu_buffer); struct buffer_page *commit = cpu_buffer->commit_page; + /* In case of error, head will be NULL */ + if (unlikely(!head)) + return 1; + return reader->read == rb_page_commit(reader) && (commit == reader || (commit == head && @@ -1539,7 +2542,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable); * @buffer: The ring buffer to enable writes * * Note, multiple disables will need the same number of enables - * to truely enable the writing (much like preempt_disable). + * to truly enable the writing (much like preempt_disable). */ void ring_buffer_record_enable(struct ring_buffer *buffer) { @@ -1561,15 +2564,11 @@ void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - get_online_cpus(); - if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return; cpu_buffer = buffer->buffers[cpu]; atomic_inc(&cpu_buffer->record_disabled); - out: - put_online_cpus(); } EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); @@ -1579,21 +2578,17 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); * @cpu: The CPU to enable. * * Note, multiple disables will need the same number of enables - * to truely enable the writing (much like preempt_disable). + * to truly enable the writing (much like preempt_disable). */ void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - get_online_cpus(); - if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return; cpu_buffer = buffer->buffers[cpu]; atomic_dec(&cpu_buffer->record_disabled); - out: - put_online_cpus(); } EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); @@ -1605,17 +2600,14 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - unsigned long ret = 0; - - get_online_cpus(); + unsigned long ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return 0; cpu_buffer = buffer->buffers[cpu]; - ret = cpu_buffer->entries; - out: - put_online_cpus(); + ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun)) + - cpu_buffer->read; return ret; } @@ -1629,23 +2621,40 @@ EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - unsigned long ret = 0; - - get_online_cpus(); + unsigned long ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return 0; cpu_buffer = buffer->buffers[cpu]; - ret = cpu_buffer->overrun; - out: - put_online_cpus(); + ret = local_read(&cpu_buffer->overrun); return ret; } EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); /** + * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits + * @buffer: The ring buffer + * @cpu: The per CPU buffer to get the number of overruns from + */ +unsigned long +ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + unsigned long ret; + + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return 0; + + cpu_buffer = buffer->buffers[cpu]; + ret = local_read(&cpu_buffer->commit_overrun); + + return ret; +} +EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); + +/** * ring_buffer_entries - get the number of entries in a buffer * @buffer: The ring buffer * @@ -1658,22 +2667,19 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer) unsigned long entries = 0; int cpu; - get_online_cpus(); - /* if you care about this being correct, lock the buffer */ for_each_buffer_cpu(buffer, cpu) { cpu_buffer = buffer->buffers[cpu]; - entries += cpu_buffer->entries; + entries += (local_read(&cpu_buffer->entries) - + local_read(&cpu_buffer->overrun)) - cpu_buffer->read; } - put_online_cpus(); - return entries; } EXPORT_SYMBOL_GPL(ring_buffer_entries); /** - * ring_buffer_overrun_cpu - get the number of overruns in buffer + * ring_buffer_overruns - get the number of overruns in buffer * @buffer: The ring buffer * * Returns the total number of overruns in the ring buffer @@ -1685,16 +2691,12 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer) unsigned long overruns = 0; int cpu; - get_online_cpus(); - /* if you care about this being correct, lock the buffer */ for_each_buffer_cpu(buffer, cpu) { cpu_buffer = buffer->buffers[cpu]; - overruns += cpu_buffer->overrun; + overruns += local_read(&cpu_buffer->overrun); } - put_online_cpus(); - return overruns; } EXPORT_SYMBOL_GPL(ring_buffer_overruns); @@ -1705,8 +2707,10 @@ static void rb_iter_reset(struct ring_buffer_iter *iter) /* Iterator usage is expected to have record disabled */ if (list_empty(&cpu_buffer->reader_page->list)) { - iter->head_page = cpu_buffer->head_page; - iter->head = cpu_buffer->head_page->read; + iter->head_page = rb_set_head_page(cpu_buffer); + if (unlikely(!iter->head_page)) + return; + iter->head = iter->head_page->read; } else { iter->head_page = cpu_buffer->reader_page; iter->head = cpu_buffer->reader_page->read; @@ -1715,6 +2719,8 @@ static void rb_iter_reset(struct ring_buffer_iter *iter) iter->read_stamp = cpu_buffer->read_stamp; else iter->read_stamp = iter->head_page->page->time_stamp; + iter->cache_reader_page = cpu_buffer->reader_page; + iter->cache_read = cpu_buffer->read; } /** @@ -1761,7 +2767,7 @@ rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, { u64 delta; - switch (event->type) { + switch (event->type_len) { case RINGBUF_TYPE_PADDING: return; @@ -1792,7 +2798,7 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, { u64 delta; - switch (event->type) { + switch (event->type_len) { case RINGBUF_TYPE_PADDING: return; @@ -1823,9 +2829,10 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) struct buffer_page *reader = NULL; unsigned long flags; int nr_loops = 0; + int ret; local_irq_save(flags); - __raw_spin_lock(&cpu_buffer->lock); + arch_spin_lock(&cpu_buffer->lock); again: /* @@ -1856,29 +2863,56 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) goto out; /* - * Splice the empty reader page into the list around the head. * Reset the reader page to size zero. */ + local_set(&cpu_buffer->reader_page->write, 0); + local_set(&cpu_buffer->reader_page->entries, 0); + local_set(&cpu_buffer->reader_page->page->commit, 0); - reader = cpu_buffer->head_page; - cpu_buffer->reader_page->list.next = reader->list.next; + spin: + /* + * Splice the empty reader page into the list around the head. + */ + reader = rb_set_head_page(cpu_buffer); + cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); cpu_buffer->reader_page->list.prev = reader->list.prev; - local_set(&cpu_buffer->reader_page->write, 0); - local_set(&cpu_buffer->reader_page->page->commit, 0); + /* + * cpu_buffer->pages just needs to point to the buffer, it + * has no specific buffer page to point to. Lets move it out + * of our way so we don't accidently swap it. + */ + cpu_buffer->pages = reader->list.prev; + + /* The reader page will be pointing to the new head */ + rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); + + /* + * Here's the tricky part. + * + * We need to move the pointer past the header page. + * But we can only do that if a writer is not currently + * moving it. The page before the header page has the + * flag bit '1' set if it is pointing to the page we want. + * but if the writer is in the process of moving it + * than it will be '2' or already moved '0'. + */ - /* Make the reader page now replace the head */ - reader->list.prev->next = &cpu_buffer->reader_page->list; - reader->list.next->prev = &cpu_buffer->reader_page->list; + ret = rb_head_page_replace(reader, cpu_buffer->reader_page); /* - * If the tail is on the reader, then we must set the head - * to the inserted page, otherwise we set it one before. + * If we did not convert it, then we must try again. */ - cpu_buffer->head_page = cpu_buffer->reader_page; + if (!ret) + goto spin; - if (cpu_buffer->commit_page != reader) - rb_inc_page(cpu_buffer, &cpu_buffer->head_page); + /* + * Yeah! We succeeded in replacing the page. + * + * Now make the new head point back to the reader page. + */ + rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; + rb_inc_page(cpu_buffer, &cpu_buffer->head_page); /* Finally update the reader page to the new head */ cpu_buffer->reader_page = reader; @@ -1887,7 +2921,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) goto again; out: - __raw_spin_unlock(&cpu_buffer->lock); + arch_spin_unlock(&cpu_buffer->lock); local_irq_restore(flags); return reader; @@ -1907,8 +2941,8 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) event = rb_reader_event(cpu_buffer); - if (event->type == RINGBUF_TYPE_DATA) - cpu_buffer->entries--; + if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) + cpu_buffer->read++; rb_update_read_stamp(cpu_buffer, event); @@ -1930,8 +2964,8 @@ static void rb_advance_iter(struct ring_buffer_iter *iter) * Check if we are at the end of the buffer. */ if (iter->head >= rb_page_size(iter->head_page)) { - if (RB_WARN_ON(buffer, - iter->head_page == cpu_buffer->commit_page)) + /* discarded commits can make the page empty */ + if (iter->head_page == cpu_buffer->commit_page) return; rb_inc_iter(iter); return; @@ -1961,25 +2995,20 @@ static void rb_advance_iter(struct ring_buffer_iter *iter) } static struct ring_buffer_event * -rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) +rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) { - struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_event *event; struct buffer_page *reader; int nr_loops = 0; - cpu_buffer = buffer->buffers[cpu]; - again: /* * We repeat when a timestamp is encountered. It is possible * to get multiple timestamps from an interrupt entering just - * as one timestamp is about to be written. The max times - * that this can happen is the number of nested interrupts we - * can have. Nesting 10 deep of interrupts is clearly - * an anomaly. + * as one timestamp is about to be written, or from discarded + * commits. The most that we can have is the number on a single page. */ - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) + if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) return NULL; reader = rb_get_reader_page(cpu_buffer); @@ -1988,11 +3017,19 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) event = rb_reader_event(cpu_buffer); - switch (event->type) { + switch (event->type_len) { case RINGBUF_TYPE_PADDING: - RB_WARN_ON(cpu_buffer, 1); - rb_advance_reader(cpu_buffer); - return NULL; + if (rb_null_event(event)) + RB_WARN_ON(cpu_buffer, 1); + /* + * Because the writer could be discarding every + * event it creates (which would probably be bad) + * if we were to go back to "again" then we may never + * catch up, and will trigger the warn on, or lock + * the box. Return the padding, and we will release + * the current locks, and try again. + */ + return event; case RINGBUF_TYPE_TIME_EXTEND: /* Internal data, OK to advance */ @@ -2007,7 +3044,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) case RINGBUF_TYPE_DATA: if (ts) { *ts = cpu_buffer->read_stamp + event->time_delta; - ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); + ring_buffer_normalize_time_stamp(cpu_buffer->buffer, + cpu_buffer->cpu, ts); } return event; @@ -2027,33 +3065,51 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) struct ring_buffer_event *event; int nr_loops = 0; - if (ring_buffer_iter_empty(iter)) - return NULL; - cpu_buffer = iter->cpu_buffer; buffer = cpu_buffer->buffer; + /* + * Check if someone performed a consuming read to + * the buffer. A consuming read invalidates the iterator + * and we need to reset the iterator in this case. + */ + if (unlikely(iter->cache_read != cpu_buffer->read || + iter->cache_reader_page != cpu_buffer->reader_page)) + rb_iter_reset(iter); + again: + if (ring_buffer_iter_empty(iter)) + return NULL; + /* - * We repeat when a timestamp is encountered. It is possible - * to get multiple timestamps from an interrupt entering just - * as one timestamp is about to be written. The max times - * that this can happen is the number of nested interrupts we - * can have. Nesting 10 deep of interrupts is clearly - * an anomaly. + * We repeat when a timestamp is encountered. + * We can get multiple timestamps by nested interrupts or also + * if filtering is on (discarding commits). Since discarding + * commits can be frequent we can get a lot of timestamps. + * But we limit them by not adding timestamps if they begin + * at the start of a page. */ - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) + if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) return NULL; if (rb_per_cpu_empty(cpu_buffer)) return NULL; + if (iter->head >= local_read(&iter->head_page->page->commit)) { + rb_inc_iter(iter); + goto again; + } + event = rb_iter_head_event(iter); - switch (event->type) { + switch (event->type_len) { case RINGBUF_TYPE_PADDING: - rb_inc_iter(iter); - goto again; + if (rb_null_event(event)) { + rb_inc_iter(iter); + goto again; + } + rb_advance_iter(iter); + return event; case RINGBUF_TYPE_TIME_EXTEND: /* Internal data, OK to advance */ @@ -2068,7 +3124,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) case RINGBUF_TYPE_DATA: if (ts) { *ts = iter->read_stamp + event->time_delta; - ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); + ring_buffer_normalize_time_stamp(buffer, + cpu_buffer->cpu, ts); } return event; @@ -2080,6 +3137,21 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) } EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); +static inline int rb_ok_to_lock(void) +{ + /* + * If an NMI die dumps out the content of the ring buffer + * do not grab locks. We also permanently disable the ring + * buffer too. A one time deal is all you get from reading + * the ring buffer from an NMI. + */ + if (likely(!in_nmi())) + return 1; + + tracing_off_permanent(); + return 0; +} + /** * ring_buffer_peek - peek at the next event to be read * @buffer: The ring buffer to read @@ -2093,20 +3165,27 @@ struct ring_buffer_event * ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) { struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; - struct ring_buffer_event *event = NULL; + struct ring_buffer_event *event; unsigned long flags; - - get_online_cpus(); + int dolock; if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return NULL; - spin_lock_irqsave(&cpu_buffer->reader_lock, flags); - event = rb_buffer_peek(buffer, cpu, ts); - spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + dolock = rb_ok_to_lock(); + again: + local_irq_save(flags); + if (dolock) + spin_lock(&cpu_buffer->reader_lock); + event = rb_buffer_peek(cpu_buffer, ts); + if (event && event->type_len == RINGBUF_TYPE_PADDING) + rb_advance_reader(cpu_buffer); + if (dolock) + spin_unlock(&cpu_buffer->reader_lock); + local_irq_restore(flags); - out: - put_online_cpus(); + if (event && event->type_len == RINGBUF_TYPE_PADDING) + goto again; return event; } @@ -2126,10 +3205,14 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) struct ring_buffer_event *event; unsigned long flags; + again: spin_lock_irqsave(&cpu_buffer->reader_lock, flags); event = rb_iter_peek(iter, ts); spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + if (event && event->type_len == RINGBUF_TYPE_PADDING) + goto again; + return event; } @@ -2147,7 +3230,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_event *event = NULL; unsigned long flags; + int dolock; + dolock = rb_ok_to_lock(); + + again: /* might be called in atomic */ preempt_disable(); @@ -2155,20 +3242,24 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) goto out; cpu_buffer = buffer->buffers[cpu]; - spin_lock_irqsave(&cpu_buffer->reader_lock, flags); - - event = rb_buffer_peek(buffer, cpu, ts); - if (!event) - goto out_unlock; + local_irq_save(flags); + if (dolock) + spin_lock(&cpu_buffer->reader_lock); - rb_advance_reader(cpu_buffer); + event = rb_buffer_peek(cpu_buffer, ts); + if (event) + rb_advance_reader(cpu_buffer); - out_unlock: - spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + if (dolock) + spin_unlock(&cpu_buffer->reader_lock); + local_irq_restore(flags); out: preempt_enable(); + if (event && event->type_len == RINGBUF_TYPE_PADDING) + goto again; + return event; } EXPORT_SYMBOL_GPL(ring_buffer_consume); @@ -2189,17 +3280,15 @@ struct ring_buffer_iter * ring_buffer_read_start(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - struct ring_buffer_iter *iter = NULL; + struct ring_buffer_iter *iter; unsigned long flags; - get_online_cpus(); - if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return NULL; iter = kmalloc(sizeof(*iter), GFP_KERNEL); if (!iter) - goto out; + return NULL; cpu_buffer = buffer->buffers[cpu]; @@ -2209,14 +3298,11 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu) synchronize_sched(); spin_lock_irqsave(&cpu_buffer->reader_lock, flags); - __raw_spin_lock(&cpu_buffer->lock); + arch_spin_lock(&cpu_buffer->lock); rb_iter_reset(iter); - __raw_spin_unlock(&cpu_buffer->lock); + arch_spin_unlock(&cpu_buffer->lock); spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); - out: - put_online_cpus(); - return iter; } EXPORT_SYMBOL_GPL(ring_buffer_read_start); @@ -2253,10 +3339,14 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) unsigned long flags; spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + again: event = rb_iter_peek(iter, ts); if (!event) goto out; + if (event->type_len == RINGBUF_TYPE_PADDING) + goto again; + rb_advance_iter(iter); out: spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); @@ -2278,9 +3368,12 @@ EXPORT_SYMBOL_GPL(ring_buffer_size); static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) { + rb_head_page_deactivate(cpu_buffer); + cpu_buffer->head_page - = list_entry(cpu_buffer->pages.next, struct buffer_page, list); + = list_entry(cpu_buffer->pages, struct buffer_page, list); local_set(&cpu_buffer->head_page->write, 0); + local_set(&cpu_buffer->head_page->entries, 0); local_set(&cpu_buffer->head_page->page->commit, 0); cpu_buffer->head_page->read = 0; @@ -2290,14 +3383,21 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) INIT_LIST_HEAD(&cpu_buffer->reader_page->list); local_set(&cpu_buffer->reader_page->write, 0); + local_set(&cpu_buffer->reader_page->entries, 0); local_set(&cpu_buffer->reader_page->page->commit, 0); cpu_buffer->reader_page->read = 0; - cpu_buffer->overrun = 0; - cpu_buffer->entries = 0; + local_set(&cpu_buffer->commit_overrun, 0); + local_set(&cpu_buffer->overrun, 0); + local_set(&cpu_buffer->entries, 0); + local_set(&cpu_buffer->committing, 0); + local_set(&cpu_buffer->commits, 0); + cpu_buffer->read = 0; cpu_buffer->write_stamp = 0; cpu_buffer->read_stamp = 0; + + rb_head_page_activate(cpu_buffer); } /** @@ -2309,25 +3409,27 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; unsigned long flags; - int resched; - - /* Can't use get_online_cpus because this can be in atomic */ - resched = ftrace_preempt_disable(); if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return; + + atomic_inc(&cpu_buffer->record_disabled); spin_lock_irqsave(&cpu_buffer->reader_lock, flags); - __raw_spin_lock(&cpu_buffer->lock); + if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) + goto out; + + arch_spin_lock(&cpu_buffer->lock); rb_reset_cpu(cpu_buffer); - __raw_spin_unlock(&cpu_buffer->lock); + arch_spin_unlock(&cpu_buffer->lock); - spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); out: - ftrace_preempt_enable(resched); + spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + + atomic_dec(&cpu_buffer->record_disabled); } EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); @@ -2337,16 +3439,10 @@ EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); */ void ring_buffer_reset(struct ring_buffer *buffer) { - int resched; int cpu; - /* Can't use get_online_cpus because this can be in atomic */ - resched = ftrace_preempt_disable(); - for_each_buffer_cpu(buffer, cpu) ring_buffer_reset_cpu(buffer, cpu); - - ftrace_preempt_enable(resched); } EXPORT_SYMBOL_GPL(ring_buffer_reset); @@ -2357,19 +3453,28 @@ EXPORT_SYMBOL_GPL(ring_buffer_reset); int ring_buffer_empty(struct ring_buffer *buffer) { struct ring_buffer_per_cpu *cpu_buffer; + unsigned long flags; + int dolock; int cpu; + int ret; - get_online_cpus(); + dolock = rb_ok_to_lock(); /* yes this is racy, but if you don't like the race, lock the buffer */ for_each_buffer_cpu(buffer, cpu) { cpu_buffer = buffer->buffers[cpu]; - if (!rb_per_cpu_empty(cpu_buffer)) + local_irq_save(flags); + if (dolock) + spin_lock(&cpu_buffer->reader_lock); + ret = rb_per_cpu_empty(cpu_buffer); + if (dolock) + spin_unlock(&cpu_buffer->reader_lock); + local_irq_restore(flags); + + if (!ret) return 0; } - put_online_cpus(); - return 1; } EXPORT_SYMBOL_GPL(ring_buffer_empty); @@ -2382,23 +3487,29 @@ EXPORT_SYMBOL_GPL(ring_buffer_empty); int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; - int ret = 1; - - get_online_cpus(); + unsigned long flags; + int dolock; + int ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) - goto out; + return 1; + + dolock = rb_ok_to_lock(); cpu_buffer = buffer->buffers[cpu]; + local_irq_save(flags); + if (dolock) + spin_lock(&cpu_buffer->reader_lock); ret = rb_per_cpu_empty(cpu_buffer); - - out: - put_online_cpus(); + if (dolock) + spin_unlock(&cpu_buffer->reader_lock); + local_irq_restore(flags); return ret; } EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); +#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP /** * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers * @buffer_a: One buffer to swap with @@ -2416,8 +3527,6 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, struct ring_buffer_per_cpu *cpu_buffer_b; int ret = -EINVAL; - get_online_cpus(); - if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || !cpumask_test_cpu(cpu, buffer_b->cpumask)) goto out; @@ -2455,44 +3564,28 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, atomic_inc(&cpu_buffer_a->record_disabled); atomic_inc(&cpu_buffer_b->record_disabled); + ret = -EBUSY; + if (local_read(&cpu_buffer_a->committing)) + goto out_dec; + if (local_read(&cpu_buffer_b->committing)) + goto out_dec; + buffer_a->buffers[cpu] = cpu_buffer_b; buffer_b->buffers[cpu] = cpu_buffer_a; cpu_buffer_b->buffer = buffer_a; cpu_buffer_a->buffer = buffer_b; + ret = 0; + +out_dec: atomic_dec(&cpu_buffer_a->record_disabled); atomic_dec(&cpu_buffer_b->record_disabled); - - ret = 0; out: - put_online_cpus(); - return ret; } EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); - -static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer, - struct buffer_data_page *bpage, - unsigned int offset) -{ - struct ring_buffer_event *event; - unsigned long head; - - __raw_spin_lock(&cpu_buffer->lock); - for (head = offset; head < local_read(&bpage->commit); - head += rb_event_length(event)) { - - event = __rb_data_page_index(bpage, head); - if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) - return; - /* Only count data entries */ - if (event->type != RINGBUF_TYPE_DATA) - continue; - cpu_buffer->entries--; - } - __raw_spin_unlock(&cpu_buffer->lock); -} +#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ /** * ring_buffer_alloc_read_page - allocate a page to read from buffer @@ -2524,6 +3617,7 @@ void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) return bpage; } +EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); /** * ring_buffer_free_read_page - free an allocated read page @@ -2536,6 +3630,7 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) { free_page((unsigned long)data); } +EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); /** * ring_buffer_read_page - extract a page from the ring buffer @@ -2583,8 +3678,6 @@ int ring_buffer_read_page(struct ring_buffer *buffer, u64 save_timestamp; int ret = -1; - get_online_cpus(); - if (!cpumask_test_cpu(cpu, buffer->cpumask)) goto out; @@ -2664,16 +3757,17 @@ int ring_buffer_read_page(struct ring_buffer *buffer, /* we copied everything to the beginning */ read = 0; } else { + /* update the entry counter */ + cpu_buffer->read += rb_page_entries(reader); + /* swap the pages */ rb_init_page(bpage); bpage = reader->page; reader->page = *data_page; local_set(&reader->write, 0); + local_set(&reader->entries, 0); reader->read = 0; *data_page = bpage; - - /* update the entry counter */ - rb_remove_entries(cpu_buffer, bpage, read); } ret = read; @@ -2681,11 +3775,11 @@ int ring_buffer_read_page(struct ring_buffer *buffer, spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); out: - put_online_cpus(); - return ret; } +EXPORT_SYMBOL_GPL(ring_buffer_read_page); +#ifdef CONFIG_TRACING static ssize_t rb_simple_read(struct file *filp, char __user *ubuf, size_t cnt, loff_t *ppos) @@ -2743,23 +3837,21 @@ static const struct file_operations rb_simple_fops = { static __init int rb_init_debugfs(void) { struct dentry *d_tracer; - struct dentry *entry; d_tracer = tracing_init_dentry(); - entry = debugfs_create_file("tracing_on", 0644, d_tracer, - &ring_buffer_flags, &rb_simple_fops); - if (!entry) - pr_warning("Could not create debugfs 'tracing_on' entry\n"); + trace_create_file("tracing_on", 0644, d_tracer, + &ring_buffer_flags, &rb_simple_fops); return 0; } fs_initcall(rb_init_debugfs); +#endif -#ifdef CONFIG_HOTPLUG -static int __cpuinit rb_cpu_notify(struct notifier_block *self, - unsigned long action, void *hcpu) +#ifdef CONFIG_HOTPLUG_CPU +static int rb_cpu_notify(struct notifier_block *self, + unsigned long action, void *hcpu) { struct ring_buffer *buffer = container_of(self, struct ring_buffer, cpu_notify); @@ -2768,7 +3860,7 @@ static int __cpuinit rb_cpu_notify(struct notifier_block *self, switch (action) { case CPU_UP_PREPARE: case CPU_UP_PREPARE_FROZEN: - if (cpu_isset(cpu, *buffer->cpumask)) + if (cpumask_test_cpu(cpu, buffer->cpumask)) return NOTIFY_OK; buffer->buffers[cpu] = @@ -2779,7 +3871,7 @@ static int __cpuinit rb_cpu_notify(struct notifier_block *self, return NOTIFY_OK; } smp_wmb(); - cpu_set(cpu, *buffer->cpumask); + cpumask_set_cpu(cpu, buffer->cpumask); break; case CPU_DOWN_PREPARE: case CPU_DOWN_PREPARE_FROZEN: