ring-buffer: use commit counters for commit pointer accounting
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/ftrace_irq.h>
9 #include <linux/spinlock.h>
10 #include <linux/debugfs.h>
11 #include <linux/uaccess.h>
12 #include <linux/hardirq.h>
13 #include <linux/module.h>
14 #include <linux/percpu.h>
15 #include <linux/mutex.h>
16 #include <linux/init.h>
17 #include <linux/hash.h>
18 #include <linux/list.h>
19 #include <linux/cpu.h>
20 #include <linux/fs.h>
21
22 #include "trace.h"
23
24 /*
25  * The ring buffer header is special. We must manually up keep it.
26  */
27 int ring_buffer_print_entry_header(struct trace_seq *s)
28 {
29         int ret;
30
31         ret = trace_seq_printf(s, "# compressed entry header\n");
32         ret = trace_seq_printf(s, "\ttype_len    :    5 bits\n");
33         ret = trace_seq_printf(s, "\ttime_delta  :   27 bits\n");
34         ret = trace_seq_printf(s, "\tarray       :   32 bits\n");
35         ret = trace_seq_printf(s, "\n");
36         ret = trace_seq_printf(s, "\tpadding     : type == %d\n",
37                                RINGBUF_TYPE_PADDING);
38         ret = trace_seq_printf(s, "\ttime_extend : type == %d\n",
39                                RINGBUF_TYPE_TIME_EXTEND);
40         ret = trace_seq_printf(s, "\tdata max type_len  == %d\n",
41                                RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
42
43         return ret;
44 }
45
46 /*
47  * The ring buffer is made up of a list of pages. A separate list of pages is
48  * allocated for each CPU. A writer may only write to a buffer that is
49  * associated with the CPU it is currently executing on.  A reader may read
50  * from any per cpu buffer.
51  *
52  * The reader is special. For each per cpu buffer, the reader has its own
53  * reader page. When a reader has read the entire reader page, this reader
54  * page is swapped with another page in the ring buffer.
55  *
56  * Now, as long as the writer is off the reader page, the reader can do what
57  * ever it wants with that page. The writer will never write to that page
58  * again (as long as it is out of the ring buffer).
59  *
60  * Here's some silly ASCII art.
61  *
62  *   +------+
63  *   |reader|          RING BUFFER
64  *   |page  |
65  *   +------+        +---+   +---+   +---+
66  *                   |   |-->|   |-->|   |
67  *                   +---+   +---+   +---+
68  *                     ^               |
69  *                     |               |
70  *                     +---------------+
71  *
72  *
73  *   +------+
74  *   |reader|          RING BUFFER
75  *   |page  |------------------v
76  *   +------+        +---+   +---+   +---+
77  *                   |   |-->|   |-->|   |
78  *                   +---+   +---+   +---+
79  *                     ^               |
80  *                     |               |
81  *                     +---------------+
82  *
83  *
84  *   +------+
85  *   |reader|          RING BUFFER
86  *   |page  |------------------v
87  *   +------+        +---+   +---+   +---+
88  *      ^            |   |-->|   |-->|   |
89  *      |            +---+   +---+   +---+
90  *      |                              |
91  *      |                              |
92  *      +------------------------------+
93  *
94  *
95  *   +------+
96  *   |buffer|          RING BUFFER
97  *   |page  |------------------v
98  *   +------+        +---+   +---+   +---+
99  *      ^            |   |   |   |-->|   |
100  *      |   New      +---+   +---+   +---+
101  *      |  Reader------^               |
102  *      |   page                       |
103  *      +------------------------------+
104  *
105  *
106  * After we make this swap, the reader can hand this page off to the splice
107  * code and be done with it. It can even allocate a new page if it needs to
108  * and swap that into the ring buffer.
109  *
110  * We will be using cmpxchg soon to make all this lockless.
111  *
112  */
113
114 /*
115  * A fast way to enable or disable all ring buffers is to
116  * call tracing_on or tracing_off. Turning off the ring buffers
117  * prevents all ring buffers from being recorded to.
118  * Turning this switch on, makes it OK to write to the
119  * ring buffer, if the ring buffer is enabled itself.
120  *
121  * There's three layers that must be on in order to write
122  * to the ring buffer.
123  *
124  * 1) This global flag must be set.
125  * 2) The ring buffer must be enabled for recording.
126  * 3) The per cpu buffer must be enabled for recording.
127  *
128  * In case of an anomaly, this global flag has a bit set that
129  * will permantly disable all ring buffers.
130  */
131
132 /*
133  * Global flag to disable all recording to ring buffers
134  *  This has two bits: ON, DISABLED
135  *
136  *  ON   DISABLED
137  * ---- ----------
138  *   0      0        : ring buffers are off
139  *   1      0        : ring buffers are on
140  *   X      1        : ring buffers are permanently disabled
141  */
142
143 enum {
144         RB_BUFFERS_ON_BIT       = 0,
145         RB_BUFFERS_DISABLED_BIT = 1,
146 };
147
148 enum {
149         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
150         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
151 };
152
153 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
154
155 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
156
157 /**
158  * tracing_on - enable all tracing buffers
159  *
160  * This function enables all tracing buffers that may have been
161  * disabled with tracing_off.
162  */
163 void tracing_on(void)
164 {
165         set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
166 }
167 EXPORT_SYMBOL_GPL(tracing_on);
168
169 /**
170  * tracing_off - turn off all tracing buffers
171  *
172  * This function stops all tracing buffers from recording data.
173  * It does not disable any overhead the tracers themselves may
174  * be causing. This function simply causes all recording to
175  * the ring buffers to fail.
176  */
177 void tracing_off(void)
178 {
179         clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
180 }
181 EXPORT_SYMBOL_GPL(tracing_off);
182
183 /**
184  * tracing_off_permanent - permanently disable ring buffers
185  *
186  * This function, once called, will disable all ring buffers
187  * permanently.
188  */
189 void tracing_off_permanent(void)
190 {
191         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
192 }
193
194 /**
195  * tracing_is_on - show state of ring buffers enabled
196  */
197 int tracing_is_on(void)
198 {
199         return ring_buffer_flags == RB_BUFFERS_ON;
200 }
201 EXPORT_SYMBOL_GPL(tracing_is_on);
202
203 #include "trace.h"
204
205 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
206 #define RB_ALIGNMENT            4U
207 #define RB_MAX_SMALL_DATA       (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
208 #define RB_EVNT_MIN_SIZE        8U      /* two 32bit words */
209
210 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
211 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
212
213 enum {
214         RB_LEN_TIME_EXTEND = 8,
215         RB_LEN_TIME_STAMP = 16,
216 };
217
218 static inline int rb_null_event(struct ring_buffer_event *event)
219 {
220         return event->type_len == RINGBUF_TYPE_PADDING
221                         && event->time_delta == 0;
222 }
223
224 static inline int rb_discarded_event(struct ring_buffer_event *event)
225 {
226         return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta;
227 }
228
229 static void rb_event_set_padding(struct ring_buffer_event *event)
230 {
231         event->type_len = RINGBUF_TYPE_PADDING;
232         event->time_delta = 0;
233 }
234
235 static unsigned
236 rb_event_data_length(struct ring_buffer_event *event)
237 {
238         unsigned length;
239
240         if (event->type_len)
241                 length = event->type_len * RB_ALIGNMENT;
242         else
243                 length = event->array[0];
244         return length + RB_EVNT_HDR_SIZE;
245 }
246
247 /* inline for ring buffer fast paths */
248 static unsigned
249 rb_event_length(struct ring_buffer_event *event)
250 {
251         switch (event->type_len) {
252         case RINGBUF_TYPE_PADDING:
253                 if (rb_null_event(event))
254                         /* undefined */
255                         return -1;
256                 return  event->array[0] + RB_EVNT_HDR_SIZE;
257
258         case RINGBUF_TYPE_TIME_EXTEND:
259                 return RB_LEN_TIME_EXTEND;
260
261         case RINGBUF_TYPE_TIME_STAMP:
262                 return RB_LEN_TIME_STAMP;
263
264         case RINGBUF_TYPE_DATA:
265                 return rb_event_data_length(event);
266         default:
267                 BUG();
268         }
269         /* not hit */
270         return 0;
271 }
272
273 /**
274  * ring_buffer_event_length - return the length of the event
275  * @event: the event to get the length of
276  */
277 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
278 {
279         unsigned length = rb_event_length(event);
280         if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
281                 return length;
282         length -= RB_EVNT_HDR_SIZE;
283         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
284                 length -= sizeof(event->array[0]);
285         return length;
286 }
287 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
288
289 /* inline for ring buffer fast paths */
290 static void *
291 rb_event_data(struct ring_buffer_event *event)
292 {
293         BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
294         /* If length is in len field, then array[0] has the data */
295         if (event->type_len)
296                 return (void *)&event->array[0];
297         /* Otherwise length is in array[0] and array[1] has the data */
298         return (void *)&event->array[1];
299 }
300
301 /**
302  * ring_buffer_event_data - return the data of the event
303  * @event: the event to get the data from
304  */
305 void *ring_buffer_event_data(struct ring_buffer_event *event)
306 {
307         return rb_event_data(event);
308 }
309 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
310
311 #define for_each_buffer_cpu(buffer, cpu)                \
312         for_each_cpu(cpu, buffer->cpumask)
313
314 #define TS_SHIFT        27
315 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
316 #define TS_DELTA_TEST   (~TS_MASK)
317
318 struct buffer_data_page {
319         u64              time_stamp;    /* page time stamp */
320         local_t          commit;        /* write committed index */
321         unsigned char    data[];        /* data of buffer page */
322 };
323
324 struct buffer_page {
325         struct list_head list;          /* list of buffer pages */
326         local_t          write;         /* index for next write */
327         unsigned         read;          /* index for next read */
328         local_t          entries;       /* entries on this page */
329         struct buffer_data_page *page;  /* Actual data page */
330 };
331
332 static void rb_init_page(struct buffer_data_page *bpage)
333 {
334         local_set(&bpage->commit, 0);
335 }
336
337 /**
338  * ring_buffer_page_len - the size of data on the page.
339  * @page: The page to read
340  *
341  * Returns the amount of data on the page, including buffer page header.
342  */
343 size_t ring_buffer_page_len(void *page)
344 {
345         return local_read(&((struct buffer_data_page *)page)->commit)
346                 + BUF_PAGE_HDR_SIZE;
347 }
348
349 /*
350  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
351  * this issue out.
352  */
353 static void free_buffer_page(struct buffer_page *bpage)
354 {
355         free_page((unsigned long)bpage->page);
356         kfree(bpage);
357 }
358
359 /*
360  * We need to fit the time_stamp delta into 27 bits.
361  */
362 static inline int test_time_stamp(u64 delta)
363 {
364         if (delta & TS_DELTA_TEST)
365                 return 1;
366         return 0;
367 }
368
369 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
370
371 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
372 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
373
374 /* Max number of timestamps that can fit on a page */
375 #define RB_TIMESTAMPS_PER_PAGE  (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP)
376
377 int ring_buffer_print_page_header(struct trace_seq *s)
378 {
379         struct buffer_data_page field;
380         int ret;
381
382         ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
383                                "offset:0;\tsize:%u;\n",
384                                (unsigned int)sizeof(field.time_stamp));
385
386         ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
387                                "offset:%u;\tsize:%u;\n",
388                                (unsigned int)offsetof(typeof(field), commit),
389                                (unsigned int)sizeof(field.commit));
390
391         ret = trace_seq_printf(s, "\tfield: char data;\t"
392                                "offset:%u;\tsize:%u;\n",
393                                (unsigned int)offsetof(typeof(field), data),
394                                (unsigned int)BUF_PAGE_SIZE);
395
396         return ret;
397 }
398
399 /*
400  * head_page == tail_page && head == tail then buffer is empty.
401  */
402 struct ring_buffer_per_cpu {
403         int                             cpu;
404         struct ring_buffer              *buffer;
405         spinlock_t                      reader_lock; /* serialize readers */
406         raw_spinlock_t                  lock;
407         struct lock_class_key           lock_key;
408         struct list_head                pages;
409         struct buffer_page              *head_page;     /* read from head */
410         struct buffer_page              *tail_page;     /* write to tail */
411         struct buffer_page              *commit_page;   /* committed pages */
412         struct buffer_page              *reader_page;
413         unsigned long                   nmi_dropped;
414         unsigned long                   commit_overrun;
415         unsigned long                   overrun;
416         unsigned long                   read;
417         local_t                         entries;
418         local_t                         committing;
419         local_t                         commits;
420         u64                             write_stamp;
421         u64                             read_stamp;
422         atomic_t                        record_disabled;
423 };
424
425 struct ring_buffer {
426         unsigned                        pages;
427         unsigned                        flags;
428         int                             cpus;
429         atomic_t                        record_disabled;
430         cpumask_var_t                   cpumask;
431
432         struct lock_class_key           *reader_lock_key;
433
434         struct mutex                    mutex;
435
436         struct ring_buffer_per_cpu      **buffers;
437
438 #ifdef CONFIG_HOTPLUG_CPU
439         struct notifier_block           cpu_notify;
440 #endif
441         u64                             (*clock)(void);
442 };
443
444 struct ring_buffer_iter {
445         struct ring_buffer_per_cpu      *cpu_buffer;
446         unsigned long                   head;
447         struct buffer_page              *head_page;
448         u64                             read_stamp;
449 };
450
451 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
452 #define RB_WARN_ON(buffer, cond)                                \
453         ({                                                      \
454                 int _____ret = unlikely(cond);                  \
455                 if (_____ret) {                                 \
456                         atomic_inc(&buffer->record_disabled);   \
457                         WARN_ON(1);                             \
458                 }                                               \
459                 _____ret;                                       \
460         })
461
462 /* Up this if you want to test the TIME_EXTENTS and normalization */
463 #define DEBUG_SHIFT 0
464
465 static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu)
466 {
467         /* shift to debug/test normalization and TIME_EXTENTS */
468         return buffer->clock() << DEBUG_SHIFT;
469 }
470
471 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
472 {
473         u64 time;
474
475         preempt_disable_notrace();
476         time = rb_time_stamp(buffer, cpu);
477         preempt_enable_no_resched_notrace();
478
479         return time;
480 }
481 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
482
483 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
484                                       int cpu, u64 *ts)
485 {
486         /* Just stupid testing the normalize function and deltas */
487         *ts >>= DEBUG_SHIFT;
488 }
489 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
490
491 /**
492  * check_pages - integrity check of buffer pages
493  * @cpu_buffer: CPU buffer with pages to test
494  *
495  * As a safety measure we check to make sure the data pages have not
496  * been corrupted.
497  */
498 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
499 {
500         struct list_head *head = &cpu_buffer->pages;
501         struct buffer_page *bpage, *tmp;
502
503         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
504                 return -1;
505         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
506                 return -1;
507
508         list_for_each_entry_safe(bpage, tmp, head, list) {
509                 if (RB_WARN_ON(cpu_buffer,
510                                bpage->list.next->prev != &bpage->list))
511                         return -1;
512                 if (RB_WARN_ON(cpu_buffer,
513                                bpage->list.prev->next != &bpage->list))
514                         return -1;
515         }
516
517         return 0;
518 }
519
520 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
521                              unsigned nr_pages)
522 {
523         struct list_head *head = &cpu_buffer->pages;
524         struct buffer_page *bpage, *tmp;
525         unsigned long addr;
526         LIST_HEAD(pages);
527         unsigned i;
528
529         for (i = 0; i < nr_pages; i++) {
530                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
531                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
532                 if (!bpage)
533                         goto free_pages;
534                 list_add(&bpage->list, &pages);
535
536                 addr = __get_free_page(GFP_KERNEL);
537                 if (!addr)
538                         goto free_pages;
539                 bpage->page = (void *)addr;
540                 rb_init_page(bpage->page);
541         }
542
543         list_splice(&pages, head);
544
545         rb_check_pages(cpu_buffer);
546
547         return 0;
548
549  free_pages:
550         list_for_each_entry_safe(bpage, tmp, &pages, list) {
551                 list_del_init(&bpage->list);
552                 free_buffer_page(bpage);
553         }
554         return -ENOMEM;
555 }
556
557 static struct ring_buffer_per_cpu *
558 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
559 {
560         struct ring_buffer_per_cpu *cpu_buffer;
561         struct buffer_page *bpage;
562         unsigned long addr;
563         int ret;
564
565         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
566                                   GFP_KERNEL, cpu_to_node(cpu));
567         if (!cpu_buffer)
568                 return NULL;
569
570         cpu_buffer->cpu = cpu;
571         cpu_buffer->buffer = buffer;
572         spin_lock_init(&cpu_buffer->reader_lock);
573         lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
574         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
575         INIT_LIST_HEAD(&cpu_buffer->pages);
576
577         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
578                             GFP_KERNEL, cpu_to_node(cpu));
579         if (!bpage)
580                 goto fail_free_buffer;
581
582         cpu_buffer->reader_page = bpage;
583         addr = __get_free_page(GFP_KERNEL);
584         if (!addr)
585                 goto fail_free_reader;
586         bpage->page = (void *)addr;
587         rb_init_page(bpage->page);
588
589         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
590
591         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
592         if (ret < 0)
593                 goto fail_free_reader;
594
595         cpu_buffer->head_page
596                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
597         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
598
599         return cpu_buffer;
600
601  fail_free_reader:
602         free_buffer_page(cpu_buffer->reader_page);
603
604  fail_free_buffer:
605         kfree(cpu_buffer);
606         return NULL;
607 }
608
609 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
610 {
611         struct list_head *head = &cpu_buffer->pages;
612         struct buffer_page *bpage, *tmp;
613
614         free_buffer_page(cpu_buffer->reader_page);
615
616         list_for_each_entry_safe(bpage, tmp, head, list) {
617                 list_del_init(&bpage->list);
618                 free_buffer_page(bpage);
619         }
620         kfree(cpu_buffer);
621 }
622
623 /*
624  * Causes compile errors if the struct buffer_page gets bigger
625  * than the struct page.
626  */
627 extern int ring_buffer_page_too_big(void);
628
629 #ifdef CONFIG_HOTPLUG_CPU
630 static int rb_cpu_notify(struct notifier_block *self,
631                          unsigned long action, void *hcpu);
632 #endif
633
634 /**
635  * ring_buffer_alloc - allocate a new ring_buffer
636  * @size: the size in bytes per cpu that is needed.
637  * @flags: attributes to set for the ring buffer.
638  *
639  * Currently the only flag that is available is the RB_FL_OVERWRITE
640  * flag. This flag means that the buffer will overwrite old data
641  * when the buffer wraps. If this flag is not set, the buffer will
642  * drop data when the tail hits the head.
643  */
644 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
645                                         struct lock_class_key *key)
646 {
647         struct ring_buffer *buffer;
648         int bsize;
649         int cpu;
650
651         /* Paranoid! Optimizes out when all is well */
652         if (sizeof(struct buffer_page) > sizeof(struct page))
653                 ring_buffer_page_too_big();
654
655
656         /* keep it in its own cache line */
657         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
658                          GFP_KERNEL);
659         if (!buffer)
660                 return NULL;
661
662         if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
663                 goto fail_free_buffer;
664
665         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
666         buffer->flags = flags;
667         buffer->clock = trace_clock_local;
668         buffer->reader_lock_key = key;
669
670         /* need at least two pages */
671         if (buffer->pages == 1)
672                 buffer->pages++;
673
674         /*
675          * In case of non-hotplug cpu, if the ring-buffer is allocated
676          * in early initcall, it will not be notified of secondary cpus.
677          * In that off case, we need to allocate for all possible cpus.
678          */
679 #ifdef CONFIG_HOTPLUG_CPU
680         get_online_cpus();
681         cpumask_copy(buffer->cpumask, cpu_online_mask);
682 #else
683         cpumask_copy(buffer->cpumask, cpu_possible_mask);
684 #endif
685         buffer->cpus = nr_cpu_ids;
686
687         bsize = sizeof(void *) * nr_cpu_ids;
688         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
689                                   GFP_KERNEL);
690         if (!buffer->buffers)
691                 goto fail_free_cpumask;
692
693         for_each_buffer_cpu(buffer, cpu) {
694                 buffer->buffers[cpu] =
695                         rb_allocate_cpu_buffer(buffer, cpu);
696                 if (!buffer->buffers[cpu])
697                         goto fail_free_buffers;
698         }
699
700 #ifdef CONFIG_HOTPLUG_CPU
701         buffer->cpu_notify.notifier_call = rb_cpu_notify;
702         buffer->cpu_notify.priority = 0;
703         register_cpu_notifier(&buffer->cpu_notify);
704 #endif
705
706         put_online_cpus();
707         mutex_init(&buffer->mutex);
708
709         return buffer;
710
711  fail_free_buffers:
712         for_each_buffer_cpu(buffer, cpu) {
713                 if (buffer->buffers[cpu])
714                         rb_free_cpu_buffer(buffer->buffers[cpu]);
715         }
716         kfree(buffer->buffers);
717
718  fail_free_cpumask:
719         free_cpumask_var(buffer->cpumask);
720         put_online_cpus();
721
722  fail_free_buffer:
723         kfree(buffer);
724         return NULL;
725 }
726 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
727
728 /**
729  * ring_buffer_free - free a ring buffer.
730  * @buffer: the buffer to free.
731  */
732 void
733 ring_buffer_free(struct ring_buffer *buffer)
734 {
735         int cpu;
736
737         get_online_cpus();
738
739 #ifdef CONFIG_HOTPLUG_CPU
740         unregister_cpu_notifier(&buffer->cpu_notify);
741 #endif
742
743         for_each_buffer_cpu(buffer, cpu)
744                 rb_free_cpu_buffer(buffer->buffers[cpu]);
745
746         put_online_cpus();
747
748         free_cpumask_var(buffer->cpumask);
749
750         kfree(buffer);
751 }
752 EXPORT_SYMBOL_GPL(ring_buffer_free);
753
754 void ring_buffer_set_clock(struct ring_buffer *buffer,
755                            u64 (*clock)(void))
756 {
757         buffer->clock = clock;
758 }
759
760 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
761
762 static void
763 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
764 {
765         struct buffer_page *bpage;
766         struct list_head *p;
767         unsigned i;
768
769         atomic_inc(&cpu_buffer->record_disabled);
770         synchronize_sched();
771
772         for (i = 0; i < nr_pages; i++) {
773                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
774                         return;
775                 p = cpu_buffer->pages.next;
776                 bpage = list_entry(p, struct buffer_page, list);
777                 list_del_init(&bpage->list);
778                 free_buffer_page(bpage);
779         }
780         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
781                 return;
782
783         rb_reset_cpu(cpu_buffer);
784
785         rb_check_pages(cpu_buffer);
786
787         atomic_dec(&cpu_buffer->record_disabled);
788
789 }
790
791 static void
792 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
793                 struct list_head *pages, unsigned nr_pages)
794 {
795         struct buffer_page *bpage;
796         struct list_head *p;
797         unsigned i;
798
799         atomic_inc(&cpu_buffer->record_disabled);
800         synchronize_sched();
801
802         for (i = 0; i < nr_pages; i++) {
803                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
804                         return;
805                 p = pages->next;
806                 bpage = list_entry(p, struct buffer_page, list);
807                 list_del_init(&bpage->list);
808                 list_add_tail(&bpage->list, &cpu_buffer->pages);
809         }
810         rb_reset_cpu(cpu_buffer);
811
812         rb_check_pages(cpu_buffer);
813
814         atomic_dec(&cpu_buffer->record_disabled);
815 }
816
817 /**
818  * ring_buffer_resize - resize the ring buffer
819  * @buffer: the buffer to resize.
820  * @size: the new size.
821  *
822  * The tracer is responsible for making sure that the buffer is
823  * not being used while changing the size.
824  * Note: We may be able to change the above requirement by using
825  *  RCU synchronizations.
826  *
827  * Minimum size is 2 * BUF_PAGE_SIZE.
828  *
829  * Returns -1 on failure.
830  */
831 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
832 {
833         struct ring_buffer_per_cpu *cpu_buffer;
834         unsigned nr_pages, rm_pages, new_pages;
835         struct buffer_page *bpage, *tmp;
836         unsigned long buffer_size;
837         unsigned long addr;
838         LIST_HEAD(pages);
839         int i, cpu;
840
841         /*
842          * Always succeed at resizing a non-existent buffer:
843          */
844         if (!buffer)
845                 return size;
846
847         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
848         size *= BUF_PAGE_SIZE;
849         buffer_size = buffer->pages * BUF_PAGE_SIZE;
850
851         /* we need a minimum of two pages */
852         if (size < BUF_PAGE_SIZE * 2)
853                 size = BUF_PAGE_SIZE * 2;
854
855         if (size == buffer_size)
856                 return size;
857
858         mutex_lock(&buffer->mutex);
859         get_online_cpus();
860
861         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
862
863         if (size < buffer_size) {
864
865                 /* easy case, just free pages */
866                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages))
867                         goto out_fail;
868
869                 rm_pages = buffer->pages - nr_pages;
870
871                 for_each_buffer_cpu(buffer, cpu) {
872                         cpu_buffer = buffer->buffers[cpu];
873                         rb_remove_pages(cpu_buffer, rm_pages);
874                 }
875                 goto out;
876         }
877
878         /*
879          * This is a bit more difficult. We only want to add pages
880          * when we can allocate enough for all CPUs. We do this
881          * by allocating all the pages and storing them on a local
882          * link list. If we succeed in our allocation, then we
883          * add these pages to the cpu_buffers. Otherwise we just free
884          * them all and return -ENOMEM;
885          */
886         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages))
887                 goto out_fail;
888
889         new_pages = nr_pages - buffer->pages;
890
891         for_each_buffer_cpu(buffer, cpu) {
892                 for (i = 0; i < new_pages; i++) {
893                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
894                                                   cache_line_size()),
895                                             GFP_KERNEL, cpu_to_node(cpu));
896                         if (!bpage)
897                                 goto free_pages;
898                         list_add(&bpage->list, &pages);
899                         addr = __get_free_page(GFP_KERNEL);
900                         if (!addr)
901                                 goto free_pages;
902                         bpage->page = (void *)addr;
903                         rb_init_page(bpage->page);
904                 }
905         }
906
907         for_each_buffer_cpu(buffer, cpu) {
908                 cpu_buffer = buffer->buffers[cpu];
909                 rb_insert_pages(cpu_buffer, &pages, new_pages);
910         }
911
912         if (RB_WARN_ON(buffer, !list_empty(&pages)))
913                 goto out_fail;
914
915  out:
916         buffer->pages = nr_pages;
917         put_online_cpus();
918         mutex_unlock(&buffer->mutex);
919
920         return size;
921
922  free_pages:
923         list_for_each_entry_safe(bpage, tmp, &pages, list) {
924                 list_del_init(&bpage->list);
925                 free_buffer_page(bpage);
926         }
927         put_online_cpus();
928         mutex_unlock(&buffer->mutex);
929         return -ENOMEM;
930
931         /*
932          * Something went totally wrong, and we are too paranoid
933          * to even clean up the mess.
934          */
935  out_fail:
936         put_online_cpus();
937         mutex_unlock(&buffer->mutex);
938         return -1;
939 }
940 EXPORT_SYMBOL_GPL(ring_buffer_resize);
941
942 static inline void *
943 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
944 {
945         return bpage->data + index;
946 }
947
948 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
949 {
950         return bpage->page->data + index;
951 }
952
953 static inline struct ring_buffer_event *
954 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
955 {
956         return __rb_page_index(cpu_buffer->reader_page,
957                                cpu_buffer->reader_page->read);
958 }
959
960 static inline struct ring_buffer_event *
961 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
962 {
963         return __rb_page_index(cpu_buffer->head_page,
964                                cpu_buffer->head_page->read);
965 }
966
967 static inline struct ring_buffer_event *
968 rb_iter_head_event(struct ring_buffer_iter *iter)
969 {
970         return __rb_page_index(iter->head_page, iter->head);
971 }
972
973 static inline unsigned rb_page_write(struct buffer_page *bpage)
974 {
975         return local_read(&bpage->write);
976 }
977
978 static inline unsigned rb_page_commit(struct buffer_page *bpage)
979 {
980         return local_read(&bpage->page->commit);
981 }
982
983 /* Size is determined by what has been commited */
984 static inline unsigned rb_page_size(struct buffer_page *bpage)
985 {
986         return rb_page_commit(bpage);
987 }
988
989 static inline unsigned
990 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
991 {
992         return rb_page_commit(cpu_buffer->commit_page);
993 }
994
995 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
996 {
997         return rb_page_commit(cpu_buffer->head_page);
998 }
999
1000 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
1001                                struct buffer_page **bpage)
1002 {
1003         struct list_head *p = (*bpage)->list.next;
1004
1005         if (p == &cpu_buffer->pages)
1006                 p = p->next;
1007
1008         *bpage = list_entry(p, struct buffer_page, list);
1009 }
1010
1011 static inline unsigned
1012 rb_event_index(struct ring_buffer_event *event)
1013 {
1014         unsigned long addr = (unsigned long)event;
1015
1016         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
1017 }
1018
1019 static inline int
1020 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1021                    struct ring_buffer_event *event)
1022 {
1023         unsigned long addr = (unsigned long)event;
1024         unsigned long index;
1025
1026         index = rb_event_index(event);
1027         addr &= PAGE_MASK;
1028
1029         return cpu_buffer->commit_page->page == (void *)addr &&
1030                 rb_commit_index(cpu_buffer) == index;
1031 }
1032
1033 static void
1034 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1035 {
1036         /*
1037          * We only race with interrupts and NMIs on this CPU.
1038          * If we own the commit event, then we can commit
1039          * all others that interrupted us, since the interruptions
1040          * are in stack format (they finish before they come
1041          * back to us). This allows us to do a simple loop to
1042          * assign the commit to the tail.
1043          */
1044  again:
1045         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1046                 cpu_buffer->commit_page->page->commit =
1047                         cpu_buffer->commit_page->write;
1048                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1049                 cpu_buffer->write_stamp =
1050                         cpu_buffer->commit_page->page->time_stamp;
1051                 /* add barrier to keep gcc from optimizing too much */
1052                 barrier();
1053         }
1054         while (rb_commit_index(cpu_buffer) !=
1055                rb_page_write(cpu_buffer->commit_page)) {
1056                 cpu_buffer->commit_page->page->commit =
1057                         cpu_buffer->commit_page->write;
1058                 barrier();
1059         }
1060
1061         /* again, keep gcc from optimizing */
1062         barrier();
1063
1064         /*
1065          * If an interrupt came in just after the first while loop
1066          * and pushed the tail page forward, we will be left with
1067          * a dangling commit that will never go forward.
1068          */
1069         if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
1070                 goto again;
1071 }
1072
1073 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1074 {
1075         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
1076         cpu_buffer->reader_page->read = 0;
1077 }
1078
1079 static void rb_inc_iter(struct ring_buffer_iter *iter)
1080 {
1081         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1082
1083         /*
1084          * The iterator could be on the reader page (it starts there).
1085          * But the head could have moved, since the reader was
1086          * found. Check for this case and assign the iterator
1087          * to the head page instead of next.
1088          */
1089         if (iter->head_page == cpu_buffer->reader_page)
1090                 iter->head_page = cpu_buffer->head_page;
1091         else
1092                 rb_inc_page(cpu_buffer, &iter->head_page);
1093
1094         iter->read_stamp = iter->head_page->page->time_stamp;
1095         iter->head = 0;
1096 }
1097
1098 /**
1099  * ring_buffer_update_event - update event type and data
1100  * @event: the even to update
1101  * @type: the type of event
1102  * @length: the size of the event field in the ring buffer
1103  *
1104  * Update the type and data fields of the event. The length
1105  * is the actual size that is written to the ring buffer,
1106  * and with this, we can determine what to place into the
1107  * data field.
1108  */
1109 static void
1110 rb_update_event(struct ring_buffer_event *event,
1111                          unsigned type, unsigned length)
1112 {
1113         event->type_len = type;
1114
1115         switch (type) {
1116
1117         case RINGBUF_TYPE_PADDING:
1118         case RINGBUF_TYPE_TIME_EXTEND:
1119         case RINGBUF_TYPE_TIME_STAMP:
1120                 break;
1121
1122         case 0:
1123                 length -= RB_EVNT_HDR_SIZE;
1124                 if (length > RB_MAX_SMALL_DATA)
1125                         event->array[0] = length;
1126                 else
1127                         event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1128                 break;
1129         default:
1130                 BUG();
1131         }
1132 }
1133
1134 static unsigned rb_calculate_event_length(unsigned length)
1135 {
1136         struct ring_buffer_event event; /* Used only for sizeof array */
1137
1138         /* zero length can cause confusions */
1139         if (!length)
1140                 length = 1;
1141
1142         if (length > RB_MAX_SMALL_DATA)
1143                 length += sizeof(event.array[0]);
1144
1145         length += RB_EVNT_HDR_SIZE;
1146         length = ALIGN(length, RB_ALIGNMENT);
1147
1148         return length;
1149 }
1150
1151 static inline void
1152 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
1153               struct buffer_page *tail_page,
1154               unsigned long tail, unsigned long length)
1155 {
1156         struct ring_buffer_event *event;
1157
1158         /*
1159          * Only the event that crossed the page boundary
1160          * must fill the old tail_page with padding.
1161          */
1162         if (tail >= BUF_PAGE_SIZE) {
1163                 local_sub(length, &tail_page->write);
1164                 return;
1165         }
1166
1167         event = __rb_page_index(tail_page, tail);
1168
1169         /*
1170          * If this event is bigger than the minimum size, then
1171          * we need to be careful that we don't subtract the
1172          * write counter enough to allow another writer to slip
1173          * in on this page.
1174          * We put in a discarded commit instead, to make sure
1175          * that this space is not used again.
1176          *
1177          * If we are less than the minimum size, we don't need to
1178          * worry about it.
1179          */
1180         if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
1181                 /* No room for any events */
1182
1183                 /* Mark the rest of the page with padding */
1184                 rb_event_set_padding(event);
1185
1186                 /* Set the write back to the previous setting */
1187                 local_sub(length, &tail_page->write);
1188                 return;
1189         }
1190
1191         /* Put in a discarded event */
1192         event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
1193         event->type_len = RINGBUF_TYPE_PADDING;
1194         /* time delta must be non zero */
1195         event->time_delta = 1;
1196         /* Account for this as an entry */
1197         local_inc(&tail_page->entries);
1198         local_inc(&cpu_buffer->entries);
1199
1200         /* Set write to end of buffer */
1201         length = (tail + length) - BUF_PAGE_SIZE;
1202         local_sub(length, &tail_page->write);
1203 }
1204
1205 static struct ring_buffer_event *
1206 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
1207              unsigned long length, unsigned long tail,
1208              struct buffer_page *commit_page,
1209              struct buffer_page *tail_page, u64 *ts)
1210 {
1211         struct buffer_page *next_page, *head_page, *reader_page;
1212         struct ring_buffer *buffer = cpu_buffer->buffer;
1213         bool lock_taken = false;
1214         unsigned long flags;
1215
1216         next_page = tail_page;
1217
1218         local_irq_save(flags);
1219         /*
1220          * Since the write to the buffer is still not
1221          * fully lockless, we must be careful with NMIs.
1222          * The locks in the writers are taken when a write
1223          * crosses to a new page. The locks protect against
1224          * races with the readers (this will soon be fixed
1225          * with a lockless solution).
1226          *
1227          * Because we can not protect against NMIs, and we
1228          * want to keep traces reentrant, we need to manage
1229          * what happens when we are in an NMI.
1230          *
1231          * NMIs can happen after we take the lock.
1232          * If we are in an NMI, only take the lock
1233          * if it is not already taken. Otherwise
1234          * simply fail.
1235          */
1236         if (unlikely(in_nmi())) {
1237                 if (!__raw_spin_trylock(&cpu_buffer->lock)) {
1238                         cpu_buffer->nmi_dropped++;
1239                         goto out_reset;
1240                 }
1241         } else
1242                 __raw_spin_lock(&cpu_buffer->lock);
1243
1244         lock_taken = true;
1245
1246         rb_inc_page(cpu_buffer, &next_page);
1247
1248         head_page = cpu_buffer->head_page;
1249         reader_page = cpu_buffer->reader_page;
1250
1251         /* we grabbed the lock before incrementing */
1252         if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1253                 goto out_reset;
1254
1255         /*
1256          * If for some reason, we had an interrupt storm that made
1257          * it all the way around the buffer, bail, and warn
1258          * about it.
1259          */
1260         if (unlikely(next_page == commit_page)) {
1261                 cpu_buffer->commit_overrun++;
1262                 goto out_reset;
1263         }
1264
1265         if (next_page == head_page) {
1266                 if (!(buffer->flags & RB_FL_OVERWRITE))
1267                         goto out_reset;
1268
1269                 /* tail_page has not moved yet? */
1270                 if (tail_page == cpu_buffer->tail_page) {
1271                         /* count overflows */
1272                         cpu_buffer->overrun +=
1273                                 local_read(&head_page->entries);
1274
1275                         rb_inc_page(cpu_buffer, &head_page);
1276                         cpu_buffer->head_page = head_page;
1277                         cpu_buffer->head_page->read = 0;
1278                 }
1279         }
1280
1281         /*
1282          * If the tail page is still the same as what we think
1283          * it is, then it is up to us to update the tail
1284          * pointer.
1285          */
1286         if (tail_page == cpu_buffer->tail_page) {
1287                 local_set(&next_page->write, 0);
1288                 local_set(&next_page->entries, 0);
1289                 local_set(&next_page->page->commit, 0);
1290                 cpu_buffer->tail_page = next_page;
1291
1292                 /* reread the time stamp */
1293                 *ts = rb_time_stamp(buffer, cpu_buffer->cpu);
1294                 cpu_buffer->tail_page->page->time_stamp = *ts;
1295         }
1296
1297         rb_reset_tail(cpu_buffer, tail_page, tail, length);
1298
1299         __raw_spin_unlock(&cpu_buffer->lock);
1300         local_irq_restore(flags);
1301
1302         /* fail and let the caller try again */
1303         return ERR_PTR(-EAGAIN);
1304
1305  out_reset:
1306         /* reset write */
1307         rb_reset_tail(cpu_buffer, tail_page, tail, length);
1308
1309         if (likely(lock_taken))
1310                 __raw_spin_unlock(&cpu_buffer->lock);
1311         local_irq_restore(flags);
1312         return NULL;
1313 }
1314
1315 static struct ring_buffer_event *
1316 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
1317                   unsigned type, unsigned long length, u64 *ts)
1318 {
1319         struct buffer_page *tail_page, *commit_page;
1320         struct ring_buffer_event *event;
1321         unsigned long tail, write;
1322
1323         commit_page = cpu_buffer->commit_page;
1324         /* we just need to protect against interrupts */
1325         barrier();
1326         tail_page = cpu_buffer->tail_page;
1327         write = local_add_return(length, &tail_page->write);
1328         tail = write - length;
1329
1330         /* See if we shot pass the end of this buffer page */
1331         if (write > BUF_PAGE_SIZE)
1332                 return rb_move_tail(cpu_buffer, length, tail,
1333                                     commit_page, tail_page, ts);
1334
1335         /* We reserved something on the buffer */
1336
1337         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1338                 return NULL;
1339
1340         event = __rb_page_index(tail_page, tail);
1341         rb_update_event(event, type, length);
1342
1343         /* The passed in type is zero for DATA */
1344         if (likely(!type))
1345                 local_inc(&tail_page->entries);
1346
1347         /*
1348          * If this is the first commit on the page, then update
1349          * its timestamp.
1350          */
1351         if (!tail)
1352                 tail_page->page->time_stamp = *ts;
1353
1354         return event;
1355 }
1356
1357 static inline int
1358 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
1359                   struct ring_buffer_event *event)
1360 {
1361         unsigned long new_index, old_index;
1362         struct buffer_page *bpage;
1363         unsigned long index;
1364         unsigned long addr;
1365
1366         new_index = rb_event_index(event);
1367         old_index = new_index + rb_event_length(event);
1368         addr = (unsigned long)event;
1369         addr &= PAGE_MASK;
1370
1371         bpage = cpu_buffer->tail_page;
1372
1373         if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
1374                 /*
1375                  * This is on the tail page. It is possible that
1376                  * a write could come in and move the tail page
1377                  * and write to the next page. That is fine
1378                  * because we just shorten what is on this page.
1379                  */
1380                 index = local_cmpxchg(&bpage->write, old_index, new_index);
1381                 if (index == old_index)
1382                         return 1;
1383         }
1384
1385         /* could not discard */
1386         return 0;
1387 }
1388
1389 static int
1390 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1391                   u64 *ts, u64 *delta)
1392 {
1393         struct ring_buffer_event *event;
1394         static int once;
1395         int ret;
1396
1397         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1398                 printk(KERN_WARNING "Delta way too big! %llu"
1399                        " ts=%llu write stamp = %llu\n",
1400                        (unsigned long long)*delta,
1401                        (unsigned long long)*ts,
1402                        (unsigned long long)cpu_buffer->write_stamp);
1403                 WARN_ON(1);
1404         }
1405
1406         /*
1407          * The delta is too big, we to add a
1408          * new timestamp.
1409          */
1410         event = __rb_reserve_next(cpu_buffer,
1411                                   RINGBUF_TYPE_TIME_EXTEND,
1412                                   RB_LEN_TIME_EXTEND,
1413                                   ts);
1414         if (!event)
1415                 return -EBUSY;
1416
1417         if (PTR_ERR(event) == -EAGAIN)
1418                 return -EAGAIN;
1419
1420         /* Only a commited time event can update the write stamp */
1421         if (rb_event_is_commit(cpu_buffer, event)) {
1422                 /*
1423                  * If this is the first on the page, then it was
1424                  * updated with the page itself. Try to discard it
1425                  * and if we can't just make it zero.
1426                  */
1427                 if (rb_event_index(event)) {
1428                         event->time_delta = *delta & TS_MASK;
1429                         event->array[0] = *delta >> TS_SHIFT;
1430                 } else {
1431                         /* try to discard, since we do not need this */
1432                         if (!rb_try_to_discard(cpu_buffer, event)) {
1433                                 /* nope, just zero it */
1434                                 event->time_delta = 0;
1435                                 event->array[0] = 0;
1436                         }
1437                 }
1438                 cpu_buffer->write_stamp = *ts;
1439                 /* let the caller know this was the commit */
1440                 ret = 1;
1441         } else {
1442                 /* Try to discard the event */
1443                 if (!rb_try_to_discard(cpu_buffer, event)) {
1444                         /* Darn, this is just wasted space */
1445                         event->time_delta = 0;
1446                         event->array[0] = 0;
1447                 }
1448                 ret = 0;
1449         }
1450
1451         *delta = 0;
1452
1453         return ret;
1454 }
1455
1456 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
1457 {
1458         local_inc(&cpu_buffer->committing);
1459         local_inc(&cpu_buffer->commits);
1460 }
1461
1462 static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
1463 {
1464         unsigned long commits;
1465
1466         if (RB_WARN_ON(cpu_buffer,
1467                        !local_read(&cpu_buffer->committing)))
1468                 return;
1469
1470  again:
1471         commits = local_read(&cpu_buffer->commits);
1472         /* synchronize with interrupts */
1473         barrier();
1474         if (local_read(&cpu_buffer->committing) == 1)
1475                 rb_set_commit_to_write(cpu_buffer);
1476
1477         local_dec(&cpu_buffer->committing);
1478
1479         /* synchronize with interrupts */
1480         barrier();
1481
1482         /*
1483          * Need to account for interrupts coming in between the
1484          * updating of the commit page and the clearing of the
1485          * committing counter.
1486          */
1487         if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
1488             !local_read(&cpu_buffer->committing)) {
1489                 local_inc(&cpu_buffer->committing);
1490                 goto again;
1491         }
1492 }
1493
1494 static struct ring_buffer_event *
1495 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1496                       unsigned long length)
1497 {
1498         struct ring_buffer_event *event;
1499         u64 ts, delta = 0;
1500         int commit = 0;
1501         int nr_loops = 0;
1502
1503         rb_start_commit(cpu_buffer);
1504
1505         length = rb_calculate_event_length(length);
1506  again:
1507         /*
1508          * We allow for interrupts to reenter here and do a trace.
1509          * If one does, it will cause this original code to loop
1510          * back here. Even with heavy interrupts happening, this
1511          * should only happen a few times in a row. If this happens
1512          * 1000 times in a row, there must be either an interrupt
1513          * storm or we have something buggy.
1514          * Bail!
1515          */
1516         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1517                 goto out_fail;
1518
1519         ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
1520
1521         /*
1522          * Only the first commit can update the timestamp.
1523          * Yes there is a race here. If an interrupt comes in
1524          * just after the conditional and it traces too, then it
1525          * will also check the deltas. More than one timestamp may
1526          * also be made. But only the entry that did the actual
1527          * commit will be something other than zero.
1528          */
1529         if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page &&
1530                    rb_page_write(cpu_buffer->tail_page) ==
1531                    rb_commit_index(cpu_buffer))) {
1532                 u64 diff;
1533
1534                 diff = ts - cpu_buffer->write_stamp;
1535
1536                 /* make sure this diff is calculated here */
1537                 barrier();
1538
1539                 /* Did the write stamp get updated already? */
1540                 if (unlikely(ts < cpu_buffer->write_stamp))
1541                         goto get_event;
1542
1543                 delta = diff;
1544                 if (unlikely(test_time_stamp(delta))) {
1545
1546                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1547                         if (commit == -EBUSY)
1548                                 goto out_fail;
1549
1550                         if (commit == -EAGAIN)
1551                                 goto again;
1552
1553                         RB_WARN_ON(cpu_buffer, commit < 0);
1554                 }
1555         }
1556
1557  get_event:
1558         event = __rb_reserve_next(cpu_buffer, 0, length, &ts);
1559         if (unlikely(PTR_ERR(event) == -EAGAIN))
1560                 goto again;
1561
1562         if (!event)
1563                 goto out_fail;
1564
1565         if (!rb_event_is_commit(cpu_buffer, event))
1566                 delta = 0;
1567
1568         event->time_delta = delta;
1569
1570         return event;
1571
1572  out_fail:
1573         rb_end_commit(cpu_buffer);
1574         return NULL;
1575 }
1576
1577 #define TRACE_RECURSIVE_DEPTH 16
1578
1579 static int trace_recursive_lock(void)
1580 {
1581         current->trace_recursion++;
1582
1583         if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH))
1584                 return 0;
1585
1586         /* Disable all tracing before we do anything else */
1587         tracing_off_permanent();
1588
1589         printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:"
1590                     "HC[%lu]:SC[%lu]:NMI[%lu]\n",
1591                     current->trace_recursion,
1592                     hardirq_count() >> HARDIRQ_SHIFT,
1593                     softirq_count() >> SOFTIRQ_SHIFT,
1594                     in_nmi());
1595
1596         WARN_ON_ONCE(1);
1597         return -1;
1598 }
1599
1600 static void trace_recursive_unlock(void)
1601 {
1602         WARN_ON_ONCE(!current->trace_recursion);
1603
1604         current->trace_recursion--;
1605 }
1606
1607 static DEFINE_PER_CPU(int, rb_need_resched);
1608
1609 /**
1610  * ring_buffer_lock_reserve - reserve a part of the buffer
1611  * @buffer: the ring buffer to reserve from
1612  * @length: the length of the data to reserve (excluding event header)
1613  *
1614  * Returns a reseverd event on the ring buffer to copy directly to.
1615  * The user of this interface will need to get the body to write into
1616  * and can use the ring_buffer_event_data() interface.
1617  *
1618  * The length is the length of the data needed, not the event length
1619  * which also includes the event header.
1620  *
1621  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1622  * If NULL is returned, then nothing has been allocated or locked.
1623  */
1624 struct ring_buffer_event *
1625 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1626 {
1627         struct ring_buffer_per_cpu *cpu_buffer;
1628         struct ring_buffer_event *event;
1629         int cpu, resched;
1630
1631         if (ring_buffer_flags != RB_BUFFERS_ON)
1632                 return NULL;
1633
1634         if (atomic_read(&buffer->record_disabled))
1635                 return NULL;
1636
1637         /* If we are tracing schedule, we don't want to recurse */
1638         resched = ftrace_preempt_disable();
1639
1640         if (trace_recursive_lock())
1641                 goto out_nocheck;
1642
1643         cpu = raw_smp_processor_id();
1644
1645         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1646                 goto out;
1647
1648         cpu_buffer = buffer->buffers[cpu];
1649
1650         if (atomic_read(&cpu_buffer->record_disabled))
1651                 goto out;
1652
1653         if (length > BUF_MAX_DATA_SIZE)
1654                 goto out;
1655
1656         event = rb_reserve_next_event(cpu_buffer, length);
1657         if (!event)
1658                 goto out;
1659
1660         /*
1661          * Need to store resched state on this cpu.
1662          * Only the first needs to.
1663          */
1664
1665         if (preempt_count() == 1)
1666                 per_cpu(rb_need_resched, cpu) = resched;
1667
1668         return event;
1669
1670  out:
1671         trace_recursive_unlock();
1672
1673  out_nocheck:
1674         ftrace_preempt_enable(resched);
1675         return NULL;
1676 }
1677 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1678
1679 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1680                       struct ring_buffer_event *event)
1681 {
1682         local_inc(&cpu_buffer->entries);
1683
1684         /*
1685          * The event first in the commit queue updates the
1686          * time stamp.
1687          */
1688         if (rb_event_is_commit(cpu_buffer, event))
1689                 cpu_buffer->write_stamp += event->time_delta;
1690
1691         rb_end_commit(cpu_buffer);
1692 }
1693
1694 /**
1695  * ring_buffer_unlock_commit - commit a reserved
1696  * @buffer: The buffer to commit to
1697  * @event: The event pointer to commit.
1698  *
1699  * This commits the data to the ring buffer, and releases any locks held.
1700  *
1701  * Must be paired with ring_buffer_lock_reserve.
1702  */
1703 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1704                               struct ring_buffer_event *event)
1705 {
1706         struct ring_buffer_per_cpu *cpu_buffer;
1707         int cpu = raw_smp_processor_id();
1708
1709         cpu_buffer = buffer->buffers[cpu];
1710
1711         rb_commit(cpu_buffer, event);
1712
1713         trace_recursive_unlock();
1714
1715         /*
1716          * Only the last preempt count needs to restore preemption.
1717          */
1718         if (preempt_count() == 1)
1719                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1720         else
1721                 preempt_enable_no_resched_notrace();
1722
1723         return 0;
1724 }
1725 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1726
1727 static inline void rb_event_discard(struct ring_buffer_event *event)
1728 {
1729         /* array[0] holds the actual length for the discarded event */
1730         event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
1731         event->type_len = RINGBUF_TYPE_PADDING;
1732         /* time delta must be non zero */
1733         if (!event->time_delta)
1734                 event->time_delta = 1;
1735 }
1736
1737 /**
1738  * ring_buffer_event_discard - discard any event in the ring buffer
1739  * @event: the event to discard
1740  *
1741  * Sometimes a event that is in the ring buffer needs to be ignored.
1742  * This function lets the user discard an event in the ring buffer
1743  * and then that event will not be read later.
1744  *
1745  * Note, it is up to the user to be careful with this, and protect
1746  * against races. If the user discards an event that has been consumed
1747  * it is possible that it could corrupt the ring buffer.
1748  */
1749 void ring_buffer_event_discard(struct ring_buffer_event *event)
1750 {
1751         rb_event_discard(event);
1752 }
1753 EXPORT_SYMBOL_GPL(ring_buffer_event_discard);
1754
1755 /**
1756  * ring_buffer_commit_discard - discard an event that has not been committed
1757  * @buffer: the ring buffer
1758  * @event: non committed event to discard
1759  *
1760  * This is similar to ring_buffer_event_discard but must only be
1761  * performed on an event that has not been committed yet. The difference
1762  * is that this will also try to free the event from the ring buffer
1763  * if another event has not been added behind it.
1764  *
1765  * If another event has been added behind it, it will set the event
1766  * up as discarded, and perform the commit.
1767  *
1768  * If this function is called, do not call ring_buffer_unlock_commit on
1769  * the event.
1770  */
1771 void ring_buffer_discard_commit(struct ring_buffer *buffer,
1772                                 struct ring_buffer_event *event)
1773 {
1774         struct ring_buffer_per_cpu *cpu_buffer;
1775         int cpu;
1776
1777         /* The event is discarded regardless */
1778         rb_event_discard(event);
1779
1780         cpu = smp_processor_id();
1781         cpu_buffer = buffer->buffers[cpu];
1782
1783         /*
1784          * This must only be called if the event has not been
1785          * committed yet. Thus we can assume that preemption
1786          * is still disabled.
1787          */
1788         RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
1789
1790         if (!rb_try_to_discard(cpu_buffer, event))
1791                 goto out;
1792
1793         /*
1794          * The commit is still visible by the reader, so we
1795          * must increment entries.
1796          */
1797         local_inc(&cpu_buffer->entries);
1798  out:
1799         rb_end_commit(cpu_buffer);
1800
1801         trace_recursive_unlock();
1802
1803         /*
1804          * Only the last preempt count needs to restore preemption.
1805          */
1806         if (preempt_count() == 1)
1807                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1808         else
1809                 preempt_enable_no_resched_notrace();
1810
1811 }
1812 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
1813
1814 /**
1815  * ring_buffer_write - write data to the buffer without reserving
1816  * @buffer: The ring buffer to write to.
1817  * @length: The length of the data being written (excluding the event header)
1818  * @data: The data to write to the buffer.
1819  *
1820  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1821  * one function. If you already have the data to write to the buffer, it
1822  * may be easier to simply call this function.
1823  *
1824  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1825  * and not the length of the event which would hold the header.
1826  */
1827 int ring_buffer_write(struct ring_buffer *buffer,
1828                         unsigned long length,
1829                         void *data)
1830 {
1831         struct ring_buffer_per_cpu *cpu_buffer;
1832         struct ring_buffer_event *event;
1833         void *body;
1834         int ret = -EBUSY;
1835         int cpu, resched;
1836
1837         if (ring_buffer_flags != RB_BUFFERS_ON)
1838                 return -EBUSY;
1839
1840         if (atomic_read(&buffer->record_disabled))
1841                 return -EBUSY;
1842
1843         resched = ftrace_preempt_disable();
1844
1845         cpu = raw_smp_processor_id();
1846
1847         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1848                 goto out;
1849
1850         cpu_buffer = buffer->buffers[cpu];
1851
1852         if (atomic_read(&cpu_buffer->record_disabled))
1853                 goto out;
1854
1855         if (length > BUF_MAX_DATA_SIZE)
1856                 goto out;
1857
1858         event = rb_reserve_next_event(cpu_buffer, length);
1859         if (!event)
1860                 goto out;
1861
1862         body = rb_event_data(event);
1863
1864         memcpy(body, data, length);
1865
1866         rb_commit(cpu_buffer, event);
1867
1868         ret = 0;
1869  out:
1870         ftrace_preempt_enable(resched);
1871
1872         return ret;
1873 }
1874 EXPORT_SYMBOL_GPL(ring_buffer_write);
1875
1876 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1877 {
1878         struct buffer_page *reader = cpu_buffer->reader_page;
1879         struct buffer_page *head = cpu_buffer->head_page;
1880         struct buffer_page *commit = cpu_buffer->commit_page;
1881
1882         return reader->read == rb_page_commit(reader) &&
1883                 (commit == reader ||
1884                  (commit == head &&
1885                   head->read == rb_page_commit(commit)));
1886 }
1887
1888 /**
1889  * ring_buffer_record_disable - stop all writes into the buffer
1890  * @buffer: The ring buffer to stop writes to.
1891  *
1892  * This prevents all writes to the buffer. Any attempt to write
1893  * to the buffer after this will fail and return NULL.
1894  *
1895  * The caller should call synchronize_sched() after this.
1896  */
1897 void ring_buffer_record_disable(struct ring_buffer *buffer)
1898 {
1899         atomic_inc(&buffer->record_disabled);
1900 }
1901 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1902
1903 /**
1904  * ring_buffer_record_enable - enable writes to the buffer
1905  * @buffer: The ring buffer to enable writes
1906  *
1907  * Note, multiple disables will need the same number of enables
1908  * to truely enable the writing (much like preempt_disable).
1909  */
1910 void ring_buffer_record_enable(struct ring_buffer *buffer)
1911 {
1912         atomic_dec(&buffer->record_disabled);
1913 }
1914 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1915
1916 /**
1917  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1918  * @buffer: The ring buffer to stop writes to.
1919  * @cpu: The CPU buffer to stop
1920  *
1921  * This prevents all writes to the buffer. Any attempt to write
1922  * to the buffer after this will fail and return NULL.
1923  *
1924  * The caller should call synchronize_sched() after this.
1925  */
1926 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1927 {
1928         struct ring_buffer_per_cpu *cpu_buffer;
1929
1930         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1931                 return;
1932
1933         cpu_buffer = buffer->buffers[cpu];
1934         atomic_inc(&cpu_buffer->record_disabled);
1935 }
1936 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1937
1938 /**
1939  * ring_buffer_record_enable_cpu - enable writes to the buffer
1940  * @buffer: The ring buffer to enable writes
1941  * @cpu: The CPU to enable.
1942  *
1943  * Note, multiple disables will need the same number of enables
1944  * to truely enable the writing (much like preempt_disable).
1945  */
1946 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1947 {
1948         struct ring_buffer_per_cpu *cpu_buffer;
1949
1950         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1951                 return;
1952
1953         cpu_buffer = buffer->buffers[cpu];
1954         atomic_dec(&cpu_buffer->record_disabled);
1955 }
1956 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1957
1958 /**
1959  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1960  * @buffer: The ring buffer
1961  * @cpu: The per CPU buffer to get the entries from.
1962  */
1963 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1964 {
1965         struct ring_buffer_per_cpu *cpu_buffer;
1966         unsigned long ret;
1967
1968         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1969                 return 0;
1970
1971         cpu_buffer = buffer->buffers[cpu];
1972         ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun)
1973                 - cpu_buffer->read;
1974
1975         return ret;
1976 }
1977 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1978
1979 /**
1980  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1981  * @buffer: The ring buffer
1982  * @cpu: The per CPU buffer to get the number of overruns from
1983  */
1984 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1985 {
1986         struct ring_buffer_per_cpu *cpu_buffer;
1987         unsigned long ret;
1988
1989         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1990                 return 0;
1991
1992         cpu_buffer = buffer->buffers[cpu];
1993         ret = cpu_buffer->overrun;
1994
1995         return ret;
1996 }
1997 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1998
1999 /**
2000  * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped
2001  * @buffer: The ring buffer
2002  * @cpu: The per CPU buffer to get the number of overruns from
2003  */
2004 unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu)
2005 {
2006         struct ring_buffer_per_cpu *cpu_buffer;
2007         unsigned long ret;
2008
2009         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2010                 return 0;
2011
2012         cpu_buffer = buffer->buffers[cpu];
2013         ret = cpu_buffer->nmi_dropped;
2014
2015         return ret;
2016 }
2017 EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu);
2018
2019 /**
2020  * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
2021  * @buffer: The ring buffer
2022  * @cpu: The per CPU buffer to get the number of overruns from
2023  */
2024 unsigned long
2025 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
2026 {
2027         struct ring_buffer_per_cpu *cpu_buffer;
2028         unsigned long ret;
2029
2030         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2031                 return 0;
2032
2033         cpu_buffer = buffer->buffers[cpu];
2034         ret = cpu_buffer->commit_overrun;
2035
2036         return ret;
2037 }
2038 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
2039
2040 /**
2041  * ring_buffer_entries - get the number of entries in a buffer
2042  * @buffer: The ring buffer
2043  *
2044  * Returns the total number of entries in the ring buffer
2045  * (all CPU entries)
2046  */
2047 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
2048 {
2049         struct ring_buffer_per_cpu *cpu_buffer;
2050         unsigned long entries = 0;
2051         int cpu;
2052
2053         /* if you care about this being correct, lock the buffer */
2054         for_each_buffer_cpu(buffer, cpu) {
2055                 cpu_buffer = buffer->buffers[cpu];
2056                 entries += (local_read(&cpu_buffer->entries) -
2057                             cpu_buffer->overrun) - cpu_buffer->read;
2058         }
2059
2060         return entries;
2061 }
2062 EXPORT_SYMBOL_GPL(ring_buffer_entries);
2063
2064 /**
2065  * ring_buffer_overrun_cpu - get the number of overruns in buffer
2066  * @buffer: The ring buffer
2067  *
2068  * Returns the total number of overruns in the ring buffer
2069  * (all CPU entries)
2070  */
2071 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
2072 {
2073         struct ring_buffer_per_cpu *cpu_buffer;
2074         unsigned long overruns = 0;
2075         int cpu;
2076
2077         /* if you care about this being correct, lock the buffer */
2078         for_each_buffer_cpu(buffer, cpu) {
2079                 cpu_buffer = buffer->buffers[cpu];
2080                 overruns += cpu_buffer->overrun;
2081         }
2082
2083         return overruns;
2084 }
2085 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
2086
2087 static void rb_iter_reset(struct ring_buffer_iter *iter)
2088 {
2089         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2090
2091         /* Iterator usage is expected to have record disabled */
2092         if (list_empty(&cpu_buffer->reader_page->list)) {
2093                 iter->head_page = cpu_buffer->head_page;
2094                 iter->head = cpu_buffer->head_page->read;
2095         } else {
2096                 iter->head_page = cpu_buffer->reader_page;
2097                 iter->head = cpu_buffer->reader_page->read;
2098         }
2099         if (iter->head)
2100                 iter->read_stamp = cpu_buffer->read_stamp;
2101         else
2102                 iter->read_stamp = iter->head_page->page->time_stamp;
2103 }
2104
2105 /**
2106  * ring_buffer_iter_reset - reset an iterator
2107  * @iter: The iterator to reset
2108  *
2109  * Resets the iterator, so that it will start from the beginning
2110  * again.
2111  */
2112 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
2113 {
2114         struct ring_buffer_per_cpu *cpu_buffer;
2115         unsigned long flags;
2116
2117         if (!iter)
2118                 return;
2119
2120         cpu_buffer = iter->cpu_buffer;
2121
2122         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2123         rb_iter_reset(iter);
2124         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2125 }
2126 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
2127
2128 /**
2129  * ring_buffer_iter_empty - check if an iterator has no more to read
2130  * @iter: The iterator to check
2131  */
2132 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
2133 {
2134         struct ring_buffer_per_cpu *cpu_buffer;
2135
2136         cpu_buffer = iter->cpu_buffer;
2137
2138         return iter->head_page == cpu_buffer->commit_page &&
2139                 iter->head == rb_commit_index(cpu_buffer);
2140 }
2141 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
2142
2143 static void
2144 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2145                      struct ring_buffer_event *event)
2146 {
2147         u64 delta;
2148
2149         switch (event->type_len) {
2150         case RINGBUF_TYPE_PADDING:
2151                 return;
2152
2153         case RINGBUF_TYPE_TIME_EXTEND:
2154                 delta = event->array[0];
2155                 delta <<= TS_SHIFT;
2156                 delta += event->time_delta;
2157                 cpu_buffer->read_stamp += delta;
2158                 return;
2159
2160         case RINGBUF_TYPE_TIME_STAMP:
2161                 /* FIXME: not implemented */
2162                 return;
2163
2164         case RINGBUF_TYPE_DATA:
2165                 cpu_buffer->read_stamp += event->time_delta;
2166                 return;
2167
2168         default:
2169                 BUG();
2170         }
2171         return;
2172 }
2173
2174 static void
2175 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
2176                           struct ring_buffer_event *event)
2177 {
2178         u64 delta;
2179
2180         switch (event->type_len) {
2181         case RINGBUF_TYPE_PADDING:
2182                 return;
2183
2184         case RINGBUF_TYPE_TIME_EXTEND:
2185                 delta = event->array[0];
2186                 delta <<= TS_SHIFT;
2187                 delta += event->time_delta;
2188                 iter->read_stamp += delta;
2189                 return;
2190
2191         case RINGBUF_TYPE_TIME_STAMP:
2192                 /* FIXME: not implemented */
2193                 return;
2194
2195         case RINGBUF_TYPE_DATA:
2196                 iter->read_stamp += event->time_delta;
2197                 return;
2198
2199         default:
2200                 BUG();
2201         }
2202         return;
2203 }
2204
2205 static struct buffer_page *
2206 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2207 {
2208         struct buffer_page *reader = NULL;
2209         unsigned long flags;
2210         int nr_loops = 0;
2211
2212         local_irq_save(flags);
2213         __raw_spin_lock(&cpu_buffer->lock);
2214
2215  again:
2216         /*
2217          * This should normally only loop twice. But because the
2218          * start of the reader inserts an empty page, it causes
2219          * a case where we will loop three times. There should be no
2220          * reason to loop four times (that I know of).
2221          */
2222         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
2223                 reader = NULL;
2224                 goto out;
2225         }
2226
2227         reader = cpu_buffer->reader_page;
2228
2229         /* If there's more to read, return this page */
2230         if (cpu_buffer->reader_page->read < rb_page_size(reader))
2231                 goto out;
2232
2233         /* Never should we have an index greater than the size */
2234         if (RB_WARN_ON(cpu_buffer,
2235                        cpu_buffer->reader_page->read > rb_page_size(reader)))
2236                 goto out;
2237
2238         /* check if we caught up to the tail */
2239         reader = NULL;
2240         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
2241                 goto out;
2242
2243         /*
2244          * Splice the empty reader page into the list around the head.
2245          * Reset the reader page to size zero.
2246          */
2247
2248         reader = cpu_buffer->head_page;
2249         cpu_buffer->reader_page->list.next = reader->list.next;
2250         cpu_buffer->reader_page->list.prev = reader->list.prev;
2251
2252         local_set(&cpu_buffer->reader_page->write, 0);
2253         local_set(&cpu_buffer->reader_page->entries, 0);
2254         local_set(&cpu_buffer->reader_page->page->commit, 0);
2255
2256         /* Make the reader page now replace the head */
2257         reader->list.prev->next = &cpu_buffer->reader_page->list;
2258         reader->list.next->prev = &cpu_buffer->reader_page->list;
2259
2260         /*
2261          * If the tail is on the reader, then we must set the head
2262          * to the inserted page, otherwise we set it one before.
2263          */
2264         cpu_buffer->head_page = cpu_buffer->reader_page;
2265
2266         if (cpu_buffer->commit_page != reader)
2267                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
2268
2269         /* Finally update the reader page to the new head */
2270         cpu_buffer->reader_page = reader;
2271         rb_reset_reader_page(cpu_buffer);
2272
2273         goto again;
2274
2275  out:
2276         __raw_spin_unlock(&cpu_buffer->lock);
2277         local_irq_restore(flags);
2278
2279         return reader;
2280 }
2281
2282 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
2283 {
2284         struct ring_buffer_event *event;
2285         struct buffer_page *reader;
2286         unsigned length;
2287
2288         reader = rb_get_reader_page(cpu_buffer);
2289
2290         /* This function should not be called when buffer is empty */
2291         if (RB_WARN_ON(cpu_buffer, !reader))
2292                 return;
2293
2294         event = rb_reader_event(cpu_buffer);
2295
2296         if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX
2297                         || rb_discarded_event(event))
2298                 cpu_buffer->read++;
2299
2300         rb_update_read_stamp(cpu_buffer, event);
2301
2302         length = rb_event_length(event);
2303         cpu_buffer->reader_page->read += length;
2304 }
2305
2306 static void rb_advance_iter(struct ring_buffer_iter *iter)
2307 {
2308         struct ring_buffer *buffer;
2309         struct ring_buffer_per_cpu *cpu_buffer;
2310         struct ring_buffer_event *event;
2311         unsigned length;
2312
2313         cpu_buffer = iter->cpu_buffer;
2314         buffer = cpu_buffer->buffer;
2315
2316         /*
2317          * Check if we are at the end of the buffer.
2318          */
2319         if (iter->head >= rb_page_size(iter->head_page)) {
2320                 /* discarded commits can make the page empty */
2321                 if (iter->head_page == cpu_buffer->commit_page)
2322                         return;
2323                 rb_inc_iter(iter);
2324                 return;
2325         }
2326
2327         event = rb_iter_head_event(iter);
2328
2329         length = rb_event_length(event);
2330
2331         /*
2332          * This should not be called to advance the header if we are
2333          * at the tail of the buffer.
2334          */
2335         if (RB_WARN_ON(cpu_buffer,
2336                        (iter->head_page == cpu_buffer->commit_page) &&
2337                        (iter->head + length > rb_commit_index(cpu_buffer))))
2338                 return;
2339
2340         rb_update_iter_read_stamp(iter, event);
2341
2342         iter->head += length;
2343
2344         /* check for end of page padding */
2345         if ((iter->head >= rb_page_size(iter->head_page)) &&
2346             (iter->head_page != cpu_buffer->commit_page))
2347                 rb_advance_iter(iter);
2348 }
2349
2350 static struct ring_buffer_event *
2351 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2352 {
2353         struct ring_buffer_per_cpu *cpu_buffer;
2354         struct ring_buffer_event *event;
2355         struct buffer_page *reader;
2356         int nr_loops = 0;
2357
2358         cpu_buffer = buffer->buffers[cpu];
2359
2360  again:
2361         /*
2362          * We repeat when a timestamp is encountered. It is possible
2363          * to get multiple timestamps from an interrupt entering just
2364          * as one timestamp is about to be written, or from discarded
2365          * commits. The most that we can have is the number on a single page.
2366          */
2367         if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE))
2368                 return NULL;
2369
2370         reader = rb_get_reader_page(cpu_buffer);
2371         if (!reader)
2372                 return NULL;
2373
2374         event = rb_reader_event(cpu_buffer);
2375
2376         switch (event->type_len) {
2377         case RINGBUF_TYPE_PADDING:
2378                 if (rb_null_event(event))
2379                         RB_WARN_ON(cpu_buffer, 1);
2380                 /*
2381                  * Because the writer could be discarding every
2382                  * event it creates (which would probably be bad)
2383                  * if we were to go back to "again" then we may never
2384                  * catch up, and will trigger the warn on, or lock
2385                  * the box. Return the padding, and we will release
2386                  * the current locks, and try again.
2387                  */
2388                 rb_advance_reader(cpu_buffer);
2389                 return event;
2390
2391         case RINGBUF_TYPE_TIME_EXTEND:
2392                 /* Internal data, OK to advance */
2393                 rb_advance_reader(cpu_buffer);
2394                 goto again;
2395
2396         case RINGBUF_TYPE_TIME_STAMP:
2397                 /* FIXME: not implemented */
2398                 rb_advance_reader(cpu_buffer);
2399                 goto again;
2400
2401         case RINGBUF_TYPE_DATA:
2402                 if (ts) {
2403                         *ts = cpu_buffer->read_stamp + event->time_delta;
2404                         ring_buffer_normalize_time_stamp(buffer,
2405                                                          cpu_buffer->cpu, ts);
2406                 }
2407                 return event;
2408
2409         default:
2410                 BUG();
2411         }
2412
2413         return NULL;
2414 }
2415 EXPORT_SYMBOL_GPL(ring_buffer_peek);
2416
2417 static struct ring_buffer_event *
2418 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2419 {
2420         struct ring_buffer *buffer;
2421         struct ring_buffer_per_cpu *cpu_buffer;
2422         struct ring_buffer_event *event;
2423         int nr_loops = 0;
2424
2425         if (ring_buffer_iter_empty(iter))
2426                 return NULL;
2427
2428         cpu_buffer = iter->cpu_buffer;
2429         buffer = cpu_buffer->buffer;
2430
2431  again:
2432         /*
2433          * We repeat when a timestamp is encountered.
2434          * We can get multiple timestamps by nested interrupts or also
2435          * if filtering is on (discarding commits). Since discarding
2436          * commits can be frequent we can get a lot of timestamps.
2437          * But we limit them by not adding timestamps if they begin
2438          * at the start of a page.
2439          */
2440         if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE))
2441                 return NULL;
2442
2443         if (rb_per_cpu_empty(cpu_buffer))
2444                 return NULL;
2445
2446         event = rb_iter_head_event(iter);
2447
2448         switch (event->type_len) {
2449         case RINGBUF_TYPE_PADDING:
2450                 if (rb_null_event(event)) {
2451                         rb_inc_iter(iter);
2452                         goto again;
2453                 }
2454                 rb_advance_iter(iter);
2455                 return event;
2456
2457         case RINGBUF_TYPE_TIME_EXTEND:
2458                 /* Internal data, OK to advance */
2459                 rb_advance_iter(iter);
2460                 goto again;
2461
2462         case RINGBUF_TYPE_TIME_STAMP:
2463                 /* FIXME: not implemented */
2464                 rb_advance_iter(iter);
2465                 goto again;
2466
2467         case RINGBUF_TYPE_DATA:
2468                 if (ts) {
2469                         *ts = iter->read_stamp + event->time_delta;
2470                         ring_buffer_normalize_time_stamp(buffer,
2471                                                          cpu_buffer->cpu, ts);
2472                 }
2473                 return event;
2474
2475         default:
2476                 BUG();
2477         }
2478
2479         return NULL;
2480 }
2481 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
2482
2483 /**
2484  * ring_buffer_peek - peek at the next event to be read
2485  * @buffer: The ring buffer to read
2486  * @cpu: The cpu to peak at
2487  * @ts: The timestamp counter of this event.
2488  *
2489  * This will return the event that will be read next, but does
2490  * not consume the data.
2491  */
2492 struct ring_buffer_event *
2493 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2494 {
2495         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2496         struct ring_buffer_event *event;
2497         unsigned long flags;
2498
2499         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2500                 return NULL;
2501
2502  again:
2503         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2504         event = rb_buffer_peek(buffer, cpu, ts);
2505         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2506
2507         if (event && event->type_len == RINGBUF_TYPE_PADDING) {
2508                 cpu_relax();
2509                 goto again;
2510         }
2511
2512         return event;
2513 }
2514
2515 /**
2516  * ring_buffer_iter_peek - peek at the next event to be read
2517  * @iter: The ring buffer iterator
2518  * @ts: The timestamp counter of this event.
2519  *
2520  * This will return the event that will be read next, but does
2521  * not increment the iterator.
2522  */
2523 struct ring_buffer_event *
2524 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2525 {
2526         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2527         struct ring_buffer_event *event;
2528         unsigned long flags;
2529
2530  again:
2531         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2532         event = rb_iter_peek(iter, ts);
2533         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2534
2535         if (event && event->type_len == RINGBUF_TYPE_PADDING) {
2536                 cpu_relax();
2537                 goto again;
2538         }
2539
2540         return event;
2541 }
2542
2543 /**
2544  * ring_buffer_consume - return an event and consume it
2545  * @buffer: The ring buffer to get the next event from
2546  *
2547  * Returns the next event in the ring buffer, and that event is consumed.
2548  * Meaning, that sequential reads will keep returning a different event,
2549  * and eventually empty the ring buffer if the producer is slower.
2550  */
2551 struct ring_buffer_event *
2552 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2553 {
2554         struct ring_buffer_per_cpu *cpu_buffer;
2555         struct ring_buffer_event *event = NULL;
2556         unsigned long flags;
2557
2558  again:
2559         /* might be called in atomic */
2560         preempt_disable();
2561
2562         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2563                 goto out;
2564
2565         cpu_buffer = buffer->buffers[cpu];
2566         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2567
2568         event = rb_buffer_peek(buffer, cpu, ts);
2569         if (!event)
2570                 goto out_unlock;
2571
2572         rb_advance_reader(cpu_buffer);
2573
2574  out_unlock:
2575         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2576
2577  out:
2578         preempt_enable();
2579
2580         if (event && event->type_len == RINGBUF_TYPE_PADDING) {
2581                 cpu_relax();
2582                 goto again;
2583         }
2584
2585         return event;
2586 }
2587 EXPORT_SYMBOL_GPL(ring_buffer_consume);
2588
2589 /**
2590  * ring_buffer_read_start - start a non consuming read of the buffer
2591  * @buffer: The ring buffer to read from
2592  * @cpu: The cpu buffer to iterate over
2593  *
2594  * This starts up an iteration through the buffer. It also disables
2595  * the recording to the buffer until the reading is finished.
2596  * This prevents the reading from being corrupted. This is not
2597  * a consuming read, so a producer is not expected.
2598  *
2599  * Must be paired with ring_buffer_finish.
2600  */
2601 struct ring_buffer_iter *
2602 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2603 {
2604         struct ring_buffer_per_cpu *cpu_buffer;
2605         struct ring_buffer_iter *iter;
2606         unsigned long flags;
2607
2608         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2609                 return NULL;
2610
2611         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2612         if (!iter)
2613                 return NULL;
2614
2615         cpu_buffer = buffer->buffers[cpu];
2616
2617         iter->cpu_buffer = cpu_buffer;
2618
2619         atomic_inc(&cpu_buffer->record_disabled);
2620         synchronize_sched();
2621
2622         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2623         __raw_spin_lock(&cpu_buffer->lock);
2624         rb_iter_reset(iter);
2625         __raw_spin_unlock(&cpu_buffer->lock);
2626         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2627
2628         return iter;
2629 }
2630 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2631
2632 /**
2633  * ring_buffer_finish - finish reading the iterator of the buffer
2634  * @iter: The iterator retrieved by ring_buffer_start
2635  *
2636  * This re-enables the recording to the buffer, and frees the
2637  * iterator.
2638  */
2639 void
2640 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2641 {
2642         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2643
2644         atomic_dec(&cpu_buffer->record_disabled);
2645         kfree(iter);
2646 }
2647 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
2648
2649 /**
2650  * ring_buffer_read - read the next item in the ring buffer by the iterator
2651  * @iter: The ring buffer iterator
2652  * @ts: The time stamp of the event read.
2653  *
2654  * This reads the next event in the ring buffer and increments the iterator.
2655  */
2656 struct ring_buffer_event *
2657 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2658 {
2659         struct ring_buffer_event *event;
2660         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2661         unsigned long flags;
2662
2663  again:
2664         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2665         event = rb_iter_peek(iter, ts);
2666         if (!event)
2667                 goto out;
2668
2669         rb_advance_iter(iter);
2670  out:
2671         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2672
2673         if (event && event->type_len == RINGBUF_TYPE_PADDING) {
2674                 cpu_relax();
2675                 goto again;
2676         }
2677
2678         return event;
2679 }
2680 EXPORT_SYMBOL_GPL(ring_buffer_read);
2681
2682 /**
2683  * ring_buffer_size - return the size of the ring buffer (in bytes)
2684  * @buffer: The ring buffer.
2685  */
2686 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2687 {
2688         return BUF_PAGE_SIZE * buffer->pages;
2689 }
2690 EXPORT_SYMBOL_GPL(ring_buffer_size);
2691
2692 static void
2693 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2694 {
2695         cpu_buffer->head_page
2696                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2697         local_set(&cpu_buffer->head_page->write, 0);
2698         local_set(&cpu_buffer->head_page->entries, 0);
2699         local_set(&cpu_buffer->head_page->page->commit, 0);
2700
2701         cpu_buffer->head_page->read = 0;
2702
2703         cpu_buffer->tail_page = cpu_buffer->head_page;
2704         cpu_buffer->commit_page = cpu_buffer->head_page;
2705
2706         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2707         local_set(&cpu_buffer->reader_page->write, 0);
2708         local_set(&cpu_buffer->reader_page->entries, 0);
2709         local_set(&cpu_buffer->reader_page->page->commit, 0);
2710         cpu_buffer->reader_page->read = 0;
2711
2712         cpu_buffer->nmi_dropped = 0;
2713         cpu_buffer->commit_overrun = 0;
2714         cpu_buffer->overrun = 0;
2715         cpu_buffer->read = 0;
2716         local_set(&cpu_buffer->entries, 0);
2717         local_set(&cpu_buffer->committing, 0);
2718         local_set(&cpu_buffer->commits, 0);
2719
2720         cpu_buffer->write_stamp = 0;
2721         cpu_buffer->read_stamp = 0;
2722 }
2723
2724 /**
2725  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2726  * @buffer: The ring buffer to reset a per cpu buffer of
2727  * @cpu: The CPU buffer to be reset
2728  */
2729 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2730 {
2731         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2732         unsigned long flags;
2733
2734         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2735                 return;
2736
2737         atomic_inc(&cpu_buffer->record_disabled);
2738
2739         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2740
2741         __raw_spin_lock(&cpu_buffer->lock);
2742
2743         rb_reset_cpu(cpu_buffer);
2744
2745         __raw_spin_unlock(&cpu_buffer->lock);
2746
2747         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2748
2749         atomic_dec(&cpu_buffer->record_disabled);
2750 }
2751 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2752
2753 /**
2754  * ring_buffer_reset - reset a ring buffer
2755  * @buffer: The ring buffer to reset all cpu buffers
2756  */
2757 void ring_buffer_reset(struct ring_buffer *buffer)
2758 {
2759         int cpu;
2760
2761         for_each_buffer_cpu(buffer, cpu)
2762                 ring_buffer_reset_cpu(buffer, cpu);
2763 }
2764 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2765
2766 /**
2767  * rind_buffer_empty - is the ring buffer empty?
2768  * @buffer: The ring buffer to test
2769  */
2770 int ring_buffer_empty(struct ring_buffer *buffer)
2771 {
2772         struct ring_buffer_per_cpu *cpu_buffer;
2773         int cpu;
2774
2775         /* yes this is racy, but if you don't like the race, lock the buffer */
2776         for_each_buffer_cpu(buffer, cpu) {
2777                 cpu_buffer = buffer->buffers[cpu];
2778                 if (!rb_per_cpu_empty(cpu_buffer))
2779                         return 0;
2780         }
2781
2782         return 1;
2783 }
2784 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2785
2786 /**
2787  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2788  * @buffer: The ring buffer
2789  * @cpu: The CPU buffer to test
2790  */
2791 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2792 {
2793         struct ring_buffer_per_cpu *cpu_buffer;
2794         int ret;
2795
2796         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2797                 return 1;
2798
2799         cpu_buffer = buffer->buffers[cpu];
2800         ret = rb_per_cpu_empty(cpu_buffer);
2801
2802
2803         return ret;
2804 }
2805 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2806
2807 /**
2808  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2809  * @buffer_a: One buffer to swap with
2810  * @buffer_b: The other buffer to swap with
2811  *
2812  * This function is useful for tracers that want to take a "snapshot"
2813  * of a CPU buffer and has another back up buffer lying around.
2814  * it is expected that the tracer handles the cpu buffer not being
2815  * used at the moment.
2816  */
2817 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2818                          struct ring_buffer *buffer_b, int cpu)
2819 {
2820         struct ring_buffer_per_cpu *cpu_buffer_a;
2821         struct ring_buffer_per_cpu *cpu_buffer_b;
2822         int ret = -EINVAL;
2823
2824         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
2825             !cpumask_test_cpu(cpu, buffer_b->cpumask))
2826                 goto out;
2827
2828         /* At least make sure the two buffers are somewhat the same */
2829         if (buffer_a->pages != buffer_b->pages)
2830                 goto out;
2831
2832         ret = -EAGAIN;
2833
2834         if (ring_buffer_flags != RB_BUFFERS_ON)
2835                 goto out;
2836
2837         if (atomic_read(&buffer_a->record_disabled))
2838                 goto out;
2839
2840         if (atomic_read(&buffer_b->record_disabled))
2841                 goto out;
2842
2843         cpu_buffer_a = buffer_a->buffers[cpu];
2844         cpu_buffer_b = buffer_b->buffers[cpu];
2845
2846         if (atomic_read(&cpu_buffer_a->record_disabled))
2847                 goto out;
2848
2849         if (atomic_read(&cpu_buffer_b->record_disabled))
2850                 goto out;
2851
2852         /*
2853          * We can't do a synchronize_sched here because this
2854          * function can be called in atomic context.
2855          * Normally this will be called from the same CPU as cpu.
2856          * If not it's up to the caller to protect this.
2857          */
2858         atomic_inc(&cpu_buffer_a->record_disabled);
2859         atomic_inc(&cpu_buffer_b->record_disabled);
2860
2861         buffer_a->buffers[cpu] = cpu_buffer_b;
2862         buffer_b->buffers[cpu] = cpu_buffer_a;
2863
2864         cpu_buffer_b->buffer = buffer_a;
2865         cpu_buffer_a->buffer = buffer_b;
2866
2867         atomic_dec(&cpu_buffer_a->record_disabled);
2868         atomic_dec(&cpu_buffer_b->record_disabled);
2869
2870         ret = 0;
2871 out:
2872         return ret;
2873 }
2874 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2875
2876 /**
2877  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2878  * @buffer: the buffer to allocate for.
2879  *
2880  * This function is used in conjunction with ring_buffer_read_page.
2881  * When reading a full page from the ring buffer, these functions
2882  * can be used to speed up the process. The calling function should
2883  * allocate a few pages first with this function. Then when it
2884  * needs to get pages from the ring buffer, it passes the result
2885  * of this function into ring_buffer_read_page, which will swap
2886  * the page that was allocated, with the read page of the buffer.
2887  *
2888  * Returns:
2889  *  The page allocated, or NULL on error.
2890  */
2891 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2892 {
2893         struct buffer_data_page *bpage;
2894         unsigned long addr;
2895
2896         addr = __get_free_page(GFP_KERNEL);
2897         if (!addr)
2898                 return NULL;
2899
2900         bpage = (void *)addr;
2901
2902         rb_init_page(bpage);
2903
2904         return bpage;
2905 }
2906 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
2907
2908 /**
2909  * ring_buffer_free_read_page - free an allocated read page
2910  * @buffer: the buffer the page was allocate for
2911  * @data: the page to free
2912  *
2913  * Free a page allocated from ring_buffer_alloc_read_page.
2914  */
2915 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2916 {
2917         free_page((unsigned long)data);
2918 }
2919 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
2920
2921 /**
2922  * ring_buffer_read_page - extract a page from the ring buffer
2923  * @buffer: buffer to extract from
2924  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2925  * @len: amount to extract
2926  * @cpu: the cpu of the buffer to extract
2927  * @full: should the extraction only happen when the page is full.
2928  *
2929  * This function will pull out a page from the ring buffer and consume it.
2930  * @data_page must be the address of the variable that was returned
2931  * from ring_buffer_alloc_read_page. This is because the page might be used
2932  * to swap with a page in the ring buffer.
2933  *
2934  * for example:
2935  *      rpage = ring_buffer_alloc_read_page(buffer);
2936  *      if (!rpage)
2937  *              return error;
2938  *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
2939  *      if (ret >= 0)
2940  *              process_page(rpage, ret);
2941  *
2942  * When @full is set, the function will not return true unless
2943  * the writer is off the reader page.
2944  *
2945  * Note: it is up to the calling functions to handle sleeps and wakeups.
2946  *  The ring buffer can be used anywhere in the kernel and can not
2947  *  blindly call wake_up. The layer that uses the ring buffer must be
2948  *  responsible for that.
2949  *
2950  * Returns:
2951  *  >=0 if data has been transferred, returns the offset of consumed data.
2952  *  <0 if no data has been transferred.
2953  */
2954 int ring_buffer_read_page(struct ring_buffer *buffer,
2955                           void **data_page, size_t len, int cpu, int full)
2956 {
2957         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2958         struct ring_buffer_event *event;
2959         struct buffer_data_page *bpage;
2960         struct buffer_page *reader;
2961         unsigned long flags;
2962         unsigned int commit;
2963         unsigned int read;
2964         u64 save_timestamp;
2965         int ret = -1;
2966
2967         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2968                 goto out;
2969
2970         /*
2971          * If len is not big enough to hold the page header, then
2972          * we can not copy anything.
2973          */
2974         if (len <= BUF_PAGE_HDR_SIZE)
2975                 goto out;
2976
2977         len -= BUF_PAGE_HDR_SIZE;
2978
2979         if (!data_page)
2980                 goto out;
2981
2982         bpage = *data_page;
2983         if (!bpage)
2984                 goto out;
2985
2986         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2987
2988         reader = rb_get_reader_page(cpu_buffer);
2989         if (!reader)
2990                 goto out_unlock;
2991
2992         event = rb_reader_event(cpu_buffer);
2993
2994         read = reader->read;
2995         commit = rb_page_commit(reader);
2996
2997         /*
2998          * If this page has been partially read or
2999          * if len is not big enough to read the rest of the page or
3000          * a writer is still on the page, then
3001          * we must copy the data from the page to the buffer.
3002          * Otherwise, we can simply swap the page with the one passed in.
3003          */
3004         if (read || (len < (commit - read)) ||
3005             cpu_buffer->reader_page == cpu_buffer->commit_page) {
3006                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
3007                 unsigned int rpos = read;
3008                 unsigned int pos = 0;
3009                 unsigned int size;
3010
3011                 if (full)
3012                         goto out_unlock;
3013
3014                 if (len > (commit - read))
3015                         len = (commit - read);
3016
3017                 size = rb_event_length(event);
3018
3019                 if (len < size)
3020                         goto out_unlock;
3021
3022                 /* save the current timestamp, since the user will need it */
3023                 save_timestamp = cpu_buffer->read_stamp;
3024
3025                 /* Need to copy one event at a time */
3026                 do {
3027                         memcpy(bpage->data + pos, rpage->data + rpos, size);
3028
3029                         len -= size;
3030
3031                         rb_advance_reader(cpu_buffer);
3032                         rpos = reader->read;
3033                         pos += size;
3034
3035                         event = rb_reader_event(cpu_buffer);
3036                         size = rb_event_length(event);
3037                 } while (len > size);
3038
3039                 /* update bpage */
3040                 local_set(&bpage->commit, pos);
3041                 bpage->time_stamp = save_timestamp;
3042
3043                 /* we copied everything to the beginning */
3044                 read = 0;
3045         } else {
3046                 /* update the entry counter */
3047                 cpu_buffer->read += local_read(&reader->entries);
3048
3049                 /* swap the pages */
3050                 rb_init_page(bpage);
3051                 bpage = reader->page;
3052                 reader->page = *data_page;
3053                 local_set(&reader->write, 0);
3054                 local_set(&reader->entries, 0);
3055                 reader->read = 0;
3056                 *data_page = bpage;
3057         }
3058         ret = read;
3059
3060  out_unlock:
3061         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3062
3063  out:
3064         return ret;
3065 }
3066 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
3067
3068 static ssize_t
3069 rb_simple_read(struct file *filp, char __user *ubuf,
3070                size_t cnt, loff_t *ppos)
3071 {
3072         unsigned long *p = filp->private_data;
3073         char buf[64];
3074         int r;
3075
3076         if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
3077                 r = sprintf(buf, "permanently disabled\n");
3078         else
3079                 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
3080
3081         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
3082 }
3083
3084 static ssize_t
3085 rb_simple_write(struct file *filp, const char __user *ubuf,
3086                 size_t cnt, loff_t *ppos)
3087 {
3088         unsigned long *p = filp->private_data;
3089         char buf[64];
3090         unsigned long val;
3091         int ret;
3092
3093         if (cnt >= sizeof(buf))
3094                 return -EINVAL;
3095
3096         if (copy_from_user(&buf, ubuf, cnt))
3097                 return -EFAULT;
3098
3099         buf[cnt] = 0;
3100
3101         ret = strict_strtoul(buf, 10, &val);
3102         if (ret < 0)
3103                 return ret;
3104
3105         if (val)
3106                 set_bit(RB_BUFFERS_ON_BIT, p);
3107         else
3108                 clear_bit(RB_BUFFERS_ON_BIT, p);
3109
3110         (*ppos)++;
3111
3112         return cnt;
3113 }
3114
3115 static const struct file_operations rb_simple_fops = {
3116         .open           = tracing_open_generic,
3117         .read           = rb_simple_read,
3118         .write          = rb_simple_write,
3119 };
3120
3121
3122 static __init int rb_init_debugfs(void)
3123 {
3124         struct dentry *d_tracer;
3125
3126         d_tracer = tracing_init_dentry();
3127
3128         trace_create_file("tracing_on", 0644, d_tracer,
3129                             &ring_buffer_flags, &rb_simple_fops);
3130
3131         return 0;
3132 }
3133
3134 fs_initcall(rb_init_debugfs);
3135
3136 #ifdef CONFIG_HOTPLUG_CPU
3137 static int rb_cpu_notify(struct notifier_block *self,
3138                          unsigned long action, void *hcpu)
3139 {
3140         struct ring_buffer *buffer =
3141                 container_of(self, struct ring_buffer, cpu_notify);
3142         long cpu = (long)hcpu;
3143
3144         switch (action) {
3145         case CPU_UP_PREPARE:
3146         case CPU_UP_PREPARE_FROZEN:
3147                 if (cpumask_test_cpu(cpu, buffer->cpumask))
3148                         return NOTIFY_OK;
3149
3150                 buffer->buffers[cpu] =
3151                         rb_allocate_cpu_buffer(buffer, cpu);
3152                 if (!buffer->buffers[cpu]) {
3153                         WARN(1, "failed to allocate ring buffer on CPU %ld\n",
3154                              cpu);
3155                         return NOTIFY_OK;
3156                 }
3157                 smp_wmb();
3158                 cpumask_set_cpu(cpu, buffer->cpumask);
3159                 break;
3160         case CPU_DOWN_PREPARE:
3161         case CPU_DOWN_PREPARE_FROZEN:
3162                 /*
3163                  * Do nothing.
3164                  *  If we were to free the buffer, then the user would
3165                  *  lose any trace that was in the buffer.
3166                  */
3167                 break;
3168         default:
3169                 break;
3170         }
3171         return NOTIFY_OK;
3172 }
3173 #endif