4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h> /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
23 u64 ring_buffer_time_stamp(int cpu)
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
31 /* Just stupid testing the normalize function and deltas */
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT 2
37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA 28
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
62 case RINGBUF_TYPE_DATA:
64 length = event->len << RB_ALIGNMENT_SHIFT;
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
81 return rb_event_length(event);
84 /* inline for ring buffer fast paths */
86 rb_event_data(struct ring_buffer_event *event)
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
102 return rb_event_data(event);
105 #define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
109 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST (~TS_MASK)
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
131 * We need to fit the time_stamp delta into 27 bits.
133 static inline int test_time_stamp(u64 delta)
135 if (delta & TS_DELTA_TEST)
140 #define BUF_PAGE_SIZE PAGE_SIZE
143 * head_page == tail_page && head == tail then buffer is empty.
145 struct ring_buffer_per_cpu {
147 struct ring_buffer *buffer;
149 struct lock_class_key lock_key;
150 struct list_head pages;
151 unsigned long head; /* read from head */
152 unsigned long tail; /* write to tail */
153 struct buffer_page *head_page;
154 struct buffer_page *tail_page;
155 unsigned long overrun;
156 unsigned long entries;
159 atomic_t record_disabled;
168 atomic_t record_disabled;
172 struct ring_buffer_per_cpu **buffers;
175 struct ring_buffer_iter {
176 struct ring_buffer_per_cpu *cpu_buffer;
178 struct buffer_page *head_page;
182 #define RB_WARN_ON(buffer, cond) \
183 if (unlikely(cond)) { \
184 atomic_inc(&buffer->record_disabled); \
190 * check_pages - integrity check of buffer pages
191 * @cpu_buffer: CPU buffer with pages to test
193 * As a safty measure we check to make sure the data pages have not
196 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
198 struct list_head *head = &cpu_buffer->pages;
199 struct buffer_page *page, *tmp;
201 RB_WARN_ON(cpu_buffer, head->next->prev != head);
202 RB_WARN_ON(cpu_buffer, head->prev->next != head);
204 list_for_each_entry_safe(page, tmp, head, list) {
205 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
212 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
214 return cpu_buffer->head_page->size;
217 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
220 struct list_head *head = &cpu_buffer->pages;
221 struct buffer_page *page, *tmp;
226 for (i = 0; i < nr_pages; i++) {
227 addr = __get_free_page(GFP_KERNEL);
230 page = (struct buffer_page *)virt_to_page(addr);
231 list_add(&page->list, &pages);
234 list_splice(&pages, head);
236 rb_check_pages(cpu_buffer);
241 list_for_each_entry_safe(page, tmp, &pages, list) {
242 list_del_init(&page->list);
243 __free_page(&page->page);
248 static struct ring_buffer_per_cpu *
249 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
251 struct ring_buffer_per_cpu *cpu_buffer;
254 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255 GFP_KERNEL, cpu_to_node(cpu));
259 cpu_buffer->cpu = cpu;
260 cpu_buffer->buffer = buffer;
261 spin_lock_init(&cpu_buffer->lock);
262 INIT_LIST_HEAD(&cpu_buffer->pages);
264 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
266 goto fail_free_buffer;
268 cpu_buffer->head_page
269 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270 cpu_buffer->tail_page
271 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
280 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
282 struct list_head *head = &cpu_buffer->pages;
283 struct buffer_page *page, *tmp;
285 list_for_each_entry_safe(page, tmp, head, list) {
286 list_del_init(&page->list);
287 __free_page(&page->page);
293 * Causes compile errors if the struct buffer_page gets bigger
294 * than the struct page.
296 extern int ring_buffer_page_too_big(void);
299 * ring_buffer_alloc - allocate a new ring_buffer
300 * @size: the size in bytes that is needed.
301 * @flags: attributes to set for the ring buffer.
303 * Currently the only flag that is available is the RB_FL_OVERWRITE
304 * flag. This flag means that the buffer will overwrite old data
305 * when the buffer wraps. If this flag is not set, the buffer will
306 * drop data when the tail hits the head.
308 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
310 struct ring_buffer *buffer;
314 /* Paranoid! Optimizes out when all is well */
315 if (sizeof(struct buffer_page) > sizeof(struct page))
316 ring_buffer_page_too_big();
319 /* keep it in its own cache line */
320 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
325 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
326 buffer->flags = flags;
328 /* need at least two pages */
329 if (buffer->pages == 1)
332 buffer->cpumask = cpu_possible_map;
333 buffer->cpus = nr_cpu_ids;
335 bsize = sizeof(void *) * nr_cpu_ids;
336 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
338 if (!buffer->buffers)
339 goto fail_free_buffer;
341 for_each_buffer_cpu(buffer, cpu) {
342 buffer->buffers[cpu] =
343 rb_allocate_cpu_buffer(buffer, cpu);
344 if (!buffer->buffers[cpu])
345 goto fail_free_buffers;
348 mutex_init(&buffer->mutex);
353 for_each_buffer_cpu(buffer, cpu) {
354 if (buffer->buffers[cpu])
355 rb_free_cpu_buffer(buffer->buffers[cpu]);
357 kfree(buffer->buffers);
365 * ring_buffer_free - free a ring buffer.
366 * @buffer: the buffer to free.
369 ring_buffer_free(struct ring_buffer *buffer)
373 for_each_buffer_cpu(buffer, cpu)
374 rb_free_cpu_buffer(buffer->buffers[cpu]);
379 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
382 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
384 struct buffer_page *page;
388 atomic_inc(&cpu_buffer->record_disabled);
391 for (i = 0; i < nr_pages; i++) {
392 BUG_ON(list_empty(&cpu_buffer->pages));
393 p = cpu_buffer->pages.next;
394 page = list_entry(p, struct buffer_page, list);
395 list_del_init(&page->list);
396 __free_page(&page->page);
398 BUG_ON(list_empty(&cpu_buffer->pages));
400 rb_reset_cpu(cpu_buffer);
402 rb_check_pages(cpu_buffer);
404 atomic_dec(&cpu_buffer->record_disabled);
409 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
410 struct list_head *pages, unsigned nr_pages)
412 struct buffer_page *page;
416 atomic_inc(&cpu_buffer->record_disabled);
419 for (i = 0; i < nr_pages; i++) {
420 BUG_ON(list_empty(pages));
422 page = list_entry(p, struct buffer_page, list);
423 list_del_init(&page->list);
424 list_add_tail(&page->list, &cpu_buffer->pages);
426 rb_reset_cpu(cpu_buffer);
428 rb_check_pages(cpu_buffer);
430 atomic_dec(&cpu_buffer->record_disabled);
434 * ring_buffer_resize - resize the ring buffer
435 * @buffer: the buffer to resize.
436 * @size: the new size.
438 * The tracer is responsible for making sure that the buffer is
439 * not being used while changing the size.
440 * Note: We may be able to change the above requirement by using
441 * RCU synchronizations.
443 * Minimum size is 2 * BUF_PAGE_SIZE.
445 * Returns -1 on failure.
447 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
449 struct ring_buffer_per_cpu *cpu_buffer;
450 unsigned nr_pages, rm_pages, new_pages;
451 struct buffer_page *page, *tmp;
452 unsigned long buffer_size;
457 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
458 size *= BUF_PAGE_SIZE;
459 buffer_size = buffer->pages * BUF_PAGE_SIZE;
461 /* we need a minimum of two pages */
462 if (size < BUF_PAGE_SIZE * 2)
463 size = BUF_PAGE_SIZE * 2;
465 if (size == buffer_size)
468 mutex_lock(&buffer->mutex);
470 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
472 if (size < buffer_size) {
474 /* easy case, just free pages */
475 BUG_ON(nr_pages >= buffer->pages);
477 rm_pages = buffer->pages - nr_pages;
479 for_each_buffer_cpu(buffer, cpu) {
480 cpu_buffer = buffer->buffers[cpu];
481 rb_remove_pages(cpu_buffer, rm_pages);
487 * This is a bit more difficult. We only want to add pages
488 * when we can allocate enough for all CPUs. We do this
489 * by allocating all the pages and storing them on a local
490 * link list. If we succeed in our allocation, then we
491 * add these pages to the cpu_buffers. Otherwise we just free
492 * them all and return -ENOMEM;
494 BUG_ON(nr_pages <= buffer->pages);
495 new_pages = nr_pages - buffer->pages;
497 for_each_buffer_cpu(buffer, cpu) {
498 for (i = 0; i < new_pages; i++) {
499 addr = __get_free_page(GFP_KERNEL);
502 page = (struct buffer_page *)virt_to_page(addr);
503 list_add(&page->list, &pages);
507 for_each_buffer_cpu(buffer, cpu) {
508 cpu_buffer = buffer->buffers[cpu];
509 rb_insert_pages(cpu_buffer, &pages, new_pages);
512 BUG_ON(!list_empty(&pages));
515 buffer->pages = nr_pages;
516 mutex_unlock(&buffer->mutex);
521 list_for_each_entry_safe(page, tmp, &pages, list) {
522 list_del_init(&page->list);
523 __free_page(&page->page);
528 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
530 return cpu_buffer->head_page == cpu_buffer->tail_page &&
531 cpu_buffer->head == cpu_buffer->tail;
534 static inline int rb_null_event(struct ring_buffer_event *event)
536 return event->type == RINGBUF_TYPE_PADDING;
539 static inline void *rb_page_index(struct buffer_page *page, unsigned index)
541 void *addr = page_address(&page->page);
546 static inline struct ring_buffer_event *
547 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
549 return rb_page_index(cpu_buffer->head_page,
553 static inline struct ring_buffer_event *
554 rb_iter_head_event(struct ring_buffer_iter *iter)
556 return rb_page_index(iter->head_page,
561 * When the tail hits the head and the buffer is in overwrite mode,
562 * the head jumps to the next page and all content on the previous
563 * page is discarded. But before doing so, we update the overrun
564 * variable of the buffer.
566 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
568 struct ring_buffer_event *event;
571 for (head = 0; head < rb_head_size(cpu_buffer);
572 head += rb_event_length(event)) {
574 event = rb_page_index(cpu_buffer->head_page, head);
575 BUG_ON(rb_null_event(event));
576 /* Only count data entries */
577 if (event->type != RINGBUF_TYPE_DATA)
579 cpu_buffer->overrun++;
580 cpu_buffer->entries--;
584 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
585 struct buffer_page **page)
587 struct list_head *p = (*page)->list.next;
589 if (p == &cpu_buffer->pages)
592 *page = list_entry(p, struct buffer_page, list);
596 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
598 cpu_buffer->tail_page->time_stamp = *ts;
599 cpu_buffer->write_stamp = *ts;
602 static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
604 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
605 cpu_buffer->head = 0;
609 rb_reset_iter_read_page(struct ring_buffer_iter *iter)
611 iter->read_stamp = iter->head_page->time_stamp;
616 * ring_buffer_update_event - update event type and data
617 * @event: the even to update
618 * @type: the type of event
619 * @length: the size of the event field in the ring buffer
621 * Update the type and data fields of the event. The length
622 * is the actual size that is written to the ring buffer,
623 * and with this, we can determine what to place into the
627 rb_update_event(struct ring_buffer_event *event,
628 unsigned type, unsigned length)
634 case RINGBUF_TYPE_PADDING:
637 case RINGBUF_TYPE_TIME_EXTEND:
639 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
640 >> RB_ALIGNMENT_SHIFT;
643 case RINGBUF_TYPE_TIME_STAMP:
645 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
646 >> RB_ALIGNMENT_SHIFT;
649 case RINGBUF_TYPE_DATA:
650 length -= RB_EVNT_HDR_SIZE;
651 if (length > RB_MAX_SMALL_DATA) {
653 event->array[0] = length;
656 (length + (RB_ALIGNMENT-1))
657 >> RB_ALIGNMENT_SHIFT;
664 static inline unsigned rb_calculate_event_length(unsigned length)
666 struct ring_buffer_event event; /* Used only for sizeof array */
668 /* zero length can cause confusions */
672 if (length > RB_MAX_SMALL_DATA)
673 length += sizeof(event.array[0]);
675 length += RB_EVNT_HDR_SIZE;
676 length = ALIGN(length, RB_ALIGNMENT);
681 static struct ring_buffer_event *
682 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
683 unsigned type, unsigned long length, u64 *ts)
685 struct buffer_page *head_page, *tail_page;
687 struct ring_buffer *buffer = cpu_buffer->buffer;
688 struct ring_buffer_event *event;
690 tail_page = cpu_buffer->tail_page;
691 head_page = cpu_buffer->head_page;
692 tail = cpu_buffer->tail;
694 if (tail + length > BUF_PAGE_SIZE) {
695 struct buffer_page *next_page = tail_page;
697 rb_inc_page(cpu_buffer, &next_page);
699 if (next_page == head_page) {
700 if (!(buffer->flags & RB_FL_OVERWRITE))
703 /* count overflows */
704 rb_update_overflow(cpu_buffer);
706 rb_inc_page(cpu_buffer, &head_page);
707 cpu_buffer->head_page = head_page;
708 rb_reset_read_page(cpu_buffer);
711 if (tail != BUF_PAGE_SIZE) {
712 event = rb_page_index(tail_page, tail);
714 event->type = RINGBUF_TYPE_PADDING;
717 tail_page->size = tail;
718 tail_page = next_page;
721 cpu_buffer->tail_page = tail_page;
722 cpu_buffer->tail = tail;
723 rb_add_stamp(cpu_buffer, ts);
726 BUG_ON(tail + length > BUF_PAGE_SIZE);
728 event = rb_page_index(tail_page, tail);
729 rb_update_event(event, type, length);
735 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
738 struct ring_buffer_event *event;
741 if (unlikely(*delta > (1ULL << 59) && !once++)) {
742 printk(KERN_WARNING "Delta way too big! %llu"
743 " ts=%llu write stamp = %llu\n",
744 *delta, *ts, cpu_buffer->write_stamp);
749 * The delta is too big, we to add a
752 event = __rb_reserve_next(cpu_buffer,
753 RINGBUF_TYPE_TIME_EXTEND,
759 /* check to see if we went to the next page */
760 if (cpu_buffer->tail) {
761 /* Still on same page, update timestamp */
762 event->time_delta = *delta & TS_MASK;
763 event->array[0] = *delta >> TS_SHIFT;
764 /* commit the time event */
766 rb_event_length(event);
767 cpu_buffer->write_stamp = *ts;
774 static struct ring_buffer_event *
775 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
776 unsigned type, unsigned long length)
778 struct ring_buffer_event *event;
781 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
783 if (cpu_buffer->tail) {
784 delta = ts - cpu_buffer->write_stamp;
786 if (test_time_stamp(delta)) {
789 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
794 rb_add_stamp(cpu_buffer, &ts);
798 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
802 /* If the reserve went to the next page, our delta is zero */
803 if (!cpu_buffer->tail)
806 event->time_delta = delta;
812 * ring_buffer_lock_reserve - reserve a part of the buffer
813 * @buffer: the ring buffer to reserve from
814 * @length: the length of the data to reserve (excluding event header)
815 * @flags: a pointer to save the interrupt flags
817 * Returns a reseverd event on the ring buffer to copy directly to.
818 * The user of this interface will need to get the body to write into
819 * and can use the ring_buffer_event_data() interface.
821 * The length is the length of the data needed, not the event length
822 * which also includes the event header.
824 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
825 * If NULL is returned, then nothing has been allocated or locked.
827 struct ring_buffer_event *
828 ring_buffer_lock_reserve(struct ring_buffer *buffer,
829 unsigned long length,
830 unsigned long *flags)
832 struct ring_buffer_per_cpu *cpu_buffer;
833 struct ring_buffer_event *event;
836 if (atomic_read(&buffer->record_disabled))
839 raw_local_irq_save(*flags);
840 cpu = raw_smp_processor_id();
842 if (!cpu_isset(cpu, buffer->cpumask))
845 cpu_buffer = buffer->buffers[cpu];
846 spin_lock(&cpu_buffer->lock);
848 if (atomic_read(&cpu_buffer->record_disabled))
851 length = rb_calculate_event_length(length);
852 if (length > BUF_PAGE_SIZE)
855 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
862 spin_unlock(&cpu_buffer->lock);
864 local_irq_restore(*flags);
868 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
869 struct ring_buffer_event *event)
871 cpu_buffer->tail += rb_event_length(event);
872 cpu_buffer->tail_page->size = cpu_buffer->tail;
873 cpu_buffer->write_stamp += event->time_delta;
874 cpu_buffer->entries++;
878 * ring_buffer_unlock_commit - commit a reserved
879 * @buffer: The buffer to commit to
880 * @event: The event pointer to commit.
881 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
883 * This commits the data to the ring buffer, and releases any locks held.
885 * Must be paired with ring_buffer_lock_reserve.
887 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
888 struct ring_buffer_event *event,
891 struct ring_buffer_per_cpu *cpu_buffer;
892 int cpu = raw_smp_processor_id();
894 cpu_buffer = buffer->buffers[cpu];
896 assert_spin_locked(&cpu_buffer->lock);
898 rb_commit(cpu_buffer, event);
900 spin_unlock(&cpu_buffer->lock);
901 raw_local_irq_restore(flags);
907 * ring_buffer_write - write data to the buffer without reserving
908 * @buffer: The ring buffer to write to.
909 * @length: The length of the data being written (excluding the event header)
910 * @data: The data to write to the buffer.
912 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
913 * one function. If you already have the data to write to the buffer, it
914 * may be easier to simply call this function.
916 * Note, like ring_buffer_lock_reserve, the length is the length of the data
917 * and not the length of the event which would hold the header.
919 int ring_buffer_write(struct ring_buffer *buffer,
920 unsigned long length,
923 struct ring_buffer_per_cpu *cpu_buffer;
924 struct ring_buffer_event *event;
925 unsigned long event_length, flags;
930 if (atomic_read(&buffer->record_disabled))
933 local_irq_save(flags);
934 cpu = raw_smp_processor_id();
936 if (!cpu_isset(cpu, buffer->cpumask))
939 cpu_buffer = buffer->buffers[cpu];
940 spin_lock(&cpu_buffer->lock);
942 if (atomic_read(&cpu_buffer->record_disabled))
945 event_length = rb_calculate_event_length(length);
946 event = rb_reserve_next_event(cpu_buffer,
947 RINGBUF_TYPE_DATA, event_length);
951 body = rb_event_data(event);
953 memcpy(body, data, length);
955 rb_commit(cpu_buffer, event);
959 spin_unlock(&cpu_buffer->lock);
961 local_irq_restore(flags);
967 * ring_buffer_lock - lock the ring buffer
968 * @buffer: The ring buffer to lock
969 * @flags: The place to store the interrupt flags
971 * This locks all the per CPU buffers.
973 * Must be unlocked by ring_buffer_unlock.
975 void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
977 struct ring_buffer_per_cpu *cpu_buffer;
980 local_irq_save(*flags);
982 for_each_buffer_cpu(buffer, cpu) {
983 cpu_buffer = buffer->buffers[cpu];
984 spin_lock(&cpu_buffer->lock);
989 * ring_buffer_unlock - unlock a locked buffer
990 * @buffer: The locked buffer to unlock
991 * @flags: The interrupt flags received by ring_buffer_lock
993 void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
995 struct ring_buffer_per_cpu *cpu_buffer;
998 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
999 if (!cpu_isset(cpu, buffer->cpumask))
1001 cpu_buffer = buffer->buffers[cpu];
1002 spin_unlock(&cpu_buffer->lock);
1005 local_irq_restore(flags);
1009 * ring_buffer_record_disable - stop all writes into the buffer
1010 * @buffer: The ring buffer to stop writes to.
1012 * This prevents all writes to the buffer. Any attempt to write
1013 * to the buffer after this will fail and return NULL.
1015 * The caller should call synchronize_sched() after this.
1017 void ring_buffer_record_disable(struct ring_buffer *buffer)
1019 atomic_inc(&buffer->record_disabled);
1023 * ring_buffer_record_enable - enable writes to the buffer
1024 * @buffer: The ring buffer to enable writes
1026 * Note, multiple disables will need the same number of enables
1027 * to truely enable the writing (much like preempt_disable).
1029 void ring_buffer_record_enable(struct ring_buffer *buffer)
1031 atomic_dec(&buffer->record_disabled);
1035 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1036 * @buffer: The ring buffer to stop writes to.
1037 * @cpu: The CPU buffer to stop
1039 * This prevents all writes to the buffer. Any attempt to write
1040 * to the buffer after this will fail and return NULL.
1042 * The caller should call synchronize_sched() after this.
1044 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1046 struct ring_buffer_per_cpu *cpu_buffer;
1048 if (!cpu_isset(cpu, buffer->cpumask))
1051 cpu_buffer = buffer->buffers[cpu];
1052 atomic_inc(&cpu_buffer->record_disabled);
1056 * ring_buffer_record_enable_cpu - enable writes to the buffer
1057 * @buffer: The ring buffer to enable writes
1058 * @cpu: The CPU to enable.
1060 * Note, multiple disables will need the same number of enables
1061 * to truely enable the writing (much like preempt_disable).
1063 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1065 struct ring_buffer_per_cpu *cpu_buffer;
1067 if (!cpu_isset(cpu, buffer->cpumask))
1070 cpu_buffer = buffer->buffers[cpu];
1071 atomic_dec(&cpu_buffer->record_disabled);
1075 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1076 * @buffer: The ring buffer
1077 * @cpu: The per CPU buffer to get the entries from.
1079 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1081 struct ring_buffer_per_cpu *cpu_buffer;
1083 if (!cpu_isset(cpu, buffer->cpumask))
1086 cpu_buffer = buffer->buffers[cpu];
1087 return cpu_buffer->entries;
1091 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1092 * @buffer: The ring buffer
1093 * @cpu: The per CPU buffer to get the number of overruns from
1095 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1097 struct ring_buffer_per_cpu *cpu_buffer;
1099 if (!cpu_isset(cpu, buffer->cpumask))
1102 cpu_buffer = buffer->buffers[cpu];
1103 return cpu_buffer->overrun;
1107 * ring_buffer_entries - get the number of entries in a buffer
1108 * @buffer: The ring buffer
1110 * Returns the total number of entries in the ring buffer
1113 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1115 struct ring_buffer_per_cpu *cpu_buffer;
1116 unsigned long entries = 0;
1119 /* if you care about this being correct, lock the buffer */
1120 for_each_buffer_cpu(buffer, cpu) {
1121 cpu_buffer = buffer->buffers[cpu];
1122 entries += cpu_buffer->entries;
1129 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1130 * @buffer: The ring buffer
1132 * Returns the total number of overruns in the ring buffer
1135 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1137 struct ring_buffer_per_cpu *cpu_buffer;
1138 unsigned long overruns = 0;
1141 /* if you care about this being correct, lock the buffer */
1142 for_each_buffer_cpu(buffer, cpu) {
1143 cpu_buffer = buffer->buffers[cpu];
1144 overruns += cpu_buffer->overrun;
1151 * ring_buffer_iter_reset - reset an iterator
1152 * @iter: The iterator to reset
1154 * Resets the iterator, so that it will start from the beginning
1157 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1159 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1161 iter->head_page = cpu_buffer->head_page;
1162 iter->head = cpu_buffer->head;
1163 rb_reset_iter_read_page(iter);
1167 * ring_buffer_iter_empty - check if an iterator has no more to read
1168 * @iter: The iterator to check
1170 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1172 struct ring_buffer_per_cpu *cpu_buffer;
1174 cpu_buffer = iter->cpu_buffer;
1176 return iter->head_page == cpu_buffer->tail_page &&
1177 iter->head == cpu_buffer->tail;
1181 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1182 struct ring_buffer_event *event)
1186 switch (event->type) {
1187 case RINGBUF_TYPE_PADDING:
1190 case RINGBUF_TYPE_TIME_EXTEND:
1191 delta = event->array[0];
1193 delta += event->time_delta;
1194 cpu_buffer->read_stamp += delta;
1197 case RINGBUF_TYPE_TIME_STAMP:
1198 /* FIXME: not implemented */
1201 case RINGBUF_TYPE_DATA:
1202 cpu_buffer->read_stamp += event->time_delta;
1212 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1213 struct ring_buffer_event *event)
1217 switch (event->type) {
1218 case RINGBUF_TYPE_PADDING:
1221 case RINGBUF_TYPE_TIME_EXTEND:
1222 delta = event->array[0];
1224 delta += event->time_delta;
1225 iter->read_stamp += delta;
1228 case RINGBUF_TYPE_TIME_STAMP:
1229 /* FIXME: not implemented */
1232 case RINGBUF_TYPE_DATA:
1233 iter->read_stamp += event->time_delta;
1242 static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1244 struct ring_buffer_event *event;
1248 * Check if we are at the end of the buffer.
1250 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1251 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1252 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1253 rb_reset_read_page(cpu_buffer);
1257 event = rb_head_event(cpu_buffer);
1259 if (event->type == RINGBUF_TYPE_DATA)
1260 cpu_buffer->entries--;
1262 length = rb_event_length(event);
1265 * This should not be called to advance the header if we are
1266 * at the tail of the buffer.
1268 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1269 (cpu_buffer->head + length > cpu_buffer->tail));
1271 rb_update_read_stamp(cpu_buffer, event);
1273 cpu_buffer->head += length;
1275 /* check for end of page */
1276 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1277 (cpu_buffer->head_page != cpu_buffer->tail_page))
1278 rb_advance_head(cpu_buffer);
1281 static void rb_advance_iter(struct ring_buffer_iter *iter)
1283 struct ring_buffer *buffer;
1284 struct ring_buffer_per_cpu *cpu_buffer;
1285 struct ring_buffer_event *event;
1288 cpu_buffer = iter->cpu_buffer;
1289 buffer = cpu_buffer->buffer;
1292 * Check if we are at the end of the buffer.
1294 if (iter->head >= iter->head_page->size) {
1295 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1296 rb_inc_page(cpu_buffer, &iter->head_page);
1297 rb_reset_iter_read_page(iter);
1301 event = rb_iter_head_event(iter);
1303 length = rb_event_length(event);
1306 * This should not be called to advance the header if we are
1307 * at the tail of the buffer.
1309 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1310 (iter->head + length > cpu_buffer->tail));
1312 rb_update_iter_read_stamp(iter, event);
1314 iter->head += length;
1316 /* check for end of page padding */
1317 if ((iter->head >= iter->head_page->size) &&
1318 (iter->head_page != cpu_buffer->tail_page))
1319 rb_advance_iter(iter);
1323 * ring_buffer_peek - peek at the next event to be read
1324 * @buffer: The ring buffer to read
1325 * @cpu: The cpu to peak at
1326 * @ts: The timestamp counter of this event.
1328 * This will return the event that will be read next, but does
1329 * not consume the data.
1331 struct ring_buffer_event *
1332 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1334 struct ring_buffer_per_cpu *cpu_buffer;
1335 struct ring_buffer_event *event;
1337 if (!cpu_isset(cpu, buffer->cpumask))
1340 cpu_buffer = buffer->buffers[cpu];
1343 if (rb_per_cpu_empty(cpu_buffer))
1346 event = rb_head_event(cpu_buffer);
1348 switch (event->type) {
1349 case RINGBUF_TYPE_PADDING:
1350 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1351 rb_reset_read_page(cpu_buffer);
1354 case RINGBUF_TYPE_TIME_EXTEND:
1355 /* Internal data, OK to advance */
1356 rb_advance_head(cpu_buffer);
1359 case RINGBUF_TYPE_TIME_STAMP:
1360 /* FIXME: not implemented */
1361 rb_advance_head(cpu_buffer);
1364 case RINGBUF_TYPE_DATA:
1366 *ts = cpu_buffer->read_stamp + event->time_delta;
1367 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1379 * ring_buffer_iter_peek - peek at the next event to be read
1380 * @iter: The ring buffer iterator
1381 * @ts: The timestamp counter of this event.
1383 * This will return the event that will be read next, but does
1384 * not increment the iterator.
1386 struct ring_buffer_event *
1387 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1389 struct ring_buffer *buffer;
1390 struct ring_buffer_per_cpu *cpu_buffer;
1391 struct ring_buffer_event *event;
1393 if (ring_buffer_iter_empty(iter))
1396 cpu_buffer = iter->cpu_buffer;
1397 buffer = cpu_buffer->buffer;
1400 if (rb_per_cpu_empty(cpu_buffer))
1403 event = rb_iter_head_event(iter);
1405 switch (event->type) {
1406 case RINGBUF_TYPE_PADDING:
1407 rb_inc_page(cpu_buffer, &iter->head_page);
1408 rb_reset_iter_read_page(iter);
1411 case RINGBUF_TYPE_TIME_EXTEND:
1412 /* Internal data, OK to advance */
1413 rb_advance_iter(iter);
1416 case RINGBUF_TYPE_TIME_STAMP:
1417 /* FIXME: not implemented */
1418 rb_advance_iter(iter);
1421 case RINGBUF_TYPE_DATA:
1423 *ts = iter->read_stamp + event->time_delta;
1424 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1436 * ring_buffer_consume - return an event and consume it
1437 * @buffer: The ring buffer to get the next event from
1439 * Returns the next event in the ring buffer, and that event is consumed.
1440 * Meaning, that sequential reads will keep returning a different event,
1441 * and eventually empty the ring buffer if the producer is slower.
1443 struct ring_buffer_event *
1444 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1446 struct ring_buffer_per_cpu *cpu_buffer;
1447 struct ring_buffer_event *event;
1449 if (!cpu_isset(cpu, buffer->cpumask))
1452 event = ring_buffer_peek(buffer, cpu, ts);
1456 cpu_buffer = buffer->buffers[cpu];
1457 rb_advance_head(cpu_buffer);
1463 * ring_buffer_read_start - start a non consuming read of the buffer
1464 * @buffer: The ring buffer to read from
1465 * @cpu: The cpu buffer to iterate over
1467 * This starts up an iteration through the buffer. It also disables
1468 * the recording to the buffer until the reading is finished.
1469 * This prevents the reading from being corrupted. This is not
1470 * a consuming read, so a producer is not expected.
1472 * Must be paired with ring_buffer_finish.
1474 struct ring_buffer_iter *
1475 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1477 struct ring_buffer_per_cpu *cpu_buffer;
1478 struct ring_buffer_iter *iter;
1480 if (!cpu_isset(cpu, buffer->cpumask))
1483 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1487 cpu_buffer = buffer->buffers[cpu];
1489 iter->cpu_buffer = cpu_buffer;
1491 atomic_inc(&cpu_buffer->record_disabled);
1492 synchronize_sched();
1494 spin_lock(&cpu_buffer->lock);
1495 iter->head = cpu_buffer->head;
1496 iter->head_page = cpu_buffer->head_page;
1497 rb_reset_iter_read_page(iter);
1498 spin_unlock(&cpu_buffer->lock);
1504 * ring_buffer_finish - finish reading the iterator of the buffer
1505 * @iter: The iterator retrieved by ring_buffer_start
1507 * This re-enables the recording to the buffer, and frees the
1511 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1513 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1515 atomic_dec(&cpu_buffer->record_disabled);
1520 * ring_buffer_read - read the next item in the ring buffer by the iterator
1521 * @iter: The ring buffer iterator
1522 * @ts: The time stamp of the event read.
1524 * This reads the next event in the ring buffer and increments the iterator.
1526 struct ring_buffer_event *
1527 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1529 struct ring_buffer_event *event;
1531 event = ring_buffer_iter_peek(iter, ts);
1535 rb_advance_iter(iter);
1541 * ring_buffer_size - return the size of the ring buffer (in bytes)
1542 * @buffer: The ring buffer.
1544 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1546 return BUF_PAGE_SIZE * buffer->pages;
1550 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1552 cpu_buffer->head_page
1553 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1554 cpu_buffer->tail_page
1555 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1557 cpu_buffer->head = cpu_buffer->tail = 0;
1558 cpu_buffer->overrun = 0;
1559 cpu_buffer->entries = 0;
1563 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1564 * @buffer: The ring buffer to reset a per cpu buffer of
1565 * @cpu: The CPU buffer to be reset
1567 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1569 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1570 unsigned long flags;
1572 if (!cpu_isset(cpu, buffer->cpumask))
1575 raw_local_irq_save(flags);
1576 spin_lock(&cpu_buffer->lock);
1578 rb_reset_cpu(cpu_buffer);
1580 spin_unlock(&cpu_buffer->lock);
1581 raw_local_irq_restore(flags);
1585 * ring_buffer_reset - reset a ring buffer
1586 * @buffer: The ring buffer to reset all cpu buffers
1588 void ring_buffer_reset(struct ring_buffer *buffer)
1590 unsigned long flags;
1593 ring_buffer_lock(buffer, &flags);
1595 for_each_buffer_cpu(buffer, cpu)
1596 rb_reset_cpu(buffer->buffers[cpu]);
1598 ring_buffer_unlock(buffer, flags);
1602 * rind_buffer_empty - is the ring buffer empty?
1603 * @buffer: The ring buffer to test
1605 int ring_buffer_empty(struct ring_buffer *buffer)
1607 struct ring_buffer_per_cpu *cpu_buffer;
1610 /* yes this is racy, but if you don't like the race, lock the buffer */
1611 for_each_buffer_cpu(buffer, cpu) {
1612 cpu_buffer = buffer->buffers[cpu];
1613 if (!rb_per_cpu_empty(cpu_buffer))
1620 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1621 * @buffer: The ring buffer
1622 * @cpu: The CPU buffer to test
1624 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1626 struct ring_buffer_per_cpu *cpu_buffer;
1628 if (!cpu_isset(cpu, buffer->cpumask))
1631 cpu_buffer = buffer->buffers[cpu];
1632 return rb_per_cpu_empty(cpu_buffer);
1636 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1637 * @buffer_a: One buffer to swap with
1638 * @buffer_b: The other buffer to swap with
1640 * This function is useful for tracers that want to take a "snapshot"
1641 * of a CPU buffer and has another back up buffer lying around.
1642 * it is expected that the tracer handles the cpu buffer not being
1643 * used at the moment.
1645 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1646 struct ring_buffer *buffer_b, int cpu)
1648 struct ring_buffer_per_cpu *cpu_buffer_a;
1649 struct ring_buffer_per_cpu *cpu_buffer_b;
1651 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1652 !cpu_isset(cpu, buffer_b->cpumask))
1655 /* At least make sure the two buffers are somewhat the same */
1656 if (buffer_a->size != buffer_b->size ||
1657 buffer_a->pages != buffer_b->pages)
1660 cpu_buffer_a = buffer_a->buffers[cpu];
1661 cpu_buffer_b = buffer_b->buffers[cpu];
1664 * We can't do a synchronize_sched here because this
1665 * function can be called in atomic context.
1666 * Normally this will be called from the same CPU as cpu.
1667 * If not it's up to the caller to protect this.
1669 atomic_inc(&cpu_buffer_a->record_disabled);
1670 atomic_inc(&cpu_buffer_b->record_disabled);
1672 buffer_a->buffers[cpu] = cpu_buffer_b;
1673 buffer_b->buffers[cpu] = cpu_buffer_a;
1675 cpu_buffer_b->buffer = buffer_a;
1676 cpu_buffer_a->buffer = buffer_b;
1678 atomic_dec(&cpu_buffer_a->record_disabled);
1679 atomic_dec(&cpu_buffer_b->record_disabled);