ring_buffer: adding EXPORT_SYMBOLs
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 #include "trace.h"
20
21 /* Global flag to disable all recording to ring buffers */
22 static int ring_buffers_off __read_mostly;
23
24 /**
25  * tracing_on - enable all tracing buffers
26  *
27  * This function enables all tracing buffers that may have been
28  * disabled with tracing_off.
29  */
30 void tracing_on(void)
31 {
32         ring_buffers_off = 0;
33 }
34 EXPORT_SYMBOL_GPL(tracing_on);
35
36 /**
37  * tracing_off - turn off all tracing buffers
38  *
39  * This function stops all tracing buffers from recording data.
40  * It does not disable any overhead the tracers themselves may
41  * be causing. This function simply causes all recording to
42  * the ring buffers to fail.
43  */
44 void tracing_off(void)
45 {
46         ring_buffers_off = 1;
47 }
48 EXPORT_SYMBOL_GPL(tracing_off);
49
50 /* Up this if you want to test the TIME_EXTENTS and normalization */
51 #define DEBUG_SHIFT 0
52
53 /* FIXME!!! */
54 u64 ring_buffer_time_stamp(int cpu)
55 {
56         u64 time;
57
58         preempt_disable_notrace();
59         /* shift to debug/test normalization and TIME_EXTENTS */
60         time = sched_clock() << DEBUG_SHIFT;
61         preempt_enable_notrace();
62
63         return time;
64 }
65 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
66
67 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
68 {
69         /* Just stupid testing the normalize function and deltas */
70         *ts >>= DEBUG_SHIFT;
71 }
72 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
73
74 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
75 #define RB_ALIGNMENT_SHIFT      2
76 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
77 #define RB_MAX_SMALL_DATA       28
78
79 enum {
80         RB_LEN_TIME_EXTEND = 8,
81         RB_LEN_TIME_STAMP = 16,
82 };
83
84 /* inline for ring buffer fast paths */
85 static inline unsigned
86 rb_event_length(struct ring_buffer_event *event)
87 {
88         unsigned length;
89
90         switch (event->type) {
91         case RINGBUF_TYPE_PADDING:
92                 /* undefined */
93                 return -1;
94
95         case RINGBUF_TYPE_TIME_EXTEND:
96                 return RB_LEN_TIME_EXTEND;
97
98         case RINGBUF_TYPE_TIME_STAMP:
99                 return RB_LEN_TIME_STAMP;
100
101         case RINGBUF_TYPE_DATA:
102                 if (event->len)
103                         length = event->len << RB_ALIGNMENT_SHIFT;
104                 else
105                         length = event->array[0];
106                 return length + RB_EVNT_HDR_SIZE;
107         default:
108                 BUG();
109         }
110         /* not hit */
111         return 0;
112 }
113
114 /**
115  * ring_buffer_event_length - return the length of the event
116  * @event: the event to get the length of
117  */
118 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
119 {
120         return rb_event_length(event);
121 }
122 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
123
124 /* inline for ring buffer fast paths */
125 static inline void *
126 rb_event_data(struct ring_buffer_event *event)
127 {
128         BUG_ON(event->type != RINGBUF_TYPE_DATA);
129         /* If length is in len field, then array[0] has the data */
130         if (event->len)
131                 return (void *)&event->array[0];
132         /* Otherwise length is in array[0] and array[1] has the data */
133         return (void *)&event->array[1];
134 }
135
136 /**
137  * ring_buffer_event_data - return the data of the event
138  * @event: the event to get the data from
139  */
140 void *ring_buffer_event_data(struct ring_buffer_event *event)
141 {
142         return rb_event_data(event);
143 }
144 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
145
146 #define for_each_buffer_cpu(buffer, cpu)                \
147         for_each_cpu_mask(cpu, buffer->cpumask)
148
149 #define TS_SHIFT        27
150 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
151 #define TS_DELTA_TEST   (~TS_MASK)
152
153 /*
154  * This hack stolen from mm/slob.c.
155  * We can store per page timing information in the page frame of the page.
156  * Thanks to Peter Zijlstra for suggesting this idea.
157  */
158 struct buffer_page {
159         u64              time_stamp;    /* page time stamp */
160         local_t          write;         /* index for next write */
161         local_t          commit;        /* write commited index */
162         unsigned         read;          /* index for next read */
163         struct list_head list;          /* list of free pages */
164         void *page;                     /* Actual data page */
165 };
166
167 /*
168  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
169  * this issue out.
170  */
171 static inline void free_buffer_page(struct buffer_page *bpage)
172 {
173         if (bpage->page)
174                 free_page((unsigned long)bpage->page);
175         kfree(bpage);
176 }
177
178 /*
179  * We need to fit the time_stamp delta into 27 bits.
180  */
181 static inline int test_time_stamp(u64 delta)
182 {
183         if (delta & TS_DELTA_TEST)
184                 return 1;
185         return 0;
186 }
187
188 #define BUF_PAGE_SIZE PAGE_SIZE
189
190 /*
191  * head_page == tail_page && head == tail then buffer is empty.
192  */
193 struct ring_buffer_per_cpu {
194         int                             cpu;
195         struct ring_buffer              *buffer;
196         spinlock_t                      lock;
197         struct lock_class_key           lock_key;
198         struct list_head                pages;
199         struct buffer_page              *head_page;     /* read from head */
200         struct buffer_page              *tail_page;     /* write to tail */
201         struct buffer_page              *commit_page;   /* commited pages */
202         struct buffer_page              *reader_page;
203         unsigned long                   overrun;
204         unsigned long                   entries;
205         u64                             write_stamp;
206         u64                             read_stamp;
207         atomic_t                        record_disabled;
208 };
209
210 struct ring_buffer {
211         unsigned long                   size;
212         unsigned                        pages;
213         unsigned                        flags;
214         int                             cpus;
215         cpumask_t                       cpumask;
216         atomic_t                        record_disabled;
217
218         struct mutex                    mutex;
219
220         struct ring_buffer_per_cpu      **buffers;
221 };
222
223 struct ring_buffer_iter {
224         struct ring_buffer_per_cpu      *cpu_buffer;
225         unsigned long                   head;
226         struct buffer_page              *head_page;
227         u64                             read_stamp;
228 };
229
230 #define RB_WARN_ON(buffer, cond)                                \
231         do {                                                    \
232                 if (unlikely(cond)) {                           \
233                         atomic_inc(&buffer->record_disabled);   \
234                         WARN_ON(1);                             \
235                 }                                               \
236         } while (0)
237
238 #define RB_WARN_ON_RET(buffer, cond)                            \
239         do {                                                    \
240                 if (unlikely(cond)) {                           \
241                         atomic_inc(&buffer->record_disabled);   \
242                         WARN_ON(1);                             \
243                         return -1;                              \
244                 }                                               \
245         } while (0)
246
247 #define RB_WARN_ON_ONCE(buffer, cond)                           \
248         do {                                                    \
249                 static int once;                                \
250                 if (unlikely(cond) && !once) {                  \
251                         once++;                                 \
252                         atomic_inc(&buffer->record_disabled);   \
253                         WARN_ON(1);                             \
254                 }                                               \
255         } while (0)
256
257 /**
258  * check_pages - integrity check of buffer pages
259  * @cpu_buffer: CPU buffer with pages to test
260  *
261  * As a safty measure we check to make sure the data pages have not
262  * been corrupted.
263  */
264 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
265 {
266         struct list_head *head = &cpu_buffer->pages;
267         struct buffer_page *page, *tmp;
268
269         RB_WARN_ON_RET(cpu_buffer, head->next->prev != head);
270         RB_WARN_ON_RET(cpu_buffer, head->prev->next != head);
271
272         list_for_each_entry_safe(page, tmp, head, list) {
273                 RB_WARN_ON_RET(cpu_buffer,
274                                page->list.next->prev != &page->list);
275                 RB_WARN_ON_RET(cpu_buffer,
276                                page->list.prev->next != &page->list);
277         }
278
279         return 0;
280 }
281
282 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
283                              unsigned nr_pages)
284 {
285         struct list_head *head = &cpu_buffer->pages;
286         struct buffer_page *page, *tmp;
287         unsigned long addr;
288         LIST_HEAD(pages);
289         unsigned i;
290
291         for (i = 0; i < nr_pages; i++) {
292                 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
293                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
294                 if (!page)
295                         goto free_pages;
296                 list_add(&page->list, &pages);
297
298                 addr = __get_free_page(GFP_KERNEL);
299                 if (!addr)
300                         goto free_pages;
301                 page->page = (void *)addr;
302         }
303
304         list_splice(&pages, head);
305
306         rb_check_pages(cpu_buffer);
307
308         return 0;
309
310  free_pages:
311         list_for_each_entry_safe(page, tmp, &pages, list) {
312                 list_del_init(&page->list);
313                 free_buffer_page(page);
314         }
315         return -ENOMEM;
316 }
317
318 static struct ring_buffer_per_cpu *
319 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
320 {
321         struct ring_buffer_per_cpu *cpu_buffer;
322         struct buffer_page *page;
323         unsigned long addr;
324         int ret;
325
326         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
327                                   GFP_KERNEL, cpu_to_node(cpu));
328         if (!cpu_buffer)
329                 return NULL;
330
331         cpu_buffer->cpu = cpu;
332         cpu_buffer->buffer = buffer;
333         spin_lock_init(&cpu_buffer->lock);
334         INIT_LIST_HEAD(&cpu_buffer->pages);
335
336         page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
337                             GFP_KERNEL, cpu_to_node(cpu));
338         if (!page)
339                 goto fail_free_buffer;
340
341         cpu_buffer->reader_page = page;
342         addr = __get_free_page(GFP_KERNEL);
343         if (!addr)
344                 goto fail_free_reader;
345         page->page = (void *)addr;
346
347         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
348
349         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
350         if (ret < 0)
351                 goto fail_free_reader;
352
353         cpu_buffer->head_page
354                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
355         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
356
357         return cpu_buffer;
358
359  fail_free_reader:
360         free_buffer_page(cpu_buffer->reader_page);
361
362  fail_free_buffer:
363         kfree(cpu_buffer);
364         return NULL;
365 }
366
367 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
368 {
369         struct list_head *head = &cpu_buffer->pages;
370         struct buffer_page *page, *tmp;
371
372         list_del_init(&cpu_buffer->reader_page->list);
373         free_buffer_page(cpu_buffer->reader_page);
374
375         list_for_each_entry_safe(page, tmp, head, list) {
376                 list_del_init(&page->list);
377                 free_buffer_page(page);
378         }
379         kfree(cpu_buffer);
380 }
381
382 /*
383  * Causes compile errors if the struct buffer_page gets bigger
384  * than the struct page.
385  */
386 extern int ring_buffer_page_too_big(void);
387
388 /**
389  * ring_buffer_alloc - allocate a new ring_buffer
390  * @size: the size in bytes per cpu that is needed.
391  * @flags: attributes to set for the ring buffer.
392  *
393  * Currently the only flag that is available is the RB_FL_OVERWRITE
394  * flag. This flag means that the buffer will overwrite old data
395  * when the buffer wraps. If this flag is not set, the buffer will
396  * drop data when the tail hits the head.
397  */
398 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
399 {
400         struct ring_buffer *buffer;
401         int bsize;
402         int cpu;
403
404         /* Paranoid! Optimizes out when all is well */
405         if (sizeof(struct buffer_page) > sizeof(struct page))
406                 ring_buffer_page_too_big();
407
408
409         /* keep it in its own cache line */
410         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
411                          GFP_KERNEL);
412         if (!buffer)
413                 return NULL;
414
415         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
416         buffer->flags = flags;
417
418         /* need at least two pages */
419         if (buffer->pages == 1)
420                 buffer->pages++;
421
422         buffer->cpumask = cpu_possible_map;
423         buffer->cpus = nr_cpu_ids;
424
425         bsize = sizeof(void *) * nr_cpu_ids;
426         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
427                                   GFP_KERNEL);
428         if (!buffer->buffers)
429                 goto fail_free_buffer;
430
431         for_each_buffer_cpu(buffer, cpu) {
432                 buffer->buffers[cpu] =
433                         rb_allocate_cpu_buffer(buffer, cpu);
434                 if (!buffer->buffers[cpu])
435                         goto fail_free_buffers;
436         }
437
438         mutex_init(&buffer->mutex);
439
440         return buffer;
441
442  fail_free_buffers:
443         for_each_buffer_cpu(buffer, cpu) {
444                 if (buffer->buffers[cpu])
445                         rb_free_cpu_buffer(buffer->buffers[cpu]);
446         }
447         kfree(buffer->buffers);
448
449  fail_free_buffer:
450         kfree(buffer);
451         return NULL;
452 }
453 EXPORT_SYMBOL_GPL(ring_buffer_alloc);
454
455 /**
456  * ring_buffer_free - free a ring buffer.
457  * @buffer: the buffer to free.
458  */
459 void
460 ring_buffer_free(struct ring_buffer *buffer)
461 {
462         int cpu;
463
464         for_each_buffer_cpu(buffer, cpu)
465                 rb_free_cpu_buffer(buffer->buffers[cpu]);
466
467         kfree(buffer);
468 }
469 EXPORT_SYMBOL_GPL(ring_buffer_free);
470
471 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
472
473 static void
474 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
475 {
476         struct buffer_page *page;
477         struct list_head *p;
478         unsigned i;
479
480         atomic_inc(&cpu_buffer->record_disabled);
481         synchronize_sched();
482
483         for (i = 0; i < nr_pages; i++) {
484                 BUG_ON(list_empty(&cpu_buffer->pages));
485                 p = cpu_buffer->pages.next;
486                 page = list_entry(p, struct buffer_page, list);
487                 list_del_init(&page->list);
488                 free_buffer_page(page);
489         }
490         BUG_ON(list_empty(&cpu_buffer->pages));
491
492         rb_reset_cpu(cpu_buffer);
493
494         rb_check_pages(cpu_buffer);
495
496         atomic_dec(&cpu_buffer->record_disabled);
497
498 }
499
500 static void
501 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
502                 struct list_head *pages, unsigned nr_pages)
503 {
504         struct buffer_page *page;
505         struct list_head *p;
506         unsigned i;
507
508         atomic_inc(&cpu_buffer->record_disabled);
509         synchronize_sched();
510
511         for (i = 0; i < nr_pages; i++) {
512                 BUG_ON(list_empty(pages));
513                 p = pages->next;
514                 page = list_entry(p, struct buffer_page, list);
515                 list_del_init(&page->list);
516                 list_add_tail(&page->list, &cpu_buffer->pages);
517         }
518         rb_reset_cpu(cpu_buffer);
519
520         rb_check_pages(cpu_buffer);
521
522         atomic_dec(&cpu_buffer->record_disabled);
523 }
524
525 /**
526  * ring_buffer_resize - resize the ring buffer
527  * @buffer: the buffer to resize.
528  * @size: the new size.
529  *
530  * The tracer is responsible for making sure that the buffer is
531  * not being used while changing the size.
532  * Note: We may be able to change the above requirement by using
533  *  RCU synchronizations.
534  *
535  * Minimum size is 2 * BUF_PAGE_SIZE.
536  *
537  * Returns -1 on failure.
538  */
539 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
540 {
541         struct ring_buffer_per_cpu *cpu_buffer;
542         unsigned nr_pages, rm_pages, new_pages;
543         struct buffer_page *page, *tmp;
544         unsigned long buffer_size;
545         unsigned long addr;
546         LIST_HEAD(pages);
547         int i, cpu;
548
549         /*
550          * Always succeed at resizing a non-existent buffer:
551          */
552         if (!buffer)
553                 return size;
554
555         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
556         size *= BUF_PAGE_SIZE;
557         buffer_size = buffer->pages * BUF_PAGE_SIZE;
558
559         /* we need a minimum of two pages */
560         if (size < BUF_PAGE_SIZE * 2)
561                 size = BUF_PAGE_SIZE * 2;
562
563         if (size == buffer_size)
564                 return size;
565
566         mutex_lock(&buffer->mutex);
567
568         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
569
570         if (size < buffer_size) {
571
572                 /* easy case, just free pages */
573                 BUG_ON(nr_pages >= buffer->pages);
574
575                 rm_pages = buffer->pages - nr_pages;
576
577                 for_each_buffer_cpu(buffer, cpu) {
578                         cpu_buffer = buffer->buffers[cpu];
579                         rb_remove_pages(cpu_buffer, rm_pages);
580                 }
581                 goto out;
582         }
583
584         /*
585          * This is a bit more difficult. We only want to add pages
586          * when we can allocate enough for all CPUs. We do this
587          * by allocating all the pages and storing them on a local
588          * link list. If we succeed in our allocation, then we
589          * add these pages to the cpu_buffers. Otherwise we just free
590          * them all and return -ENOMEM;
591          */
592         BUG_ON(nr_pages <= buffer->pages);
593         new_pages = nr_pages - buffer->pages;
594
595         for_each_buffer_cpu(buffer, cpu) {
596                 for (i = 0; i < new_pages; i++) {
597                         page = kzalloc_node(ALIGN(sizeof(*page),
598                                                   cache_line_size()),
599                                             GFP_KERNEL, cpu_to_node(cpu));
600                         if (!page)
601                                 goto free_pages;
602                         list_add(&page->list, &pages);
603                         addr = __get_free_page(GFP_KERNEL);
604                         if (!addr)
605                                 goto free_pages;
606                         page->page = (void *)addr;
607                 }
608         }
609
610         for_each_buffer_cpu(buffer, cpu) {
611                 cpu_buffer = buffer->buffers[cpu];
612                 rb_insert_pages(cpu_buffer, &pages, new_pages);
613         }
614
615         BUG_ON(!list_empty(&pages));
616
617  out:
618         buffer->pages = nr_pages;
619         mutex_unlock(&buffer->mutex);
620
621         return size;
622
623  free_pages:
624         list_for_each_entry_safe(page, tmp, &pages, list) {
625                 list_del_init(&page->list);
626                 free_buffer_page(page);
627         }
628         mutex_unlock(&buffer->mutex);
629         return -ENOMEM;
630 }
631 EXPORT_SYMBOL_GPL(ring_buffer_resize);
632
633 static inline int rb_null_event(struct ring_buffer_event *event)
634 {
635         return event->type == RINGBUF_TYPE_PADDING;
636 }
637
638 static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
639 {
640         return page->page + index;
641 }
642
643 static inline struct ring_buffer_event *
644 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
645 {
646         return __rb_page_index(cpu_buffer->reader_page,
647                                cpu_buffer->reader_page->read);
648 }
649
650 static inline struct ring_buffer_event *
651 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
652 {
653         return __rb_page_index(cpu_buffer->head_page,
654                                cpu_buffer->head_page->read);
655 }
656
657 static inline struct ring_buffer_event *
658 rb_iter_head_event(struct ring_buffer_iter *iter)
659 {
660         return __rb_page_index(iter->head_page, iter->head);
661 }
662
663 static inline unsigned rb_page_write(struct buffer_page *bpage)
664 {
665         return local_read(&bpage->write);
666 }
667
668 static inline unsigned rb_page_commit(struct buffer_page *bpage)
669 {
670         return local_read(&bpage->commit);
671 }
672
673 /* Size is determined by what has been commited */
674 static inline unsigned rb_page_size(struct buffer_page *bpage)
675 {
676         return rb_page_commit(bpage);
677 }
678
679 static inline unsigned
680 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
681 {
682         return rb_page_commit(cpu_buffer->commit_page);
683 }
684
685 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
686 {
687         return rb_page_commit(cpu_buffer->head_page);
688 }
689
690 /*
691  * When the tail hits the head and the buffer is in overwrite mode,
692  * the head jumps to the next page and all content on the previous
693  * page is discarded. But before doing so, we update the overrun
694  * variable of the buffer.
695  */
696 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
697 {
698         struct ring_buffer_event *event;
699         unsigned long head;
700
701         for (head = 0; head < rb_head_size(cpu_buffer);
702              head += rb_event_length(event)) {
703
704                 event = __rb_page_index(cpu_buffer->head_page, head);
705                 BUG_ON(rb_null_event(event));
706                 /* Only count data entries */
707                 if (event->type != RINGBUF_TYPE_DATA)
708                         continue;
709                 cpu_buffer->overrun++;
710                 cpu_buffer->entries--;
711         }
712 }
713
714 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
715                                struct buffer_page **page)
716 {
717         struct list_head *p = (*page)->list.next;
718
719         if (p == &cpu_buffer->pages)
720                 p = p->next;
721
722         *page = list_entry(p, struct buffer_page, list);
723 }
724
725 static inline unsigned
726 rb_event_index(struct ring_buffer_event *event)
727 {
728         unsigned long addr = (unsigned long)event;
729
730         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
731 }
732
733 static inline int
734 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
735              struct ring_buffer_event *event)
736 {
737         unsigned long addr = (unsigned long)event;
738         unsigned long index;
739
740         index = rb_event_index(event);
741         addr &= PAGE_MASK;
742
743         return cpu_buffer->commit_page->page == (void *)addr &&
744                 rb_commit_index(cpu_buffer) == index;
745 }
746
747 static inline void
748 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
749                     struct ring_buffer_event *event)
750 {
751         unsigned long addr = (unsigned long)event;
752         unsigned long index;
753
754         index = rb_event_index(event);
755         addr &= PAGE_MASK;
756
757         while (cpu_buffer->commit_page->page != (void *)addr) {
758                 RB_WARN_ON(cpu_buffer,
759                            cpu_buffer->commit_page == cpu_buffer->tail_page);
760                 cpu_buffer->commit_page->commit =
761                         cpu_buffer->commit_page->write;
762                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
763                 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
764         }
765
766         /* Now set the commit to the event's index */
767         local_set(&cpu_buffer->commit_page->commit, index);
768 }
769
770 static inline void
771 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
772 {
773         /*
774          * We only race with interrupts and NMIs on this CPU.
775          * If we own the commit event, then we can commit
776          * all others that interrupted us, since the interruptions
777          * are in stack format (they finish before they come
778          * back to us). This allows us to do a simple loop to
779          * assign the commit to the tail.
780          */
781         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
782                 cpu_buffer->commit_page->commit =
783                         cpu_buffer->commit_page->write;
784                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
785                 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
786                 /* add barrier to keep gcc from optimizing too much */
787                 barrier();
788         }
789         while (rb_commit_index(cpu_buffer) !=
790                rb_page_write(cpu_buffer->commit_page)) {
791                 cpu_buffer->commit_page->commit =
792                         cpu_buffer->commit_page->write;
793                 barrier();
794         }
795 }
796
797 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
798 {
799         cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
800         cpu_buffer->reader_page->read = 0;
801 }
802
803 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
804 {
805         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
806
807         /*
808          * The iterator could be on the reader page (it starts there).
809          * But the head could have moved, since the reader was
810          * found. Check for this case and assign the iterator
811          * to the head page instead of next.
812          */
813         if (iter->head_page == cpu_buffer->reader_page)
814                 iter->head_page = cpu_buffer->head_page;
815         else
816                 rb_inc_page(cpu_buffer, &iter->head_page);
817
818         iter->read_stamp = iter->head_page->time_stamp;
819         iter->head = 0;
820 }
821
822 /**
823  * ring_buffer_update_event - update event type and data
824  * @event: the even to update
825  * @type: the type of event
826  * @length: the size of the event field in the ring buffer
827  *
828  * Update the type and data fields of the event. The length
829  * is the actual size that is written to the ring buffer,
830  * and with this, we can determine what to place into the
831  * data field.
832  */
833 static inline void
834 rb_update_event(struct ring_buffer_event *event,
835                          unsigned type, unsigned length)
836 {
837         event->type = type;
838
839         switch (type) {
840
841         case RINGBUF_TYPE_PADDING:
842                 break;
843
844         case RINGBUF_TYPE_TIME_EXTEND:
845                 event->len =
846                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
847                         >> RB_ALIGNMENT_SHIFT;
848                 break;
849
850         case RINGBUF_TYPE_TIME_STAMP:
851                 event->len =
852                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
853                         >> RB_ALIGNMENT_SHIFT;
854                 break;
855
856         case RINGBUF_TYPE_DATA:
857                 length -= RB_EVNT_HDR_SIZE;
858                 if (length > RB_MAX_SMALL_DATA) {
859                         event->len = 0;
860                         event->array[0] = length;
861                 } else
862                         event->len =
863                                 (length + (RB_ALIGNMENT-1))
864                                 >> RB_ALIGNMENT_SHIFT;
865                 break;
866         default:
867                 BUG();
868         }
869 }
870
871 static inline unsigned rb_calculate_event_length(unsigned length)
872 {
873         struct ring_buffer_event event; /* Used only for sizeof array */
874
875         /* zero length can cause confusions */
876         if (!length)
877                 length = 1;
878
879         if (length > RB_MAX_SMALL_DATA)
880                 length += sizeof(event.array[0]);
881
882         length += RB_EVNT_HDR_SIZE;
883         length = ALIGN(length, RB_ALIGNMENT);
884
885         return length;
886 }
887
888 static struct ring_buffer_event *
889 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
890                   unsigned type, unsigned long length, u64 *ts)
891 {
892         struct buffer_page *tail_page, *head_page, *reader_page;
893         unsigned long tail, write;
894         struct ring_buffer *buffer = cpu_buffer->buffer;
895         struct ring_buffer_event *event;
896         unsigned long flags;
897
898         tail_page = cpu_buffer->tail_page;
899         write = local_add_return(length, &tail_page->write);
900         tail = write - length;
901
902         /* See if we shot pass the end of this buffer page */
903         if (write > BUF_PAGE_SIZE) {
904                 struct buffer_page *next_page = tail_page;
905
906                 spin_lock_irqsave(&cpu_buffer->lock, flags);
907
908                 rb_inc_page(cpu_buffer, &next_page);
909
910                 head_page = cpu_buffer->head_page;
911                 reader_page = cpu_buffer->reader_page;
912
913                 /* we grabbed the lock before incrementing */
914                 RB_WARN_ON(cpu_buffer, next_page == reader_page);
915
916                 /*
917                  * If for some reason, we had an interrupt storm that made
918                  * it all the way around the buffer, bail, and warn
919                  * about it.
920                  */
921                 if (unlikely(next_page == cpu_buffer->commit_page)) {
922                         WARN_ON_ONCE(1);
923                         goto out_unlock;
924                 }
925
926                 if (next_page == head_page) {
927                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
928                                 /* reset write */
929                                 if (tail <= BUF_PAGE_SIZE)
930                                         local_set(&tail_page->write, tail);
931                                 goto out_unlock;
932                         }
933
934                         /* tail_page has not moved yet? */
935                         if (tail_page == cpu_buffer->tail_page) {
936                                 /* count overflows */
937                                 rb_update_overflow(cpu_buffer);
938
939                                 rb_inc_page(cpu_buffer, &head_page);
940                                 cpu_buffer->head_page = head_page;
941                                 cpu_buffer->head_page->read = 0;
942                         }
943                 }
944
945                 /*
946                  * If the tail page is still the same as what we think
947                  * it is, then it is up to us to update the tail
948                  * pointer.
949                  */
950                 if (tail_page == cpu_buffer->tail_page) {
951                         local_set(&next_page->write, 0);
952                         local_set(&next_page->commit, 0);
953                         cpu_buffer->tail_page = next_page;
954
955                         /* reread the time stamp */
956                         *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
957                         cpu_buffer->tail_page->time_stamp = *ts;
958                 }
959
960                 /*
961                  * The actual tail page has moved forward.
962                  */
963                 if (tail < BUF_PAGE_SIZE) {
964                         /* Mark the rest of the page with padding */
965                         event = __rb_page_index(tail_page, tail);
966                         event->type = RINGBUF_TYPE_PADDING;
967                 }
968
969                 if (tail <= BUF_PAGE_SIZE)
970                         /* Set the write back to the previous setting */
971                         local_set(&tail_page->write, tail);
972
973                 /*
974                  * If this was a commit entry that failed,
975                  * increment that too
976                  */
977                 if (tail_page == cpu_buffer->commit_page &&
978                     tail == rb_commit_index(cpu_buffer)) {
979                         rb_set_commit_to_write(cpu_buffer);
980                 }
981
982                 spin_unlock_irqrestore(&cpu_buffer->lock, flags);
983
984                 /* fail and let the caller try again */
985                 return ERR_PTR(-EAGAIN);
986         }
987
988         /* We reserved something on the buffer */
989
990         BUG_ON(write > BUF_PAGE_SIZE);
991
992         event = __rb_page_index(tail_page, tail);
993         rb_update_event(event, type, length);
994
995         /*
996          * If this is a commit and the tail is zero, then update
997          * this page's time stamp.
998          */
999         if (!tail && rb_is_commit(cpu_buffer, event))
1000                 cpu_buffer->commit_page->time_stamp = *ts;
1001
1002         return event;
1003
1004  out_unlock:
1005         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1006         return NULL;
1007 }
1008
1009 static int
1010 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1011                   u64 *ts, u64 *delta)
1012 {
1013         struct ring_buffer_event *event;
1014         static int once;
1015         int ret;
1016
1017         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1018                 printk(KERN_WARNING "Delta way too big! %llu"
1019                        " ts=%llu write stamp = %llu\n",
1020                        (unsigned long long)*delta,
1021                        (unsigned long long)*ts,
1022                        (unsigned long long)cpu_buffer->write_stamp);
1023                 WARN_ON(1);
1024         }
1025
1026         /*
1027          * The delta is too big, we to add a
1028          * new timestamp.
1029          */
1030         event = __rb_reserve_next(cpu_buffer,
1031                                   RINGBUF_TYPE_TIME_EXTEND,
1032                                   RB_LEN_TIME_EXTEND,
1033                                   ts);
1034         if (!event)
1035                 return -EBUSY;
1036
1037         if (PTR_ERR(event) == -EAGAIN)
1038                 return -EAGAIN;
1039
1040         /* Only a commited time event can update the write stamp */
1041         if (rb_is_commit(cpu_buffer, event)) {
1042                 /*
1043                  * If this is the first on the page, then we need to
1044                  * update the page itself, and just put in a zero.
1045                  */
1046                 if (rb_event_index(event)) {
1047                         event->time_delta = *delta & TS_MASK;
1048                         event->array[0] = *delta >> TS_SHIFT;
1049                 } else {
1050                         cpu_buffer->commit_page->time_stamp = *ts;
1051                         event->time_delta = 0;
1052                         event->array[0] = 0;
1053                 }
1054                 cpu_buffer->write_stamp = *ts;
1055                 /* let the caller know this was the commit */
1056                 ret = 1;
1057         } else {
1058                 /* Darn, this is just wasted space */
1059                 event->time_delta = 0;
1060                 event->array[0] = 0;
1061                 ret = 0;
1062         }
1063
1064         *delta = 0;
1065
1066         return ret;
1067 }
1068
1069 static struct ring_buffer_event *
1070 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1071                       unsigned type, unsigned long length)
1072 {
1073         struct ring_buffer_event *event;
1074         u64 ts, delta;
1075         int commit = 0;
1076         int nr_loops = 0;
1077
1078  again:
1079         /*
1080          * We allow for interrupts to reenter here and do a trace.
1081          * If one does, it will cause this original code to loop
1082          * back here. Even with heavy interrupts happening, this
1083          * should only happen a few times in a row. If this happens
1084          * 1000 times in a row, there must be either an interrupt
1085          * storm or we have something buggy.
1086          * Bail!
1087          */
1088         if (unlikely(++nr_loops > 1000)) {
1089                 RB_WARN_ON(cpu_buffer, 1);
1090                 return NULL;
1091         }
1092
1093         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1094
1095         /*
1096          * Only the first commit can update the timestamp.
1097          * Yes there is a race here. If an interrupt comes in
1098          * just after the conditional and it traces too, then it
1099          * will also check the deltas. More than one timestamp may
1100          * also be made. But only the entry that did the actual
1101          * commit will be something other than zero.
1102          */
1103         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1104             rb_page_write(cpu_buffer->tail_page) ==
1105             rb_commit_index(cpu_buffer)) {
1106
1107                 delta = ts - cpu_buffer->write_stamp;
1108
1109                 /* make sure this delta is calculated here */
1110                 barrier();
1111
1112                 /* Did the write stamp get updated already? */
1113                 if (unlikely(ts < cpu_buffer->write_stamp))
1114                         delta = 0;
1115
1116                 if (test_time_stamp(delta)) {
1117
1118                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1119
1120                         if (commit == -EBUSY)
1121                                 return NULL;
1122
1123                         if (commit == -EAGAIN)
1124                                 goto again;
1125
1126                         RB_WARN_ON(cpu_buffer, commit < 0);
1127                 }
1128         } else
1129                 /* Non commits have zero deltas */
1130                 delta = 0;
1131
1132         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1133         if (PTR_ERR(event) == -EAGAIN)
1134                 goto again;
1135
1136         if (!event) {
1137                 if (unlikely(commit))
1138                         /*
1139                          * Ouch! We needed a timestamp and it was commited. But
1140                          * we didn't get our event reserved.
1141                          */
1142                         rb_set_commit_to_write(cpu_buffer);
1143                 return NULL;
1144         }
1145
1146         /*
1147          * If the timestamp was commited, make the commit our entry
1148          * now so that we will update it when needed.
1149          */
1150         if (commit)
1151                 rb_set_commit_event(cpu_buffer, event);
1152         else if (!rb_is_commit(cpu_buffer, event))
1153                 delta = 0;
1154
1155         event->time_delta = delta;
1156
1157         return event;
1158 }
1159
1160 static DEFINE_PER_CPU(int, rb_need_resched);
1161
1162 /**
1163  * ring_buffer_lock_reserve - reserve a part of the buffer
1164  * @buffer: the ring buffer to reserve from
1165  * @length: the length of the data to reserve (excluding event header)
1166  * @flags: a pointer to save the interrupt flags
1167  *
1168  * Returns a reseverd event on the ring buffer to copy directly to.
1169  * The user of this interface will need to get the body to write into
1170  * and can use the ring_buffer_event_data() interface.
1171  *
1172  * The length is the length of the data needed, not the event length
1173  * which also includes the event header.
1174  *
1175  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1176  * If NULL is returned, then nothing has been allocated or locked.
1177  */
1178 struct ring_buffer_event *
1179 ring_buffer_lock_reserve(struct ring_buffer *buffer,
1180                          unsigned long length,
1181                          unsigned long *flags)
1182 {
1183         struct ring_buffer_per_cpu *cpu_buffer;
1184         struct ring_buffer_event *event;
1185         int cpu, resched;
1186
1187         if (ring_buffers_off)
1188                 return NULL;
1189
1190         if (atomic_read(&buffer->record_disabled))
1191                 return NULL;
1192
1193         /* If we are tracing schedule, we don't want to recurse */
1194         resched = need_resched();
1195         preempt_disable_notrace();
1196
1197         cpu = raw_smp_processor_id();
1198
1199         if (!cpu_isset(cpu, buffer->cpumask))
1200                 goto out;
1201
1202         cpu_buffer = buffer->buffers[cpu];
1203
1204         if (atomic_read(&cpu_buffer->record_disabled))
1205                 goto out;
1206
1207         length = rb_calculate_event_length(length);
1208         if (length > BUF_PAGE_SIZE)
1209                 goto out;
1210
1211         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1212         if (!event)
1213                 goto out;
1214
1215         /*
1216          * Need to store resched state on this cpu.
1217          * Only the first needs to.
1218          */
1219
1220         if (preempt_count() == 1)
1221                 per_cpu(rb_need_resched, cpu) = resched;
1222
1223         return event;
1224
1225  out:
1226         if (resched)
1227                 preempt_enable_no_resched_notrace();
1228         else
1229                 preempt_enable_notrace();
1230         return NULL;
1231 }
1232 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1233
1234 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1235                       struct ring_buffer_event *event)
1236 {
1237         cpu_buffer->entries++;
1238
1239         /* Only process further if we own the commit */
1240         if (!rb_is_commit(cpu_buffer, event))
1241                 return;
1242
1243         cpu_buffer->write_stamp += event->time_delta;
1244
1245         rb_set_commit_to_write(cpu_buffer);
1246 }
1247
1248 /**
1249  * ring_buffer_unlock_commit - commit a reserved
1250  * @buffer: The buffer to commit to
1251  * @event: The event pointer to commit.
1252  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1253  *
1254  * This commits the data to the ring buffer, and releases any locks held.
1255  *
1256  * Must be paired with ring_buffer_lock_reserve.
1257  */
1258 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1259                               struct ring_buffer_event *event,
1260                               unsigned long flags)
1261 {
1262         struct ring_buffer_per_cpu *cpu_buffer;
1263         int cpu = raw_smp_processor_id();
1264
1265         cpu_buffer = buffer->buffers[cpu];
1266
1267         rb_commit(cpu_buffer, event);
1268
1269         /*
1270          * Only the last preempt count needs to restore preemption.
1271          */
1272         if (preempt_count() == 1) {
1273                 if (per_cpu(rb_need_resched, cpu))
1274                         preempt_enable_no_resched_notrace();
1275                 else
1276                         preempt_enable_notrace();
1277         } else
1278                 preempt_enable_no_resched_notrace();
1279
1280         return 0;
1281 }
1282 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1283
1284 /**
1285  * ring_buffer_write - write data to the buffer without reserving
1286  * @buffer: The ring buffer to write to.
1287  * @length: The length of the data being written (excluding the event header)
1288  * @data: The data to write to the buffer.
1289  *
1290  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1291  * one function. If you already have the data to write to the buffer, it
1292  * may be easier to simply call this function.
1293  *
1294  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1295  * and not the length of the event which would hold the header.
1296  */
1297 int ring_buffer_write(struct ring_buffer *buffer,
1298                         unsigned long length,
1299                         void *data)
1300 {
1301         struct ring_buffer_per_cpu *cpu_buffer;
1302         struct ring_buffer_event *event;
1303         unsigned long event_length;
1304         void *body;
1305         int ret = -EBUSY;
1306         int cpu, resched;
1307
1308         if (ring_buffers_off)
1309                 return -EBUSY;
1310
1311         if (atomic_read(&buffer->record_disabled))
1312                 return -EBUSY;
1313
1314         resched = need_resched();
1315         preempt_disable_notrace();
1316
1317         cpu = raw_smp_processor_id();
1318
1319         if (!cpu_isset(cpu, buffer->cpumask))
1320                 goto out;
1321
1322         cpu_buffer = buffer->buffers[cpu];
1323
1324         if (atomic_read(&cpu_buffer->record_disabled))
1325                 goto out;
1326
1327         event_length = rb_calculate_event_length(length);
1328         event = rb_reserve_next_event(cpu_buffer,
1329                                       RINGBUF_TYPE_DATA, event_length);
1330         if (!event)
1331                 goto out;
1332
1333         body = rb_event_data(event);
1334
1335         memcpy(body, data, length);
1336
1337         rb_commit(cpu_buffer, event);
1338
1339         ret = 0;
1340  out:
1341         if (resched)
1342                 preempt_enable_no_resched_notrace();
1343         else
1344                 preempt_enable_notrace();
1345
1346         return ret;
1347 }
1348 EXPORT_SYMBOL_GPL(ring_buffer_write);
1349
1350 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1351 {
1352         struct buffer_page *reader = cpu_buffer->reader_page;
1353         struct buffer_page *head = cpu_buffer->head_page;
1354         struct buffer_page *commit = cpu_buffer->commit_page;
1355
1356         return reader->read == rb_page_commit(reader) &&
1357                 (commit == reader ||
1358                  (commit == head &&
1359                   head->read == rb_page_commit(commit)));
1360 }
1361
1362 /**
1363  * ring_buffer_record_disable - stop all writes into the buffer
1364  * @buffer: The ring buffer to stop writes to.
1365  *
1366  * This prevents all writes to the buffer. Any attempt to write
1367  * to the buffer after this will fail and return NULL.
1368  *
1369  * The caller should call synchronize_sched() after this.
1370  */
1371 void ring_buffer_record_disable(struct ring_buffer *buffer)
1372 {
1373         atomic_inc(&buffer->record_disabled);
1374 }
1375 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1376
1377 /**
1378  * ring_buffer_record_enable - enable writes to the buffer
1379  * @buffer: The ring buffer to enable writes
1380  *
1381  * Note, multiple disables will need the same number of enables
1382  * to truely enable the writing (much like preempt_disable).
1383  */
1384 void ring_buffer_record_enable(struct ring_buffer *buffer)
1385 {
1386         atomic_dec(&buffer->record_disabled);
1387 }
1388 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1389
1390 /**
1391  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1392  * @buffer: The ring buffer to stop writes to.
1393  * @cpu: The CPU buffer to stop
1394  *
1395  * This prevents all writes to the buffer. Any attempt to write
1396  * to the buffer after this will fail and return NULL.
1397  *
1398  * The caller should call synchronize_sched() after this.
1399  */
1400 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1401 {
1402         struct ring_buffer_per_cpu *cpu_buffer;
1403
1404         if (!cpu_isset(cpu, buffer->cpumask))
1405                 return;
1406
1407         cpu_buffer = buffer->buffers[cpu];
1408         atomic_inc(&cpu_buffer->record_disabled);
1409 }
1410 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1411
1412 /**
1413  * ring_buffer_record_enable_cpu - enable writes to the buffer
1414  * @buffer: The ring buffer to enable writes
1415  * @cpu: The CPU to enable.
1416  *
1417  * Note, multiple disables will need the same number of enables
1418  * to truely enable the writing (much like preempt_disable).
1419  */
1420 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1421 {
1422         struct ring_buffer_per_cpu *cpu_buffer;
1423
1424         if (!cpu_isset(cpu, buffer->cpumask))
1425                 return;
1426
1427         cpu_buffer = buffer->buffers[cpu];
1428         atomic_dec(&cpu_buffer->record_disabled);
1429 }
1430 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1431
1432 /**
1433  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1434  * @buffer: The ring buffer
1435  * @cpu: The per CPU buffer to get the entries from.
1436  */
1437 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1438 {
1439         struct ring_buffer_per_cpu *cpu_buffer;
1440
1441         if (!cpu_isset(cpu, buffer->cpumask))
1442                 return 0;
1443
1444         cpu_buffer = buffer->buffers[cpu];
1445         return cpu_buffer->entries;
1446 }
1447 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1448
1449 /**
1450  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1451  * @buffer: The ring buffer
1452  * @cpu: The per CPU buffer to get the number of overruns from
1453  */
1454 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1455 {
1456         struct ring_buffer_per_cpu *cpu_buffer;
1457
1458         if (!cpu_isset(cpu, buffer->cpumask))
1459                 return 0;
1460
1461         cpu_buffer = buffer->buffers[cpu];
1462         return cpu_buffer->overrun;
1463 }
1464 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1465
1466 /**
1467  * ring_buffer_entries - get the number of entries in a buffer
1468  * @buffer: The ring buffer
1469  *
1470  * Returns the total number of entries in the ring buffer
1471  * (all CPU entries)
1472  */
1473 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1474 {
1475         struct ring_buffer_per_cpu *cpu_buffer;
1476         unsigned long entries = 0;
1477         int cpu;
1478
1479         /* if you care about this being correct, lock the buffer */
1480         for_each_buffer_cpu(buffer, cpu) {
1481                 cpu_buffer = buffer->buffers[cpu];
1482                 entries += cpu_buffer->entries;
1483         }
1484
1485         return entries;
1486 }
1487 EXPORT_SYMBOL_GPL(ring_buffer_entries);
1488
1489 /**
1490  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1491  * @buffer: The ring buffer
1492  *
1493  * Returns the total number of overruns in the ring buffer
1494  * (all CPU entries)
1495  */
1496 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1497 {
1498         struct ring_buffer_per_cpu *cpu_buffer;
1499         unsigned long overruns = 0;
1500         int cpu;
1501
1502         /* if you care about this being correct, lock the buffer */
1503         for_each_buffer_cpu(buffer, cpu) {
1504                 cpu_buffer = buffer->buffers[cpu];
1505                 overruns += cpu_buffer->overrun;
1506         }
1507
1508         return overruns;
1509 }
1510 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
1511
1512 /**
1513  * ring_buffer_iter_reset - reset an iterator
1514  * @iter: The iterator to reset
1515  *
1516  * Resets the iterator, so that it will start from the beginning
1517  * again.
1518  */
1519 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1520 {
1521         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1522
1523         /* Iterator usage is expected to have record disabled */
1524         if (list_empty(&cpu_buffer->reader_page->list)) {
1525                 iter->head_page = cpu_buffer->head_page;
1526                 iter->head = cpu_buffer->head_page->read;
1527         } else {
1528                 iter->head_page = cpu_buffer->reader_page;
1529                 iter->head = cpu_buffer->reader_page->read;
1530         }
1531         if (iter->head)
1532                 iter->read_stamp = cpu_buffer->read_stamp;
1533         else
1534                 iter->read_stamp = iter->head_page->time_stamp;
1535 }
1536 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
1537
1538 /**
1539  * ring_buffer_iter_empty - check if an iterator has no more to read
1540  * @iter: The iterator to check
1541  */
1542 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1543 {
1544         struct ring_buffer_per_cpu *cpu_buffer;
1545
1546         cpu_buffer = iter->cpu_buffer;
1547
1548         return iter->head_page == cpu_buffer->commit_page &&
1549                 iter->head == rb_commit_index(cpu_buffer);
1550 }
1551 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
1552
1553 static void
1554 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1555                      struct ring_buffer_event *event)
1556 {
1557         u64 delta;
1558
1559         switch (event->type) {
1560         case RINGBUF_TYPE_PADDING:
1561                 return;
1562
1563         case RINGBUF_TYPE_TIME_EXTEND:
1564                 delta = event->array[0];
1565                 delta <<= TS_SHIFT;
1566                 delta += event->time_delta;
1567                 cpu_buffer->read_stamp += delta;
1568                 return;
1569
1570         case RINGBUF_TYPE_TIME_STAMP:
1571                 /* FIXME: not implemented */
1572                 return;
1573
1574         case RINGBUF_TYPE_DATA:
1575                 cpu_buffer->read_stamp += event->time_delta;
1576                 return;
1577
1578         default:
1579                 BUG();
1580         }
1581         return;
1582 }
1583
1584 static void
1585 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1586                           struct ring_buffer_event *event)
1587 {
1588         u64 delta;
1589
1590         switch (event->type) {
1591         case RINGBUF_TYPE_PADDING:
1592                 return;
1593
1594         case RINGBUF_TYPE_TIME_EXTEND:
1595                 delta = event->array[0];
1596                 delta <<= TS_SHIFT;
1597                 delta += event->time_delta;
1598                 iter->read_stamp += delta;
1599                 return;
1600
1601         case RINGBUF_TYPE_TIME_STAMP:
1602                 /* FIXME: not implemented */
1603                 return;
1604
1605         case RINGBUF_TYPE_DATA:
1606                 iter->read_stamp += event->time_delta;
1607                 return;
1608
1609         default:
1610                 BUG();
1611         }
1612         return;
1613 }
1614
1615 static struct buffer_page *
1616 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1617 {
1618         struct buffer_page *reader = NULL;
1619         unsigned long flags;
1620         int nr_loops = 0;
1621
1622         spin_lock_irqsave(&cpu_buffer->lock, flags);
1623
1624  again:
1625         /*
1626          * This should normally only loop twice. But because the
1627          * start of the reader inserts an empty page, it causes
1628          * a case where we will loop three times. There should be no
1629          * reason to loop four times (that I know of).
1630          */
1631         if (unlikely(++nr_loops > 3)) {
1632                 RB_WARN_ON(cpu_buffer, 1);
1633                 reader = NULL;
1634                 goto out;
1635         }
1636
1637         reader = cpu_buffer->reader_page;
1638
1639         /* If there's more to read, return this page */
1640         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1641                 goto out;
1642
1643         /* Never should we have an index greater than the size */
1644         RB_WARN_ON(cpu_buffer,
1645                    cpu_buffer->reader_page->read > rb_page_size(reader));
1646
1647         /* check if we caught up to the tail */
1648         reader = NULL;
1649         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1650                 goto out;
1651
1652         /*
1653          * Splice the empty reader page into the list around the head.
1654          * Reset the reader page to size zero.
1655          */
1656
1657         reader = cpu_buffer->head_page;
1658         cpu_buffer->reader_page->list.next = reader->list.next;
1659         cpu_buffer->reader_page->list.prev = reader->list.prev;
1660
1661         local_set(&cpu_buffer->reader_page->write, 0);
1662         local_set(&cpu_buffer->reader_page->commit, 0);
1663
1664         /* Make the reader page now replace the head */
1665         reader->list.prev->next = &cpu_buffer->reader_page->list;
1666         reader->list.next->prev = &cpu_buffer->reader_page->list;
1667
1668         /*
1669          * If the tail is on the reader, then we must set the head
1670          * to the inserted page, otherwise we set it one before.
1671          */
1672         cpu_buffer->head_page = cpu_buffer->reader_page;
1673
1674         if (cpu_buffer->commit_page != reader)
1675                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1676
1677         /* Finally update the reader page to the new head */
1678         cpu_buffer->reader_page = reader;
1679         rb_reset_reader_page(cpu_buffer);
1680
1681         goto again;
1682
1683  out:
1684         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1685
1686         return reader;
1687 }
1688
1689 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1690 {
1691         struct ring_buffer_event *event;
1692         struct buffer_page *reader;
1693         unsigned length;
1694
1695         reader = rb_get_reader_page(cpu_buffer);
1696
1697         /* This function should not be called when buffer is empty */
1698         BUG_ON(!reader);
1699
1700         event = rb_reader_event(cpu_buffer);
1701
1702         if (event->type == RINGBUF_TYPE_DATA)
1703                 cpu_buffer->entries--;
1704
1705         rb_update_read_stamp(cpu_buffer, event);
1706
1707         length = rb_event_length(event);
1708         cpu_buffer->reader_page->read += length;
1709 }
1710
1711 static void rb_advance_iter(struct ring_buffer_iter *iter)
1712 {
1713         struct ring_buffer *buffer;
1714         struct ring_buffer_per_cpu *cpu_buffer;
1715         struct ring_buffer_event *event;
1716         unsigned length;
1717
1718         cpu_buffer = iter->cpu_buffer;
1719         buffer = cpu_buffer->buffer;
1720
1721         /*
1722          * Check if we are at the end of the buffer.
1723          */
1724         if (iter->head >= rb_page_size(iter->head_page)) {
1725                 BUG_ON(iter->head_page == cpu_buffer->commit_page);
1726                 rb_inc_iter(iter);
1727                 return;
1728         }
1729
1730         event = rb_iter_head_event(iter);
1731
1732         length = rb_event_length(event);
1733
1734         /*
1735          * This should not be called to advance the header if we are
1736          * at the tail of the buffer.
1737          */
1738         BUG_ON((iter->head_page == cpu_buffer->commit_page) &&
1739                (iter->head + length > rb_commit_index(cpu_buffer)));
1740
1741         rb_update_iter_read_stamp(iter, event);
1742
1743         iter->head += length;
1744
1745         /* check for end of page padding */
1746         if ((iter->head >= rb_page_size(iter->head_page)) &&
1747             (iter->head_page != cpu_buffer->commit_page))
1748                 rb_advance_iter(iter);
1749 }
1750
1751 /**
1752  * ring_buffer_peek - peek at the next event to be read
1753  * @buffer: The ring buffer to read
1754  * @cpu: The cpu to peak at
1755  * @ts: The timestamp counter of this event.
1756  *
1757  * This will return the event that will be read next, but does
1758  * not consume the data.
1759  */
1760 struct ring_buffer_event *
1761 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1762 {
1763         struct ring_buffer_per_cpu *cpu_buffer;
1764         struct ring_buffer_event *event;
1765         struct buffer_page *reader;
1766         int nr_loops = 0;
1767
1768         if (!cpu_isset(cpu, buffer->cpumask))
1769                 return NULL;
1770
1771         cpu_buffer = buffer->buffers[cpu];
1772
1773  again:
1774         /*
1775          * We repeat when a timestamp is encountered. It is possible
1776          * to get multiple timestamps from an interrupt entering just
1777          * as one timestamp is about to be written. The max times
1778          * that this can happen is the number of nested interrupts we
1779          * can have.  Nesting 10 deep of interrupts is clearly
1780          * an anomaly.
1781          */
1782         if (unlikely(++nr_loops > 10)) {
1783                 RB_WARN_ON(cpu_buffer, 1);
1784                 return NULL;
1785         }
1786
1787         reader = rb_get_reader_page(cpu_buffer);
1788         if (!reader)
1789                 return NULL;
1790
1791         event = rb_reader_event(cpu_buffer);
1792
1793         switch (event->type) {
1794         case RINGBUF_TYPE_PADDING:
1795                 RB_WARN_ON(cpu_buffer, 1);
1796                 rb_advance_reader(cpu_buffer);
1797                 return NULL;
1798
1799         case RINGBUF_TYPE_TIME_EXTEND:
1800                 /* Internal data, OK to advance */
1801                 rb_advance_reader(cpu_buffer);
1802                 goto again;
1803
1804         case RINGBUF_TYPE_TIME_STAMP:
1805                 /* FIXME: not implemented */
1806                 rb_advance_reader(cpu_buffer);
1807                 goto again;
1808
1809         case RINGBUF_TYPE_DATA:
1810                 if (ts) {
1811                         *ts = cpu_buffer->read_stamp + event->time_delta;
1812                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1813                 }
1814                 return event;
1815
1816         default:
1817                 BUG();
1818         }
1819
1820         return NULL;
1821 }
1822 EXPORT_SYMBOL_GPL(ring_buffer_peek);
1823
1824 /**
1825  * ring_buffer_iter_peek - peek at the next event to be read
1826  * @iter: The ring buffer iterator
1827  * @ts: The timestamp counter of this event.
1828  *
1829  * This will return the event that will be read next, but does
1830  * not increment the iterator.
1831  */
1832 struct ring_buffer_event *
1833 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1834 {
1835         struct ring_buffer *buffer;
1836         struct ring_buffer_per_cpu *cpu_buffer;
1837         struct ring_buffer_event *event;
1838         int nr_loops = 0;
1839
1840         if (ring_buffer_iter_empty(iter))
1841                 return NULL;
1842
1843         cpu_buffer = iter->cpu_buffer;
1844         buffer = cpu_buffer->buffer;
1845
1846  again:
1847         /*
1848          * We repeat when a timestamp is encountered. It is possible
1849          * to get multiple timestamps from an interrupt entering just
1850          * as one timestamp is about to be written. The max times
1851          * that this can happen is the number of nested interrupts we
1852          * can have. Nesting 10 deep of interrupts is clearly
1853          * an anomaly.
1854          */
1855         if (unlikely(++nr_loops > 10)) {
1856                 RB_WARN_ON(cpu_buffer, 1);
1857                 return NULL;
1858         }
1859
1860         if (rb_per_cpu_empty(cpu_buffer))
1861                 return NULL;
1862
1863         event = rb_iter_head_event(iter);
1864
1865         switch (event->type) {
1866         case RINGBUF_TYPE_PADDING:
1867                 rb_inc_iter(iter);
1868                 goto again;
1869
1870         case RINGBUF_TYPE_TIME_EXTEND:
1871                 /* Internal data, OK to advance */
1872                 rb_advance_iter(iter);
1873                 goto again;
1874
1875         case RINGBUF_TYPE_TIME_STAMP:
1876                 /* FIXME: not implemented */
1877                 rb_advance_iter(iter);
1878                 goto again;
1879
1880         case RINGBUF_TYPE_DATA:
1881                 if (ts) {
1882                         *ts = iter->read_stamp + event->time_delta;
1883                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1884                 }
1885                 return event;
1886
1887         default:
1888                 BUG();
1889         }
1890
1891         return NULL;
1892 }
1893 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
1894
1895 /**
1896  * ring_buffer_consume - return an event and consume it
1897  * @buffer: The ring buffer to get the next event from
1898  *
1899  * Returns the next event in the ring buffer, and that event is consumed.
1900  * Meaning, that sequential reads will keep returning a different event,
1901  * and eventually empty the ring buffer if the producer is slower.
1902  */
1903 struct ring_buffer_event *
1904 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1905 {
1906         struct ring_buffer_per_cpu *cpu_buffer;
1907         struct ring_buffer_event *event;
1908
1909         if (!cpu_isset(cpu, buffer->cpumask))
1910                 return NULL;
1911
1912         event = ring_buffer_peek(buffer, cpu, ts);
1913         if (!event)
1914                 return NULL;
1915
1916         cpu_buffer = buffer->buffers[cpu];
1917         rb_advance_reader(cpu_buffer);
1918
1919         return event;
1920 }
1921 EXPORT_SYMBOL_GPL(ring_buffer_consume);
1922
1923 /**
1924  * ring_buffer_read_start - start a non consuming read of the buffer
1925  * @buffer: The ring buffer to read from
1926  * @cpu: The cpu buffer to iterate over
1927  *
1928  * This starts up an iteration through the buffer. It also disables
1929  * the recording to the buffer until the reading is finished.
1930  * This prevents the reading from being corrupted. This is not
1931  * a consuming read, so a producer is not expected.
1932  *
1933  * Must be paired with ring_buffer_finish.
1934  */
1935 struct ring_buffer_iter *
1936 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1937 {
1938         struct ring_buffer_per_cpu *cpu_buffer;
1939         struct ring_buffer_iter *iter;
1940         unsigned long flags;
1941
1942         if (!cpu_isset(cpu, buffer->cpumask))
1943                 return NULL;
1944
1945         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1946         if (!iter)
1947                 return NULL;
1948
1949         cpu_buffer = buffer->buffers[cpu];
1950
1951         iter->cpu_buffer = cpu_buffer;
1952
1953         atomic_inc(&cpu_buffer->record_disabled);
1954         synchronize_sched();
1955
1956         spin_lock_irqsave(&cpu_buffer->lock, flags);
1957         ring_buffer_iter_reset(iter);
1958         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
1959
1960         return iter;
1961 }
1962 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
1963
1964 /**
1965  * ring_buffer_finish - finish reading the iterator of the buffer
1966  * @iter: The iterator retrieved by ring_buffer_start
1967  *
1968  * This re-enables the recording to the buffer, and frees the
1969  * iterator.
1970  */
1971 void
1972 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1973 {
1974         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1975
1976         atomic_dec(&cpu_buffer->record_disabled);
1977         kfree(iter);
1978 }
1979 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
1980
1981 /**
1982  * ring_buffer_read - read the next item in the ring buffer by the iterator
1983  * @iter: The ring buffer iterator
1984  * @ts: The time stamp of the event read.
1985  *
1986  * This reads the next event in the ring buffer and increments the iterator.
1987  */
1988 struct ring_buffer_event *
1989 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1990 {
1991         struct ring_buffer_event *event;
1992
1993         event = ring_buffer_iter_peek(iter, ts);
1994         if (!event)
1995                 return NULL;
1996
1997         rb_advance_iter(iter);
1998
1999         return event;
2000 }
2001 EXPORT_SYMBOL_GPL(ring_buffer_read);
2002
2003 /**
2004  * ring_buffer_size - return the size of the ring buffer (in bytes)
2005  * @buffer: The ring buffer.
2006  */
2007 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2008 {
2009         return BUF_PAGE_SIZE * buffer->pages;
2010 }
2011 EXPORT_SYMBOL_GPL(ring_buffer_size);
2012
2013 static void
2014 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2015 {
2016         cpu_buffer->head_page
2017                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2018         local_set(&cpu_buffer->head_page->write, 0);
2019         local_set(&cpu_buffer->head_page->commit, 0);
2020
2021         cpu_buffer->head_page->read = 0;
2022
2023         cpu_buffer->tail_page = cpu_buffer->head_page;
2024         cpu_buffer->commit_page = cpu_buffer->head_page;
2025
2026         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2027         local_set(&cpu_buffer->reader_page->write, 0);
2028         local_set(&cpu_buffer->reader_page->commit, 0);
2029         cpu_buffer->reader_page->read = 0;
2030
2031         cpu_buffer->overrun = 0;
2032         cpu_buffer->entries = 0;
2033 }
2034
2035 /**
2036  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2037  * @buffer: The ring buffer to reset a per cpu buffer of
2038  * @cpu: The CPU buffer to be reset
2039  */
2040 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2041 {
2042         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2043         unsigned long flags;
2044
2045         if (!cpu_isset(cpu, buffer->cpumask))
2046                 return;
2047
2048         spin_lock_irqsave(&cpu_buffer->lock, flags);
2049
2050         rb_reset_cpu(cpu_buffer);
2051
2052         spin_unlock_irqrestore(&cpu_buffer->lock, flags);
2053 }
2054 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2055
2056 /**
2057  * ring_buffer_reset - reset a ring buffer
2058  * @buffer: The ring buffer to reset all cpu buffers
2059  */
2060 void ring_buffer_reset(struct ring_buffer *buffer)
2061 {
2062         int cpu;
2063
2064         for_each_buffer_cpu(buffer, cpu)
2065                 ring_buffer_reset_cpu(buffer, cpu);
2066 }
2067 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2068
2069 /**
2070  * rind_buffer_empty - is the ring buffer empty?
2071  * @buffer: The ring buffer to test
2072  */
2073 int ring_buffer_empty(struct ring_buffer *buffer)
2074 {
2075         struct ring_buffer_per_cpu *cpu_buffer;
2076         int cpu;
2077
2078         /* yes this is racy, but if you don't like the race, lock the buffer */
2079         for_each_buffer_cpu(buffer, cpu) {
2080                 cpu_buffer = buffer->buffers[cpu];
2081                 if (!rb_per_cpu_empty(cpu_buffer))
2082                         return 0;
2083         }
2084         return 1;
2085 }
2086 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2087
2088 /**
2089  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2090  * @buffer: The ring buffer
2091  * @cpu: The CPU buffer to test
2092  */
2093 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2094 {
2095         struct ring_buffer_per_cpu *cpu_buffer;
2096
2097         if (!cpu_isset(cpu, buffer->cpumask))
2098                 return 1;
2099
2100         cpu_buffer = buffer->buffers[cpu];
2101         return rb_per_cpu_empty(cpu_buffer);
2102 }
2103 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2104
2105 /**
2106  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2107  * @buffer_a: One buffer to swap with
2108  * @buffer_b: The other buffer to swap with
2109  *
2110  * This function is useful for tracers that want to take a "snapshot"
2111  * of a CPU buffer and has another back up buffer lying around.
2112  * it is expected that the tracer handles the cpu buffer not being
2113  * used at the moment.
2114  */
2115 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2116                          struct ring_buffer *buffer_b, int cpu)
2117 {
2118         struct ring_buffer_per_cpu *cpu_buffer_a;
2119         struct ring_buffer_per_cpu *cpu_buffer_b;
2120
2121         if (!cpu_isset(cpu, buffer_a->cpumask) ||
2122             !cpu_isset(cpu, buffer_b->cpumask))
2123                 return -EINVAL;
2124
2125         /* At least make sure the two buffers are somewhat the same */
2126         if (buffer_a->size != buffer_b->size ||
2127             buffer_a->pages != buffer_b->pages)
2128                 return -EINVAL;
2129
2130         cpu_buffer_a = buffer_a->buffers[cpu];
2131         cpu_buffer_b = buffer_b->buffers[cpu];
2132
2133         /*
2134          * We can't do a synchronize_sched here because this
2135          * function can be called in atomic context.
2136          * Normally this will be called from the same CPU as cpu.
2137          * If not it's up to the caller to protect this.
2138          */
2139         atomic_inc(&cpu_buffer_a->record_disabled);
2140         atomic_inc(&cpu_buffer_b->record_disabled);
2141
2142         buffer_a->buffers[cpu] = cpu_buffer_b;
2143         buffer_b->buffers[cpu] = cpu_buffer_a;
2144
2145         cpu_buffer_b->buffer = buffer_a;
2146         cpu_buffer_a->buffer = buffer_b;
2147
2148         atomic_dec(&cpu_buffer_a->record_disabled);
2149         atomic_dec(&cpu_buffer_b->record_disabled);
2150
2151         return 0;
2152 }
2153 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2154
2155 static ssize_t
2156 rb_simple_read(struct file *filp, char __user *ubuf,
2157                size_t cnt, loff_t *ppos)
2158 {
2159         int *p = filp->private_data;
2160         char buf[64];
2161         int r;
2162
2163         /* !ring_buffers_off == tracing_on */
2164         r = sprintf(buf, "%d\n", !*p);
2165
2166         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2167 }
2168
2169 static ssize_t
2170 rb_simple_write(struct file *filp, const char __user *ubuf,
2171                 size_t cnt, loff_t *ppos)
2172 {
2173         int *p = filp->private_data;
2174         char buf[64];
2175         long val;
2176         int ret;
2177
2178         if (cnt >= sizeof(buf))
2179                 return -EINVAL;
2180
2181         if (copy_from_user(&buf, ubuf, cnt))
2182                 return -EFAULT;
2183
2184         buf[cnt] = 0;
2185
2186         ret = strict_strtoul(buf, 10, &val);
2187         if (ret < 0)
2188                 return ret;
2189
2190         /* !ring_buffers_off == tracing_on */
2191         *p = !val;
2192
2193         (*ppos)++;
2194
2195         return cnt;
2196 }
2197
2198 static struct file_operations rb_simple_fops = {
2199         .open           = tracing_open_generic,
2200         .read           = rb_simple_read,
2201         .write          = rb_simple_write,
2202 };
2203
2204
2205 static __init int rb_init_debugfs(void)
2206 {
2207         struct dentry *d_tracer;
2208         struct dentry *entry;
2209
2210         d_tracer = tracing_init_dentry();
2211
2212         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2213                                     &ring_buffers_off, &rb_simple_fops);
2214         if (!entry)
2215                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2216
2217         return 0;
2218 }
2219
2220 fs_initcall(rb_init_debugfs);