ring-buffer: fix ring_buffer_read_page
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/ftrace_irq.h>
9 #include <linux/spinlock.h>
10 #include <linux/debugfs.h>
11 #include <linux/uaccess.h>
12 #include <linux/hardirq.h>
13 #include <linux/module.h>
14 #include <linux/percpu.h>
15 #include <linux/mutex.h>
16 #include <linux/init.h>
17 #include <linux/hash.h>
18 #include <linux/list.h>
19 #include <linux/fs.h>
20
21 #include "trace.h"
22
23 /*
24  * A fast way to enable or disable all ring buffers is to
25  * call tracing_on or tracing_off. Turning off the ring buffers
26  * prevents all ring buffers from being recorded to.
27  * Turning this switch on, makes it OK to write to the
28  * ring buffer, if the ring buffer is enabled itself.
29  *
30  * There's three layers that must be on in order to write
31  * to the ring buffer.
32  *
33  * 1) This global flag must be set.
34  * 2) The ring buffer must be enabled for recording.
35  * 3) The per cpu buffer must be enabled for recording.
36  *
37  * In case of an anomaly, this global flag has a bit set that
38  * will permantly disable all ring buffers.
39  */
40
41 /*
42  * Global flag to disable all recording to ring buffers
43  *  This has two bits: ON, DISABLED
44  *
45  *  ON   DISABLED
46  * ---- ----------
47  *   0      0        : ring buffers are off
48  *   1      0        : ring buffers are on
49  *   X      1        : ring buffers are permanently disabled
50  */
51
52 enum {
53         RB_BUFFERS_ON_BIT       = 0,
54         RB_BUFFERS_DISABLED_BIT = 1,
55 };
56
57 enum {
58         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
59         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
60 };
61
62 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
63
64 /**
65  * tracing_on - enable all tracing buffers
66  *
67  * This function enables all tracing buffers that may have been
68  * disabled with tracing_off.
69  */
70 void tracing_on(void)
71 {
72         set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
73 }
74 EXPORT_SYMBOL_GPL(tracing_on);
75
76 /**
77  * tracing_off - turn off all tracing buffers
78  *
79  * This function stops all tracing buffers from recording data.
80  * It does not disable any overhead the tracers themselves may
81  * be causing. This function simply causes all recording to
82  * the ring buffers to fail.
83  */
84 void tracing_off(void)
85 {
86         clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
87 }
88 EXPORT_SYMBOL_GPL(tracing_off);
89
90 /**
91  * tracing_off_permanent - permanently disable ring buffers
92  *
93  * This function, once called, will disable all ring buffers
94  * permanently.
95  */
96 void tracing_off_permanent(void)
97 {
98         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
99 }
100
101 /**
102  * tracing_is_on - show state of ring buffers enabled
103  */
104 int tracing_is_on(void)
105 {
106         return ring_buffer_flags == RB_BUFFERS_ON;
107 }
108 EXPORT_SYMBOL_GPL(tracing_is_on);
109
110 #include "trace.h"
111
112 /* Up this if you want to test the TIME_EXTENTS and normalization */
113 #define DEBUG_SHIFT 0
114
115 u64 ring_buffer_time_stamp(int cpu)
116 {
117         u64 time;
118
119         preempt_disable_notrace();
120         /* shift to debug/test normalization and TIME_EXTENTS */
121         time = trace_clock_local() << DEBUG_SHIFT;
122         preempt_enable_no_resched_notrace();
123
124         return time;
125 }
126 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
127
128 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
129 {
130         /* Just stupid testing the normalize function and deltas */
131         *ts >>= DEBUG_SHIFT;
132 }
133 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
134
135 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
136 #define RB_ALIGNMENT            4U
137 #define RB_MAX_SMALL_DATA       28
138
139 enum {
140         RB_LEN_TIME_EXTEND = 8,
141         RB_LEN_TIME_STAMP = 16,
142 };
143
144 /* inline for ring buffer fast paths */
145 static unsigned
146 rb_event_length(struct ring_buffer_event *event)
147 {
148         unsigned length;
149
150         switch (event->type) {
151         case RINGBUF_TYPE_PADDING:
152                 /* undefined */
153                 return -1;
154
155         case RINGBUF_TYPE_TIME_EXTEND:
156                 return RB_LEN_TIME_EXTEND;
157
158         case RINGBUF_TYPE_TIME_STAMP:
159                 return RB_LEN_TIME_STAMP;
160
161         case RINGBUF_TYPE_DATA:
162                 if (event->len)
163                         length = event->len * RB_ALIGNMENT;
164                 else
165                         length = event->array[0];
166                 return length + RB_EVNT_HDR_SIZE;
167         default:
168                 BUG();
169         }
170         /* not hit */
171         return 0;
172 }
173
174 /**
175  * ring_buffer_event_length - return the length of the event
176  * @event: the event to get the length of
177  */
178 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
179 {
180         unsigned length = rb_event_length(event);
181         if (event->type != RINGBUF_TYPE_DATA)
182                 return length;
183         length -= RB_EVNT_HDR_SIZE;
184         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
185                 length -= sizeof(event->array[0]);
186         return length;
187 }
188 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
189
190 /* inline for ring buffer fast paths */
191 static void *
192 rb_event_data(struct ring_buffer_event *event)
193 {
194         BUG_ON(event->type != RINGBUF_TYPE_DATA);
195         /* If length is in len field, then array[0] has the data */
196         if (event->len)
197                 return (void *)&event->array[0];
198         /* Otherwise length is in array[0] and array[1] has the data */
199         return (void *)&event->array[1];
200 }
201
202 /**
203  * ring_buffer_event_data - return the data of the event
204  * @event: the event to get the data from
205  */
206 void *ring_buffer_event_data(struct ring_buffer_event *event)
207 {
208         return rb_event_data(event);
209 }
210 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
211
212 #define for_each_buffer_cpu(buffer, cpu)                \
213         for_each_cpu(cpu, buffer->cpumask)
214
215 #define TS_SHIFT        27
216 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
217 #define TS_DELTA_TEST   (~TS_MASK)
218
219 struct buffer_data_page {
220         u64              time_stamp;    /* page time stamp */
221         local_t          commit;        /* write committed index */
222         unsigned char    data[];        /* data of buffer page */
223 };
224
225 struct buffer_page {
226         local_t          write;         /* index for next write */
227         unsigned         read;          /* index for next read */
228         struct list_head list;          /* list of free pages */
229         struct buffer_data_page *page;  /* Actual data page */
230 };
231
232 static void rb_init_page(struct buffer_data_page *bpage)
233 {
234         local_set(&bpage->commit, 0);
235 }
236
237 size_t ring_buffer_page_len(void *page)
238 {
239         return local_read(&((struct buffer_data_page *)page)->commit);
240 }
241
242 /*
243  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
244  * this issue out.
245  */
246 static void free_buffer_page(struct buffer_page *bpage)
247 {
248         free_page((unsigned long)bpage->page);
249         kfree(bpage);
250 }
251
252 /*
253  * We need to fit the time_stamp delta into 27 bits.
254  */
255 static inline int test_time_stamp(u64 delta)
256 {
257         if (delta & TS_DELTA_TEST)
258                 return 1;
259         return 0;
260 }
261
262 #define BUF_PAGE_SIZE (PAGE_SIZE - offsetof(struct buffer_data_page, data))
263
264 /*
265  * head_page == tail_page && head == tail then buffer is empty.
266  */
267 struct ring_buffer_per_cpu {
268         int                             cpu;
269         struct ring_buffer              *buffer;
270         spinlock_t                      reader_lock; /* serialize readers */
271         raw_spinlock_t                  lock;
272         struct lock_class_key           lock_key;
273         struct list_head                pages;
274         struct buffer_page              *head_page;     /* read from head */
275         struct buffer_page              *tail_page;     /* write to tail */
276         struct buffer_page              *commit_page;   /* committed pages */
277         struct buffer_page              *reader_page;
278         unsigned long                   overrun;
279         unsigned long                   entries;
280         u64                             write_stamp;
281         u64                             read_stamp;
282         atomic_t                        record_disabled;
283 };
284
285 struct ring_buffer {
286         unsigned                        pages;
287         unsigned                        flags;
288         int                             cpus;
289         atomic_t                        record_disabled;
290         cpumask_var_t                   cpumask;
291
292         struct mutex                    mutex;
293
294         struct ring_buffer_per_cpu      **buffers;
295 };
296
297 struct ring_buffer_iter {
298         struct ring_buffer_per_cpu      *cpu_buffer;
299         unsigned long                   head;
300         struct buffer_page              *head_page;
301         u64                             read_stamp;
302 };
303
304 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
305 #define RB_WARN_ON(buffer, cond)                                \
306         ({                                                      \
307                 int _____ret = unlikely(cond);                  \
308                 if (_____ret) {                                 \
309                         atomic_inc(&buffer->record_disabled);   \
310                         WARN_ON(1);                             \
311                 }                                               \
312                 _____ret;                                       \
313         })
314
315 /**
316  * check_pages - integrity check of buffer pages
317  * @cpu_buffer: CPU buffer with pages to test
318  *
319  * As a safety measure we check to make sure the data pages have not
320  * been corrupted.
321  */
322 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
323 {
324         struct list_head *head = &cpu_buffer->pages;
325         struct buffer_page *bpage, *tmp;
326
327         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
328                 return -1;
329         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
330                 return -1;
331
332         list_for_each_entry_safe(bpage, tmp, head, list) {
333                 if (RB_WARN_ON(cpu_buffer,
334                                bpage->list.next->prev != &bpage->list))
335                         return -1;
336                 if (RB_WARN_ON(cpu_buffer,
337                                bpage->list.prev->next != &bpage->list))
338                         return -1;
339         }
340
341         return 0;
342 }
343
344 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
345                              unsigned nr_pages)
346 {
347         struct list_head *head = &cpu_buffer->pages;
348         struct buffer_page *bpage, *tmp;
349         unsigned long addr;
350         LIST_HEAD(pages);
351         unsigned i;
352
353         for (i = 0; i < nr_pages; i++) {
354                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
355                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
356                 if (!bpage)
357                         goto free_pages;
358                 list_add(&bpage->list, &pages);
359
360                 addr = __get_free_page(GFP_KERNEL);
361                 if (!addr)
362                         goto free_pages;
363                 bpage->page = (void *)addr;
364                 rb_init_page(bpage->page);
365         }
366
367         list_splice(&pages, head);
368
369         rb_check_pages(cpu_buffer);
370
371         return 0;
372
373  free_pages:
374         list_for_each_entry_safe(bpage, tmp, &pages, list) {
375                 list_del_init(&bpage->list);
376                 free_buffer_page(bpage);
377         }
378         return -ENOMEM;
379 }
380
381 static struct ring_buffer_per_cpu *
382 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
383 {
384         struct ring_buffer_per_cpu *cpu_buffer;
385         struct buffer_page *bpage;
386         unsigned long addr;
387         int ret;
388
389         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
390                                   GFP_KERNEL, cpu_to_node(cpu));
391         if (!cpu_buffer)
392                 return NULL;
393
394         cpu_buffer->cpu = cpu;
395         cpu_buffer->buffer = buffer;
396         spin_lock_init(&cpu_buffer->reader_lock);
397         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
398         INIT_LIST_HEAD(&cpu_buffer->pages);
399
400         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
401                             GFP_KERNEL, cpu_to_node(cpu));
402         if (!bpage)
403                 goto fail_free_buffer;
404
405         cpu_buffer->reader_page = bpage;
406         addr = __get_free_page(GFP_KERNEL);
407         if (!addr)
408                 goto fail_free_reader;
409         bpage->page = (void *)addr;
410         rb_init_page(bpage->page);
411
412         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
413
414         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
415         if (ret < 0)
416                 goto fail_free_reader;
417
418         cpu_buffer->head_page
419                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
420         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
421
422         return cpu_buffer;
423
424  fail_free_reader:
425         free_buffer_page(cpu_buffer->reader_page);
426
427  fail_free_buffer:
428         kfree(cpu_buffer);
429         return NULL;
430 }
431
432 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
433 {
434         struct list_head *head = &cpu_buffer->pages;
435         struct buffer_page *bpage, *tmp;
436
437         list_del_init(&cpu_buffer->reader_page->list);
438         free_buffer_page(cpu_buffer->reader_page);
439
440         list_for_each_entry_safe(bpage, tmp, head, list) {
441                 list_del_init(&bpage->list);
442                 free_buffer_page(bpage);
443         }
444         kfree(cpu_buffer);
445 }
446
447 /*
448  * Causes compile errors if the struct buffer_page gets bigger
449  * than the struct page.
450  */
451 extern int ring_buffer_page_too_big(void);
452
453 /**
454  * ring_buffer_alloc - allocate a new ring_buffer
455  * @size: the size in bytes per cpu that is needed.
456  * @flags: attributes to set for the ring buffer.
457  *
458  * Currently the only flag that is available is the RB_FL_OVERWRITE
459  * flag. This flag means that the buffer will overwrite old data
460  * when the buffer wraps. If this flag is not set, the buffer will
461  * drop data when the tail hits the head.
462  */
463 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
464 {
465         struct ring_buffer *buffer;
466         int bsize;
467         int cpu;
468
469         /* Paranoid! Optimizes out when all is well */
470         if (sizeof(struct buffer_page) > sizeof(struct page))
471                 ring_buffer_page_too_big();
472
473
474         /* keep it in its own cache line */
475         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
476                          GFP_KERNEL);
477         if (!buffer)
478                 return NULL;
479
480         if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
481                 goto fail_free_buffer;
482
483         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
484         buffer->flags = flags;
485
486         /* need at least two pages */
487         if (buffer->pages == 1)
488                 buffer->pages++;
489
490         cpumask_copy(buffer->cpumask, cpu_possible_mask);
491         buffer->cpus = nr_cpu_ids;
492
493         bsize = sizeof(void *) * nr_cpu_ids;
494         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
495                                   GFP_KERNEL);
496         if (!buffer->buffers)
497                 goto fail_free_cpumask;
498
499         for_each_buffer_cpu(buffer, cpu) {
500                 buffer->buffers[cpu] =
501                         rb_allocate_cpu_buffer(buffer, cpu);
502                 if (!buffer->buffers[cpu])
503                         goto fail_free_buffers;
504         }
505
506         mutex_init(&buffer->mutex);
507
508         return buffer;
509
510  fail_free_buffers:
511         for_each_buffer_cpu(buffer, cpu) {
512                 if (buffer->buffers[cpu])
513                         rb_free_cpu_buffer(buffer->buffers[cpu]);
514         }
515         kfree(buffer->buffers);
516
517  fail_free_cpumask:
518         free_cpumask_var(buffer->cpumask);
519
520  fail_free_buffer:
521         kfree(buffer);
522         return NULL;
523 }
524 EXPORT_SYMBOL_GPL(ring_buffer_alloc);
525
526 /**
527  * ring_buffer_free - free a ring buffer.
528  * @buffer: the buffer to free.
529  */
530 void
531 ring_buffer_free(struct ring_buffer *buffer)
532 {
533         int cpu;
534
535         for_each_buffer_cpu(buffer, cpu)
536                 rb_free_cpu_buffer(buffer->buffers[cpu]);
537
538         free_cpumask_var(buffer->cpumask);
539
540         kfree(buffer);
541 }
542 EXPORT_SYMBOL_GPL(ring_buffer_free);
543
544 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
545
546 static void
547 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
548 {
549         struct buffer_page *bpage;
550         struct list_head *p;
551         unsigned i;
552
553         atomic_inc(&cpu_buffer->record_disabled);
554         synchronize_sched();
555
556         for (i = 0; i < nr_pages; i++) {
557                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
558                         return;
559                 p = cpu_buffer->pages.next;
560                 bpage = list_entry(p, struct buffer_page, list);
561                 list_del_init(&bpage->list);
562                 free_buffer_page(bpage);
563         }
564         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
565                 return;
566
567         rb_reset_cpu(cpu_buffer);
568
569         rb_check_pages(cpu_buffer);
570
571         atomic_dec(&cpu_buffer->record_disabled);
572
573 }
574
575 static void
576 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
577                 struct list_head *pages, unsigned nr_pages)
578 {
579         struct buffer_page *bpage;
580         struct list_head *p;
581         unsigned i;
582
583         atomic_inc(&cpu_buffer->record_disabled);
584         synchronize_sched();
585
586         for (i = 0; i < nr_pages; i++) {
587                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
588                         return;
589                 p = pages->next;
590                 bpage = list_entry(p, struct buffer_page, list);
591                 list_del_init(&bpage->list);
592                 list_add_tail(&bpage->list, &cpu_buffer->pages);
593         }
594         rb_reset_cpu(cpu_buffer);
595
596         rb_check_pages(cpu_buffer);
597
598         atomic_dec(&cpu_buffer->record_disabled);
599 }
600
601 /**
602  * ring_buffer_resize - resize the ring buffer
603  * @buffer: the buffer to resize.
604  * @size: the new size.
605  *
606  * The tracer is responsible for making sure that the buffer is
607  * not being used while changing the size.
608  * Note: We may be able to change the above requirement by using
609  *  RCU synchronizations.
610  *
611  * Minimum size is 2 * BUF_PAGE_SIZE.
612  *
613  * Returns -1 on failure.
614  */
615 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
616 {
617         struct ring_buffer_per_cpu *cpu_buffer;
618         unsigned nr_pages, rm_pages, new_pages;
619         struct buffer_page *bpage, *tmp;
620         unsigned long buffer_size;
621         unsigned long addr;
622         LIST_HEAD(pages);
623         int i, cpu;
624
625         /*
626          * Always succeed at resizing a non-existent buffer:
627          */
628         if (!buffer)
629                 return size;
630
631         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
632         size *= BUF_PAGE_SIZE;
633         buffer_size = buffer->pages * BUF_PAGE_SIZE;
634
635         /* we need a minimum of two pages */
636         if (size < BUF_PAGE_SIZE * 2)
637                 size = BUF_PAGE_SIZE * 2;
638
639         if (size == buffer_size)
640                 return size;
641
642         mutex_lock(&buffer->mutex);
643
644         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
645
646         if (size < buffer_size) {
647
648                 /* easy case, just free pages */
649                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
650                         mutex_unlock(&buffer->mutex);
651                         return -1;
652                 }
653
654                 rm_pages = buffer->pages - nr_pages;
655
656                 for_each_buffer_cpu(buffer, cpu) {
657                         cpu_buffer = buffer->buffers[cpu];
658                         rb_remove_pages(cpu_buffer, rm_pages);
659                 }
660                 goto out;
661         }
662
663         /*
664          * This is a bit more difficult. We only want to add pages
665          * when we can allocate enough for all CPUs. We do this
666          * by allocating all the pages and storing them on a local
667          * link list. If we succeed in our allocation, then we
668          * add these pages to the cpu_buffers. Otherwise we just free
669          * them all and return -ENOMEM;
670          */
671         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
672                 mutex_unlock(&buffer->mutex);
673                 return -1;
674         }
675
676         new_pages = nr_pages - buffer->pages;
677
678         for_each_buffer_cpu(buffer, cpu) {
679                 for (i = 0; i < new_pages; i++) {
680                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
681                                                   cache_line_size()),
682                                             GFP_KERNEL, cpu_to_node(cpu));
683                         if (!bpage)
684                                 goto free_pages;
685                         list_add(&bpage->list, &pages);
686                         addr = __get_free_page(GFP_KERNEL);
687                         if (!addr)
688                                 goto free_pages;
689                         bpage->page = (void *)addr;
690                         rb_init_page(bpage->page);
691                 }
692         }
693
694         for_each_buffer_cpu(buffer, cpu) {
695                 cpu_buffer = buffer->buffers[cpu];
696                 rb_insert_pages(cpu_buffer, &pages, new_pages);
697         }
698
699         if (RB_WARN_ON(buffer, !list_empty(&pages))) {
700                 mutex_unlock(&buffer->mutex);
701                 return -1;
702         }
703
704  out:
705         buffer->pages = nr_pages;
706         mutex_unlock(&buffer->mutex);
707
708         return size;
709
710  free_pages:
711         list_for_each_entry_safe(bpage, tmp, &pages, list) {
712                 list_del_init(&bpage->list);
713                 free_buffer_page(bpage);
714         }
715         mutex_unlock(&buffer->mutex);
716         return -ENOMEM;
717 }
718 EXPORT_SYMBOL_GPL(ring_buffer_resize);
719
720 static inline int rb_null_event(struct ring_buffer_event *event)
721 {
722         return event->type == RINGBUF_TYPE_PADDING;
723 }
724
725 static inline void *
726 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
727 {
728         return bpage->data + index;
729 }
730
731 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
732 {
733         return bpage->page->data + index;
734 }
735
736 static inline struct ring_buffer_event *
737 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
738 {
739         return __rb_page_index(cpu_buffer->reader_page,
740                                cpu_buffer->reader_page->read);
741 }
742
743 static inline struct ring_buffer_event *
744 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
745 {
746         return __rb_page_index(cpu_buffer->head_page,
747                                cpu_buffer->head_page->read);
748 }
749
750 static inline struct ring_buffer_event *
751 rb_iter_head_event(struct ring_buffer_iter *iter)
752 {
753         return __rb_page_index(iter->head_page, iter->head);
754 }
755
756 static inline unsigned rb_page_write(struct buffer_page *bpage)
757 {
758         return local_read(&bpage->write);
759 }
760
761 static inline unsigned rb_page_commit(struct buffer_page *bpage)
762 {
763         return local_read(&bpage->page->commit);
764 }
765
766 /* Size is determined by what has been commited */
767 static inline unsigned rb_page_size(struct buffer_page *bpage)
768 {
769         return rb_page_commit(bpage);
770 }
771
772 static inline unsigned
773 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
774 {
775         return rb_page_commit(cpu_buffer->commit_page);
776 }
777
778 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
779 {
780         return rb_page_commit(cpu_buffer->head_page);
781 }
782
783 /*
784  * When the tail hits the head and the buffer is in overwrite mode,
785  * the head jumps to the next page and all content on the previous
786  * page is discarded. But before doing so, we update the overrun
787  * variable of the buffer.
788  */
789 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
790 {
791         struct ring_buffer_event *event;
792         unsigned long head;
793
794         for (head = 0; head < rb_head_size(cpu_buffer);
795              head += rb_event_length(event)) {
796
797                 event = __rb_page_index(cpu_buffer->head_page, head);
798                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
799                         return;
800                 /* Only count data entries */
801                 if (event->type != RINGBUF_TYPE_DATA)
802                         continue;
803                 cpu_buffer->overrun++;
804                 cpu_buffer->entries--;
805         }
806 }
807
808 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
809                                struct buffer_page **bpage)
810 {
811         struct list_head *p = (*bpage)->list.next;
812
813         if (p == &cpu_buffer->pages)
814                 p = p->next;
815
816         *bpage = list_entry(p, struct buffer_page, list);
817 }
818
819 static inline unsigned
820 rb_event_index(struct ring_buffer_event *event)
821 {
822         unsigned long addr = (unsigned long)event;
823
824         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
825 }
826
827 static int
828 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
829              struct ring_buffer_event *event)
830 {
831         unsigned long addr = (unsigned long)event;
832         unsigned long index;
833
834         index = rb_event_index(event);
835         addr &= PAGE_MASK;
836
837         return cpu_buffer->commit_page->page == (void *)addr &&
838                 rb_commit_index(cpu_buffer) == index;
839 }
840
841 static void
842 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
843                     struct ring_buffer_event *event)
844 {
845         unsigned long addr = (unsigned long)event;
846         unsigned long index;
847
848         index = rb_event_index(event);
849         addr &= PAGE_MASK;
850
851         while (cpu_buffer->commit_page->page != (void *)addr) {
852                 if (RB_WARN_ON(cpu_buffer,
853                           cpu_buffer->commit_page == cpu_buffer->tail_page))
854                         return;
855                 cpu_buffer->commit_page->page->commit =
856                         cpu_buffer->commit_page->write;
857                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
858                 cpu_buffer->write_stamp =
859                         cpu_buffer->commit_page->page->time_stamp;
860         }
861
862         /* Now set the commit to the event's index */
863         local_set(&cpu_buffer->commit_page->page->commit, index);
864 }
865
866 static void
867 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
868 {
869         /*
870          * We only race with interrupts and NMIs on this CPU.
871          * If we own the commit event, then we can commit
872          * all others that interrupted us, since the interruptions
873          * are in stack format (they finish before they come
874          * back to us). This allows us to do a simple loop to
875          * assign the commit to the tail.
876          */
877  again:
878         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
879                 cpu_buffer->commit_page->page->commit =
880                         cpu_buffer->commit_page->write;
881                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
882                 cpu_buffer->write_stamp =
883                         cpu_buffer->commit_page->page->time_stamp;
884                 /* add barrier to keep gcc from optimizing too much */
885                 barrier();
886         }
887         while (rb_commit_index(cpu_buffer) !=
888                rb_page_write(cpu_buffer->commit_page)) {
889                 cpu_buffer->commit_page->page->commit =
890                         cpu_buffer->commit_page->write;
891                 barrier();
892         }
893
894         /* again, keep gcc from optimizing */
895         barrier();
896
897         /*
898          * If an interrupt came in just after the first while loop
899          * and pushed the tail page forward, we will be left with
900          * a dangling commit that will never go forward.
901          */
902         if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
903                 goto again;
904 }
905
906 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
907 {
908         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
909         cpu_buffer->reader_page->read = 0;
910 }
911
912 static void rb_inc_iter(struct ring_buffer_iter *iter)
913 {
914         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
915
916         /*
917          * The iterator could be on the reader page (it starts there).
918          * But the head could have moved, since the reader was
919          * found. Check for this case and assign the iterator
920          * to the head page instead of next.
921          */
922         if (iter->head_page == cpu_buffer->reader_page)
923                 iter->head_page = cpu_buffer->head_page;
924         else
925                 rb_inc_page(cpu_buffer, &iter->head_page);
926
927         iter->read_stamp = iter->head_page->page->time_stamp;
928         iter->head = 0;
929 }
930
931 /**
932  * ring_buffer_update_event - update event type and data
933  * @event: the even to update
934  * @type: the type of event
935  * @length: the size of the event field in the ring buffer
936  *
937  * Update the type and data fields of the event. The length
938  * is the actual size that is written to the ring buffer,
939  * and with this, we can determine what to place into the
940  * data field.
941  */
942 static void
943 rb_update_event(struct ring_buffer_event *event,
944                          unsigned type, unsigned length)
945 {
946         event->type = type;
947
948         switch (type) {
949
950         case RINGBUF_TYPE_PADDING:
951                 break;
952
953         case RINGBUF_TYPE_TIME_EXTEND:
954                 event->len = DIV_ROUND_UP(RB_LEN_TIME_EXTEND, RB_ALIGNMENT);
955                 break;
956
957         case RINGBUF_TYPE_TIME_STAMP:
958                 event->len = DIV_ROUND_UP(RB_LEN_TIME_STAMP, RB_ALIGNMENT);
959                 break;
960
961         case RINGBUF_TYPE_DATA:
962                 length -= RB_EVNT_HDR_SIZE;
963                 if (length > RB_MAX_SMALL_DATA) {
964                         event->len = 0;
965                         event->array[0] = length;
966                 } else
967                         event->len = DIV_ROUND_UP(length, RB_ALIGNMENT);
968                 break;
969         default:
970                 BUG();
971         }
972 }
973
974 static unsigned rb_calculate_event_length(unsigned length)
975 {
976         struct ring_buffer_event event; /* Used only for sizeof array */
977
978         /* zero length can cause confusions */
979         if (!length)
980                 length = 1;
981
982         if (length > RB_MAX_SMALL_DATA)
983                 length += sizeof(event.array[0]);
984
985         length += RB_EVNT_HDR_SIZE;
986         length = ALIGN(length, RB_ALIGNMENT);
987
988         return length;
989 }
990
991 static struct ring_buffer_event *
992 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
993                   unsigned type, unsigned long length, u64 *ts)
994 {
995         struct buffer_page *tail_page, *head_page, *reader_page, *commit_page;
996         unsigned long tail, write;
997         struct ring_buffer *buffer = cpu_buffer->buffer;
998         struct ring_buffer_event *event;
999         unsigned long flags;
1000         bool lock_taken = false;
1001
1002         commit_page = cpu_buffer->commit_page;
1003         /* we just need to protect against interrupts */
1004         barrier();
1005         tail_page = cpu_buffer->tail_page;
1006         write = local_add_return(length, &tail_page->write);
1007         tail = write - length;
1008
1009         /* See if we shot pass the end of this buffer page */
1010         if (write > BUF_PAGE_SIZE) {
1011                 struct buffer_page *next_page = tail_page;
1012
1013                 local_irq_save(flags);
1014                 /*
1015                  * Since the write to the buffer is still not
1016                  * fully lockless, we must be careful with NMIs.
1017                  * The locks in the writers are taken when a write
1018                  * crosses to a new page. The locks protect against
1019                  * races with the readers (this will soon be fixed
1020                  * with a lockless solution).
1021                  *
1022                  * Because we can not protect against NMIs, and we
1023                  * want to keep traces reentrant, we need to manage
1024                  * what happens when we are in an NMI.
1025                  *
1026                  * NMIs can happen after we take the lock.
1027                  * If we are in an NMI, only take the lock
1028                  * if it is not already taken. Otherwise
1029                  * simply fail.
1030                  */
1031                 if (unlikely(in_nmi())) {
1032                         if (!__raw_spin_trylock(&cpu_buffer->lock))
1033                                 goto out_reset;
1034                 } else
1035                         __raw_spin_lock(&cpu_buffer->lock);
1036
1037                 lock_taken = true;
1038
1039                 rb_inc_page(cpu_buffer, &next_page);
1040
1041                 head_page = cpu_buffer->head_page;
1042                 reader_page = cpu_buffer->reader_page;
1043
1044                 /* we grabbed the lock before incrementing */
1045                 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1046                         goto out_reset;
1047
1048                 /*
1049                  * If for some reason, we had an interrupt storm that made
1050                  * it all the way around the buffer, bail, and warn
1051                  * about it.
1052                  */
1053                 if (unlikely(next_page == commit_page)) {
1054                         WARN_ON_ONCE(1);
1055                         goto out_reset;
1056                 }
1057
1058                 if (next_page == head_page) {
1059                         if (!(buffer->flags & RB_FL_OVERWRITE))
1060                                 goto out_reset;
1061
1062                         /* tail_page has not moved yet? */
1063                         if (tail_page == cpu_buffer->tail_page) {
1064                                 /* count overflows */
1065                                 rb_update_overflow(cpu_buffer);
1066
1067                                 rb_inc_page(cpu_buffer, &head_page);
1068                                 cpu_buffer->head_page = head_page;
1069                                 cpu_buffer->head_page->read = 0;
1070                         }
1071                 }
1072
1073                 /*
1074                  * If the tail page is still the same as what we think
1075                  * it is, then it is up to us to update the tail
1076                  * pointer.
1077                  */
1078                 if (tail_page == cpu_buffer->tail_page) {
1079                         local_set(&next_page->write, 0);
1080                         local_set(&next_page->page->commit, 0);
1081                         cpu_buffer->tail_page = next_page;
1082
1083                         /* reread the time stamp */
1084                         *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1085                         cpu_buffer->tail_page->page->time_stamp = *ts;
1086                 }
1087
1088                 /*
1089                  * The actual tail page has moved forward.
1090                  */
1091                 if (tail < BUF_PAGE_SIZE) {
1092                         /* Mark the rest of the page with padding */
1093                         event = __rb_page_index(tail_page, tail);
1094                         event->type = RINGBUF_TYPE_PADDING;
1095                 }
1096
1097                 if (tail <= BUF_PAGE_SIZE)
1098                         /* Set the write back to the previous setting */
1099                         local_set(&tail_page->write, tail);
1100
1101                 /*
1102                  * If this was a commit entry that failed,
1103                  * increment that too
1104                  */
1105                 if (tail_page == cpu_buffer->commit_page &&
1106                     tail == rb_commit_index(cpu_buffer)) {
1107                         rb_set_commit_to_write(cpu_buffer);
1108                 }
1109
1110                 __raw_spin_unlock(&cpu_buffer->lock);
1111                 local_irq_restore(flags);
1112
1113                 /* fail and let the caller try again */
1114                 return ERR_PTR(-EAGAIN);
1115         }
1116
1117         /* We reserved something on the buffer */
1118
1119         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1120                 return NULL;
1121
1122         event = __rb_page_index(tail_page, tail);
1123         rb_update_event(event, type, length);
1124
1125         /*
1126          * If this is a commit and the tail is zero, then update
1127          * this page's time stamp.
1128          */
1129         if (!tail && rb_is_commit(cpu_buffer, event))
1130                 cpu_buffer->commit_page->page->time_stamp = *ts;
1131
1132         return event;
1133
1134  out_reset:
1135         /* reset write */
1136         if (tail <= BUF_PAGE_SIZE)
1137                 local_set(&tail_page->write, tail);
1138
1139         if (likely(lock_taken))
1140                 __raw_spin_unlock(&cpu_buffer->lock);
1141         local_irq_restore(flags);
1142         return NULL;
1143 }
1144
1145 static int
1146 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1147                   u64 *ts, u64 *delta)
1148 {
1149         struct ring_buffer_event *event;
1150         static int once;
1151         int ret;
1152
1153         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1154                 printk(KERN_WARNING "Delta way too big! %llu"
1155                        " ts=%llu write stamp = %llu\n",
1156                        (unsigned long long)*delta,
1157                        (unsigned long long)*ts,
1158                        (unsigned long long)cpu_buffer->write_stamp);
1159                 WARN_ON(1);
1160         }
1161
1162         /*
1163          * The delta is too big, we to add a
1164          * new timestamp.
1165          */
1166         event = __rb_reserve_next(cpu_buffer,
1167                                   RINGBUF_TYPE_TIME_EXTEND,
1168                                   RB_LEN_TIME_EXTEND,
1169                                   ts);
1170         if (!event)
1171                 return -EBUSY;
1172
1173         if (PTR_ERR(event) == -EAGAIN)
1174                 return -EAGAIN;
1175
1176         /* Only a commited time event can update the write stamp */
1177         if (rb_is_commit(cpu_buffer, event)) {
1178                 /*
1179                  * If this is the first on the page, then we need to
1180                  * update the page itself, and just put in a zero.
1181                  */
1182                 if (rb_event_index(event)) {
1183                         event->time_delta = *delta & TS_MASK;
1184                         event->array[0] = *delta >> TS_SHIFT;
1185                 } else {
1186                         cpu_buffer->commit_page->page->time_stamp = *ts;
1187                         event->time_delta = 0;
1188                         event->array[0] = 0;
1189                 }
1190                 cpu_buffer->write_stamp = *ts;
1191                 /* let the caller know this was the commit */
1192                 ret = 1;
1193         } else {
1194                 /* Darn, this is just wasted space */
1195                 event->time_delta = 0;
1196                 event->array[0] = 0;
1197                 ret = 0;
1198         }
1199
1200         *delta = 0;
1201
1202         return ret;
1203 }
1204
1205 static struct ring_buffer_event *
1206 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1207                       unsigned type, unsigned long length)
1208 {
1209         struct ring_buffer_event *event;
1210         u64 ts, delta;
1211         int commit = 0;
1212         int nr_loops = 0;
1213
1214  again:
1215         /*
1216          * We allow for interrupts to reenter here and do a trace.
1217          * If one does, it will cause this original code to loop
1218          * back here. Even with heavy interrupts happening, this
1219          * should only happen a few times in a row. If this happens
1220          * 1000 times in a row, there must be either an interrupt
1221          * storm or we have something buggy.
1222          * Bail!
1223          */
1224         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1225                 return NULL;
1226
1227         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1228
1229         /*
1230          * Only the first commit can update the timestamp.
1231          * Yes there is a race here. If an interrupt comes in
1232          * just after the conditional and it traces too, then it
1233          * will also check the deltas. More than one timestamp may
1234          * also be made. But only the entry that did the actual
1235          * commit will be something other than zero.
1236          */
1237         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1238             rb_page_write(cpu_buffer->tail_page) ==
1239             rb_commit_index(cpu_buffer)) {
1240
1241                 delta = ts - cpu_buffer->write_stamp;
1242
1243                 /* make sure this delta is calculated here */
1244                 barrier();
1245
1246                 /* Did the write stamp get updated already? */
1247                 if (unlikely(ts < cpu_buffer->write_stamp))
1248                         delta = 0;
1249
1250                 if (test_time_stamp(delta)) {
1251
1252                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1253
1254                         if (commit == -EBUSY)
1255                                 return NULL;
1256
1257                         if (commit == -EAGAIN)
1258                                 goto again;
1259
1260                         RB_WARN_ON(cpu_buffer, commit < 0);
1261                 }
1262         } else
1263                 /* Non commits have zero deltas */
1264                 delta = 0;
1265
1266         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1267         if (PTR_ERR(event) == -EAGAIN)
1268                 goto again;
1269
1270         if (!event) {
1271                 if (unlikely(commit))
1272                         /*
1273                          * Ouch! We needed a timestamp and it was commited. But
1274                          * we didn't get our event reserved.
1275                          */
1276                         rb_set_commit_to_write(cpu_buffer);
1277                 return NULL;
1278         }
1279
1280         /*
1281          * If the timestamp was commited, make the commit our entry
1282          * now so that we will update it when needed.
1283          */
1284         if (commit)
1285                 rb_set_commit_event(cpu_buffer, event);
1286         else if (!rb_is_commit(cpu_buffer, event))
1287                 delta = 0;
1288
1289         event->time_delta = delta;
1290
1291         return event;
1292 }
1293
1294 static DEFINE_PER_CPU(int, rb_need_resched);
1295
1296 /**
1297  * ring_buffer_lock_reserve - reserve a part of the buffer
1298  * @buffer: the ring buffer to reserve from
1299  * @length: the length of the data to reserve (excluding event header)
1300  *
1301  * Returns a reseverd event on the ring buffer to copy directly to.
1302  * The user of this interface will need to get the body to write into
1303  * and can use the ring_buffer_event_data() interface.
1304  *
1305  * The length is the length of the data needed, not the event length
1306  * which also includes the event header.
1307  *
1308  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1309  * If NULL is returned, then nothing has been allocated or locked.
1310  */
1311 struct ring_buffer_event *
1312 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1313 {
1314         struct ring_buffer_per_cpu *cpu_buffer;
1315         struct ring_buffer_event *event;
1316         int cpu, resched;
1317
1318         if (ring_buffer_flags != RB_BUFFERS_ON)
1319                 return NULL;
1320
1321         if (atomic_read(&buffer->record_disabled))
1322                 return NULL;
1323
1324         /* If we are tracing schedule, we don't want to recurse */
1325         resched = ftrace_preempt_disable();
1326
1327         cpu = raw_smp_processor_id();
1328
1329         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1330                 goto out;
1331
1332         cpu_buffer = buffer->buffers[cpu];
1333
1334         if (atomic_read(&cpu_buffer->record_disabled))
1335                 goto out;
1336
1337         length = rb_calculate_event_length(length);
1338         if (length > BUF_PAGE_SIZE)
1339                 goto out;
1340
1341         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1342         if (!event)
1343                 goto out;
1344
1345         /*
1346          * Need to store resched state on this cpu.
1347          * Only the first needs to.
1348          */
1349
1350         if (preempt_count() == 1)
1351                 per_cpu(rb_need_resched, cpu) = resched;
1352
1353         return event;
1354
1355  out:
1356         ftrace_preempt_enable(resched);
1357         return NULL;
1358 }
1359 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1360
1361 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1362                       struct ring_buffer_event *event)
1363 {
1364         cpu_buffer->entries++;
1365
1366         /* Only process further if we own the commit */
1367         if (!rb_is_commit(cpu_buffer, event))
1368                 return;
1369
1370         cpu_buffer->write_stamp += event->time_delta;
1371
1372         rb_set_commit_to_write(cpu_buffer);
1373 }
1374
1375 /**
1376  * ring_buffer_unlock_commit - commit a reserved
1377  * @buffer: The buffer to commit to
1378  * @event: The event pointer to commit.
1379  *
1380  * This commits the data to the ring buffer, and releases any locks held.
1381  *
1382  * Must be paired with ring_buffer_lock_reserve.
1383  */
1384 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1385                               struct ring_buffer_event *event)
1386 {
1387         struct ring_buffer_per_cpu *cpu_buffer;
1388         int cpu = raw_smp_processor_id();
1389
1390         cpu_buffer = buffer->buffers[cpu];
1391
1392         rb_commit(cpu_buffer, event);
1393
1394         /*
1395          * Only the last preempt count needs to restore preemption.
1396          */
1397         if (preempt_count() == 1)
1398                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1399         else
1400                 preempt_enable_no_resched_notrace();
1401
1402         return 0;
1403 }
1404 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1405
1406 /**
1407  * ring_buffer_write - write data to the buffer without reserving
1408  * @buffer: The ring buffer to write to.
1409  * @length: The length of the data being written (excluding the event header)
1410  * @data: The data to write to the buffer.
1411  *
1412  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1413  * one function. If you already have the data to write to the buffer, it
1414  * may be easier to simply call this function.
1415  *
1416  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1417  * and not the length of the event which would hold the header.
1418  */
1419 int ring_buffer_write(struct ring_buffer *buffer,
1420                         unsigned long length,
1421                         void *data)
1422 {
1423         struct ring_buffer_per_cpu *cpu_buffer;
1424         struct ring_buffer_event *event;
1425         unsigned long event_length;
1426         void *body;
1427         int ret = -EBUSY;
1428         int cpu, resched;
1429
1430         if (ring_buffer_flags != RB_BUFFERS_ON)
1431                 return -EBUSY;
1432
1433         if (atomic_read(&buffer->record_disabled))
1434                 return -EBUSY;
1435
1436         resched = ftrace_preempt_disable();
1437
1438         cpu = raw_smp_processor_id();
1439
1440         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1441                 goto out;
1442
1443         cpu_buffer = buffer->buffers[cpu];
1444
1445         if (atomic_read(&cpu_buffer->record_disabled))
1446                 goto out;
1447
1448         event_length = rb_calculate_event_length(length);
1449         event = rb_reserve_next_event(cpu_buffer,
1450                                       RINGBUF_TYPE_DATA, event_length);
1451         if (!event)
1452                 goto out;
1453
1454         body = rb_event_data(event);
1455
1456         memcpy(body, data, length);
1457
1458         rb_commit(cpu_buffer, event);
1459
1460         ret = 0;
1461  out:
1462         ftrace_preempt_enable(resched);
1463
1464         return ret;
1465 }
1466 EXPORT_SYMBOL_GPL(ring_buffer_write);
1467
1468 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1469 {
1470         struct buffer_page *reader = cpu_buffer->reader_page;
1471         struct buffer_page *head = cpu_buffer->head_page;
1472         struct buffer_page *commit = cpu_buffer->commit_page;
1473
1474         return reader->read == rb_page_commit(reader) &&
1475                 (commit == reader ||
1476                  (commit == head &&
1477                   head->read == rb_page_commit(commit)));
1478 }
1479
1480 /**
1481  * ring_buffer_record_disable - stop all writes into the buffer
1482  * @buffer: The ring buffer to stop writes to.
1483  *
1484  * This prevents all writes to the buffer. Any attempt to write
1485  * to the buffer after this will fail and return NULL.
1486  *
1487  * The caller should call synchronize_sched() after this.
1488  */
1489 void ring_buffer_record_disable(struct ring_buffer *buffer)
1490 {
1491         atomic_inc(&buffer->record_disabled);
1492 }
1493 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1494
1495 /**
1496  * ring_buffer_record_enable - enable writes to the buffer
1497  * @buffer: The ring buffer to enable writes
1498  *
1499  * Note, multiple disables will need the same number of enables
1500  * to truely enable the writing (much like preempt_disable).
1501  */
1502 void ring_buffer_record_enable(struct ring_buffer *buffer)
1503 {
1504         atomic_dec(&buffer->record_disabled);
1505 }
1506 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1507
1508 /**
1509  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1510  * @buffer: The ring buffer to stop writes to.
1511  * @cpu: The CPU buffer to stop
1512  *
1513  * This prevents all writes to the buffer. Any attempt to write
1514  * to the buffer after this will fail and return NULL.
1515  *
1516  * The caller should call synchronize_sched() after this.
1517  */
1518 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1519 {
1520         struct ring_buffer_per_cpu *cpu_buffer;
1521
1522         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1523                 return;
1524
1525         cpu_buffer = buffer->buffers[cpu];
1526         atomic_inc(&cpu_buffer->record_disabled);
1527 }
1528 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1529
1530 /**
1531  * ring_buffer_record_enable_cpu - enable writes to the buffer
1532  * @buffer: The ring buffer to enable writes
1533  * @cpu: The CPU to enable.
1534  *
1535  * Note, multiple disables will need the same number of enables
1536  * to truely enable the writing (much like preempt_disable).
1537  */
1538 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1539 {
1540         struct ring_buffer_per_cpu *cpu_buffer;
1541
1542         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1543                 return;
1544
1545         cpu_buffer = buffer->buffers[cpu];
1546         atomic_dec(&cpu_buffer->record_disabled);
1547 }
1548 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1549
1550 /**
1551  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1552  * @buffer: The ring buffer
1553  * @cpu: The per CPU buffer to get the entries from.
1554  */
1555 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1556 {
1557         struct ring_buffer_per_cpu *cpu_buffer;
1558
1559         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1560                 return 0;
1561
1562         cpu_buffer = buffer->buffers[cpu];
1563         return cpu_buffer->entries;
1564 }
1565 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1566
1567 /**
1568  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1569  * @buffer: The ring buffer
1570  * @cpu: The per CPU buffer to get the number of overruns from
1571  */
1572 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1573 {
1574         struct ring_buffer_per_cpu *cpu_buffer;
1575
1576         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1577                 return 0;
1578
1579         cpu_buffer = buffer->buffers[cpu];
1580         return cpu_buffer->overrun;
1581 }
1582 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1583
1584 /**
1585  * ring_buffer_entries - get the number of entries in a buffer
1586  * @buffer: The ring buffer
1587  *
1588  * Returns the total number of entries in the ring buffer
1589  * (all CPU entries)
1590  */
1591 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1592 {
1593         struct ring_buffer_per_cpu *cpu_buffer;
1594         unsigned long entries = 0;
1595         int cpu;
1596
1597         /* if you care about this being correct, lock the buffer */
1598         for_each_buffer_cpu(buffer, cpu) {
1599                 cpu_buffer = buffer->buffers[cpu];
1600                 entries += cpu_buffer->entries;
1601         }
1602
1603         return entries;
1604 }
1605 EXPORT_SYMBOL_GPL(ring_buffer_entries);
1606
1607 /**
1608  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1609  * @buffer: The ring buffer
1610  *
1611  * Returns the total number of overruns in the ring buffer
1612  * (all CPU entries)
1613  */
1614 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1615 {
1616         struct ring_buffer_per_cpu *cpu_buffer;
1617         unsigned long overruns = 0;
1618         int cpu;
1619
1620         /* if you care about this being correct, lock the buffer */
1621         for_each_buffer_cpu(buffer, cpu) {
1622                 cpu_buffer = buffer->buffers[cpu];
1623                 overruns += cpu_buffer->overrun;
1624         }
1625
1626         return overruns;
1627 }
1628 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
1629
1630 static void rb_iter_reset(struct ring_buffer_iter *iter)
1631 {
1632         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1633
1634         /* Iterator usage is expected to have record disabled */
1635         if (list_empty(&cpu_buffer->reader_page->list)) {
1636                 iter->head_page = cpu_buffer->head_page;
1637                 iter->head = cpu_buffer->head_page->read;
1638         } else {
1639                 iter->head_page = cpu_buffer->reader_page;
1640                 iter->head = cpu_buffer->reader_page->read;
1641         }
1642         if (iter->head)
1643                 iter->read_stamp = cpu_buffer->read_stamp;
1644         else
1645                 iter->read_stamp = iter->head_page->page->time_stamp;
1646 }
1647
1648 /**
1649  * ring_buffer_iter_reset - reset an iterator
1650  * @iter: The iterator to reset
1651  *
1652  * Resets the iterator, so that it will start from the beginning
1653  * again.
1654  */
1655 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1656 {
1657         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1658         unsigned long flags;
1659
1660         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1661         rb_iter_reset(iter);
1662         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1663 }
1664 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
1665
1666 /**
1667  * ring_buffer_iter_empty - check if an iterator has no more to read
1668  * @iter: The iterator to check
1669  */
1670 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1671 {
1672         struct ring_buffer_per_cpu *cpu_buffer;
1673
1674         cpu_buffer = iter->cpu_buffer;
1675
1676         return iter->head_page == cpu_buffer->commit_page &&
1677                 iter->head == rb_commit_index(cpu_buffer);
1678 }
1679 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
1680
1681 static void
1682 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1683                      struct ring_buffer_event *event)
1684 {
1685         u64 delta;
1686
1687         switch (event->type) {
1688         case RINGBUF_TYPE_PADDING:
1689                 return;
1690
1691         case RINGBUF_TYPE_TIME_EXTEND:
1692                 delta = event->array[0];
1693                 delta <<= TS_SHIFT;
1694                 delta += event->time_delta;
1695                 cpu_buffer->read_stamp += delta;
1696                 return;
1697
1698         case RINGBUF_TYPE_TIME_STAMP:
1699                 /* FIXME: not implemented */
1700                 return;
1701
1702         case RINGBUF_TYPE_DATA:
1703                 cpu_buffer->read_stamp += event->time_delta;
1704                 return;
1705
1706         default:
1707                 BUG();
1708         }
1709         return;
1710 }
1711
1712 static void
1713 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1714                           struct ring_buffer_event *event)
1715 {
1716         u64 delta;
1717
1718         switch (event->type) {
1719         case RINGBUF_TYPE_PADDING:
1720                 return;
1721
1722         case RINGBUF_TYPE_TIME_EXTEND:
1723                 delta = event->array[0];
1724                 delta <<= TS_SHIFT;
1725                 delta += event->time_delta;
1726                 iter->read_stamp += delta;
1727                 return;
1728
1729         case RINGBUF_TYPE_TIME_STAMP:
1730                 /* FIXME: not implemented */
1731                 return;
1732
1733         case RINGBUF_TYPE_DATA:
1734                 iter->read_stamp += event->time_delta;
1735                 return;
1736
1737         default:
1738                 BUG();
1739         }
1740         return;
1741 }
1742
1743 static struct buffer_page *
1744 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1745 {
1746         struct buffer_page *reader = NULL;
1747         unsigned long flags;
1748         int nr_loops = 0;
1749
1750         local_irq_save(flags);
1751         __raw_spin_lock(&cpu_buffer->lock);
1752
1753  again:
1754         /*
1755          * This should normally only loop twice. But because the
1756          * start of the reader inserts an empty page, it causes
1757          * a case where we will loop three times. There should be no
1758          * reason to loop four times (that I know of).
1759          */
1760         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1761                 reader = NULL;
1762                 goto out;
1763         }
1764
1765         reader = cpu_buffer->reader_page;
1766
1767         /* If there's more to read, return this page */
1768         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1769                 goto out;
1770
1771         /* Never should we have an index greater than the size */
1772         if (RB_WARN_ON(cpu_buffer,
1773                        cpu_buffer->reader_page->read > rb_page_size(reader)))
1774                 goto out;
1775
1776         /* check if we caught up to the tail */
1777         reader = NULL;
1778         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1779                 goto out;
1780
1781         /*
1782          * Splice the empty reader page into the list around the head.
1783          * Reset the reader page to size zero.
1784          */
1785
1786         reader = cpu_buffer->head_page;
1787         cpu_buffer->reader_page->list.next = reader->list.next;
1788         cpu_buffer->reader_page->list.prev = reader->list.prev;
1789
1790         local_set(&cpu_buffer->reader_page->write, 0);
1791         local_set(&cpu_buffer->reader_page->page->commit, 0);
1792
1793         /* Make the reader page now replace the head */
1794         reader->list.prev->next = &cpu_buffer->reader_page->list;
1795         reader->list.next->prev = &cpu_buffer->reader_page->list;
1796
1797         /*
1798          * If the tail is on the reader, then we must set the head
1799          * to the inserted page, otherwise we set it one before.
1800          */
1801         cpu_buffer->head_page = cpu_buffer->reader_page;
1802
1803         if (cpu_buffer->commit_page != reader)
1804                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1805
1806         /* Finally update the reader page to the new head */
1807         cpu_buffer->reader_page = reader;
1808         rb_reset_reader_page(cpu_buffer);
1809
1810         goto again;
1811
1812  out:
1813         __raw_spin_unlock(&cpu_buffer->lock);
1814         local_irq_restore(flags);
1815
1816         return reader;
1817 }
1818
1819 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1820 {
1821         struct ring_buffer_event *event;
1822         struct buffer_page *reader;
1823         unsigned length;
1824
1825         reader = rb_get_reader_page(cpu_buffer);
1826
1827         /* This function should not be called when buffer is empty */
1828         if (RB_WARN_ON(cpu_buffer, !reader))
1829                 return;
1830
1831         event = rb_reader_event(cpu_buffer);
1832
1833         if (event->type == RINGBUF_TYPE_DATA)
1834                 cpu_buffer->entries--;
1835
1836         rb_update_read_stamp(cpu_buffer, event);
1837
1838         length = rb_event_length(event);
1839         cpu_buffer->reader_page->read += length;
1840 }
1841
1842 static void rb_advance_iter(struct ring_buffer_iter *iter)
1843 {
1844         struct ring_buffer *buffer;
1845         struct ring_buffer_per_cpu *cpu_buffer;
1846         struct ring_buffer_event *event;
1847         unsigned length;
1848
1849         cpu_buffer = iter->cpu_buffer;
1850         buffer = cpu_buffer->buffer;
1851
1852         /*
1853          * Check if we are at the end of the buffer.
1854          */
1855         if (iter->head >= rb_page_size(iter->head_page)) {
1856                 if (RB_WARN_ON(buffer,
1857                                iter->head_page == cpu_buffer->commit_page))
1858                         return;
1859                 rb_inc_iter(iter);
1860                 return;
1861         }
1862
1863         event = rb_iter_head_event(iter);
1864
1865         length = rb_event_length(event);
1866
1867         /*
1868          * This should not be called to advance the header if we are
1869          * at the tail of the buffer.
1870          */
1871         if (RB_WARN_ON(cpu_buffer,
1872                        (iter->head_page == cpu_buffer->commit_page) &&
1873                        (iter->head + length > rb_commit_index(cpu_buffer))))
1874                 return;
1875
1876         rb_update_iter_read_stamp(iter, event);
1877
1878         iter->head += length;
1879
1880         /* check for end of page padding */
1881         if ((iter->head >= rb_page_size(iter->head_page)) &&
1882             (iter->head_page != cpu_buffer->commit_page))
1883                 rb_advance_iter(iter);
1884 }
1885
1886 static struct ring_buffer_event *
1887 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1888 {
1889         struct ring_buffer_per_cpu *cpu_buffer;
1890         struct ring_buffer_event *event;
1891         struct buffer_page *reader;
1892         int nr_loops = 0;
1893
1894         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1895                 return NULL;
1896
1897         cpu_buffer = buffer->buffers[cpu];
1898
1899  again:
1900         /*
1901          * We repeat when a timestamp is encountered. It is possible
1902          * to get multiple timestamps from an interrupt entering just
1903          * as one timestamp is about to be written. The max times
1904          * that this can happen is the number of nested interrupts we
1905          * can have.  Nesting 10 deep of interrupts is clearly
1906          * an anomaly.
1907          */
1908         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1909                 return NULL;
1910
1911         reader = rb_get_reader_page(cpu_buffer);
1912         if (!reader)
1913                 return NULL;
1914
1915         event = rb_reader_event(cpu_buffer);
1916
1917         switch (event->type) {
1918         case RINGBUF_TYPE_PADDING:
1919                 RB_WARN_ON(cpu_buffer, 1);
1920                 rb_advance_reader(cpu_buffer);
1921                 return NULL;
1922
1923         case RINGBUF_TYPE_TIME_EXTEND:
1924                 /* Internal data, OK to advance */
1925                 rb_advance_reader(cpu_buffer);
1926                 goto again;
1927
1928         case RINGBUF_TYPE_TIME_STAMP:
1929                 /* FIXME: not implemented */
1930                 rb_advance_reader(cpu_buffer);
1931                 goto again;
1932
1933         case RINGBUF_TYPE_DATA:
1934                 if (ts) {
1935                         *ts = cpu_buffer->read_stamp + event->time_delta;
1936                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1937                 }
1938                 return event;
1939
1940         default:
1941                 BUG();
1942         }
1943
1944         return NULL;
1945 }
1946 EXPORT_SYMBOL_GPL(ring_buffer_peek);
1947
1948 static struct ring_buffer_event *
1949 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1950 {
1951         struct ring_buffer *buffer;
1952         struct ring_buffer_per_cpu *cpu_buffer;
1953         struct ring_buffer_event *event;
1954         int nr_loops = 0;
1955
1956         if (ring_buffer_iter_empty(iter))
1957                 return NULL;
1958
1959         cpu_buffer = iter->cpu_buffer;
1960         buffer = cpu_buffer->buffer;
1961
1962  again:
1963         /*
1964          * We repeat when a timestamp is encountered. It is possible
1965          * to get multiple timestamps from an interrupt entering just
1966          * as one timestamp is about to be written. The max times
1967          * that this can happen is the number of nested interrupts we
1968          * can have. Nesting 10 deep of interrupts is clearly
1969          * an anomaly.
1970          */
1971         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1972                 return NULL;
1973
1974         if (rb_per_cpu_empty(cpu_buffer))
1975                 return NULL;
1976
1977         event = rb_iter_head_event(iter);
1978
1979         switch (event->type) {
1980         case RINGBUF_TYPE_PADDING:
1981                 rb_inc_iter(iter);
1982                 goto again;
1983
1984         case RINGBUF_TYPE_TIME_EXTEND:
1985                 /* Internal data, OK to advance */
1986                 rb_advance_iter(iter);
1987                 goto again;
1988
1989         case RINGBUF_TYPE_TIME_STAMP:
1990                 /* FIXME: not implemented */
1991                 rb_advance_iter(iter);
1992                 goto again;
1993
1994         case RINGBUF_TYPE_DATA:
1995                 if (ts) {
1996                         *ts = iter->read_stamp + event->time_delta;
1997                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1998                 }
1999                 return event;
2000
2001         default:
2002                 BUG();
2003         }
2004
2005         return NULL;
2006 }
2007 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
2008
2009 /**
2010  * ring_buffer_peek - peek at the next event to be read
2011  * @buffer: The ring buffer to read
2012  * @cpu: The cpu to peak at
2013  * @ts: The timestamp counter of this event.
2014  *
2015  * This will return the event that will be read next, but does
2016  * not consume the data.
2017  */
2018 struct ring_buffer_event *
2019 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2020 {
2021         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2022         struct ring_buffer_event *event;
2023         unsigned long flags;
2024
2025         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2026         event = rb_buffer_peek(buffer, cpu, ts);
2027         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2028
2029         return event;
2030 }
2031
2032 /**
2033  * ring_buffer_iter_peek - peek at the next event to be read
2034  * @iter: The ring buffer iterator
2035  * @ts: The timestamp counter of this event.
2036  *
2037  * This will return the event that will be read next, but does
2038  * not increment the iterator.
2039  */
2040 struct ring_buffer_event *
2041 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2042 {
2043         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2044         struct ring_buffer_event *event;
2045         unsigned long flags;
2046
2047         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2048         event = rb_iter_peek(iter, ts);
2049         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2050
2051         return event;
2052 }
2053
2054 /**
2055  * ring_buffer_consume - return an event and consume it
2056  * @buffer: The ring buffer to get the next event from
2057  *
2058  * Returns the next event in the ring buffer, and that event is consumed.
2059  * Meaning, that sequential reads will keep returning a different event,
2060  * and eventually empty the ring buffer if the producer is slower.
2061  */
2062 struct ring_buffer_event *
2063 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2064 {
2065         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2066         struct ring_buffer_event *event;
2067         unsigned long flags;
2068
2069         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2070                 return NULL;
2071
2072         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2073
2074         event = rb_buffer_peek(buffer, cpu, ts);
2075         if (!event)
2076                 goto out;
2077
2078         rb_advance_reader(cpu_buffer);
2079
2080  out:
2081         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2082
2083         return event;
2084 }
2085 EXPORT_SYMBOL_GPL(ring_buffer_consume);
2086
2087 /**
2088  * ring_buffer_read_start - start a non consuming read of the buffer
2089  * @buffer: The ring buffer to read from
2090  * @cpu: The cpu buffer to iterate over
2091  *
2092  * This starts up an iteration through the buffer. It also disables
2093  * the recording to the buffer until the reading is finished.
2094  * This prevents the reading from being corrupted. This is not
2095  * a consuming read, so a producer is not expected.
2096  *
2097  * Must be paired with ring_buffer_finish.
2098  */
2099 struct ring_buffer_iter *
2100 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2101 {
2102         struct ring_buffer_per_cpu *cpu_buffer;
2103         struct ring_buffer_iter *iter;
2104         unsigned long flags;
2105
2106         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2107                 return NULL;
2108
2109         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2110         if (!iter)
2111                 return NULL;
2112
2113         cpu_buffer = buffer->buffers[cpu];
2114
2115         iter->cpu_buffer = cpu_buffer;
2116
2117         atomic_inc(&cpu_buffer->record_disabled);
2118         synchronize_sched();
2119
2120         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2121         __raw_spin_lock(&cpu_buffer->lock);
2122         rb_iter_reset(iter);
2123         __raw_spin_unlock(&cpu_buffer->lock);
2124         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2125
2126         return iter;
2127 }
2128 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2129
2130 /**
2131  * ring_buffer_finish - finish reading the iterator of the buffer
2132  * @iter: The iterator retrieved by ring_buffer_start
2133  *
2134  * This re-enables the recording to the buffer, and frees the
2135  * iterator.
2136  */
2137 void
2138 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2139 {
2140         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2141
2142         atomic_dec(&cpu_buffer->record_disabled);
2143         kfree(iter);
2144 }
2145 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
2146
2147 /**
2148  * ring_buffer_read - read the next item in the ring buffer by the iterator
2149  * @iter: The ring buffer iterator
2150  * @ts: The time stamp of the event read.
2151  *
2152  * This reads the next event in the ring buffer and increments the iterator.
2153  */
2154 struct ring_buffer_event *
2155 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2156 {
2157         struct ring_buffer_event *event;
2158         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2159         unsigned long flags;
2160
2161         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2162         event = rb_iter_peek(iter, ts);
2163         if (!event)
2164                 goto out;
2165
2166         rb_advance_iter(iter);
2167  out:
2168         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2169
2170         return event;
2171 }
2172 EXPORT_SYMBOL_GPL(ring_buffer_read);
2173
2174 /**
2175  * ring_buffer_size - return the size of the ring buffer (in bytes)
2176  * @buffer: The ring buffer.
2177  */
2178 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2179 {
2180         return BUF_PAGE_SIZE * buffer->pages;
2181 }
2182 EXPORT_SYMBOL_GPL(ring_buffer_size);
2183
2184 static void
2185 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2186 {
2187         cpu_buffer->head_page
2188                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2189         local_set(&cpu_buffer->head_page->write, 0);
2190         local_set(&cpu_buffer->head_page->page->commit, 0);
2191
2192         cpu_buffer->head_page->read = 0;
2193
2194         cpu_buffer->tail_page = cpu_buffer->head_page;
2195         cpu_buffer->commit_page = cpu_buffer->head_page;
2196
2197         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2198         local_set(&cpu_buffer->reader_page->write, 0);
2199         local_set(&cpu_buffer->reader_page->page->commit, 0);
2200         cpu_buffer->reader_page->read = 0;
2201
2202         cpu_buffer->overrun = 0;
2203         cpu_buffer->entries = 0;
2204
2205         cpu_buffer->write_stamp = 0;
2206         cpu_buffer->read_stamp = 0;
2207 }
2208
2209 /**
2210  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2211  * @buffer: The ring buffer to reset a per cpu buffer of
2212  * @cpu: The CPU buffer to be reset
2213  */
2214 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2215 {
2216         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2217         unsigned long flags;
2218
2219         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2220                 return;
2221
2222         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2223
2224         __raw_spin_lock(&cpu_buffer->lock);
2225
2226         rb_reset_cpu(cpu_buffer);
2227
2228         __raw_spin_unlock(&cpu_buffer->lock);
2229
2230         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2231 }
2232 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2233
2234 /**
2235  * ring_buffer_reset - reset a ring buffer
2236  * @buffer: The ring buffer to reset all cpu buffers
2237  */
2238 void ring_buffer_reset(struct ring_buffer *buffer)
2239 {
2240         int cpu;
2241
2242         for_each_buffer_cpu(buffer, cpu)
2243                 ring_buffer_reset_cpu(buffer, cpu);
2244 }
2245 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2246
2247 /**
2248  * rind_buffer_empty - is the ring buffer empty?
2249  * @buffer: The ring buffer to test
2250  */
2251 int ring_buffer_empty(struct ring_buffer *buffer)
2252 {
2253         struct ring_buffer_per_cpu *cpu_buffer;
2254         int cpu;
2255
2256         /* yes this is racy, but if you don't like the race, lock the buffer */
2257         for_each_buffer_cpu(buffer, cpu) {
2258                 cpu_buffer = buffer->buffers[cpu];
2259                 if (!rb_per_cpu_empty(cpu_buffer))
2260                         return 0;
2261         }
2262         return 1;
2263 }
2264 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2265
2266 /**
2267  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2268  * @buffer: The ring buffer
2269  * @cpu: The CPU buffer to test
2270  */
2271 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2272 {
2273         struct ring_buffer_per_cpu *cpu_buffer;
2274
2275         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2276                 return 1;
2277
2278         cpu_buffer = buffer->buffers[cpu];
2279         return rb_per_cpu_empty(cpu_buffer);
2280 }
2281 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2282
2283 /**
2284  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2285  * @buffer_a: One buffer to swap with
2286  * @buffer_b: The other buffer to swap with
2287  *
2288  * This function is useful for tracers that want to take a "snapshot"
2289  * of a CPU buffer and has another back up buffer lying around.
2290  * it is expected that the tracer handles the cpu buffer not being
2291  * used at the moment.
2292  */
2293 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2294                          struct ring_buffer *buffer_b, int cpu)
2295 {
2296         struct ring_buffer_per_cpu *cpu_buffer_a;
2297         struct ring_buffer_per_cpu *cpu_buffer_b;
2298
2299         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
2300             !cpumask_test_cpu(cpu, buffer_b->cpumask))
2301                 return -EINVAL;
2302
2303         /* At least make sure the two buffers are somewhat the same */
2304         if (buffer_a->pages != buffer_b->pages)
2305                 return -EINVAL;
2306
2307         if (ring_buffer_flags != RB_BUFFERS_ON)
2308                 return -EAGAIN;
2309
2310         if (atomic_read(&buffer_a->record_disabled))
2311                 return -EAGAIN;
2312
2313         if (atomic_read(&buffer_b->record_disabled))
2314                 return -EAGAIN;
2315
2316         cpu_buffer_a = buffer_a->buffers[cpu];
2317         cpu_buffer_b = buffer_b->buffers[cpu];
2318
2319         if (atomic_read(&cpu_buffer_a->record_disabled))
2320                 return -EAGAIN;
2321
2322         if (atomic_read(&cpu_buffer_b->record_disabled))
2323                 return -EAGAIN;
2324
2325         /*
2326          * We can't do a synchronize_sched here because this
2327          * function can be called in atomic context.
2328          * Normally this will be called from the same CPU as cpu.
2329          * If not it's up to the caller to protect this.
2330          */
2331         atomic_inc(&cpu_buffer_a->record_disabled);
2332         atomic_inc(&cpu_buffer_b->record_disabled);
2333
2334         buffer_a->buffers[cpu] = cpu_buffer_b;
2335         buffer_b->buffers[cpu] = cpu_buffer_a;
2336
2337         cpu_buffer_b->buffer = buffer_a;
2338         cpu_buffer_a->buffer = buffer_b;
2339
2340         atomic_dec(&cpu_buffer_a->record_disabled);
2341         atomic_dec(&cpu_buffer_b->record_disabled);
2342
2343         return 0;
2344 }
2345 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2346
2347 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
2348                               struct buffer_data_page *bpage,
2349                               unsigned int offset)
2350 {
2351         struct ring_buffer_event *event;
2352         unsigned long head;
2353
2354         __raw_spin_lock(&cpu_buffer->lock);
2355         for (head = offset; head < local_read(&bpage->commit);
2356              head += rb_event_length(event)) {
2357
2358                 event = __rb_data_page_index(bpage, head);
2359                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
2360                         return;
2361                 /* Only count data entries */
2362                 if (event->type != RINGBUF_TYPE_DATA)
2363                         continue;
2364                 cpu_buffer->entries--;
2365         }
2366         __raw_spin_unlock(&cpu_buffer->lock);
2367 }
2368
2369 /**
2370  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2371  * @buffer: the buffer to allocate for.
2372  *
2373  * This function is used in conjunction with ring_buffer_read_page.
2374  * When reading a full page from the ring buffer, these functions
2375  * can be used to speed up the process. The calling function should
2376  * allocate a few pages first with this function. Then when it
2377  * needs to get pages from the ring buffer, it passes the result
2378  * of this function into ring_buffer_read_page, which will swap
2379  * the page that was allocated, with the read page of the buffer.
2380  *
2381  * Returns:
2382  *  The page allocated, or NULL on error.
2383  */
2384 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2385 {
2386         struct buffer_data_page *bpage;
2387         unsigned long addr;
2388
2389         addr = __get_free_page(GFP_KERNEL);
2390         if (!addr)
2391                 return NULL;
2392
2393         bpage = (void *)addr;
2394
2395         rb_init_page(bpage);
2396
2397         return bpage;
2398 }
2399
2400 /**
2401  * ring_buffer_free_read_page - free an allocated read page
2402  * @buffer: the buffer the page was allocate for
2403  * @data: the page to free
2404  *
2405  * Free a page allocated from ring_buffer_alloc_read_page.
2406  */
2407 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2408 {
2409         free_page((unsigned long)data);
2410 }
2411
2412 /**
2413  * ring_buffer_read_page - extract a page from the ring buffer
2414  * @buffer: buffer to extract from
2415  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2416  * @len: amount to extract
2417  * @cpu: the cpu of the buffer to extract
2418  * @full: should the extraction only happen when the page is full.
2419  *
2420  * This function will pull out a page from the ring buffer and consume it.
2421  * @data_page must be the address of the variable that was returned
2422  * from ring_buffer_alloc_read_page. This is because the page might be used
2423  * to swap with a page in the ring buffer.
2424  *
2425  * for example:
2426  *      rpage = ring_buffer_alloc_read_page(buffer);
2427  *      if (!rpage)
2428  *              return error;
2429  *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
2430  *      if (ret >= 0)
2431  *              process_page(rpage, ret);
2432  *
2433  * When @full is set, the function will not return true unless
2434  * the writer is off the reader page.
2435  *
2436  * Note: it is up to the calling functions to handle sleeps and wakeups.
2437  *  The ring buffer can be used anywhere in the kernel and can not
2438  *  blindly call wake_up. The layer that uses the ring buffer must be
2439  *  responsible for that.
2440  *
2441  * Returns:
2442  *  >=0 if data has been transferred, returns the offset of consumed data.
2443  *  <0 if no data has been transferred.
2444  */
2445 int ring_buffer_read_page(struct ring_buffer *buffer,
2446                           void **data_page, size_t len, int cpu, int full)
2447 {
2448         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2449         struct ring_buffer_event *event;
2450         struct buffer_data_page *bpage;
2451         struct buffer_page *reader;
2452         unsigned long flags;
2453         unsigned int commit;
2454         unsigned int read;
2455         int ret = -1;
2456
2457         if (!data_page)
2458                 return -1;
2459
2460         bpage = *data_page;
2461         if (!bpage)
2462                 return -1;
2463
2464         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2465
2466         reader = rb_get_reader_page(cpu_buffer);
2467         if (!reader)
2468                 goto out;
2469
2470         event = rb_reader_event(cpu_buffer);
2471
2472         read = reader->read;
2473         commit = rb_page_commit(reader);
2474
2475         /*
2476          * If len > what's left on the page, and the writer is also off of
2477          * the read page, then simply switch the read page with the given
2478          * page. Otherwise we need to copy the data from the reader to the
2479          * writer.
2480          */
2481         if ((len < (commit - read)) ||
2482             cpu_buffer->reader_page == cpu_buffer->commit_page) {
2483                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
2484                 unsigned int pos = read;
2485                 unsigned int size;
2486
2487                 if (full)
2488                         goto out;
2489
2490                 if (len > (commit - read))
2491                         len = (commit - read);
2492
2493                 size = rb_event_length(event);
2494
2495                 if (len < size)
2496                         goto out;
2497
2498                 /* Need to copy one event at a time */
2499                 do {
2500                         memcpy(bpage->data + pos, rpage->data + pos, size);
2501
2502                         len -= size;
2503
2504                         rb_advance_reader(cpu_buffer);
2505                         pos = reader->read;
2506
2507                         event = rb_reader_event(cpu_buffer);
2508                         size = rb_event_length(event);
2509                 } while (len > size);
2510
2511                 /* update bpage */
2512                 local_set(&bpage->commit, pos);
2513                 bpage->time_stamp = rpage->time_stamp;
2514
2515         } else {
2516                 /* swap the pages */
2517                 rb_init_page(bpage);
2518                 bpage = reader->page;
2519                 reader->page = *data_page;
2520                 local_set(&reader->write, 0);
2521                 reader->read = 0;
2522                 *data_page = bpage;
2523
2524                 /* update the entry counter */
2525                 rb_remove_entries(cpu_buffer, bpage, read);
2526         }
2527         ret = read;
2528
2529  out:
2530         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2531
2532         return ret;
2533 }
2534
2535 static ssize_t
2536 rb_simple_read(struct file *filp, char __user *ubuf,
2537                size_t cnt, loff_t *ppos)
2538 {
2539         unsigned long *p = filp->private_data;
2540         char buf[64];
2541         int r;
2542
2543         if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
2544                 r = sprintf(buf, "permanently disabled\n");
2545         else
2546                 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
2547
2548         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2549 }
2550
2551 static ssize_t
2552 rb_simple_write(struct file *filp, const char __user *ubuf,
2553                 size_t cnt, loff_t *ppos)
2554 {
2555         unsigned long *p = filp->private_data;
2556         char buf[64];
2557         unsigned long val;
2558         int ret;
2559
2560         if (cnt >= sizeof(buf))
2561                 return -EINVAL;
2562
2563         if (copy_from_user(&buf, ubuf, cnt))
2564                 return -EFAULT;
2565
2566         buf[cnt] = 0;
2567
2568         ret = strict_strtoul(buf, 10, &val);
2569         if (ret < 0)
2570                 return ret;
2571
2572         if (val)
2573                 set_bit(RB_BUFFERS_ON_BIT, p);
2574         else
2575                 clear_bit(RB_BUFFERS_ON_BIT, p);
2576
2577         (*ppos)++;
2578
2579         return cnt;
2580 }
2581
2582 static struct file_operations rb_simple_fops = {
2583         .open           = tracing_open_generic,
2584         .read           = rb_simple_read,
2585         .write          = rb_simple_write,
2586 };
2587
2588
2589 static __init int rb_init_debugfs(void)
2590 {
2591         struct dentry *d_tracer;
2592         struct dentry *entry;
2593
2594         d_tracer = tracing_init_dentry();
2595
2596         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2597                                     &ring_buffer_flags, &rb_simple_fops);
2598         if (!entry)
2599                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2600
2601         return 0;
2602 }
2603
2604 fs_initcall(rb_init_debugfs);