4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h> /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
23 u64 ring_buffer_time_stamp(int cpu)
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
31 /* Just stupid testing the normalize function and deltas */
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT 2
37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA 28
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
62 case RINGBUF_TYPE_DATA:
64 length = event->len << RB_ALIGNMENT_SHIFT;
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
81 return rb_event_length(event);
84 /* inline for ring buffer fast paths */
86 rb_event_data(struct ring_buffer_event *event)
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
102 return rb_event_data(event);
105 #define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
109 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST (~TS_MASK)
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
131 * We need to fit the time_stamp delta into 27 bits.
133 static inline int test_time_stamp(u64 delta)
135 if (delta & TS_DELTA_TEST)
140 #define BUF_PAGE_SIZE PAGE_SIZE
143 * head_page == tail_page && head == tail then buffer is empty.
145 struct ring_buffer_per_cpu {
147 struct ring_buffer *buffer;
149 struct lock_class_key lock_key;
150 struct list_head pages;
151 unsigned long head; /* read from head */
152 unsigned long tail; /* write to tail */
153 struct buffer_page *head_page;
154 struct buffer_page *tail_page;
155 unsigned long overrun;
156 unsigned long entries;
159 atomic_t record_disabled;
168 atomic_t record_disabled;
172 struct ring_buffer_per_cpu **buffers;
175 struct ring_buffer_iter {
176 struct ring_buffer_per_cpu *cpu_buffer;
178 struct buffer_page *head_page;
182 #define RB_WARN_ON(buffer, cond) \
183 if (unlikely(cond)) { \
184 atomic_inc(&buffer->record_disabled); \
190 * check_pages - integrity check of buffer pages
191 * @cpu_buffer: CPU buffer with pages to test
193 * As a safty measure we check to make sure the data pages have not
196 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
198 struct list_head *head = &cpu_buffer->pages;
199 struct buffer_page *page, *tmp;
201 RB_WARN_ON(cpu_buffer, head->next->prev != head);
202 RB_WARN_ON(cpu_buffer, head->prev->next != head);
204 list_for_each_entry_safe(page, tmp, head, list) {
205 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
212 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
214 return cpu_buffer->head_page->size;
217 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
220 struct list_head *head = &cpu_buffer->pages;
221 struct buffer_page *page, *tmp;
226 for (i = 0; i < nr_pages; i++) {
227 addr = __get_free_page(GFP_KERNEL);
230 page = (struct buffer_page *)virt_to_page(addr);
231 list_add(&page->list, &pages);
234 list_splice(&pages, head);
236 rb_check_pages(cpu_buffer);
241 list_for_each_entry_safe(page, tmp, &pages, list) {
242 list_del_init(&page->list);
243 __free_page(&page->page);
248 static struct ring_buffer_per_cpu *
249 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
251 struct ring_buffer_per_cpu *cpu_buffer;
254 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255 GFP_KERNEL, cpu_to_node(cpu));
259 cpu_buffer->cpu = cpu;
260 cpu_buffer->buffer = buffer;
261 spin_lock_init(&cpu_buffer->lock);
262 INIT_LIST_HEAD(&cpu_buffer->pages);
264 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
266 goto fail_free_buffer;
268 cpu_buffer->head_page
269 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270 cpu_buffer->tail_page
271 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
280 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
282 struct list_head *head = &cpu_buffer->pages;
283 struct buffer_page *page, *tmp;
285 list_for_each_entry_safe(page, tmp, head, list) {
286 list_del_init(&page->list);
287 __free_page(&page->page);
293 * ring_buffer_alloc - allocate a new ring_buffer
294 * @size: the size in bytes that is needed.
295 * @flags: attributes to set for the ring buffer.
297 * Currently the only flag that is available is the RB_FL_OVERWRITE
298 * flag. This flag means that the buffer will overwrite old data
299 * when the buffer wraps. If this flag is not set, the buffer will
300 * drop data when the tail hits the head.
302 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
304 struct ring_buffer *buffer;
308 /* keep it in its own cache line */
309 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
314 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
315 buffer->flags = flags;
317 /* need at least two pages */
318 if (buffer->pages == 1)
321 buffer->cpumask = cpu_possible_map;
322 buffer->cpus = nr_cpu_ids;
324 bsize = sizeof(void *) * nr_cpu_ids;
325 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
327 if (!buffer->buffers)
328 goto fail_free_buffer;
330 for_each_buffer_cpu(buffer, cpu) {
331 buffer->buffers[cpu] =
332 rb_allocate_cpu_buffer(buffer, cpu);
333 if (!buffer->buffers[cpu])
334 goto fail_free_buffers;
337 mutex_init(&buffer->mutex);
342 for_each_buffer_cpu(buffer, cpu) {
343 if (buffer->buffers[cpu])
344 rb_free_cpu_buffer(buffer->buffers[cpu]);
346 kfree(buffer->buffers);
354 * ring_buffer_free - free a ring buffer.
355 * @buffer: the buffer to free.
358 ring_buffer_free(struct ring_buffer *buffer)
362 for_each_buffer_cpu(buffer, cpu)
363 rb_free_cpu_buffer(buffer->buffers[cpu]);
368 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
371 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
373 struct buffer_page *page;
377 atomic_inc(&cpu_buffer->record_disabled);
380 for (i = 0; i < nr_pages; i++) {
381 BUG_ON(list_empty(&cpu_buffer->pages));
382 p = cpu_buffer->pages.next;
383 page = list_entry(p, struct buffer_page, list);
384 list_del_init(&page->list);
385 __free_page(&page->page);
387 BUG_ON(list_empty(&cpu_buffer->pages));
389 rb_reset_cpu(cpu_buffer);
391 rb_check_pages(cpu_buffer);
393 atomic_dec(&cpu_buffer->record_disabled);
398 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
399 struct list_head *pages, unsigned nr_pages)
401 struct buffer_page *page;
405 atomic_inc(&cpu_buffer->record_disabled);
408 for (i = 0; i < nr_pages; i++) {
409 BUG_ON(list_empty(pages));
411 page = list_entry(p, struct buffer_page, list);
412 list_del_init(&page->list);
413 list_add_tail(&page->list, &cpu_buffer->pages);
415 rb_reset_cpu(cpu_buffer);
417 rb_check_pages(cpu_buffer);
419 atomic_dec(&cpu_buffer->record_disabled);
423 * ring_buffer_resize - resize the ring buffer
424 * @buffer: the buffer to resize.
425 * @size: the new size.
427 * The tracer is responsible for making sure that the buffer is
428 * not being used while changing the size.
429 * Note: We may be able to change the above requirement by using
430 * RCU synchronizations.
432 * Minimum size is 2 * BUF_PAGE_SIZE.
434 * Returns -1 on failure.
436 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
438 struct ring_buffer_per_cpu *cpu_buffer;
439 unsigned nr_pages, rm_pages, new_pages;
440 struct buffer_page *page, *tmp;
441 unsigned long buffer_size;
446 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
447 size *= BUF_PAGE_SIZE;
448 buffer_size = buffer->pages * BUF_PAGE_SIZE;
450 /* we need a minimum of two pages */
451 if (size < BUF_PAGE_SIZE * 2)
452 size = BUF_PAGE_SIZE * 2;
454 if (size == buffer_size)
457 mutex_lock(&buffer->mutex);
459 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
461 if (size < buffer_size) {
463 /* easy case, just free pages */
464 BUG_ON(nr_pages >= buffer->pages);
466 rm_pages = buffer->pages - nr_pages;
468 for_each_buffer_cpu(buffer, cpu) {
469 cpu_buffer = buffer->buffers[cpu];
470 rb_remove_pages(cpu_buffer, rm_pages);
476 * This is a bit more difficult. We only want to add pages
477 * when we can allocate enough for all CPUs. We do this
478 * by allocating all the pages and storing them on a local
479 * link list. If we succeed in our allocation, then we
480 * add these pages to the cpu_buffers. Otherwise we just free
481 * them all and return -ENOMEM;
483 BUG_ON(nr_pages <= buffer->pages);
484 new_pages = nr_pages - buffer->pages;
486 for_each_buffer_cpu(buffer, cpu) {
487 for (i = 0; i < new_pages; i++) {
488 addr = __get_free_page(GFP_KERNEL);
491 page = (struct buffer_page *)virt_to_page(addr);
492 list_add(&page->list, &pages);
496 for_each_buffer_cpu(buffer, cpu) {
497 cpu_buffer = buffer->buffers[cpu];
498 rb_insert_pages(cpu_buffer, &pages, new_pages);
501 BUG_ON(!list_empty(&pages));
504 buffer->pages = nr_pages;
505 mutex_unlock(&buffer->mutex);
510 list_for_each_entry_safe(page, tmp, &pages, list) {
511 list_del_init(&page->list);
512 __free_page(&page->page);
517 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
519 return cpu_buffer->head_page == cpu_buffer->tail_page &&
520 cpu_buffer->head == cpu_buffer->tail;
523 static inline int rb_null_event(struct ring_buffer_event *event)
525 return event->type == RINGBUF_TYPE_PADDING;
528 static inline void *rb_page_index(struct buffer_page *page, unsigned index)
530 void *addr = page_address(&page->page);
535 static inline struct ring_buffer_event *
536 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
538 return rb_page_index(cpu_buffer->head_page,
542 static inline struct ring_buffer_event *
543 rb_iter_head_event(struct ring_buffer_iter *iter)
545 return rb_page_index(iter->head_page,
550 * When the tail hits the head and the buffer is in overwrite mode,
551 * the head jumps to the next page and all content on the previous
552 * page is discarded. But before doing so, we update the overrun
553 * variable of the buffer.
555 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
557 struct ring_buffer_event *event;
560 for (head = 0; head < rb_head_size(cpu_buffer);
561 head += rb_event_length(event)) {
563 event = rb_page_index(cpu_buffer->head_page, head);
564 BUG_ON(rb_null_event(event));
565 /* Only count data entries */
566 if (event->type != RINGBUF_TYPE_DATA)
568 cpu_buffer->overrun++;
569 cpu_buffer->entries--;
573 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
574 struct buffer_page **page)
576 struct list_head *p = (*page)->list.next;
578 if (p == &cpu_buffer->pages)
581 *page = list_entry(p, struct buffer_page, list);
585 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
587 cpu_buffer->tail_page->time_stamp = *ts;
588 cpu_buffer->write_stamp = *ts;
591 static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
593 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
594 cpu_buffer->head = 0;
598 rb_reset_iter_read_page(struct ring_buffer_iter *iter)
600 iter->read_stamp = iter->head_page->time_stamp;
605 * ring_buffer_update_event - update event type and data
606 * @event: the even to update
607 * @type: the type of event
608 * @length: the size of the event field in the ring buffer
610 * Update the type and data fields of the event. The length
611 * is the actual size that is written to the ring buffer,
612 * and with this, we can determine what to place into the
616 rb_update_event(struct ring_buffer_event *event,
617 unsigned type, unsigned length)
623 case RINGBUF_TYPE_PADDING:
626 case RINGBUF_TYPE_TIME_EXTEND:
628 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
629 >> RB_ALIGNMENT_SHIFT;
632 case RINGBUF_TYPE_TIME_STAMP:
634 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
635 >> RB_ALIGNMENT_SHIFT;
638 case RINGBUF_TYPE_DATA:
639 length -= RB_EVNT_HDR_SIZE;
640 if (length > RB_MAX_SMALL_DATA) {
642 event->array[0] = length;
645 (length + (RB_ALIGNMENT-1))
646 >> RB_ALIGNMENT_SHIFT;
653 static inline unsigned rb_calculate_event_length(unsigned length)
655 struct ring_buffer_event event; /* Used only for sizeof array */
657 /* zero length can cause confusions */
661 if (length > RB_MAX_SMALL_DATA)
662 length += sizeof(event.array[0]);
664 length += RB_EVNT_HDR_SIZE;
665 length = ALIGN(length, RB_ALIGNMENT);
670 static struct ring_buffer_event *
671 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
672 unsigned type, unsigned long length, u64 *ts)
674 struct buffer_page *head_page, *tail_page;
676 struct ring_buffer *buffer = cpu_buffer->buffer;
677 struct ring_buffer_event *event;
679 tail_page = cpu_buffer->tail_page;
680 head_page = cpu_buffer->head_page;
681 tail = cpu_buffer->tail;
683 if (tail + length > BUF_PAGE_SIZE) {
684 struct buffer_page *next_page = tail_page;
686 rb_inc_page(cpu_buffer, &next_page);
688 if (next_page == head_page) {
689 if (!(buffer->flags & RB_FL_OVERWRITE))
692 /* count overflows */
693 rb_update_overflow(cpu_buffer);
695 rb_inc_page(cpu_buffer, &head_page);
696 cpu_buffer->head_page = head_page;
697 rb_reset_read_page(cpu_buffer);
700 if (tail != BUF_PAGE_SIZE) {
701 event = rb_page_index(tail_page, tail);
703 event->type = RINGBUF_TYPE_PADDING;
706 tail_page->size = tail;
707 tail_page = next_page;
710 cpu_buffer->tail_page = tail_page;
711 cpu_buffer->tail = tail;
712 rb_add_stamp(cpu_buffer, ts);
715 BUG_ON(tail + length > BUF_PAGE_SIZE);
717 event = rb_page_index(tail_page, tail);
718 rb_update_event(event, type, length);
724 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
727 struct ring_buffer_event *event;
730 if (unlikely(*delta > (1ULL << 59) && !once++)) {
731 printk(KERN_WARNING "Delta way too big! %llu"
732 " ts=%llu write stamp = %llu\n",
733 *delta, *ts, cpu_buffer->write_stamp);
738 * The delta is too big, we to add a
741 event = __rb_reserve_next(cpu_buffer,
742 RINGBUF_TYPE_TIME_EXTEND,
748 /* check to see if we went to the next page */
749 if (cpu_buffer->tail) {
750 /* Still on same page, update timestamp */
751 event->time_delta = *delta & TS_MASK;
752 event->array[0] = *delta >> TS_SHIFT;
753 /* commit the time event */
755 rb_event_length(event);
756 cpu_buffer->write_stamp = *ts;
763 static struct ring_buffer_event *
764 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
765 unsigned type, unsigned long length)
767 struct ring_buffer_event *event;
770 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
772 if (cpu_buffer->tail) {
773 delta = ts - cpu_buffer->write_stamp;
775 if (test_time_stamp(delta)) {
778 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
783 rb_add_stamp(cpu_buffer, &ts);
787 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
791 /* If the reserve went to the next page, our delta is zero */
792 if (!cpu_buffer->tail)
795 event->time_delta = delta;
801 * ring_buffer_lock_reserve - reserve a part of the buffer
802 * @buffer: the ring buffer to reserve from
803 * @length: the length of the data to reserve (excluding event header)
804 * @flags: a pointer to save the interrupt flags
806 * Returns a reseverd event on the ring buffer to copy directly to.
807 * The user of this interface will need to get the body to write into
808 * and can use the ring_buffer_event_data() interface.
810 * The length is the length of the data needed, not the event length
811 * which also includes the event header.
813 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
814 * If NULL is returned, then nothing has been allocated or locked.
816 struct ring_buffer_event *
817 ring_buffer_lock_reserve(struct ring_buffer *buffer,
818 unsigned long length,
819 unsigned long *flags)
821 struct ring_buffer_per_cpu *cpu_buffer;
822 struct ring_buffer_event *event;
825 if (atomic_read(&buffer->record_disabled))
828 raw_local_irq_save(*flags);
829 cpu = raw_smp_processor_id();
831 if (!cpu_isset(cpu, buffer->cpumask))
834 cpu_buffer = buffer->buffers[cpu];
835 spin_lock(&cpu_buffer->lock);
837 if (atomic_read(&cpu_buffer->record_disabled))
840 length = rb_calculate_event_length(length);
841 if (length > BUF_PAGE_SIZE)
844 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
851 spin_unlock(&cpu_buffer->lock);
853 local_irq_restore(*flags);
857 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
858 struct ring_buffer_event *event)
860 cpu_buffer->tail += rb_event_length(event);
861 cpu_buffer->tail_page->size = cpu_buffer->tail;
862 cpu_buffer->write_stamp += event->time_delta;
863 cpu_buffer->entries++;
867 * ring_buffer_unlock_commit - commit a reserved
868 * @buffer: The buffer to commit to
869 * @event: The event pointer to commit.
870 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
872 * This commits the data to the ring buffer, and releases any locks held.
874 * Must be paired with ring_buffer_lock_reserve.
876 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
877 struct ring_buffer_event *event,
880 struct ring_buffer_per_cpu *cpu_buffer;
881 int cpu = raw_smp_processor_id();
883 cpu_buffer = buffer->buffers[cpu];
885 assert_spin_locked(&cpu_buffer->lock);
887 rb_commit(cpu_buffer, event);
889 spin_unlock(&cpu_buffer->lock);
890 raw_local_irq_restore(flags);
896 * ring_buffer_write - write data to the buffer without reserving
897 * @buffer: The ring buffer to write to.
898 * @length: The length of the data being written (excluding the event header)
899 * @data: The data to write to the buffer.
901 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
902 * one function. If you already have the data to write to the buffer, it
903 * may be easier to simply call this function.
905 * Note, like ring_buffer_lock_reserve, the length is the length of the data
906 * and not the length of the event which would hold the header.
908 int ring_buffer_write(struct ring_buffer *buffer,
909 unsigned long length,
912 struct ring_buffer_per_cpu *cpu_buffer;
913 struct ring_buffer_event *event;
914 unsigned long event_length, flags;
919 if (atomic_read(&buffer->record_disabled))
922 local_irq_save(flags);
923 cpu = raw_smp_processor_id();
925 if (!cpu_isset(cpu, buffer->cpumask))
928 cpu_buffer = buffer->buffers[cpu];
929 spin_lock(&cpu_buffer->lock);
931 if (atomic_read(&cpu_buffer->record_disabled))
934 event_length = rb_calculate_event_length(length);
935 event = rb_reserve_next_event(cpu_buffer,
936 RINGBUF_TYPE_DATA, event_length);
940 body = rb_event_data(event);
942 memcpy(body, data, length);
944 rb_commit(cpu_buffer, event);
948 spin_unlock(&cpu_buffer->lock);
950 local_irq_restore(flags);
956 * ring_buffer_lock - lock the ring buffer
957 * @buffer: The ring buffer to lock
958 * @flags: The place to store the interrupt flags
960 * This locks all the per CPU buffers.
962 * Must be unlocked by ring_buffer_unlock.
964 void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
966 struct ring_buffer_per_cpu *cpu_buffer;
969 local_irq_save(*flags);
971 for_each_buffer_cpu(buffer, cpu) {
972 cpu_buffer = buffer->buffers[cpu];
973 spin_lock(&cpu_buffer->lock);
978 * ring_buffer_unlock - unlock a locked buffer
979 * @buffer: The locked buffer to unlock
980 * @flags: The interrupt flags received by ring_buffer_lock
982 void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
984 struct ring_buffer_per_cpu *cpu_buffer;
987 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
988 if (!cpu_isset(cpu, buffer->cpumask))
990 cpu_buffer = buffer->buffers[cpu];
991 spin_unlock(&cpu_buffer->lock);
994 local_irq_restore(flags);
998 * ring_buffer_record_disable - stop all writes into the buffer
999 * @buffer: The ring buffer to stop writes to.
1001 * This prevents all writes to the buffer. Any attempt to write
1002 * to the buffer after this will fail and return NULL.
1004 * The caller should call synchronize_sched() after this.
1006 void ring_buffer_record_disable(struct ring_buffer *buffer)
1008 atomic_inc(&buffer->record_disabled);
1012 * ring_buffer_record_enable - enable writes to the buffer
1013 * @buffer: The ring buffer to enable writes
1015 * Note, multiple disables will need the same number of enables
1016 * to truely enable the writing (much like preempt_disable).
1018 void ring_buffer_record_enable(struct ring_buffer *buffer)
1020 atomic_dec(&buffer->record_disabled);
1024 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1025 * @buffer: The ring buffer to stop writes to.
1026 * @cpu: The CPU buffer to stop
1028 * This prevents all writes to the buffer. Any attempt to write
1029 * to the buffer after this will fail and return NULL.
1031 * The caller should call synchronize_sched() after this.
1033 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1035 struct ring_buffer_per_cpu *cpu_buffer;
1037 if (!cpu_isset(cpu, buffer->cpumask))
1040 cpu_buffer = buffer->buffers[cpu];
1041 atomic_inc(&cpu_buffer->record_disabled);
1045 * ring_buffer_record_enable_cpu - enable writes to the buffer
1046 * @buffer: The ring buffer to enable writes
1047 * @cpu: The CPU to enable.
1049 * Note, multiple disables will need the same number of enables
1050 * to truely enable the writing (much like preempt_disable).
1052 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1054 struct ring_buffer_per_cpu *cpu_buffer;
1056 if (!cpu_isset(cpu, buffer->cpumask))
1059 cpu_buffer = buffer->buffers[cpu];
1060 atomic_dec(&cpu_buffer->record_disabled);
1064 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1065 * @buffer: The ring buffer
1066 * @cpu: The per CPU buffer to get the entries from.
1068 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1070 struct ring_buffer_per_cpu *cpu_buffer;
1072 if (!cpu_isset(cpu, buffer->cpumask))
1075 cpu_buffer = buffer->buffers[cpu];
1076 return cpu_buffer->entries;
1080 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1081 * @buffer: The ring buffer
1082 * @cpu: The per CPU buffer to get the number of overruns from
1084 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1086 struct ring_buffer_per_cpu *cpu_buffer;
1088 if (!cpu_isset(cpu, buffer->cpumask))
1091 cpu_buffer = buffer->buffers[cpu];
1092 return cpu_buffer->overrun;
1096 * ring_buffer_entries - get the number of entries in a buffer
1097 * @buffer: The ring buffer
1099 * Returns the total number of entries in the ring buffer
1102 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1104 struct ring_buffer_per_cpu *cpu_buffer;
1105 unsigned long entries = 0;
1108 /* if you care about this being correct, lock the buffer */
1109 for_each_buffer_cpu(buffer, cpu) {
1110 cpu_buffer = buffer->buffers[cpu];
1111 entries += cpu_buffer->entries;
1118 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1119 * @buffer: The ring buffer
1121 * Returns the total number of overruns in the ring buffer
1124 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1126 struct ring_buffer_per_cpu *cpu_buffer;
1127 unsigned long overruns = 0;
1130 /* if you care about this being correct, lock the buffer */
1131 for_each_buffer_cpu(buffer, cpu) {
1132 cpu_buffer = buffer->buffers[cpu];
1133 overruns += cpu_buffer->overrun;
1140 * ring_buffer_iter_reset - reset an iterator
1141 * @iter: The iterator to reset
1143 * Resets the iterator, so that it will start from the beginning
1146 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1148 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1150 iter->head_page = cpu_buffer->head_page;
1151 iter->head = cpu_buffer->head;
1152 rb_reset_iter_read_page(iter);
1156 * ring_buffer_iter_empty - check if an iterator has no more to read
1157 * @iter: The iterator to check
1159 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1161 struct ring_buffer_per_cpu *cpu_buffer;
1163 cpu_buffer = iter->cpu_buffer;
1165 return iter->head_page == cpu_buffer->tail_page &&
1166 iter->head == cpu_buffer->tail;
1170 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1171 struct ring_buffer_event *event)
1175 switch (event->type) {
1176 case RINGBUF_TYPE_PADDING:
1179 case RINGBUF_TYPE_TIME_EXTEND:
1180 delta = event->array[0];
1182 delta += event->time_delta;
1183 cpu_buffer->read_stamp += delta;
1186 case RINGBUF_TYPE_TIME_STAMP:
1187 /* FIXME: not implemented */
1190 case RINGBUF_TYPE_DATA:
1191 cpu_buffer->read_stamp += event->time_delta;
1201 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1202 struct ring_buffer_event *event)
1206 switch (event->type) {
1207 case RINGBUF_TYPE_PADDING:
1210 case RINGBUF_TYPE_TIME_EXTEND:
1211 delta = event->array[0];
1213 delta += event->time_delta;
1214 iter->read_stamp += delta;
1217 case RINGBUF_TYPE_TIME_STAMP:
1218 /* FIXME: not implemented */
1221 case RINGBUF_TYPE_DATA:
1222 iter->read_stamp += event->time_delta;
1231 static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1233 struct ring_buffer_event *event;
1237 * Check if we are at the end of the buffer.
1239 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1240 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1241 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1242 rb_reset_read_page(cpu_buffer);
1246 event = rb_head_event(cpu_buffer);
1248 if (event->type == RINGBUF_TYPE_DATA)
1249 cpu_buffer->entries--;
1251 length = rb_event_length(event);
1254 * This should not be called to advance the header if we are
1255 * at the tail of the buffer.
1257 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1258 (cpu_buffer->head + length > cpu_buffer->tail));
1260 rb_update_read_stamp(cpu_buffer, event);
1262 cpu_buffer->head += length;
1264 /* check for end of page */
1265 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1266 (cpu_buffer->head_page != cpu_buffer->tail_page))
1267 rb_advance_head(cpu_buffer);
1270 static void rb_advance_iter(struct ring_buffer_iter *iter)
1272 struct ring_buffer *buffer;
1273 struct ring_buffer_per_cpu *cpu_buffer;
1274 struct ring_buffer_event *event;
1277 cpu_buffer = iter->cpu_buffer;
1278 buffer = cpu_buffer->buffer;
1281 * Check if we are at the end of the buffer.
1283 if (iter->head >= iter->head_page->size) {
1284 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1285 rb_inc_page(cpu_buffer, &iter->head_page);
1286 rb_reset_iter_read_page(iter);
1290 event = rb_iter_head_event(iter);
1292 length = rb_event_length(event);
1295 * This should not be called to advance the header if we are
1296 * at the tail of the buffer.
1298 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1299 (iter->head + length > cpu_buffer->tail));
1301 rb_update_iter_read_stamp(iter, event);
1303 iter->head += length;
1305 /* check for end of page padding */
1306 if ((iter->head >= iter->head_page->size) &&
1307 (iter->head_page != cpu_buffer->tail_page))
1308 rb_advance_iter(iter);
1312 * ring_buffer_peek - peek at the next event to be read
1313 * @buffer: The ring buffer to read
1314 * @cpu: The cpu to peak at
1315 * @ts: The timestamp counter of this event.
1317 * This will return the event that will be read next, but does
1318 * not consume the data.
1320 struct ring_buffer_event *
1321 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1323 struct ring_buffer_per_cpu *cpu_buffer;
1324 struct ring_buffer_event *event;
1326 if (!cpu_isset(cpu, buffer->cpumask))
1329 cpu_buffer = buffer->buffers[cpu];
1332 if (rb_per_cpu_empty(cpu_buffer))
1335 event = rb_head_event(cpu_buffer);
1337 switch (event->type) {
1338 case RINGBUF_TYPE_PADDING:
1339 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1340 rb_reset_read_page(cpu_buffer);
1343 case RINGBUF_TYPE_TIME_EXTEND:
1344 /* Internal data, OK to advance */
1345 rb_advance_head(cpu_buffer);
1348 case RINGBUF_TYPE_TIME_STAMP:
1349 /* FIXME: not implemented */
1350 rb_advance_head(cpu_buffer);
1353 case RINGBUF_TYPE_DATA:
1355 *ts = cpu_buffer->read_stamp + event->time_delta;
1356 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1368 * ring_buffer_iter_peek - peek at the next event to be read
1369 * @iter: The ring buffer iterator
1370 * @ts: The timestamp counter of this event.
1372 * This will return the event that will be read next, but does
1373 * not increment the iterator.
1375 struct ring_buffer_event *
1376 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1378 struct ring_buffer *buffer;
1379 struct ring_buffer_per_cpu *cpu_buffer;
1380 struct ring_buffer_event *event;
1382 if (ring_buffer_iter_empty(iter))
1385 cpu_buffer = iter->cpu_buffer;
1386 buffer = cpu_buffer->buffer;
1389 if (rb_per_cpu_empty(cpu_buffer))
1392 event = rb_iter_head_event(iter);
1394 switch (event->type) {
1395 case RINGBUF_TYPE_PADDING:
1396 rb_inc_page(cpu_buffer, &iter->head_page);
1397 rb_reset_iter_read_page(iter);
1400 case RINGBUF_TYPE_TIME_EXTEND:
1401 /* Internal data, OK to advance */
1402 rb_advance_iter(iter);
1405 case RINGBUF_TYPE_TIME_STAMP:
1406 /* FIXME: not implemented */
1407 rb_advance_iter(iter);
1410 case RINGBUF_TYPE_DATA:
1412 *ts = iter->read_stamp + event->time_delta;
1413 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1425 * ring_buffer_consume - return an event and consume it
1426 * @buffer: The ring buffer to get the next event from
1428 * Returns the next event in the ring buffer, and that event is consumed.
1429 * Meaning, that sequential reads will keep returning a different event,
1430 * and eventually empty the ring buffer if the producer is slower.
1432 struct ring_buffer_event *
1433 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1435 struct ring_buffer_per_cpu *cpu_buffer;
1436 struct ring_buffer_event *event;
1438 if (!cpu_isset(cpu, buffer->cpumask))
1441 event = ring_buffer_peek(buffer, cpu, ts);
1445 cpu_buffer = buffer->buffers[cpu];
1446 rb_advance_head(cpu_buffer);
1452 * ring_buffer_read_start - start a non consuming read of the buffer
1453 * @buffer: The ring buffer to read from
1454 * @cpu: The cpu buffer to iterate over
1456 * This starts up an iteration through the buffer. It also disables
1457 * the recording to the buffer until the reading is finished.
1458 * This prevents the reading from being corrupted. This is not
1459 * a consuming read, so a producer is not expected.
1461 * Must be paired with ring_buffer_finish.
1463 struct ring_buffer_iter *
1464 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1466 struct ring_buffer_per_cpu *cpu_buffer;
1467 struct ring_buffer_iter *iter;
1469 if (!cpu_isset(cpu, buffer->cpumask))
1472 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1476 cpu_buffer = buffer->buffers[cpu];
1478 iter->cpu_buffer = cpu_buffer;
1480 atomic_inc(&cpu_buffer->record_disabled);
1481 synchronize_sched();
1483 spin_lock(&cpu_buffer->lock);
1484 iter->head = cpu_buffer->head;
1485 iter->head_page = cpu_buffer->head_page;
1486 rb_reset_iter_read_page(iter);
1487 spin_unlock(&cpu_buffer->lock);
1493 * ring_buffer_finish - finish reading the iterator of the buffer
1494 * @iter: The iterator retrieved by ring_buffer_start
1496 * This re-enables the recording to the buffer, and frees the
1500 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1502 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1504 atomic_dec(&cpu_buffer->record_disabled);
1509 * ring_buffer_read - read the next item in the ring buffer by the iterator
1510 * @iter: The ring buffer iterator
1511 * @ts: The time stamp of the event read.
1513 * This reads the next event in the ring buffer and increments the iterator.
1515 struct ring_buffer_event *
1516 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1518 struct ring_buffer_event *event;
1520 event = ring_buffer_iter_peek(iter, ts);
1524 rb_advance_iter(iter);
1530 * ring_buffer_size - return the size of the ring buffer (in bytes)
1531 * @buffer: The ring buffer.
1533 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1535 return BUF_PAGE_SIZE * buffer->pages;
1539 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1541 cpu_buffer->head_page
1542 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1543 cpu_buffer->tail_page
1544 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1546 cpu_buffer->head = cpu_buffer->tail = 0;
1547 cpu_buffer->overrun = 0;
1548 cpu_buffer->entries = 0;
1552 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1553 * @buffer: The ring buffer to reset a per cpu buffer of
1554 * @cpu: The CPU buffer to be reset
1556 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1558 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1559 unsigned long flags;
1561 if (!cpu_isset(cpu, buffer->cpumask))
1564 raw_local_irq_save(flags);
1565 spin_lock(&cpu_buffer->lock);
1567 rb_reset_cpu(cpu_buffer);
1569 spin_unlock(&cpu_buffer->lock);
1570 raw_local_irq_restore(flags);
1574 * ring_buffer_reset - reset a ring buffer
1575 * @buffer: The ring buffer to reset all cpu buffers
1577 void ring_buffer_reset(struct ring_buffer *buffer)
1579 unsigned long flags;
1582 ring_buffer_lock(buffer, &flags);
1584 for_each_buffer_cpu(buffer, cpu)
1585 rb_reset_cpu(buffer->buffers[cpu]);
1587 ring_buffer_unlock(buffer, flags);
1591 * rind_buffer_empty - is the ring buffer empty?
1592 * @buffer: The ring buffer to test
1594 int ring_buffer_empty(struct ring_buffer *buffer)
1596 struct ring_buffer_per_cpu *cpu_buffer;
1599 /* yes this is racy, but if you don't like the race, lock the buffer */
1600 for_each_buffer_cpu(buffer, cpu) {
1601 cpu_buffer = buffer->buffers[cpu];
1602 if (!rb_per_cpu_empty(cpu_buffer))
1609 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1610 * @buffer: The ring buffer
1611 * @cpu: The CPU buffer to test
1613 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1615 struct ring_buffer_per_cpu *cpu_buffer;
1617 if (!cpu_isset(cpu, buffer->cpumask))
1620 cpu_buffer = buffer->buffers[cpu];
1621 return rb_per_cpu_empty(cpu_buffer);
1625 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1626 * @buffer_a: One buffer to swap with
1627 * @buffer_b: The other buffer to swap with
1629 * This function is useful for tracers that want to take a "snapshot"
1630 * of a CPU buffer and has another back up buffer lying around.
1631 * it is expected that the tracer handles the cpu buffer not being
1632 * used at the moment.
1634 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1635 struct ring_buffer *buffer_b, int cpu)
1637 struct ring_buffer_per_cpu *cpu_buffer_a;
1638 struct ring_buffer_per_cpu *cpu_buffer_b;
1640 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1641 !cpu_isset(cpu, buffer_b->cpumask))
1644 /* At least make sure the two buffers are somewhat the same */
1645 if (buffer_a->size != buffer_b->size ||
1646 buffer_a->pages != buffer_b->pages)
1649 cpu_buffer_a = buffer_a->buffers[cpu];
1650 cpu_buffer_b = buffer_b->buffers[cpu];
1653 * We can't do a synchronize_sched here because this
1654 * function can be called in atomic context.
1655 * Normally this will be called from the same CPU as cpu.
1656 * If not it's up to the caller to protect this.
1658 atomic_inc(&cpu_buffer_a->record_disabled);
1659 atomic_inc(&cpu_buffer_b->record_disabled);
1661 buffer_a->buffers[cpu] = cpu_buffer_b;
1662 buffer_b->buffers[cpu] = cpu_buffer_a;
1664 cpu_buffer_b->buffer = buffer_a;
1665 cpu_buffer_a->buffer = buffer_b;
1667 atomic_dec(&cpu_buffer_a->record_disabled);
1668 atomic_dec(&cpu_buffer_b->record_disabled);