tracing: fix typos in comments
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/ftrace_irq.h>
8 #include <linux/spinlock.h>
9 #include <linux/debugfs.h>
10 #include <linux/uaccess.h>
11 #include <linux/hardirq.h>
12 #include <linux/module.h>
13 #include <linux/percpu.h>
14 #include <linux/mutex.h>
15 #include <linux/sched.h>        /* used for sched_clock() (for now) */
16 #include <linux/init.h>
17 #include <linux/hash.h>
18 #include <linux/list.h>
19 #include <linux/fs.h>
20
21 #include "trace.h"
22
23 /*
24  * A fast way to enable or disable all ring buffers is to
25  * call tracing_on or tracing_off. Turning off the ring buffers
26  * prevents all ring buffers from being recorded to.
27  * Turning this switch on, makes it OK to write to the
28  * ring buffer, if the ring buffer is enabled itself.
29  *
30  * There's three layers that must be on in order to write
31  * to the ring buffer.
32  *
33  * 1) This global flag must be set.
34  * 2) The ring buffer must be enabled for recording.
35  * 3) The per cpu buffer must be enabled for recording.
36  *
37  * In case of an anomaly, this global flag has a bit set that
38  * will permantly disable all ring buffers.
39  */
40
41 /*
42  * Global flag to disable all recording to ring buffers
43  *  This has two bits: ON, DISABLED
44  *
45  *  ON   DISABLED
46  * ---- ----------
47  *   0      0        : ring buffers are off
48  *   1      0        : ring buffers are on
49  *   X      1        : ring buffers are permanently disabled
50  */
51
52 enum {
53         RB_BUFFERS_ON_BIT       = 0,
54         RB_BUFFERS_DISABLED_BIT = 1,
55 };
56
57 enum {
58         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
59         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
60 };
61
62 static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
63
64 /**
65  * tracing_on - enable all tracing buffers
66  *
67  * This function enables all tracing buffers that may have been
68  * disabled with tracing_off.
69  */
70 void tracing_on(void)
71 {
72         set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
73 }
74 EXPORT_SYMBOL_GPL(tracing_on);
75
76 /**
77  * tracing_off - turn off all tracing buffers
78  *
79  * This function stops all tracing buffers from recording data.
80  * It does not disable any overhead the tracers themselves may
81  * be causing. This function simply causes all recording to
82  * the ring buffers to fail.
83  */
84 void tracing_off(void)
85 {
86         clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
87 }
88 EXPORT_SYMBOL_GPL(tracing_off);
89
90 /**
91  * tracing_off_permanent - permanently disable ring buffers
92  *
93  * This function, once called, will disable all ring buffers
94  * permanently.
95  */
96 void tracing_off_permanent(void)
97 {
98         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
99 }
100
101 #include "trace.h"
102
103 /* Up this if you want to test the TIME_EXTENTS and normalization */
104 #define DEBUG_SHIFT 0
105
106 /* FIXME!!! */
107 u64 ring_buffer_time_stamp(int cpu)
108 {
109         u64 time;
110
111         preempt_disable_notrace();
112         /* shift to debug/test normalization and TIME_EXTENTS */
113         time = sched_clock() << DEBUG_SHIFT;
114         preempt_enable_no_resched_notrace();
115
116         return time;
117 }
118 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
119
120 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
121 {
122         /* Just stupid testing the normalize function and deltas */
123         *ts >>= DEBUG_SHIFT;
124 }
125 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
126
127 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
128 #define RB_ALIGNMENT            4U
129 #define RB_MAX_SMALL_DATA       28
130
131 enum {
132         RB_LEN_TIME_EXTEND = 8,
133         RB_LEN_TIME_STAMP = 16,
134 };
135
136 /* inline for ring buffer fast paths */
137 static unsigned
138 rb_event_length(struct ring_buffer_event *event)
139 {
140         unsigned length;
141
142         switch (event->type) {
143         case RINGBUF_TYPE_PADDING:
144                 /* undefined */
145                 return -1;
146
147         case RINGBUF_TYPE_TIME_EXTEND:
148                 return RB_LEN_TIME_EXTEND;
149
150         case RINGBUF_TYPE_TIME_STAMP:
151                 return RB_LEN_TIME_STAMP;
152
153         case RINGBUF_TYPE_DATA:
154                 if (event->len)
155                         length = event->len * RB_ALIGNMENT;
156                 else
157                         length = event->array[0];
158                 return length + RB_EVNT_HDR_SIZE;
159         default:
160                 BUG();
161         }
162         /* not hit */
163         return 0;
164 }
165
166 /**
167  * ring_buffer_event_length - return the length of the event
168  * @event: the event to get the length of
169  */
170 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
171 {
172         unsigned length = rb_event_length(event);
173         if (event->type != RINGBUF_TYPE_DATA)
174                 return length;
175         length -= RB_EVNT_HDR_SIZE;
176         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
177                 length -= sizeof(event->array[0]);
178         return length;
179 }
180 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
181
182 /* inline for ring buffer fast paths */
183 static void *
184 rb_event_data(struct ring_buffer_event *event)
185 {
186         BUG_ON(event->type != RINGBUF_TYPE_DATA);
187         /* If length is in len field, then array[0] has the data */
188         if (event->len)
189                 return (void *)&event->array[0];
190         /* Otherwise length is in array[0] and array[1] has the data */
191         return (void *)&event->array[1];
192 }
193
194 /**
195  * ring_buffer_event_data - return the data of the event
196  * @event: the event to get the data from
197  */
198 void *ring_buffer_event_data(struct ring_buffer_event *event)
199 {
200         return rb_event_data(event);
201 }
202 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
203
204 #define for_each_buffer_cpu(buffer, cpu)                \
205         for_each_cpu(cpu, buffer->cpumask)
206
207 #define TS_SHIFT        27
208 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
209 #define TS_DELTA_TEST   (~TS_MASK)
210
211 struct buffer_data_page {
212         u64              time_stamp;    /* page time stamp */
213         local_t          commit;        /* write committed index */
214         unsigned char    data[];        /* data of buffer page */
215 };
216
217 struct buffer_page {
218         local_t          write;         /* index for next write */
219         unsigned         read;          /* index for next read */
220         struct list_head list;          /* list of free pages */
221         struct buffer_data_page *page;  /* Actual data page */
222 };
223
224 static void rb_init_page(struct buffer_data_page *bpage)
225 {
226         local_set(&bpage->commit, 0);
227 }
228
229 /*
230  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
231  * this issue out.
232  */
233 static void free_buffer_page(struct buffer_page *bpage)
234 {
235         free_page((unsigned long)bpage->page);
236         kfree(bpage);
237 }
238
239 /*
240  * We need to fit the time_stamp delta into 27 bits.
241  */
242 static inline int test_time_stamp(u64 delta)
243 {
244         if (delta & TS_DELTA_TEST)
245                 return 1;
246         return 0;
247 }
248
249 #define BUF_PAGE_SIZE (PAGE_SIZE - offsetof(struct buffer_data_page, data))
250
251 /*
252  * head_page == tail_page && head == tail then buffer is empty.
253  */
254 struct ring_buffer_per_cpu {
255         int                             cpu;
256         struct ring_buffer              *buffer;
257         spinlock_t                      reader_lock; /* serialize readers */
258         raw_spinlock_t                  lock;
259         struct lock_class_key           lock_key;
260         struct list_head                pages;
261         struct buffer_page              *head_page;     /* read from head */
262         struct buffer_page              *tail_page;     /* write to tail */
263         struct buffer_page              *commit_page;   /* committed pages */
264         struct buffer_page              *reader_page;
265         unsigned long                   overrun;
266         unsigned long                   entries;
267         u64                             write_stamp;
268         u64                             read_stamp;
269         atomic_t                        record_disabled;
270 };
271
272 struct ring_buffer {
273         unsigned                        pages;
274         unsigned                        flags;
275         int                             cpus;
276         cpumask_var_t                   cpumask;
277         atomic_t                        record_disabled;
278
279         struct mutex                    mutex;
280
281         struct ring_buffer_per_cpu      **buffers;
282 };
283
284 struct ring_buffer_iter {
285         struct ring_buffer_per_cpu      *cpu_buffer;
286         unsigned long                   head;
287         struct buffer_page              *head_page;
288         u64                             read_stamp;
289 };
290
291 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
292 #define RB_WARN_ON(buffer, cond)                                \
293         ({                                                      \
294                 int _____ret = unlikely(cond);                  \
295                 if (_____ret) {                                 \
296                         atomic_inc(&buffer->record_disabled);   \
297                         WARN_ON(1);                             \
298                 }                                               \
299                 _____ret;                                       \
300         })
301
302 /**
303  * check_pages - integrity check of buffer pages
304  * @cpu_buffer: CPU buffer with pages to test
305  *
306  * As a safety measure we check to make sure the data pages have not
307  * been corrupted.
308  */
309 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
310 {
311         struct list_head *head = &cpu_buffer->pages;
312         struct buffer_page *bpage, *tmp;
313
314         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
315                 return -1;
316         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
317                 return -1;
318
319         list_for_each_entry_safe(bpage, tmp, head, list) {
320                 if (RB_WARN_ON(cpu_buffer,
321                                bpage->list.next->prev != &bpage->list))
322                         return -1;
323                 if (RB_WARN_ON(cpu_buffer,
324                                bpage->list.prev->next != &bpage->list))
325                         return -1;
326         }
327
328         return 0;
329 }
330
331 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
332                              unsigned nr_pages)
333 {
334         struct list_head *head = &cpu_buffer->pages;
335         struct buffer_page *bpage, *tmp;
336         unsigned long addr;
337         LIST_HEAD(pages);
338         unsigned i;
339
340         for (i = 0; i < nr_pages; i++) {
341                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
342                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
343                 if (!bpage)
344                         goto free_pages;
345                 list_add(&bpage->list, &pages);
346
347                 addr = __get_free_page(GFP_KERNEL);
348                 if (!addr)
349                         goto free_pages;
350                 bpage->page = (void *)addr;
351                 rb_init_page(bpage->page);
352         }
353
354         list_splice(&pages, head);
355
356         rb_check_pages(cpu_buffer);
357
358         return 0;
359
360  free_pages:
361         list_for_each_entry_safe(bpage, tmp, &pages, list) {
362                 list_del_init(&bpage->list);
363                 free_buffer_page(bpage);
364         }
365         return -ENOMEM;
366 }
367
368 static struct ring_buffer_per_cpu *
369 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
370 {
371         struct ring_buffer_per_cpu *cpu_buffer;
372         struct buffer_page *bpage;
373         unsigned long addr;
374         int ret;
375
376         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
377                                   GFP_KERNEL, cpu_to_node(cpu));
378         if (!cpu_buffer)
379                 return NULL;
380
381         cpu_buffer->cpu = cpu;
382         cpu_buffer->buffer = buffer;
383         spin_lock_init(&cpu_buffer->reader_lock);
384         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
385         INIT_LIST_HEAD(&cpu_buffer->pages);
386
387         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
388                             GFP_KERNEL, cpu_to_node(cpu));
389         if (!bpage)
390                 goto fail_free_buffer;
391
392         cpu_buffer->reader_page = bpage;
393         addr = __get_free_page(GFP_KERNEL);
394         if (!addr)
395                 goto fail_free_reader;
396         bpage->page = (void *)addr;
397         rb_init_page(bpage->page);
398
399         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
400
401         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
402         if (ret < 0)
403                 goto fail_free_reader;
404
405         cpu_buffer->head_page
406                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
407         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
408
409         return cpu_buffer;
410
411  fail_free_reader:
412         free_buffer_page(cpu_buffer->reader_page);
413
414  fail_free_buffer:
415         kfree(cpu_buffer);
416         return NULL;
417 }
418
419 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
420 {
421         struct list_head *head = &cpu_buffer->pages;
422         struct buffer_page *bpage, *tmp;
423
424         list_del_init(&cpu_buffer->reader_page->list);
425         free_buffer_page(cpu_buffer->reader_page);
426
427         list_for_each_entry_safe(bpage, tmp, head, list) {
428                 list_del_init(&bpage->list);
429                 free_buffer_page(bpage);
430         }
431         kfree(cpu_buffer);
432 }
433
434 /*
435  * Causes compile errors if the struct buffer_page gets bigger
436  * than the struct page.
437  */
438 extern int ring_buffer_page_too_big(void);
439
440 /**
441  * ring_buffer_alloc - allocate a new ring_buffer
442  * @size: the size in bytes per cpu that is needed.
443  * @flags: attributes to set for the ring buffer.
444  *
445  * Currently the only flag that is available is the RB_FL_OVERWRITE
446  * flag. This flag means that the buffer will overwrite old data
447  * when the buffer wraps. If this flag is not set, the buffer will
448  * drop data when the tail hits the head.
449  */
450 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
451 {
452         struct ring_buffer *buffer;
453         int bsize;
454         int cpu;
455
456         /* Paranoid! Optimizes out when all is well */
457         if (sizeof(struct buffer_page) > sizeof(struct page))
458                 ring_buffer_page_too_big();
459
460
461         /* keep it in its own cache line */
462         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
463                          GFP_KERNEL);
464         if (!buffer)
465                 return NULL;
466
467         if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
468                 goto fail_free_buffer;
469
470         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
471         buffer->flags = flags;
472
473         /* need at least two pages */
474         if (buffer->pages == 1)
475                 buffer->pages++;
476
477         cpumask_copy(buffer->cpumask, cpu_possible_mask);
478         buffer->cpus = nr_cpu_ids;
479
480         bsize = sizeof(void *) * nr_cpu_ids;
481         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
482                                   GFP_KERNEL);
483         if (!buffer->buffers)
484                 goto fail_free_cpumask;
485
486         for_each_buffer_cpu(buffer, cpu) {
487                 buffer->buffers[cpu] =
488                         rb_allocate_cpu_buffer(buffer, cpu);
489                 if (!buffer->buffers[cpu])
490                         goto fail_free_buffers;
491         }
492
493         mutex_init(&buffer->mutex);
494
495         return buffer;
496
497  fail_free_buffers:
498         for_each_buffer_cpu(buffer, cpu) {
499                 if (buffer->buffers[cpu])
500                         rb_free_cpu_buffer(buffer->buffers[cpu]);
501         }
502         kfree(buffer->buffers);
503
504  fail_free_cpumask:
505         free_cpumask_var(buffer->cpumask);
506
507  fail_free_buffer:
508         kfree(buffer);
509         return NULL;
510 }
511 EXPORT_SYMBOL_GPL(ring_buffer_alloc);
512
513 /**
514  * ring_buffer_free - free a ring buffer.
515  * @buffer: the buffer to free.
516  */
517 void
518 ring_buffer_free(struct ring_buffer *buffer)
519 {
520         int cpu;
521
522         for_each_buffer_cpu(buffer, cpu)
523                 rb_free_cpu_buffer(buffer->buffers[cpu]);
524
525         free_cpumask_var(buffer->cpumask);
526
527         kfree(buffer);
528 }
529 EXPORT_SYMBOL_GPL(ring_buffer_free);
530
531 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
532
533 static void
534 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
535 {
536         struct buffer_page *bpage;
537         struct list_head *p;
538         unsigned i;
539
540         atomic_inc(&cpu_buffer->record_disabled);
541         synchronize_sched();
542
543         for (i = 0; i < nr_pages; i++) {
544                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
545                         return;
546                 p = cpu_buffer->pages.next;
547                 bpage = list_entry(p, struct buffer_page, list);
548                 list_del_init(&bpage->list);
549                 free_buffer_page(bpage);
550         }
551         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
552                 return;
553
554         rb_reset_cpu(cpu_buffer);
555
556         rb_check_pages(cpu_buffer);
557
558         atomic_dec(&cpu_buffer->record_disabled);
559
560 }
561
562 static void
563 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
564                 struct list_head *pages, unsigned nr_pages)
565 {
566         struct buffer_page *bpage;
567         struct list_head *p;
568         unsigned i;
569
570         atomic_inc(&cpu_buffer->record_disabled);
571         synchronize_sched();
572
573         for (i = 0; i < nr_pages; i++) {
574                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
575                         return;
576                 p = pages->next;
577                 bpage = list_entry(p, struct buffer_page, list);
578                 list_del_init(&bpage->list);
579                 list_add_tail(&bpage->list, &cpu_buffer->pages);
580         }
581         rb_reset_cpu(cpu_buffer);
582
583         rb_check_pages(cpu_buffer);
584
585         atomic_dec(&cpu_buffer->record_disabled);
586 }
587
588 /**
589  * ring_buffer_resize - resize the ring buffer
590  * @buffer: the buffer to resize.
591  * @size: the new size.
592  *
593  * The tracer is responsible for making sure that the buffer is
594  * not being used while changing the size.
595  * Note: We may be able to change the above requirement by using
596  *  RCU synchronizations.
597  *
598  * Minimum size is 2 * BUF_PAGE_SIZE.
599  *
600  * Returns -1 on failure.
601  */
602 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
603 {
604         struct ring_buffer_per_cpu *cpu_buffer;
605         unsigned nr_pages, rm_pages, new_pages;
606         struct buffer_page *bpage, *tmp;
607         unsigned long buffer_size;
608         unsigned long addr;
609         LIST_HEAD(pages);
610         int i, cpu;
611
612         /*
613          * Always succeed at resizing a non-existent buffer:
614          */
615         if (!buffer)
616                 return size;
617
618         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
619         size *= BUF_PAGE_SIZE;
620         buffer_size = buffer->pages * BUF_PAGE_SIZE;
621
622         /* we need a minimum of two pages */
623         if (size < BUF_PAGE_SIZE * 2)
624                 size = BUF_PAGE_SIZE * 2;
625
626         if (size == buffer_size)
627                 return size;
628
629         mutex_lock(&buffer->mutex);
630
631         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
632
633         if (size < buffer_size) {
634
635                 /* easy case, just free pages */
636                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
637                         mutex_unlock(&buffer->mutex);
638                         return -1;
639                 }
640
641                 rm_pages = buffer->pages - nr_pages;
642
643                 for_each_buffer_cpu(buffer, cpu) {
644                         cpu_buffer = buffer->buffers[cpu];
645                         rb_remove_pages(cpu_buffer, rm_pages);
646                 }
647                 goto out;
648         }
649
650         /*
651          * This is a bit more difficult. We only want to add pages
652          * when we can allocate enough for all CPUs. We do this
653          * by allocating all the pages and storing them on a local
654          * link list. If we succeed in our allocation, then we
655          * add these pages to the cpu_buffers. Otherwise we just free
656          * them all and return -ENOMEM;
657          */
658         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
659                 mutex_unlock(&buffer->mutex);
660                 return -1;
661         }
662
663         new_pages = nr_pages - buffer->pages;
664
665         for_each_buffer_cpu(buffer, cpu) {
666                 for (i = 0; i < new_pages; i++) {
667                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
668                                                   cache_line_size()),
669                                             GFP_KERNEL, cpu_to_node(cpu));
670                         if (!bpage)
671                                 goto free_pages;
672                         list_add(&bpage->list, &pages);
673                         addr = __get_free_page(GFP_KERNEL);
674                         if (!addr)
675                                 goto free_pages;
676                         bpage->page = (void *)addr;
677                         rb_init_page(bpage->page);
678                 }
679         }
680
681         for_each_buffer_cpu(buffer, cpu) {
682                 cpu_buffer = buffer->buffers[cpu];
683                 rb_insert_pages(cpu_buffer, &pages, new_pages);
684         }
685
686         if (RB_WARN_ON(buffer, !list_empty(&pages))) {
687                 mutex_unlock(&buffer->mutex);
688                 return -1;
689         }
690
691  out:
692         buffer->pages = nr_pages;
693         mutex_unlock(&buffer->mutex);
694
695         return size;
696
697  free_pages:
698         list_for_each_entry_safe(bpage, tmp, &pages, list) {
699                 list_del_init(&bpage->list);
700                 free_buffer_page(bpage);
701         }
702         mutex_unlock(&buffer->mutex);
703         return -ENOMEM;
704 }
705 EXPORT_SYMBOL_GPL(ring_buffer_resize);
706
707 static inline int rb_null_event(struct ring_buffer_event *event)
708 {
709         return event->type == RINGBUF_TYPE_PADDING;
710 }
711
712 static inline void *
713 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
714 {
715         return bpage->data + index;
716 }
717
718 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
719 {
720         return bpage->page->data + index;
721 }
722
723 static inline struct ring_buffer_event *
724 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
725 {
726         return __rb_page_index(cpu_buffer->reader_page,
727                                cpu_buffer->reader_page->read);
728 }
729
730 static inline struct ring_buffer_event *
731 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
732 {
733         return __rb_page_index(cpu_buffer->head_page,
734                                cpu_buffer->head_page->read);
735 }
736
737 static inline struct ring_buffer_event *
738 rb_iter_head_event(struct ring_buffer_iter *iter)
739 {
740         return __rb_page_index(iter->head_page, iter->head);
741 }
742
743 static inline unsigned rb_page_write(struct buffer_page *bpage)
744 {
745         return local_read(&bpage->write);
746 }
747
748 static inline unsigned rb_page_commit(struct buffer_page *bpage)
749 {
750         return local_read(&bpage->page->commit);
751 }
752
753 /* Size is determined by what has been commited */
754 static inline unsigned rb_page_size(struct buffer_page *bpage)
755 {
756         return rb_page_commit(bpage);
757 }
758
759 static inline unsigned
760 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
761 {
762         return rb_page_commit(cpu_buffer->commit_page);
763 }
764
765 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
766 {
767         return rb_page_commit(cpu_buffer->head_page);
768 }
769
770 /*
771  * When the tail hits the head and the buffer is in overwrite mode,
772  * the head jumps to the next page and all content on the previous
773  * page is discarded. But before doing so, we update the overrun
774  * variable of the buffer.
775  */
776 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
777 {
778         struct ring_buffer_event *event;
779         unsigned long head;
780
781         for (head = 0; head < rb_head_size(cpu_buffer);
782              head += rb_event_length(event)) {
783
784                 event = __rb_page_index(cpu_buffer->head_page, head);
785                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
786                         return;
787                 /* Only count data entries */
788                 if (event->type != RINGBUF_TYPE_DATA)
789                         continue;
790                 cpu_buffer->overrun++;
791                 cpu_buffer->entries--;
792         }
793 }
794
795 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
796                                struct buffer_page **bpage)
797 {
798         struct list_head *p = (*bpage)->list.next;
799
800         if (p == &cpu_buffer->pages)
801                 p = p->next;
802
803         *bpage = list_entry(p, struct buffer_page, list);
804 }
805
806 static inline unsigned
807 rb_event_index(struct ring_buffer_event *event)
808 {
809         unsigned long addr = (unsigned long)event;
810
811         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
812 }
813
814 static int
815 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
816              struct ring_buffer_event *event)
817 {
818         unsigned long addr = (unsigned long)event;
819         unsigned long index;
820
821         index = rb_event_index(event);
822         addr &= PAGE_MASK;
823
824         return cpu_buffer->commit_page->page == (void *)addr &&
825                 rb_commit_index(cpu_buffer) == index;
826 }
827
828 static void
829 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
830                     struct ring_buffer_event *event)
831 {
832         unsigned long addr = (unsigned long)event;
833         unsigned long index;
834
835         index = rb_event_index(event);
836         addr &= PAGE_MASK;
837
838         while (cpu_buffer->commit_page->page != (void *)addr) {
839                 if (RB_WARN_ON(cpu_buffer,
840                           cpu_buffer->commit_page == cpu_buffer->tail_page))
841                         return;
842                 cpu_buffer->commit_page->page->commit =
843                         cpu_buffer->commit_page->write;
844                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
845                 cpu_buffer->write_stamp =
846                         cpu_buffer->commit_page->page->time_stamp;
847         }
848
849         /* Now set the commit to the event's index */
850         local_set(&cpu_buffer->commit_page->page->commit, index);
851 }
852
853 static void
854 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
855 {
856         /*
857          * We only race with interrupts and NMIs on this CPU.
858          * If we own the commit event, then we can commit
859          * all others that interrupted us, since the interruptions
860          * are in stack format (they finish before they come
861          * back to us). This allows us to do a simple loop to
862          * assign the commit to the tail.
863          */
864  again:
865         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
866                 cpu_buffer->commit_page->page->commit =
867                         cpu_buffer->commit_page->write;
868                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
869                 cpu_buffer->write_stamp =
870                         cpu_buffer->commit_page->page->time_stamp;
871                 /* add barrier to keep gcc from optimizing too much */
872                 barrier();
873         }
874         while (rb_commit_index(cpu_buffer) !=
875                rb_page_write(cpu_buffer->commit_page)) {
876                 cpu_buffer->commit_page->page->commit =
877                         cpu_buffer->commit_page->write;
878                 barrier();
879         }
880
881         /* again, keep gcc from optimizing */
882         barrier();
883
884         /*
885          * If an interrupt came in just after the first while loop
886          * and pushed the tail page forward, we will be left with
887          * a dangling commit that will never go forward.
888          */
889         if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
890                 goto again;
891 }
892
893 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
894 {
895         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
896         cpu_buffer->reader_page->read = 0;
897 }
898
899 static void rb_inc_iter(struct ring_buffer_iter *iter)
900 {
901         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
902
903         /*
904          * The iterator could be on the reader page (it starts there).
905          * But the head could have moved, since the reader was
906          * found. Check for this case and assign the iterator
907          * to the head page instead of next.
908          */
909         if (iter->head_page == cpu_buffer->reader_page)
910                 iter->head_page = cpu_buffer->head_page;
911         else
912                 rb_inc_page(cpu_buffer, &iter->head_page);
913
914         iter->read_stamp = iter->head_page->page->time_stamp;
915         iter->head = 0;
916 }
917
918 /**
919  * ring_buffer_update_event - update event type and data
920  * @event: the even to update
921  * @type: the type of event
922  * @length: the size of the event field in the ring buffer
923  *
924  * Update the type and data fields of the event. The length
925  * is the actual size that is written to the ring buffer,
926  * and with this, we can determine what to place into the
927  * data field.
928  */
929 static void
930 rb_update_event(struct ring_buffer_event *event,
931                          unsigned type, unsigned length)
932 {
933         event->type = type;
934
935         switch (type) {
936
937         case RINGBUF_TYPE_PADDING:
938                 break;
939
940         case RINGBUF_TYPE_TIME_EXTEND:
941                 event->len = DIV_ROUND_UP(RB_LEN_TIME_EXTEND, RB_ALIGNMENT);
942                 break;
943
944         case RINGBUF_TYPE_TIME_STAMP:
945                 event->len = DIV_ROUND_UP(RB_LEN_TIME_STAMP, RB_ALIGNMENT);
946                 break;
947
948         case RINGBUF_TYPE_DATA:
949                 length -= RB_EVNT_HDR_SIZE;
950                 if (length > RB_MAX_SMALL_DATA) {
951                         event->len = 0;
952                         event->array[0] = length;
953                 } else
954                         event->len = DIV_ROUND_UP(length, RB_ALIGNMENT);
955                 break;
956         default:
957                 BUG();
958         }
959 }
960
961 static unsigned rb_calculate_event_length(unsigned length)
962 {
963         struct ring_buffer_event event; /* Used only for sizeof array */
964
965         /* zero length can cause confusions */
966         if (!length)
967                 length = 1;
968
969         if (length > RB_MAX_SMALL_DATA)
970                 length += sizeof(event.array[0]);
971
972         length += RB_EVNT_HDR_SIZE;
973         length = ALIGN(length, RB_ALIGNMENT);
974
975         return length;
976 }
977
978 static struct ring_buffer_event *
979 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
980                   unsigned type, unsigned long length, u64 *ts)
981 {
982         struct buffer_page *tail_page, *head_page, *reader_page, *commit_page;
983         unsigned long tail, write;
984         struct ring_buffer *buffer = cpu_buffer->buffer;
985         struct ring_buffer_event *event;
986         unsigned long flags;
987         bool lock_taken = false;
988
989         commit_page = cpu_buffer->commit_page;
990         /* we just need to protect against interrupts */
991         barrier();
992         tail_page = cpu_buffer->tail_page;
993         write = local_add_return(length, &tail_page->write);
994         tail = write - length;
995
996         /* See if we shot pass the end of this buffer page */
997         if (write > BUF_PAGE_SIZE) {
998                 struct buffer_page *next_page = tail_page;
999
1000                 local_irq_save(flags);
1001                 /*
1002                  * Since the write to the buffer is still not
1003                  * fully lockless, we must be careful with NMIs.
1004                  * The locks in the writers are taken when a write
1005                  * crosses to a new page. The locks protect against
1006                  * races with the readers (this will soon be fixed
1007                  * with a lockless solution).
1008                  *
1009                  * Because we can not protect against NMIs, and we
1010                  * want to keep traces reentrant, we need to manage
1011                  * what happens when we are in an NMI.
1012                  *
1013                  * NMIs can happen after we take the lock.
1014                  * If we are in an NMI, only take the lock
1015                  * if it is not already taken. Otherwise
1016                  * simply fail.
1017                  */
1018                 if (unlikely(in_nmi())) {
1019                         if (!__raw_spin_trylock(&cpu_buffer->lock))
1020                                 goto out_unlock;
1021                 } else
1022                         __raw_spin_lock(&cpu_buffer->lock);
1023
1024                 lock_taken = true;
1025
1026                 rb_inc_page(cpu_buffer, &next_page);
1027
1028                 head_page = cpu_buffer->head_page;
1029                 reader_page = cpu_buffer->reader_page;
1030
1031                 /* we grabbed the lock before incrementing */
1032                 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1033                         goto out_unlock;
1034
1035                 /*
1036                  * If for some reason, we had an interrupt storm that made
1037                  * it all the way around the buffer, bail, and warn
1038                  * about it.
1039                  */
1040                 if (unlikely(next_page == commit_page)) {
1041                         WARN_ON_ONCE(1);
1042                         goto out_unlock;
1043                 }
1044
1045                 if (next_page == head_page) {
1046                         if (!(buffer->flags & RB_FL_OVERWRITE))
1047                                 goto out_unlock;
1048
1049                         /* tail_page has not moved yet? */
1050                         if (tail_page == cpu_buffer->tail_page) {
1051                                 /* count overflows */
1052                                 rb_update_overflow(cpu_buffer);
1053
1054                                 rb_inc_page(cpu_buffer, &head_page);
1055                                 cpu_buffer->head_page = head_page;
1056                                 cpu_buffer->head_page->read = 0;
1057                         }
1058                 }
1059
1060                 /*
1061                  * If the tail page is still the same as what we think
1062                  * it is, then it is up to us to update the tail
1063                  * pointer.
1064                  */
1065                 if (tail_page == cpu_buffer->tail_page) {
1066                         local_set(&next_page->write, 0);
1067                         local_set(&next_page->page->commit, 0);
1068                         cpu_buffer->tail_page = next_page;
1069
1070                         /* reread the time stamp */
1071                         *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1072                         cpu_buffer->tail_page->page->time_stamp = *ts;
1073                 }
1074
1075                 /*
1076                  * The actual tail page has moved forward.
1077                  */
1078                 if (tail < BUF_PAGE_SIZE) {
1079                         /* Mark the rest of the page with padding */
1080                         event = __rb_page_index(tail_page, tail);
1081                         event->type = RINGBUF_TYPE_PADDING;
1082                 }
1083
1084                 if (tail <= BUF_PAGE_SIZE)
1085                         /* Set the write back to the previous setting */
1086                         local_set(&tail_page->write, tail);
1087
1088                 /*
1089                  * If this was a commit entry that failed,
1090                  * increment that too
1091                  */
1092                 if (tail_page == cpu_buffer->commit_page &&
1093                     tail == rb_commit_index(cpu_buffer)) {
1094                         rb_set_commit_to_write(cpu_buffer);
1095                 }
1096
1097                 __raw_spin_unlock(&cpu_buffer->lock);
1098                 local_irq_restore(flags);
1099
1100                 /* fail and let the caller try again */
1101                 return ERR_PTR(-EAGAIN);
1102         }
1103
1104         /* We reserved something on the buffer */
1105
1106         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1107                 return NULL;
1108
1109         event = __rb_page_index(tail_page, tail);
1110         rb_update_event(event, type, length);
1111
1112         /*
1113          * If this is a commit and the tail is zero, then update
1114          * this page's time stamp.
1115          */
1116         if (!tail && rb_is_commit(cpu_buffer, event))
1117                 cpu_buffer->commit_page->page->time_stamp = *ts;
1118
1119         return event;
1120
1121  out_unlock:
1122         /* reset write */
1123         if (tail <= BUF_PAGE_SIZE)
1124                 local_set(&tail_page->write, tail);
1125
1126         if (likely(lock_taken))
1127                 __raw_spin_unlock(&cpu_buffer->lock);
1128         local_irq_restore(flags);
1129         return NULL;
1130 }
1131
1132 static int
1133 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1134                   u64 *ts, u64 *delta)
1135 {
1136         struct ring_buffer_event *event;
1137         static int once;
1138         int ret;
1139
1140         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1141                 printk(KERN_WARNING "Delta way too big! %llu"
1142                        " ts=%llu write stamp = %llu\n",
1143                        (unsigned long long)*delta,
1144                        (unsigned long long)*ts,
1145                        (unsigned long long)cpu_buffer->write_stamp);
1146                 WARN_ON(1);
1147         }
1148
1149         /*
1150          * The delta is too big, we to add a
1151          * new timestamp.
1152          */
1153         event = __rb_reserve_next(cpu_buffer,
1154                                   RINGBUF_TYPE_TIME_EXTEND,
1155                                   RB_LEN_TIME_EXTEND,
1156                                   ts);
1157         if (!event)
1158                 return -EBUSY;
1159
1160         if (PTR_ERR(event) == -EAGAIN)
1161                 return -EAGAIN;
1162
1163         /* Only a commited time event can update the write stamp */
1164         if (rb_is_commit(cpu_buffer, event)) {
1165                 /*
1166                  * If this is the first on the page, then we need to
1167                  * update the page itself, and just put in a zero.
1168                  */
1169                 if (rb_event_index(event)) {
1170                         event->time_delta = *delta & TS_MASK;
1171                         event->array[0] = *delta >> TS_SHIFT;
1172                 } else {
1173                         cpu_buffer->commit_page->page->time_stamp = *ts;
1174                         event->time_delta = 0;
1175                         event->array[0] = 0;
1176                 }
1177                 cpu_buffer->write_stamp = *ts;
1178                 /* let the caller know this was the commit */
1179                 ret = 1;
1180         } else {
1181                 /* Darn, this is just wasted space */
1182                 event->time_delta = 0;
1183                 event->array[0] = 0;
1184                 ret = 0;
1185         }
1186
1187         *delta = 0;
1188
1189         return ret;
1190 }
1191
1192 static struct ring_buffer_event *
1193 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1194                       unsigned type, unsigned long length)
1195 {
1196         struct ring_buffer_event *event;
1197         u64 ts, delta;
1198         int commit = 0;
1199         int nr_loops = 0;
1200
1201  again:
1202         /*
1203          * We allow for interrupts to reenter here and do a trace.
1204          * If one does, it will cause this original code to loop
1205          * back here. Even with heavy interrupts happening, this
1206          * should only happen a few times in a row. If this happens
1207          * 1000 times in a row, there must be either an interrupt
1208          * storm or we have something buggy.
1209          * Bail!
1210          */
1211         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1212                 return NULL;
1213
1214         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1215
1216         /*
1217          * Only the first commit can update the timestamp.
1218          * Yes there is a race here. If an interrupt comes in
1219          * just after the conditional and it traces too, then it
1220          * will also check the deltas. More than one timestamp may
1221          * also be made. But only the entry that did the actual
1222          * commit will be something other than zero.
1223          */
1224         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1225             rb_page_write(cpu_buffer->tail_page) ==
1226             rb_commit_index(cpu_buffer)) {
1227
1228                 delta = ts - cpu_buffer->write_stamp;
1229
1230                 /* make sure this delta is calculated here */
1231                 barrier();
1232
1233                 /* Did the write stamp get updated already? */
1234                 if (unlikely(ts < cpu_buffer->write_stamp))
1235                         delta = 0;
1236
1237                 if (test_time_stamp(delta)) {
1238
1239                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1240
1241                         if (commit == -EBUSY)
1242                                 return NULL;
1243
1244                         if (commit == -EAGAIN)
1245                                 goto again;
1246
1247                         RB_WARN_ON(cpu_buffer, commit < 0);
1248                 }
1249         } else
1250                 /* Non commits have zero deltas */
1251                 delta = 0;
1252
1253         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1254         if (PTR_ERR(event) == -EAGAIN)
1255                 goto again;
1256
1257         if (!event) {
1258                 if (unlikely(commit))
1259                         /*
1260                          * Ouch! We needed a timestamp and it was commited. But
1261                          * we didn't get our event reserved.
1262                          */
1263                         rb_set_commit_to_write(cpu_buffer);
1264                 return NULL;
1265         }
1266
1267         /*
1268          * If the timestamp was commited, make the commit our entry
1269          * now so that we will update it when needed.
1270          */
1271         if (commit)
1272                 rb_set_commit_event(cpu_buffer, event);
1273         else if (!rb_is_commit(cpu_buffer, event))
1274                 delta = 0;
1275
1276         event->time_delta = delta;
1277
1278         return event;
1279 }
1280
1281 static DEFINE_PER_CPU(int, rb_need_resched);
1282
1283 /**
1284  * ring_buffer_lock_reserve - reserve a part of the buffer
1285  * @buffer: the ring buffer to reserve from
1286  * @length: the length of the data to reserve (excluding event header)
1287  *
1288  * Returns a reseverd event on the ring buffer to copy directly to.
1289  * The user of this interface will need to get the body to write into
1290  * and can use the ring_buffer_event_data() interface.
1291  *
1292  * The length is the length of the data needed, not the event length
1293  * which also includes the event header.
1294  *
1295  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1296  * If NULL is returned, then nothing has been allocated or locked.
1297  */
1298 struct ring_buffer_event *
1299 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1300 {
1301         struct ring_buffer_per_cpu *cpu_buffer;
1302         struct ring_buffer_event *event;
1303         int cpu, resched;
1304
1305         if (ring_buffer_flags != RB_BUFFERS_ON)
1306                 return NULL;
1307
1308         if (atomic_read(&buffer->record_disabled))
1309                 return NULL;
1310
1311         /* If we are tracing schedule, we don't want to recurse */
1312         resched = ftrace_preempt_disable();
1313
1314         cpu = raw_smp_processor_id();
1315
1316         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1317                 goto out;
1318
1319         cpu_buffer = buffer->buffers[cpu];
1320
1321         if (atomic_read(&cpu_buffer->record_disabled))
1322                 goto out;
1323
1324         length = rb_calculate_event_length(length);
1325         if (length > BUF_PAGE_SIZE)
1326                 goto out;
1327
1328         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1329         if (!event)
1330                 goto out;
1331
1332         /*
1333          * Need to store resched state on this cpu.
1334          * Only the first needs to.
1335          */
1336
1337         if (preempt_count() == 1)
1338                 per_cpu(rb_need_resched, cpu) = resched;
1339
1340         return event;
1341
1342  out:
1343         ftrace_preempt_enable(resched);
1344         return NULL;
1345 }
1346 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1347
1348 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1349                       struct ring_buffer_event *event)
1350 {
1351         cpu_buffer->entries++;
1352
1353         /* Only process further if we own the commit */
1354         if (!rb_is_commit(cpu_buffer, event))
1355                 return;
1356
1357         cpu_buffer->write_stamp += event->time_delta;
1358
1359         rb_set_commit_to_write(cpu_buffer);
1360 }
1361
1362 /**
1363  * ring_buffer_unlock_commit - commit a reserved
1364  * @buffer: The buffer to commit to
1365  * @event: The event pointer to commit.
1366  *
1367  * This commits the data to the ring buffer, and releases any locks held.
1368  *
1369  * Must be paired with ring_buffer_lock_reserve.
1370  */
1371 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1372                               struct ring_buffer_event *event)
1373 {
1374         struct ring_buffer_per_cpu *cpu_buffer;
1375         int cpu = raw_smp_processor_id();
1376
1377         cpu_buffer = buffer->buffers[cpu];
1378
1379         rb_commit(cpu_buffer, event);
1380
1381         /*
1382          * Only the last preempt count needs to restore preemption.
1383          */
1384         if (preempt_count() == 1)
1385                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1386         else
1387                 preempt_enable_no_resched_notrace();
1388
1389         return 0;
1390 }
1391 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1392
1393 /**
1394  * ring_buffer_write - write data to the buffer without reserving
1395  * @buffer: The ring buffer to write to.
1396  * @length: The length of the data being written (excluding the event header)
1397  * @data: The data to write to the buffer.
1398  *
1399  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1400  * one function. If you already have the data to write to the buffer, it
1401  * may be easier to simply call this function.
1402  *
1403  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1404  * and not the length of the event which would hold the header.
1405  */
1406 int ring_buffer_write(struct ring_buffer *buffer,
1407                         unsigned long length,
1408                         void *data)
1409 {
1410         struct ring_buffer_per_cpu *cpu_buffer;
1411         struct ring_buffer_event *event;
1412         unsigned long event_length;
1413         void *body;
1414         int ret = -EBUSY;
1415         int cpu, resched;
1416
1417         if (ring_buffer_flags != RB_BUFFERS_ON)
1418                 return -EBUSY;
1419
1420         if (atomic_read(&buffer->record_disabled))
1421                 return -EBUSY;
1422
1423         resched = ftrace_preempt_disable();
1424
1425         cpu = raw_smp_processor_id();
1426
1427         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1428                 goto out;
1429
1430         cpu_buffer = buffer->buffers[cpu];
1431
1432         if (atomic_read(&cpu_buffer->record_disabled))
1433                 goto out;
1434
1435         event_length = rb_calculate_event_length(length);
1436         event = rb_reserve_next_event(cpu_buffer,
1437                                       RINGBUF_TYPE_DATA, event_length);
1438         if (!event)
1439                 goto out;
1440
1441         body = rb_event_data(event);
1442
1443         memcpy(body, data, length);
1444
1445         rb_commit(cpu_buffer, event);
1446
1447         ret = 0;
1448  out:
1449         ftrace_preempt_enable(resched);
1450
1451         return ret;
1452 }
1453 EXPORT_SYMBOL_GPL(ring_buffer_write);
1454
1455 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1456 {
1457         struct buffer_page *reader = cpu_buffer->reader_page;
1458         struct buffer_page *head = cpu_buffer->head_page;
1459         struct buffer_page *commit = cpu_buffer->commit_page;
1460
1461         return reader->read == rb_page_commit(reader) &&
1462                 (commit == reader ||
1463                  (commit == head &&
1464                   head->read == rb_page_commit(commit)));
1465 }
1466
1467 /**
1468  * ring_buffer_record_disable - stop all writes into the buffer
1469  * @buffer: The ring buffer to stop writes to.
1470  *
1471  * This prevents all writes to the buffer. Any attempt to write
1472  * to the buffer after this will fail and return NULL.
1473  *
1474  * The caller should call synchronize_sched() after this.
1475  */
1476 void ring_buffer_record_disable(struct ring_buffer *buffer)
1477 {
1478         atomic_inc(&buffer->record_disabled);
1479 }
1480 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1481
1482 /**
1483  * ring_buffer_record_enable - enable writes to the buffer
1484  * @buffer: The ring buffer to enable writes
1485  *
1486  * Note, multiple disables will need the same number of enables
1487  * to truely enable the writing (much like preempt_disable).
1488  */
1489 void ring_buffer_record_enable(struct ring_buffer *buffer)
1490 {
1491         atomic_dec(&buffer->record_disabled);
1492 }
1493 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1494
1495 /**
1496  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1497  * @buffer: The ring buffer to stop writes to.
1498  * @cpu: The CPU buffer to stop
1499  *
1500  * This prevents all writes to the buffer. Any attempt to write
1501  * to the buffer after this will fail and return NULL.
1502  *
1503  * The caller should call synchronize_sched() after this.
1504  */
1505 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1506 {
1507         struct ring_buffer_per_cpu *cpu_buffer;
1508
1509         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1510                 return;
1511
1512         cpu_buffer = buffer->buffers[cpu];
1513         atomic_inc(&cpu_buffer->record_disabled);
1514 }
1515 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1516
1517 /**
1518  * ring_buffer_record_enable_cpu - enable writes to the buffer
1519  * @buffer: The ring buffer to enable writes
1520  * @cpu: The CPU to enable.
1521  *
1522  * Note, multiple disables will need the same number of enables
1523  * to truely enable the writing (much like preempt_disable).
1524  */
1525 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1526 {
1527         struct ring_buffer_per_cpu *cpu_buffer;
1528
1529         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1530                 return;
1531
1532         cpu_buffer = buffer->buffers[cpu];
1533         atomic_dec(&cpu_buffer->record_disabled);
1534 }
1535 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1536
1537 /**
1538  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1539  * @buffer: The ring buffer
1540  * @cpu: The per CPU buffer to get the entries from.
1541  */
1542 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1543 {
1544         struct ring_buffer_per_cpu *cpu_buffer;
1545
1546         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1547                 return 0;
1548
1549         cpu_buffer = buffer->buffers[cpu];
1550         return cpu_buffer->entries;
1551 }
1552 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1553
1554 /**
1555  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1556  * @buffer: The ring buffer
1557  * @cpu: The per CPU buffer to get the number of overruns from
1558  */
1559 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1560 {
1561         struct ring_buffer_per_cpu *cpu_buffer;
1562
1563         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1564                 return 0;
1565
1566         cpu_buffer = buffer->buffers[cpu];
1567         return cpu_buffer->overrun;
1568 }
1569 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1570
1571 /**
1572  * ring_buffer_entries - get the number of entries in a buffer
1573  * @buffer: The ring buffer
1574  *
1575  * Returns the total number of entries in the ring buffer
1576  * (all CPU entries)
1577  */
1578 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1579 {
1580         struct ring_buffer_per_cpu *cpu_buffer;
1581         unsigned long entries = 0;
1582         int cpu;
1583
1584         /* if you care about this being correct, lock the buffer */
1585         for_each_buffer_cpu(buffer, cpu) {
1586                 cpu_buffer = buffer->buffers[cpu];
1587                 entries += cpu_buffer->entries;
1588         }
1589
1590         return entries;
1591 }
1592 EXPORT_SYMBOL_GPL(ring_buffer_entries);
1593
1594 /**
1595  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1596  * @buffer: The ring buffer
1597  *
1598  * Returns the total number of overruns in the ring buffer
1599  * (all CPU entries)
1600  */
1601 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1602 {
1603         struct ring_buffer_per_cpu *cpu_buffer;
1604         unsigned long overruns = 0;
1605         int cpu;
1606
1607         /* if you care about this being correct, lock the buffer */
1608         for_each_buffer_cpu(buffer, cpu) {
1609                 cpu_buffer = buffer->buffers[cpu];
1610                 overruns += cpu_buffer->overrun;
1611         }
1612
1613         return overruns;
1614 }
1615 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
1616
1617 static void rb_iter_reset(struct ring_buffer_iter *iter)
1618 {
1619         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1620
1621         /* Iterator usage is expected to have record disabled */
1622         if (list_empty(&cpu_buffer->reader_page->list)) {
1623                 iter->head_page = cpu_buffer->head_page;
1624                 iter->head = cpu_buffer->head_page->read;
1625         } else {
1626                 iter->head_page = cpu_buffer->reader_page;
1627                 iter->head = cpu_buffer->reader_page->read;
1628         }
1629         if (iter->head)
1630                 iter->read_stamp = cpu_buffer->read_stamp;
1631         else
1632                 iter->read_stamp = iter->head_page->page->time_stamp;
1633 }
1634
1635 /**
1636  * ring_buffer_iter_reset - reset an iterator
1637  * @iter: The iterator to reset
1638  *
1639  * Resets the iterator, so that it will start from the beginning
1640  * again.
1641  */
1642 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1643 {
1644         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1645         unsigned long flags;
1646
1647         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1648         rb_iter_reset(iter);
1649         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1650 }
1651 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
1652
1653 /**
1654  * ring_buffer_iter_empty - check if an iterator has no more to read
1655  * @iter: The iterator to check
1656  */
1657 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1658 {
1659         struct ring_buffer_per_cpu *cpu_buffer;
1660
1661         cpu_buffer = iter->cpu_buffer;
1662
1663         return iter->head_page == cpu_buffer->commit_page &&
1664                 iter->head == rb_commit_index(cpu_buffer);
1665 }
1666 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
1667
1668 static void
1669 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1670                      struct ring_buffer_event *event)
1671 {
1672         u64 delta;
1673
1674         switch (event->type) {
1675         case RINGBUF_TYPE_PADDING:
1676                 return;
1677
1678         case RINGBUF_TYPE_TIME_EXTEND:
1679                 delta = event->array[0];
1680                 delta <<= TS_SHIFT;
1681                 delta += event->time_delta;
1682                 cpu_buffer->read_stamp += delta;
1683                 return;
1684
1685         case RINGBUF_TYPE_TIME_STAMP:
1686                 /* FIXME: not implemented */
1687                 return;
1688
1689         case RINGBUF_TYPE_DATA:
1690                 cpu_buffer->read_stamp += event->time_delta;
1691                 return;
1692
1693         default:
1694                 BUG();
1695         }
1696         return;
1697 }
1698
1699 static void
1700 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1701                           struct ring_buffer_event *event)
1702 {
1703         u64 delta;
1704
1705         switch (event->type) {
1706         case RINGBUF_TYPE_PADDING:
1707                 return;
1708
1709         case RINGBUF_TYPE_TIME_EXTEND:
1710                 delta = event->array[0];
1711                 delta <<= TS_SHIFT;
1712                 delta += event->time_delta;
1713                 iter->read_stamp += delta;
1714                 return;
1715
1716         case RINGBUF_TYPE_TIME_STAMP:
1717                 /* FIXME: not implemented */
1718                 return;
1719
1720         case RINGBUF_TYPE_DATA:
1721                 iter->read_stamp += event->time_delta;
1722                 return;
1723
1724         default:
1725                 BUG();
1726         }
1727         return;
1728 }
1729
1730 static struct buffer_page *
1731 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1732 {
1733         struct buffer_page *reader = NULL;
1734         unsigned long flags;
1735         int nr_loops = 0;
1736
1737         local_irq_save(flags);
1738         __raw_spin_lock(&cpu_buffer->lock);
1739
1740  again:
1741         /*
1742          * This should normally only loop twice. But because the
1743          * start of the reader inserts an empty page, it causes
1744          * a case where we will loop three times. There should be no
1745          * reason to loop four times (that I know of).
1746          */
1747         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1748                 reader = NULL;
1749                 goto out;
1750         }
1751
1752         reader = cpu_buffer->reader_page;
1753
1754         /* If there's more to read, return this page */
1755         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1756                 goto out;
1757
1758         /* Never should we have an index greater than the size */
1759         if (RB_WARN_ON(cpu_buffer,
1760                        cpu_buffer->reader_page->read > rb_page_size(reader)))
1761                 goto out;
1762
1763         /* check if we caught up to the tail */
1764         reader = NULL;
1765         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1766                 goto out;
1767
1768         /*
1769          * Splice the empty reader page into the list around the head.
1770          * Reset the reader page to size zero.
1771          */
1772
1773         reader = cpu_buffer->head_page;
1774         cpu_buffer->reader_page->list.next = reader->list.next;
1775         cpu_buffer->reader_page->list.prev = reader->list.prev;
1776
1777         local_set(&cpu_buffer->reader_page->write, 0);
1778         local_set(&cpu_buffer->reader_page->page->commit, 0);
1779
1780         /* Make the reader page now replace the head */
1781         reader->list.prev->next = &cpu_buffer->reader_page->list;
1782         reader->list.next->prev = &cpu_buffer->reader_page->list;
1783
1784         /*
1785          * If the tail is on the reader, then we must set the head
1786          * to the inserted page, otherwise we set it one before.
1787          */
1788         cpu_buffer->head_page = cpu_buffer->reader_page;
1789
1790         if (cpu_buffer->commit_page != reader)
1791                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1792
1793         /* Finally update the reader page to the new head */
1794         cpu_buffer->reader_page = reader;
1795         rb_reset_reader_page(cpu_buffer);
1796
1797         goto again;
1798
1799  out:
1800         __raw_spin_unlock(&cpu_buffer->lock);
1801         local_irq_restore(flags);
1802
1803         return reader;
1804 }
1805
1806 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1807 {
1808         struct ring_buffer_event *event;
1809         struct buffer_page *reader;
1810         unsigned length;
1811
1812         reader = rb_get_reader_page(cpu_buffer);
1813
1814         /* This function should not be called when buffer is empty */
1815         if (RB_WARN_ON(cpu_buffer, !reader))
1816                 return;
1817
1818         event = rb_reader_event(cpu_buffer);
1819
1820         if (event->type == RINGBUF_TYPE_DATA)
1821                 cpu_buffer->entries--;
1822
1823         rb_update_read_stamp(cpu_buffer, event);
1824
1825         length = rb_event_length(event);
1826         cpu_buffer->reader_page->read += length;
1827 }
1828
1829 static void rb_advance_iter(struct ring_buffer_iter *iter)
1830 {
1831         struct ring_buffer *buffer;
1832         struct ring_buffer_per_cpu *cpu_buffer;
1833         struct ring_buffer_event *event;
1834         unsigned length;
1835
1836         cpu_buffer = iter->cpu_buffer;
1837         buffer = cpu_buffer->buffer;
1838
1839         /*
1840          * Check if we are at the end of the buffer.
1841          */
1842         if (iter->head >= rb_page_size(iter->head_page)) {
1843                 if (RB_WARN_ON(buffer,
1844                                iter->head_page == cpu_buffer->commit_page))
1845                         return;
1846                 rb_inc_iter(iter);
1847                 return;
1848         }
1849
1850         event = rb_iter_head_event(iter);
1851
1852         length = rb_event_length(event);
1853
1854         /*
1855          * This should not be called to advance the header if we are
1856          * at the tail of the buffer.
1857          */
1858         if (RB_WARN_ON(cpu_buffer,
1859                        (iter->head_page == cpu_buffer->commit_page) &&
1860                        (iter->head + length > rb_commit_index(cpu_buffer))))
1861                 return;
1862
1863         rb_update_iter_read_stamp(iter, event);
1864
1865         iter->head += length;
1866
1867         /* check for end of page padding */
1868         if ((iter->head >= rb_page_size(iter->head_page)) &&
1869             (iter->head_page != cpu_buffer->commit_page))
1870                 rb_advance_iter(iter);
1871 }
1872
1873 static struct ring_buffer_event *
1874 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1875 {
1876         struct ring_buffer_per_cpu *cpu_buffer;
1877         struct ring_buffer_event *event;
1878         struct buffer_page *reader;
1879         int nr_loops = 0;
1880
1881         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1882                 return NULL;
1883
1884         cpu_buffer = buffer->buffers[cpu];
1885
1886  again:
1887         /*
1888          * We repeat when a timestamp is encountered. It is possible
1889          * to get multiple timestamps from an interrupt entering just
1890          * as one timestamp is about to be written. The max times
1891          * that this can happen is the number of nested interrupts we
1892          * can have.  Nesting 10 deep of interrupts is clearly
1893          * an anomaly.
1894          */
1895         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1896                 return NULL;
1897
1898         reader = rb_get_reader_page(cpu_buffer);
1899         if (!reader)
1900                 return NULL;
1901
1902         event = rb_reader_event(cpu_buffer);
1903
1904         switch (event->type) {
1905         case RINGBUF_TYPE_PADDING:
1906                 RB_WARN_ON(cpu_buffer, 1);
1907                 rb_advance_reader(cpu_buffer);
1908                 return NULL;
1909
1910         case RINGBUF_TYPE_TIME_EXTEND:
1911                 /* Internal data, OK to advance */
1912                 rb_advance_reader(cpu_buffer);
1913                 goto again;
1914
1915         case RINGBUF_TYPE_TIME_STAMP:
1916                 /* FIXME: not implemented */
1917                 rb_advance_reader(cpu_buffer);
1918                 goto again;
1919
1920         case RINGBUF_TYPE_DATA:
1921                 if (ts) {
1922                         *ts = cpu_buffer->read_stamp + event->time_delta;
1923                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1924                 }
1925                 return event;
1926
1927         default:
1928                 BUG();
1929         }
1930
1931         return NULL;
1932 }
1933 EXPORT_SYMBOL_GPL(ring_buffer_peek);
1934
1935 static struct ring_buffer_event *
1936 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1937 {
1938         struct ring_buffer *buffer;
1939         struct ring_buffer_per_cpu *cpu_buffer;
1940         struct ring_buffer_event *event;
1941         int nr_loops = 0;
1942
1943         if (ring_buffer_iter_empty(iter))
1944                 return NULL;
1945
1946         cpu_buffer = iter->cpu_buffer;
1947         buffer = cpu_buffer->buffer;
1948
1949  again:
1950         /*
1951          * We repeat when a timestamp is encountered. It is possible
1952          * to get multiple timestamps from an interrupt entering just
1953          * as one timestamp is about to be written. The max times
1954          * that this can happen is the number of nested interrupts we
1955          * can have. Nesting 10 deep of interrupts is clearly
1956          * an anomaly.
1957          */
1958         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1959                 return NULL;
1960
1961         if (rb_per_cpu_empty(cpu_buffer))
1962                 return NULL;
1963
1964         event = rb_iter_head_event(iter);
1965
1966         switch (event->type) {
1967         case RINGBUF_TYPE_PADDING:
1968                 rb_inc_iter(iter);
1969                 goto again;
1970
1971         case RINGBUF_TYPE_TIME_EXTEND:
1972                 /* Internal data, OK to advance */
1973                 rb_advance_iter(iter);
1974                 goto again;
1975
1976         case RINGBUF_TYPE_TIME_STAMP:
1977                 /* FIXME: not implemented */
1978                 rb_advance_iter(iter);
1979                 goto again;
1980
1981         case RINGBUF_TYPE_DATA:
1982                 if (ts) {
1983                         *ts = iter->read_stamp + event->time_delta;
1984                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1985                 }
1986                 return event;
1987
1988         default:
1989                 BUG();
1990         }
1991
1992         return NULL;
1993 }
1994 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
1995
1996 /**
1997  * ring_buffer_peek - peek at the next event to be read
1998  * @buffer: The ring buffer to read
1999  * @cpu: The cpu to peak at
2000  * @ts: The timestamp counter of this event.
2001  *
2002  * This will return the event that will be read next, but does
2003  * not consume the data.
2004  */
2005 struct ring_buffer_event *
2006 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2007 {
2008         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2009         struct ring_buffer_event *event;
2010         unsigned long flags;
2011
2012         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2013         event = rb_buffer_peek(buffer, cpu, ts);
2014         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2015
2016         return event;
2017 }
2018
2019 /**
2020  * ring_buffer_iter_peek - peek at the next event to be read
2021  * @iter: The ring buffer iterator
2022  * @ts: The timestamp counter of this event.
2023  *
2024  * This will return the event that will be read next, but does
2025  * not increment the iterator.
2026  */
2027 struct ring_buffer_event *
2028 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2029 {
2030         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2031         struct ring_buffer_event *event;
2032         unsigned long flags;
2033
2034         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2035         event = rb_iter_peek(iter, ts);
2036         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2037
2038         return event;
2039 }
2040
2041 /**
2042  * ring_buffer_consume - return an event and consume it
2043  * @buffer: The ring buffer to get the next event from
2044  *
2045  * Returns the next event in the ring buffer, and that event is consumed.
2046  * Meaning, that sequential reads will keep returning a different event,
2047  * and eventually empty the ring buffer if the producer is slower.
2048  */
2049 struct ring_buffer_event *
2050 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2051 {
2052         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2053         struct ring_buffer_event *event;
2054         unsigned long flags;
2055
2056         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2057                 return NULL;
2058
2059         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2060
2061         event = rb_buffer_peek(buffer, cpu, ts);
2062         if (!event)
2063                 goto out;
2064
2065         rb_advance_reader(cpu_buffer);
2066
2067  out:
2068         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2069
2070         return event;
2071 }
2072 EXPORT_SYMBOL_GPL(ring_buffer_consume);
2073
2074 /**
2075  * ring_buffer_read_start - start a non consuming read of the buffer
2076  * @buffer: The ring buffer to read from
2077  * @cpu: The cpu buffer to iterate over
2078  *
2079  * This starts up an iteration through the buffer. It also disables
2080  * the recording to the buffer until the reading is finished.
2081  * This prevents the reading from being corrupted. This is not
2082  * a consuming read, so a producer is not expected.
2083  *
2084  * Must be paired with ring_buffer_finish.
2085  */
2086 struct ring_buffer_iter *
2087 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2088 {
2089         struct ring_buffer_per_cpu *cpu_buffer;
2090         struct ring_buffer_iter *iter;
2091         unsigned long flags;
2092
2093         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2094                 return NULL;
2095
2096         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2097         if (!iter)
2098                 return NULL;
2099
2100         cpu_buffer = buffer->buffers[cpu];
2101
2102         iter->cpu_buffer = cpu_buffer;
2103
2104         atomic_inc(&cpu_buffer->record_disabled);
2105         synchronize_sched();
2106
2107         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2108         __raw_spin_lock(&cpu_buffer->lock);
2109         rb_iter_reset(iter);
2110         __raw_spin_unlock(&cpu_buffer->lock);
2111         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2112
2113         return iter;
2114 }
2115 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2116
2117 /**
2118  * ring_buffer_finish - finish reading the iterator of the buffer
2119  * @iter: The iterator retrieved by ring_buffer_start
2120  *
2121  * This re-enables the recording to the buffer, and frees the
2122  * iterator.
2123  */
2124 void
2125 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2126 {
2127         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2128
2129         atomic_dec(&cpu_buffer->record_disabled);
2130         kfree(iter);
2131 }
2132 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
2133
2134 /**
2135  * ring_buffer_read - read the next item in the ring buffer by the iterator
2136  * @iter: The ring buffer iterator
2137  * @ts: The time stamp of the event read.
2138  *
2139  * This reads the next event in the ring buffer and increments the iterator.
2140  */
2141 struct ring_buffer_event *
2142 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2143 {
2144         struct ring_buffer_event *event;
2145         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2146         unsigned long flags;
2147
2148         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2149         event = rb_iter_peek(iter, ts);
2150         if (!event)
2151                 goto out;
2152
2153         rb_advance_iter(iter);
2154  out:
2155         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2156
2157         return event;
2158 }
2159 EXPORT_SYMBOL_GPL(ring_buffer_read);
2160
2161 /**
2162  * ring_buffer_size - return the size of the ring buffer (in bytes)
2163  * @buffer: The ring buffer.
2164  */
2165 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2166 {
2167         return BUF_PAGE_SIZE * buffer->pages;
2168 }
2169 EXPORT_SYMBOL_GPL(ring_buffer_size);
2170
2171 static void
2172 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2173 {
2174         cpu_buffer->head_page
2175                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2176         local_set(&cpu_buffer->head_page->write, 0);
2177         local_set(&cpu_buffer->head_page->page->commit, 0);
2178
2179         cpu_buffer->head_page->read = 0;
2180
2181         cpu_buffer->tail_page = cpu_buffer->head_page;
2182         cpu_buffer->commit_page = cpu_buffer->head_page;
2183
2184         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2185         local_set(&cpu_buffer->reader_page->write, 0);
2186         local_set(&cpu_buffer->reader_page->page->commit, 0);
2187         cpu_buffer->reader_page->read = 0;
2188
2189         cpu_buffer->overrun = 0;
2190         cpu_buffer->entries = 0;
2191
2192         cpu_buffer->write_stamp = 0;
2193         cpu_buffer->read_stamp = 0;
2194 }
2195
2196 /**
2197  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2198  * @buffer: The ring buffer to reset a per cpu buffer of
2199  * @cpu: The CPU buffer to be reset
2200  */
2201 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2202 {
2203         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2204         unsigned long flags;
2205
2206         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2207                 return;
2208
2209         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2210
2211         __raw_spin_lock(&cpu_buffer->lock);
2212
2213         rb_reset_cpu(cpu_buffer);
2214
2215         __raw_spin_unlock(&cpu_buffer->lock);
2216
2217         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2218 }
2219 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2220
2221 /**
2222  * ring_buffer_reset - reset a ring buffer
2223  * @buffer: The ring buffer to reset all cpu buffers
2224  */
2225 void ring_buffer_reset(struct ring_buffer *buffer)
2226 {
2227         int cpu;
2228
2229         for_each_buffer_cpu(buffer, cpu)
2230                 ring_buffer_reset_cpu(buffer, cpu);
2231 }
2232 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2233
2234 /**
2235  * rind_buffer_empty - is the ring buffer empty?
2236  * @buffer: The ring buffer to test
2237  */
2238 int ring_buffer_empty(struct ring_buffer *buffer)
2239 {
2240         struct ring_buffer_per_cpu *cpu_buffer;
2241         int cpu;
2242
2243         /* yes this is racy, but if you don't like the race, lock the buffer */
2244         for_each_buffer_cpu(buffer, cpu) {
2245                 cpu_buffer = buffer->buffers[cpu];
2246                 if (!rb_per_cpu_empty(cpu_buffer))
2247                         return 0;
2248         }
2249         return 1;
2250 }
2251 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2252
2253 /**
2254  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2255  * @buffer: The ring buffer
2256  * @cpu: The CPU buffer to test
2257  */
2258 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2259 {
2260         struct ring_buffer_per_cpu *cpu_buffer;
2261
2262         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2263                 return 1;
2264
2265         cpu_buffer = buffer->buffers[cpu];
2266         return rb_per_cpu_empty(cpu_buffer);
2267 }
2268 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2269
2270 /**
2271  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2272  * @buffer_a: One buffer to swap with
2273  * @buffer_b: The other buffer to swap with
2274  *
2275  * This function is useful for tracers that want to take a "snapshot"
2276  * of a CPU buffer and has another back up buffer lying around.
2277  * it is expected that the tracer handles the cpu buffer not being
2278  * used at the moment.
2279  */
2280 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2281                          struct ring_buffer *buffer_b, int cpu)
2282 {
2283         struct ring_buffer_per_cpu *cpu_buffer_a;
2284         struct ring_buffer_per_cpu *cpu_buffer_b;
2285
2286         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
2287             !cpumask_test_cpu(cpu, buffer_b->cpumask))
2288                 return -EINVAL;
2289
2290         /* At least make sure the two buffers are somewhat the same */
2291         if (buffer_a->pages != buffer_b->pages)
2292                 return -EINVAL;
2293
2294         if (ring_buffer_flags != RB_BUFFERS_ON)
2295                 return -EAGAIN;
2296
2297         if (atomic_read(&buffer_a->record_disabled))
2298                 return -EAGAIN;
2299
2300         if (atomic_read(&buffer_b->record_disabled))
2301                 return -EAGAIN;
2302
2303         cpu_buffer_a = buffer_a->buffers[cpu];
2304         cpu_buffer_b = buffer_b->buffers[cpu];
2305
2306         if (atomic_read(&cpu_buffer_a->record_disabled))
2307                 return -EAGAIN;
2308
2309         if (atomic_read(&cpu_buffer_b->record_disabled))
2310                 return -EAGAIN;
2311
2312         /*
2313          * We can't do a synchronize_sched here because this
2314          * function can be called in atomic context.
2315          * Normally this will be called from the same CPU as cpu.
2316          * If not it's up to the caller to protect this.
2317          */
2318         atomic_inc(&cpu_buffer_a->record_disabled);
2319         atomic_inc(&cpu_buffer_b->record_disabled);
2320
2321         buffer_a->buffers[cpu] = cpu_buffer_b;
2322         buffer_b->buffers[cpu] = cpu_buffer_a;
2323
2324         cpu_buffer_b->buffer = buffer_a;
2325         cpu_buffer_a->buffer = buffer_b;
2326
2327         atomic_dec(&cpu_buffer_a->record_disabled);
2328         atomic_dec(&cpu_buffer_b->record_disabled);
2329
2330         return 0;
2331 }
2332 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2333
2334 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
2335                               struct buffer_data_page *bpage,
2336                               unsigned int offset)
2337 {
2338         struct ring_buffer_event *event;
2339         unsigned long head;
2340
2341         __raw_spin_lock(&cpu_buffer->lock);
2342         for (head = offset; head < local_read(&bpage->commit);
2343              head += rb_event_length(event)) {
2344
2345                 event = __rb_data_page_index(bpage, head);
2346                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
2347                         return;
2348                 /* Only count data entries */
2349                 if (event->type != RINGBUF_TYPE_DATA)
2350                         continue;
2351                 cpu_buffer->entries--;
2352         }
2353         __raw_spin_unlock(&cpu_buffer->lock);
2354 }
2355
2356 /**
2357  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2358  * @buffer: the buffer to allocate for.
2359  *
2360  * This function is used in conjunction with ring_buffer_read_page.
2361  * When reading a full page from the ring buffer, these functions
2362  * can be used to speed up the process. The calling function should
2363  * allocate a few pages first with this function. Then when it
2364  * needs to get pages from the ring buffer, it passes the result
2365  * of this function into ring_buffer_read_page, which will swap
2366  * the page that was allocated, with the read page of the buffer.
2367  *
2368  * Returns:
2369  *  The page allocated, or NULL on error.
2370  */
2371 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2372 {
2373         unsigned long addr;
2374         struct buffer_data_page *bpage;
2375
2376         addr = __get_free_page(GFP_KERNEL);
2377         if (!addr)
2378                 return NULL;
2379
2380         bpage = (void *)addr;
2381
2382         return bpage;
2383 }
2384
2385 /**
2386  * ring_buffer_free_read_page - free an allocated read page
2387  * @buffer: the buffer the page was allocate for
2388  * @data: the page to free
2389  *
2390  * Free a page allocated from ring_buffer_alloc_read_page.
2391  */
2392 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2393 {
2394         free_page((unsigned long)data);
2395 }
2396
2397 /**
2398  * ring_buffer_read_page - extract a page from the ring buffer
2399  * @buffer: buffer to extract from
2400  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2401  * @cpu: the cpu of the buffer to extract
2402  * @full: should the extraction only happen when the page is full.
2403  *
2404  * This function will pull out a page from the ring buffer and consume it.
2405  * @data_page must be the address of the variable that was returned
2406  * from ring_buffer_alloc_read_page. This is because the page might be used
2407  * to swap with a page in the ring buffer.
2408  *
2409  * for example:
2410  *      rpage = ring_buffer_alloc_read_page(buffer);
2411  *      if (!rpage)
2412  *              return error;
2413  *      ret = ring_buffer_read_page(buffer, &rpage, cpu, 0);
2414  *      if (ret >= 0)
2415  *              process_page(rpage, ret);
2416  *
2417  * When @full is set, the function will not return true unless
2418  * the writer is off the reader page.
2419  *
2420  * Note: it is up to the calling functions to handle sleeps and wakeups.
2421  *  The ring buffer can be used anywhere in the kernel and can not
2422  *  blindly call wake_up. The layer that uses the ring buffer must be
2423  *  responsible for that.
2424  *
2425  * Returns:
2426  *  >=0 if data has been transferred, returns the offset of consumed data.
2427  *  <0 if no data has been transferred.
2428  */
2429 int ring_buffer_read_page(struct ring_buffer *buffer,
2430                             void **data_page, int cpu, int full)
2431 {
2432         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2433         struct ring_buffer_event *event;
2434         struct buffer_data_page *bpage;
2435         unsigned long flags;
2436         unsigned int read;
2437         int ret = -1;
2438
2439         if (!data_page)
2440                 return 0;
2441
2442         bpage = *data_page;
2443         if (!bpage)
2444                 return 0;
2445
2446         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2447
2448         /*
2449          * rb_buffer_peek will get the next ring buffer if
2450          * the current reader page is empty.
2451          */
2452         event = rb_buffer_peek(buffer, cpu, NULL);
2453         if (!event)
2454                 goto out;
2455
2456         /* check for data */
2457         if (!local_read(&cpu_buffer->reader_page->page->commit))
2458                 goto out;
2459
2460         read = cpu_buffer->reader_page->read;
2461         /*
2462          * If the writer is already off of the read page, then simply
2463          * switch the read page with the given page. Otherwise
2464          * we need to copy the data from the reader to the writer.
2465          */
2466         if (cpu_buffer->reader_page == cpu_buffer->commit_page) {
2467                 unsigned int commit = rb_page_commit(cpu_buffer->reader_page);
2468                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
2469
2470                 if (full)
2471                         goto out;
2472                 /* The writer is still on the reader page, we must copy */
2473                 memcpy(bpage->data + read, rpage->data + read, commit - read);
2474
2475                 /* consume what was read */
2476                 cpu_buffer->reader_page->read = commit;
2477
2478                 /* update bpage */
2479                 local_set(&bpage->commit, commit);
2480                 if (!read)
2481                         bpage->time_stamp = rpage->time_stamp;
2482         } else {
2483                 /* swap the pages */
2484                 rb_init_page(bpage);
2485                 bpage = cpu_buffer->reader_page->page;
2486                 cpu_buffer->reader_page->page = *data_page;
2487                 cpu_buffer->reader_page->read = 0;
2488                 *data_page = bpage;
2489         }
2490         ret = read;
2491
2492         /* update the entry counter */
2493         rb_remove_entries(cpu_buffer, bpage, read);
2494  out:
2495         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2496
2497         return ret;
2498 }
2499
2500 static ssize_t
2501 rb_simple_read(struct file *filp, char __user *ubuf,
2502                size_t cnt, loff_t *ppos)
2503 {
2504         long *p = filp->private_data;
2505         char buf[64];
2506         int r;
2507
2508         if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
2509                 r = sprintf(buf, "permanently disabled\n");
2510         else
2511                 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
2512
2513         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2514 }
2515
2516 static ssize_t
2517 rb_simple_write(struct file *filp, const char __user *ubuf,
2518                 size_t cnt, loff_t *ppos)
2519 {
2520         long *p = filp->private_data;
2521         char buf[64];
2522         long val;
2523         int ret;
2524
2525         if (cnt >= sizeof(buf))
2526                 return -EINVAL;
2527
2528         if (copy_from_user(&buf, ubuf, cnt))
2529                 return -EFAULT;
2530
2531         buf[cnt] = 0;
2532
2533         ret = strict_strtoul(buf, 10, &val);
2534         if (ret < 0)
2535                 return ret;
2536
2537         if (val)
2538                 set_bit(RB_BUFFERS_ON_BIT, p);
2539         else
2540                 clear_bit(RB_BUFFERS_ON_BIT, p);
2541
2542         (*ppos)++;
2543
2544         return cnt;
2545 }
2546
2547 static struct file_operations rb_simple_fops = {
2548         .open           = tracing_open_generic,
2549         .read           = rb_simple_read,
2550         .write          = rb_simple_write,
2551 };
2552
2553
2554 static __init int rb_init_debugfs(void)
2555 {
2556         struct dentry *d_tracer;
2557         struct dentry *entry;
2558
2559         d_tracer = tracing_init_dentry();
2560
2561         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2562                                     &ring_buffer_flags, &rb_simple_fops);
2563         if (!entry)
2564                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2565
2566         return 0;
2567 }
2568
2569 fs_initcall(rb_init_debugfs);