ring_buffer: add paranoid check for buffer page
[safe/jmp/linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
20 #define DEBUG_SHIFT 0
21
22 /* FIXME!!! */
23 u64 ring_buffer_time_stamp(int cpu)
24 {
25         /* shift to debug/test normalization and TIME_EXTENTS */
26         return sched_clock() << DEBUG_SHIFT;
27 }
28
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30 {
31         /* Just stupid testing the normalize function and deltas */
32         *ts >>= DEBUG_SHIFT;
33 }
34
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT      2
37 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA       28
39
40 enum {
41         RB_LEN_TIME_EXTEND = 8,
42         RB_LEN_TIME_STAMP = 16,
43 };
44
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
48 {
49         unsigned length;
50
51         switch (event->type) {
52         case RINGBUF_TYPE_PADDING:
53                 /* undefined */
54                 return -1;
55
56         case RINGBUF_TYPE_TIME_EXTEND:
57                 return RB_LEN_TIME_EXTEND;
58
59         case RINGBUF_TYPE_TIME_STAMP:
60                 return RB_LEN_TIME_STAMP;
61
62         case RINGBUF_TYPE_DATA:
63                 if (event->len)
64                         length = event->len << RB_ALIGNMENT_SHIFT;
65                 else
66                         length = event->array[0];
67                 return length + RB_EVNT_HDR_SIZE;
68         default:
69                 BUG();
70         }
71         /* not hit */
72         return 0;
73 }
74
75 /**
76  * ring_buffer_event_length - return the length of the event
77  * @event: the event to get the length of
78  */
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80 {
81         return rb_event_length(event);
82 }
83
84 /* inline for ring buffer fast paths */
85 static inline void *
86 rb_event_data(struct ring_buffer_event *event)
87 {
88         BUG_ON(event->type != RINGBUF_TYPE_DATA);
89         /* If length is in len field, then array[0] has the data */
90         if (event->len)
91                 return (void *)&event->array[0];
92         /* Otherwise length is in array[0] and array[1] has the data */
93         return (void *)&event->array[1];
94 }
95
96 /**
97  * ring_buffer_event_data - return the data of the event
98  * @event: the event to get the data from
99  */
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
101 {
102         return rb_event_data(event);
103 }
104
105 #define for_each_buffer_cpu(buffer, cpu)                \
106         for_each_cpu_mask(cpu, buffer->cpumask)
107
108 #define TS_SHIFT        27
109 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST   (~TS_MASK)
111
112 /*
113  * This hack stolen from mm/slob.c.
114  * We can store per page timing information in the page frame of the page.
115  * Thanks to Peter Zijlstra for suggesting this idea.
116  */
117 struct buffer_page {
118         union {
119                 struct {
120                         unsigned long    flags;         /* mandatory */
121                         atomic_t         _count;        /* mandatory */
122                         u64              time_stamp;    /* page time stamp */
123                         unsigned         size;          /* size of page data */
124                         struct list_head list;          /* list of free pages */
125                 };
126                 struct page page;
127         };
128 };
129
130 /*
131  * We need to fit the time_stamp delta into 27 bits.
132  */
133 static inline int test_time_stamp(u64 delta)
134 {
135         if (delta & TS_DELTA_TEST)
136                 return 1;
137         return 0;
138 }
139
140 #define BUF_PAGE_SIZE PAGE_SIZE
141
142 /*
143  * head_page == tail_page && head == tail then buffer is empty.
144  */
145 struct ring_buffer_per_cpu {
146         int                             cpu;
147         struct ring_buffer              *buffer;
148         spinlock_t                      lock;
149         struct lock_class_key           lock_key;
150         struct list_head                pages;
151         unsigned long                   head;   /* read from head */
152         unsigned long                   tail;   /* write to tail */
153         struct buffer_page              *head_page;
154         struct buffer_page              *tail_page;
155         unsigned long                   overrun;
156         unsigned long                   entries;
157         u64                             write_stamp;
158         u64                             read_stamp;
159         atomic_t                        record_disabled;
160 };
161
162 struct ring_buffer {
163         unsigned long                   size;
164         unsigned                        pages;
165         unsigned                        flags;
166         int                             cpus;
167         cpumask_t                       cpumask;
168         atomic_t                        record_disabled;
169
170         struct mutex                    mutex;
171
172         struct ring_buffer_per_cpu      **buffers;
173 };
174
175 struct ring_buffer_iter {
176         struct ring_buffer_per_cpu      *cpu_buffer;
177         unsigned long                   head;
178         struct buffer_page              *head_page;
179         u64                             read_stamp;
180 };
181
182 #define RB_WARN_ON(buffer, cond)                        \
183         if (unlikely(cond)) {                           \
184                 atomic_inc(&buffer->record_disabled);   \
185                 WARN_ON(1);                             \
186                 return -1;                              \
187         }
188
189 /**
190  * check_pages - integrity check of buffer pages
191  * @cpu_buffer: CPU buffer with pages to test
192  *
193  * As a safty measure we check to make sure the data pages have not
194  * been corrupted.
195  */
196 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
197 {
198         struct list_head *head = &cpu_buffer->pages;
199         struct buffer_page *page, *tmp;
200
201         RB_WARN_ON(cpu_buffer, head->next->prev != head);
202         RB_WARN_ON(cpu_buffer, head->prev->next != head);
203
204         list_for_each_entry_safe(page, tmp, head, list) {
205                 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206                 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
207         }
208
209         return 0;
210 }
211
212 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
213 {
214         return cpu_buffer->head_page->size;
215 }
216
217 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
218                              unsigned nr_pages)
219 {
220         struct list_head *head = &cpu_buffer->pages;
221         struct buffer_page *page, *tmp;
222         unsigned long addr;
223         LIST_HEAD(pages);
224         unsigned i;
225
226         for (i = 0; i < nr_pages; i++) {
227                 addr = __get_free_page(GFP_KERNEL);
228                 if (!addr)
229                         goto free_pages;
230                 page = (struct buffer_page *)virt_to_page(addr);
231                 list_add(&page->list, &pages);
232         }
233
234         list_splice(&pages, head);
235
236         rb_check_pages(cpu_buffer);
237
238         return 0;
239
240  free_pages:
241         list_for_each_entry_safe(page, tmp, &pages, list) {
242                 list_del_init(&page->list);
243                 __free_page(&page->page);
244         }
245         return -ENOMEM;
246 }
247
248 static struct ring_buffer_per_cpu *
249 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
250 {
251         struct ring_buffer_per_cpu *cpu_buffer;
252         int ret;
253
254         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255                                   GFP_KERNEL, cpu_to_node(cpu));
256         if (!cpu_buffer)
257                 return NULL;
258
259         cpu_buffer->cpu = cpu;
260         cpu_buffer->buffer = buffer;
261         spin_lock_init(&cpu_buffer->lock);
262         INIT_LIST_HEAD(&cpu_buffer->pages);
263
264         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
265         if (ret < 0)
266                 goto fail_free_buffer;
267
268         cpu_buffer->head_page
269                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270         cpu_buffer->tail_page
271                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
272
273         return cpu_buffer;
274
275  fail_free_buffer:
276         kfree(cpu_buffer);
277         return NULL;
278 }
279
280 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
281 {
282         struct list_head *head = &cpu_buffer->pages;
283         struct buffer_page *page, *tmp;
284
285         list_for_each_entry_safe(page, tmp, head, list) {
286                 list_del_init(&page->list);
287                 __free_page(&page->page);
288         }
289         kfree(cpu_buffer);
290 }
291
292 /*
293  * Causes compile errors if the struct buffer_page gets bigger
294  * than the struct page.
295  */
296 extern int ring_buffer_page_too_big(void);
297
298 /**
299  * ring_buffer_alloc - allocate a new ring_buffer
300  * @size: the size in bytes that is needed.
301  * @flags: attributes to set for the ring buffer.
302  *
303  * Currently the only flag that is available is the RB_FL_OVERWRITE
304  * flag. This flag means that the buffer will overwrite old data
305  * when the buffer wraps. If this flag is not set, the buffer will
306  * drop data when the tail hits the head.
307  */
308 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
309 {
310         struct ring_buffer *buffer;
311         int bsize;
312         int cpu;
313
314         /* Paranoid! Optimizes out when all is well */
315         if (sizeof(struct buffer_page) > sizeof(struct page))
316                 ring_buffer_page_too_big();
317
318
319         /* keep it in its own cache line */
320         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
321                          GFP_KERNEL);
322         if (!buffer)
323                 return NULL;
324
325         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
326         buffer->flags = flags;
327
328         /* need at least two pages */
329         if (buffer->pages == 1)
330                 buffer->pages++;
331
332         buffer->cpumask = cpu_possible_map;
333         buffer->cpus = nr_cpu_ids;
334
335         bsize = sizeof(void *) * nr_cpu_ids;
336         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
337                                   GFP_KERNEL);
338         if (!buffer->buffers)
339                 goto fail_free_buffer;
340
341         for_each_buffer_cpu(buffer, cpu) {
342                 buffer->buffers[cpu] =
343                         rb_allocate_cpu_buffer(buffer, cpu);
344                 if (!buffer->buffers[cpu])
345                         goto fail_free_buffers;
346         }
347
348         mutex_init(&buffer->mutex);
349
350         return buffer;
351
352  fail_free_buffers:
353         for_each_buffer_cpu(buffer, cpu) {
354                 if (buffer->buffers[cpu])
355                         rb_free_cpu_buffer(buffer->buffers[cpu]);
356         }
357         kfree(buffer->buffers);
358
359  fail_free_buffer:
360         kfree(buffer);
361         return NULL;
362 }
363
364 /**
365  * ring_buffer_free - free a ring buffer.
366  * @buffer: the buffer to free.
367  */
368 void
369 ring_buffer_free(struct ring_buffer *buffer)
370 {
371         int cpu;
372
373         for_each_buffer_cpu(buffer, cpu)
374                 rb_free_cpu_buffer(buffer->buffers[cpu]);
375
376         kfree(buffer);
377 }
378
379 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
380
381 static void
382 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
383 {
384         struct buffer_page *page;
385         struct list_head *p;
386         unsigned i;
387
388         atomic_inc(&cpu_buffer->record_disabled);
389         synchronize_sched();
390
391         for (i = 0; i < nr_pages; i++) {
392                 BUG_ON(list_empty(&cpu_buffer->pages));
393                 p = cpu_buffer->pages.next;
394                 page = list_entry(p, struct buffer_page, list);
395                 list_del_init(&page->list);
396                 __free_page(&page->page);
397         }
398         BUG_ON(list_empty(&cpu_buffer->pages));
399
400         rb_reset_cpu(cpu_buffer);
401
402         rb_check_pages(cpu_buffer);
403
404         atomic_dec(&cpu_buffer->record_disabled);
405
406 }
407
408 static void
409 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
410                 struct list_head *pages, unsigned nr_pages)
411 {
412         struct buffer_page *page;
413         struct list_head *p;
414         unsigned i;
415
416         atomic_inc(&cpu_buffer->record_disabled);
417         synchronize_sched();
418
419         for (i = 0; i < nr_pages; i++) {
420                 BUG_ON(list_empty(pages));
421                 p = pages->next;
422                 page = list_entry(p, struct buffer_page, list);
423                 list_del_init(&page->list);
424                 list_add_tail(&page->list, &cpu_buffer->pages);
425         }
426         rb_reset_cpu(cpu_buffer);
427
428         rb_check_pages(cpu_buffer);
429
430         atomic_dec(&cpu_buffer->record_disabled);
431 }
432
433 /**
434  * ring_buffer_resize - resize the ring buffer
435  * @buffer: the buffer to resize.
436  * @size: the new size.
437  *
438  * The tracer is responsible for making sure that the buffer is
439  * not being used while changing the size.
440  * Note: We may be able to change the above requirement by using
441  *  RCU synchronizations.
442  *
443  * Minimum size is 2 * BUF_PAGE_SIZE.
444  *
445  * Returns -1 on failure.
446  */
447 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
448 {
449         struct ring_buffer_per_cpu *cpu_buffer;
450         unsigned nr_pages, rm_pages, new_pages;
451         struct buffer_page *page, *tmp;
452         unsigned long buffer_size;
453         unsigned long addr;
454         LIST_HEAD(pages);
455         int i, cpu;
456
457         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
458         size *= BUF_PAGE_SIZE;
459         buffer_size = buffer->pages * BUF_PAGE_SIZE;
460
461         /* we need a minimum of two pages */
462         if (size < BUF_PAGE_SIZE * 2)
463                 size = BUF_PAGE_SIZE * 2;
464
465         if (size == buffer_size)
466                 return size;
467
468         mutex_lock(&buffer->mutex);
469
470         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
471
472         if (size < buffer_size) {
473
474                 /* easy case, just free pages */
475                 BUG_ON(nr_pages >= buffer->pages);
476
477                 rm_pages = buffer->pages - nr_pages;
478
479                 for_each_buffer_cpu(buffer, cpu) {
480                         cpu_buffer = buffer->buffers[cpu];
481                         rb_remove_pages(cpu_buffer, rm_pages);
482                 }
483                 goto out;
484         }
485
486         /*
487          * This is a bit more difficult. We only want to add pages
488          * when we can allocate enough for all CPUs. We do this
489          * by allocating all the pages and storing them on a local
490          * link list. If we succeed in our allocation, then we
491          * add these pages to the cpu_buffers. Otherwise we just free
492          * them all and return -ENOMEM;
493          */
494         BUG_ON(nr_pages <= buffer->pages);
495         new_pages = nr_pages - buffer->pages;
496
497         for_each_buffer_cpu(buffer, cpu) {
498                 for (i = 0; i < new_pages; i++) {
499                         addr = __get_free_page(GFP_KERNEL);
500                         if (!addr)
501                                 goto free_pages;
502                         page = (struct buffer_page *)virt_to_page(addr);
503                         list_add(&page->list, &pages);
504                 }
505         }
506
507         for_each_buffer_cpu(buffer, cpu) {
508                 cpu_buffer = buffer->buffers[cpu];
509                 rb_insert_pages(cpu_buffer, &pages, new_pages);
510         }
511
512         BUG_ON(!list_empty(&pages));
513
514  out:
515         buffer->pages = nr_pages;
516         mutex_unlock(&buffer->mutex);
517
518         return size;
519
520  free_pages:
521         list_for_each_entry_safe(page, tmp, &pages, list) {
522                 list_del_init(&page->list);
523                 __free_page(&page->page);
524         }
525         return -ENOMEM;
526 }
527
528 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
529 {
530         return cpu_buffer->head_page == cpu_buffer->tail_page &&
531                 cpu_buffer->head == cpu_buffer->tail;
532 }
533
534 static inline int rb_null_event(struct ring_buffer_event *event)
535 {
536         return event->type == RINGBUF_TYPE_PADDING;
537 }
538
539 static inline void *rb_page_index(struct buffer_page *page, unsigned index)
540 {
541         void *addr = page_address(&page->page);
542
543         return addr + index;
544 }
545
546 static inline struct ring_buffer_event *
547 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
548 {
549         return rb_page_index(cpu_buffer->head_page,
550                              cpu_buffer->head);
551 }
552
553 static inline struct ring_buffer_event *
554 rb_iter_head_event(struct ring_buffer_iter *iter)
555 {
556         return rb_page_index(iter->head_page,
557                              iter->head);
558 }
559
560 /*
561  * When the tail hits the head and the buffer is in overwrite mode,
562  * the head jumps to the next page and all content on the previous
563  * page is discarded. But before doing so, we update the overrun
564  * variable of the buffer.
565  */
566 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
567 {
568         struct ring_buffer_event *event;
569         unsigned long head;
570
571         for (head = 0; head < rb_head_size(cpu_buffer);
572              head += rb_event_length(event)) {
573
574                 event = rb_page_index(cpu_buffer->head_page, head);
575                 BUG_ON(rb_null_event(event));
576                 /* Only count data entries */
577                 if (event->type != RINGBUF_TYPE_DATA)
578                         continue;
579                 cpu_buffer->overrun++;
580                 cpu_buffer->entries--;
581         }
582 }
583
584 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
585                                struct buffer_page **page)
586 {
587         struct list_head *p = (*page)->list.next;
588
589         if (p == &cpu_buffer->pages)
590                 p = p->next;
591
592         *page = list_entry(p, struct buffer_page, list);
593 }
594
595 static inline void
596 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
597 {
598         cpu_buffer->tail_page->time_stamp = *ts;
599         cpu_buffer->write_stamp = *ts;
600 }
601
602 static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
603 {
604         cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
605         cpu_buffer->head = 0;
606 }
607
608 static void
609 rb_reset_iter_read_page(struct ring_buffer_iter *iter)
610 {
611         iter->read_stamp = iter->head_page->time_stamp;
612         iter->head = 0;
613 }
614
615 /**
616  * ring_buffer_update_event - update event type and data
617  * @event: the even to update
618  * @type: the type of event
619  * @length: the size of the event field in the ring buffer
620  *
621  * Update the type and data fields of the event. The length
622  * is the actual size that is written to the ring buffer,
623  * and with this, we can determine what to place into the
624  * data field.
625  */
626 static inline void
627 rb_update_event(struct ring_buffer_event *event,
628                          unsigned type, unsigned length)
629 {
630         event->type = type;
631
632         switch (type) {
633
634         case RINGBUF_TYPE_PADDING:
635                 break;
636
637         case RINGBUF_TYPE_TIME_EXTEND:
638                 event->len =
639                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
640                         >> RB_ALIGNMENT_SHIFT;
641                 break;
642
643         case RINGBUF_TYPE_TIME_STAMP:
644                 event->len =
645                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
646                         >> RB_ALIGNMENT_SHIFT;
647                 break;
648
649         case RINGBUF_TYPE_DATA:
650                 length -= RB_EVNT_HDR_SIZE;
651                 if (length > RB_MAX_SMALL_DATA) {
652                         event->len = 0;
653                         event->array[0] = length;
654                 } else
655                         event->len =
656                                 (length + (RB_ALIGNMENT-1))
657                                 >> RB_ALIGNMENT_SHIFT;
658                 break;
659         default:
660                 BUG();
661         }
662 }
663
664 static inline unsigned rb_calculate_event_length(unsigned length)
665 {
666         struct ring_buffer_event event; /* Used only for sizeof array */
667
668         /* zero length can cause confusions */
669         if (!length)
670                 length = 1;
671
672         if (length > RB_MAX_SMALL_DATA)
673                 length += sizeof(event.array[0]);
674
675         length += RB_EVNT_HDR_SIZE;
676         length = ALIGN(length, RB_ALIGNMENT);
677
678         return length;
679 }
680
681 static struct ring_buffer_event *
682 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
683                   unsigned type, unsigned long length, u64 *ts)
684 {
685         struct buffer_page *head_page, *tail_page;
686         unsigned long tail;
687         struct ring_buffer *buffer = cpu_buffer->buffer;
688         struct ring_buffer_event *event;
689
690         tail_page = cpu_buffer->tail_page;
691         head_page = cpu_buffer->head_page;
692         tail = cpu_buffer->tail;
693
694         if (tail + length > BUF_PAGE_SIZE) {
695                 struct buffer_page *next_page = tail_page;
696
697                 rb_inc_page(cpu_buffer, &next_page);
698
699                 if (next_page == head_page) {
700                         if (!(buffer->flags & RB_FL_OVERWRITE))
701                                 return NULL;
702
703                         /* count overflows */
704                         rb_update_overflow(cpu_buffer);
705
706                         rb_inc_page(cpu_buffer, &head_page);
707                         cpu_buffer->head_page = head_page;
708                         rb_reset_read_page(cpu_buffer);
709                 }
710
711                 if (tail != BUF_PAGE_SIZE) {
712                         event = rb_page_index(tail_page, tail);
713                         /* page padding */
714                         event->type = RINGBUF_TYPE_PADDING;
715                 }
716
717                 tail_page->size = tail;
718                 tail_page = next_page;
719                 tail_page->size = 0;
720                 tail = 0;
721                 cpu_buffer->tail_page = tail_page;
722                 cpu_buffer->tail = tail;
723                 rb_add_stamp(cpu_buffer, ts);
724         }
725
726         BUG_ON(tail + length > BUF_PAGE_SIZE);
727
728         event = rb_page_index(tail_page, tail);
729         rb_update_event(event, type, length);
730
731         return event;
732 }
733
734 static int
735 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
736                   u64 *ts, u64 *delta)
737 {
738         struct ring_buffer_event *event;
739         static int once;
740
741         if (unlikely(*delta > (1ULL << 59) && !once++)) {
742                 printk(KERN_WARNING "Delta way too big! %llu"
743                        " ts=%llu write stamp = %llu\n",
744                        *delta, *ts, cpu_buffer->write_stamp);
745                 WARN_ON(1);
746         }
747
748         /*
749          * The delta is too big, we to add a
750          * new timestamp.
751          */
752         event = __rb_reserve_next(cpu_buffer,
753                                   RINGBUF_TYPE_TIME_EXTEND,
754                                   RB_LEN_TIME_EXTEND,
755                                   ts);
756         if (!event)
757                 return -1;
758
759         /* check to see if we went to the next page */
760         if (cpu_buffer->tail) {
761                 /* Still on same page, update timestamp */
762                 event->time_delta = *delta & TS_MASK;
763                 event->array[0] = *delta >> TS_SHIFT;
764                 /* commit the time event */
765                 cpu_buffer->tail +=
766                         rb_event_length(event);
767                 cpu_buffer->write_stamp = *ts;
768                 *delta = 0;
769         }
770
771         return 0;
772 }
773
774 static struct ring_buffer_event *
775 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
776                       unsigned type, unsigned long length)
777 {
778         struct ring_buffer_event *event;
779         u64 ts, delta;
780
781         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
782
783         if (cpu_buffer->tail) {
784                 delta = ts - cpu_buffer->write_stamp;
785
786                 if (test_time_stamp(delta)) {
787                         int ret;
788
789                         ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
790                         if (ret < 0)
791                                 return NULL;
792                 }
793         } else {
794                 rb_add_stamp(cpu_buffer, &ts);
795                 delta = 0;
796         }
797
798         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
799         if (!event)
800                 return NULL;
801
802         /* If the reserve went to the next page, our delta is zero */
803         if (!cpu_buffer->tail)
804                 delta = 0;
805
806         event->time_delta = delta;
807
808         return event;
809 }
810
811 /**
812  * ring_buffer_lock_reserve - reserve a part of the buffer
813  * @buffer: the ring buffer to reserve from
814  * @length: the length of the data to reserve (excluding event header)
815  * @flags: a pointer to save the interrupt flags
816  *
817  * Returns a reseverd event on the ring buffer to copy directly to.
818  * The user of this interface will need to get the body to write into
819  * and can use the ring_buffer_event_data() interface.
820  *
821  * The length is the length of the data needed, not the event length
822  * which also includes the event header.
823  *
824  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
825  * If NULL is returned, then nothing has been allocated or locked.
826  */
827 struct ring_buffer_event *
828 ring_buffer_lock_reserve(struct ring_buffer *buffer,
829                          unsigned long length,
830                          unsigned long *flags)
831 {
832         struct ring_buffer_per_cpu *cpu_buffer;
833         struct ring_buffer_event *event;
834         int cpu;
835
836         if (atomic_read(&buffer->record_disabled))
837                 return NULL;
838
839         raw_local_irq_save(*flags);
840         cpu = raw_smp_processor_id();
841
842         if (!cpu_isset(cpu, buffer->cpumask))
843                 goto out_irq;
844
845         cpu_buffer = buffer->buffers[cpu];
846         spin_lock(&cpu_buffer->lock);
847
848         if (atomic_read(&cpu_buffer->record_disabled))
849                 goto no_record;
850
851         length = rb_calculate_event_length(length);
852         if (length > BUF_PAGE_SIZE)
853                 return NULL;
854
855         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
856         if (!event)
857                 goto no_record;
858
859         return event;
860
861  no_record:
862         spin_unlock(&cpu_buffer->lock);
863  out_irq:
864         local_irq_restore(*flags);
865         return NULL;
866 }
867
868 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
869                       struct ring_buffer_event *event)
870 {
871         cpu_buffer->tail += rb_event_length(event);
872         cpu_buffer->tail_page->size = cpu_buffer->tail;
873         cpu_buffer->write_stamp += event->time_delta;
874         cpu_buffer->entries++;
875 }
876
877 /**
878  * ring_buffer_unlock_commit - commit a reserved
879  * @buffer: The buffer to commit to
880  * @event: The event pointer to commit.
881  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
882  *
883  * This commits the data to the ring buffer, and releases any locks held.
884  *
885  * Must be paired with ring_buffer_lock_reserve.
886  */
887 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
888                               struct ring_buffer_event *event,
889                               unsigned long flags)
890 {
891         struct ring_buffer_per_cpu *cpu_buffer;
892         int cpu = raw_smp_processor_id();
893
894         cpu_buffer = buffer->buffers[cpu];
895
896         assert_spin_locked(&cpu_buffer->lock);
897
898         rb_commit(cpu_buffer, event);
899
900         spin_unlock(&cpu_buffer->lock);
901         raw_local_irq_restore(flags);
902
903         return 0;
904 }
905
906 /**
907  * ring_buffer_write - write data to the buffer without reserving
908  * @buffer: The ring buffer to write to.
909  * @length: The length of the data being written (excluding the event header)
910  * @data: The data to write to the buffer.
911  *
912  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
913  * one function. If you already have the data to write to the buffer, it
914  * may be easier to simply call this function.
915  *
916  * Note, like ring_buffer_lock_reserve, the length is the length of the data
917  * and not the length of the event which would hold the header.
918  */
919 int ring_buffer_write(struct ring_buffer *buffer,
920                         unsigned long length,
921                         void *data)
922 {
923         struct ring_buffer_per_cpu *cpu_buffer;
924         struct ring_buffer_event *event;
925         unsigned long event_length, flags;
926         void *body;
927         int ret = -EBUSY;
928         int cpu;
929
930         if (atomic_read(&buffer->record_disabled))
931                 return -EBUSY;
932
933         local_irq_save(flags);
934         cpu = raw_smp_processor_id();
935
936         if (!cpu_isset(cpu, buffer->cpumask))
937                 goto out_irq;
938
939         cpu_buffer = buffer->buffers[cpu];
940         spin_lock(&cpu_buffer->lock);
941
942         if (atomic_read(&cpu_buffer->record_disabled))
943                 goto out;
944
945         event_length = rb_calculate_event_length(length);
946         event = rb_reserve_next_event(cpu_buffer,
947                                       RINGBUF_TYPE_DATA, event_length);
948         if (!event)
949                 goto out;
950
951         body = rb_event_data(event);
952
953         memcpy(body, data, length);
954
955         rb_commit(cpu_buffer, event);
956
957         ret = 0;
958  out:
959         spin_unlock(&cpu_buffer->lock);
960  out_irq:
961         local_irq_restore(flags);
962
963         return ret;
964 }
965
966 /**
967  * ring_buffer_lock - lock the ring buffer
968  * @buffer: The ring buffer to lock
969  * @flags: The place to store the interrupt flags
970  *
971  * This locks all the per CPU buffers.
972  *
973  * Must be unlocked by ring_buffer_unlock.
974  */
975 void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
976 {
977         struct ring_buffer_per_cpu *cpu_buffer;
978         int cpu;
979
980         local_irq_save(*flags);
981
982         for_each_buffer_cpu(buffer, cpu) {
983                 cpu_buffer = buffer->buffers[cpu];
984                 spin_lock(&cpu_buffer->lock);
985         }
986 }
987
988 /**
989  * ring_buffer_unlock - unlock a locked buffer
990  * @buffer: The locked buffer to unlock
991  * @flags: The interrupt flags received by ring_buffer_lock
992  */
993 void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
994 {
995         struct ring_buffer_per_cpu *cpu_buffer;
996         int cpu;
997
998         for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
999                 if (!cpu_isset(cpu, buffer->cpumask))
1000                         continue;
1001                 cpu_buffer = buffer->buffers[cpu];
1002                 spin_unlock(&cpu_buffer->lock);
1003         }
1004
1005         local_irq_restore(flags);
1006 }
1007
1008 /**
1009  * ring_buffer_record_disable - stop all writes into the buffer
1010  * @buffer: The ring buffer to stop writes to.
1011  *
1012  * This prevents all writes to the buffer. Any attempt to write
1013  * to the buffer after this will fail and return NULL.
1014  *
1015  * The caller should call synchronize_sched() after this.
1016  */
1017 void ring_buffer_record_disable(struct ring_buffer *buffer)
1018 {
1019         atomic_inc(&buffer->record_disabled);
1020 }
1021
1022 /**
1023  * ring_buffer_record_enable - enable writes to the buffer
1024  * @buffer: The ring buffer to enable writes
1025  *
1026  * Note, multiple disables will need the same number of enables
1027  * to truely enable the writing (much like preempt_disable).
1028  */
1029 void ring_buffer_record_enable(struct ring_buffer *buffer)
1030 {
1031         atomic_dec(&buffer->record_disabled);
1032 }
1033
1034 /**
1035  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1036  * @buffer: The ring buffer to stop writes to.
1037  * @cpu: The CPU buffer to stop
1038  *
1039  * This prevents all writes to the buffer. Any attempt to write
1040  * to the buffer after this will fail and return NULL.
1041  *
1042  * The caller should call synchronize_sched() after this.
1043  */
1044 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1045 {
1046         struct ring_buffer_per_cpu *cpu_buffer;
1047
1048         if (!cpu_isset(cpu, buffer->cpumask))
1049                 return;
1050
1051         cpu_buffer = buffer->buffers[cpu];
1052         atomic_inc(&cpu_buffer->record_disabled);
1053 }
1054
1055 /**
1056  * ring_buffer_record_enable_cpu - enable writes to the buffer
1057  * @buffer: The ring buffer to enable writes
1058  * @cpu: The CPU to enable.
1059  *
1060  * Note, multiple disables will need the same number of enables
1061  * to truely enable the writing (much like preempt_disable).
1062  */
1063 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1064 {
1065         struct ring_buffer_per_cpu *cpu_buffer;
1066
1067         if (!cpu_isset(cpu, buffer->cpumask))
1068                 return;
1069
1070         cpu_buffer = buffer->buffers[cpu];
1071         atomic_dec(&cpu_buffer->record_disabled);
1072 }
1073
1074 /**
1075  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1076  * @buffer: The ring buffer
1077  * @cpu: The per CPU buffer to get the entries from.
1078  */
1079 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1080 {
1081         struct ring_buffer_per_cpu *cpu_buffer;
1082
1083         if (!cpu_isset(cpu, buffer->cpumask))
1084                 return 0;
1085
1086         cpu_buffer = buffer->buffers[cpu];
1087         return cpu_buffer->entries;
1088 }
1089
1090 /**
1091  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1092  * @buffer: The ring buffer
1093  * @cpu: The per CPU buffer to get the number of overruns from
1094  */
1095 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1096 {
1097         struct ring_buffer_per_cpu *cpu_buffer;
1098
1099         if (!cpu_isset(cpu, buffer->cpumask))
1100                 return 0;
1101
1102         cpu_buffer = buffer->buffers[cpu];
1103         return cpu_buffer->overrun;
1104 }
1105
1106 /**
1107  * ring_buffer_entries - get the number of entries in a buffer
1108  * @buffer: The ring buffer
1109  *
1110  * Returns the total number of entries in the ring buffer
1111  * (all CPU entries)
1112  */
1113 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1114 {
1115         struct ring_buffer_per_cpu *cpu_buffer;
1116         unsigned long entries = 0;
1117         int cpu;
1118
1119         /* if you care about this being correct, lock the buffer */
1120         for_each_buffer_cpu(buffer, cpu) {
1121                 cpu_buffer = buffer->buffers[cpu];
1122                 entries += cpu_buffer->entries;
1123         }
1124
1125         return entries;
1126 }
1127
1128 /**
1129  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1130  * @buffer: The ring buffer
1131  *
1132  * Returns the total number of overruns in the ring buffer
1133  * (all CPU entries)
1134  */
1135 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1136 {
1137         struct ring_buffer_per_cpu *cpu_buffer;
1138         unsigned long overruns = 0;
1139         int cpu;
1140
1141         /* if you care about this being correct, lock the buffer */
1142         for_each_buffer_cpu(buffer, cpu) {
1143                 cpu_buffer = buffer->buffers[cpu];
1144                 overruns += cpu_buffer->overrun;
1145         }
1146
1147         return overruns;
1148 }
1149
1150 /**
1151  * ring_buffer_iter_reset - reset an iterator
1152  * @iter: The iterator to reset
1153  *
1154  * Resets the iterator, so that it will start from the beginning
1155  * again.
1156  */
1157 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1158 {
1159         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1160
1161         iter->head_page = cpu_buffer->head_page;
1162         iter->head = cpu_buffer->head;
1163         rb_reset_iter_read_page(iter);
1164 }
1165
1166 /**
1167  * ring_buffer_iter_empty - check if an iterator has no more to read
1168  * @iter: The iterator to check
1169  */
1170 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1171 {
1172         struct ring_buffer_per_cpu *cpu_buffer;
1173
1174         cpu_buffer = iter->cpu_buffer;
1175
1176         return iter->head_page == cpu_buffer->tail_page &&
1177                 iter->head == cpu_buffer->tail;
1178 }
1179
1180 static void
1181 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1182                      struct ring_buffer_event *event)
1183 {
1184         u64 delta;
1185
1186         switch (event->type) {
1187         case RINGBUF_TYPE_PADDING:
1188                 return;
1189
1190         case RINGBUF_TYPE_TIME_EXTEND:
1191                 delta = event->array[0];
1192                 delta <<= TS_SHIFT;
1193                 delta += event->time_delta;
1194                 cpu_buffer->read_stamp += delta;
1195                 return;
1196
1197         case RINGBUF_TYPE_TIME_STAMP:
1198                 /* FIXME: not implemented */
1199                 return;
1200
1201         case RINGBUF_TYPE_DATA:
1202                 cpu_buffer->read_stamp += event->time_delta;
1203                 return;
1204
1205         default:
1206                 BUG();
1207         }
1208         return;
1209 }
1210
1211 static void
1212 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1213                           struct ring_buffer_event *event)
1214 {
1215         u64 delta;
1216
1217         switch (event->type) {
1218         case RINGBUF_TYPE_PADDING:
1219                 return;
1220
1221         case RINGBUF_TYPE_TIME_EXTEND:
1222                 delta = event->array[0];
1223                 delta <<= TS_SHIFT;
1224                 delta += event->time_delta;
1225                 iter->read_stamp += delta;
1226                 return;
1227
1228         case RINGBUF_TYPE_TIME_STAMP:
1229                 /* FIXME: not implemented */
1230                 return;
1231
1232         case RINGBUF_TYPE_DATA:
1233                 iter->read_stamp += event->time_delta;
1234                 return;
1235
1236         default:
1237                 BUG();
1238         }
1239         return;
1240 }
1241
1242 static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1243 {
1244         struct ring_buffer_event *event;
1245         unsigned length;
1246
1247         /*
1248          * Check if we are at the end of the buffer.
1249          */
1250         if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1251                 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1252                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1253                 rb_reset_read_page(cpu_buffer);
1254                 return;
1255         }
1256
1257         event = rb_head_event(cpu_buffer);
1258
1259         if (event->type == RINGBUF_TYPE_DATA)
1260                 cpu_buffer->entries--;
1261
1262         length = rb_event_length(event);
1263
1264         /*
1265          * This should not be called to advance the header if we are
1266          * at the tail of the buffer.
1267          */
1268         BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1269                (cpu_buffer->head + length > cpu_buffer->tail));
1270
1271         rb_update_read_stamp(cpu_buffer, event);
1272
1273         cpu_buffer->head += length;
1274
1275         /* check for end of page */
1276         if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1277             (cpu_buffer->head_page != cpu_buffer->tail_page))
1278                 rb_advance_head(cpu_buffer);
1279 }
1280
1281 static void rb_advance_iter(struct ring_buffer_iter *iter)
1282 {
1283         struct ring_buffer *buffer;
1284         struct ring_buffer_per_cpu *cpu_buffer;
1285         struct ring_buffer_event *event;
1286         unsigned length;
1287
1288         cpu_buffer = iter->cpu_buffer;
1289         buffer = cpu_buffer->buffer;
1290
1291         /*
1292          * Check if we are at the end of the buffer.
1293          */
1294         if (iter->head >= iter->head_page->size) {
1295                 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1296                 rb_inc_page(cpu_buffer, &iter->head_page);
1297                 rb_reset_iter_read_page(iter);
1298                 return;
1299         }
1300
1301         event = rb_iter_head_event(iter);
1302
1303         length = rb_event_length(event);
1304
1305         /*
1306          * This should not be called to advance the header if we are
1307          * at the tail of the buffer.
1308          */
1309         BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1310                (iter->head + length > cpu_buffer->tail));
1311
1312         rb_update_iter_read_stamp(iter, event);
1313
1314         iter->head += length;
1315
1316         /* check for end of page padding */
1317         if ((iter->head >= iter->head_page->size) &&
1318             (iter->head_page != cpu_buffer->tail_page))
1319                 rb_advance_iter(iter);
1320 }
1321
1322 /**
1323  * ring_buffer_peek - peek at the next event to be read
1324  * @buffer: The ring buffer to read
1325  * @cpu: The cpu to peak at
1326  * @ts: The timestamp counter of this event.
1327  *
1328  * This will return the event that will be read next, but does
1329  * not consume the data.
1330  */
1331 struct ring_buffer_event *
1332 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1333 {
1334         struct ring_buffer_per_cpu *cpu_buffer;
1335         struct ring_buffer_event *event;
1336
1337         if (!cpu_isset(cpu, buffer->cpumask))
1338                 return NULL;
1339
1340         cpu_buffer = buffer->buffers[cpu];
1341
1342  again:
1343         if (rb_per_cpu_empty(cpu_buffer))
1344                 return NULL;
1345
1346         event = rb_head_event(cpu_buffer);
1347
1348         switch (event->type) {
1349         case RINGBUF_TYPE_PADDING:
1350                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1351                 rb_reset_read_page(cpu_buffer);
1352                 goto again;
1353
1354         case RINGBUF_TYPE_TIME_EXTEND:
1355                 /* Internal data, OK to advance */
1356                 rb_advance_head(cpu_buffer);
1357                 goto again;
1358
1359         case RINGBUF_TYPE_TIME_STAMP:
1360                 /* FIXME: not implemented */
1361                 rb_advance_head(cpu_buffer);
1362                 goto again;
1363
1364         case RINGBUF_TYPE_DATA:
1365                 if (ts) {
1366                         *ts = cpu_buffer->read_stamp + event->time_delta;
1367                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1368                 }
1369                 return event;
1370
1371         default:
1372                 BUG();
1373         }
1374
1375         return NULL;
1376 }
1377
1378 /**
1379  * ring_buffer_iter_peek - peek at the next event to be read
1380  * @iter: The ring buffer iterator
1381  * @ts: The timestamp counter of this event.
1382  *
1383  * This will return the event that will be read next, but does
1384  * not increment the iterator.
1385  */
1386 struct ring_buffer_event *
1387 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1388 {
1389         struct ring_buffer *buffer;
1390         struct ring_buffer_per_cpu *cpu_buffer;
1391         struct ring_buffer_event *event;
1392
1393         if (ring_buffer_iter_empty(iter))
1394                 return NULL;
1395
1396         cpu_buffer = iter->cpu_buffer;
1397         buffer = cpu_buffer->buffer;
1398
1399  again:
1400         if (rb_per_cpu_empty(cpu_buffer))
1401                 return NULL;
1402
1403         event = rb_iter_head_event(iter);
1404
1405         switch (event->type) {
1406         case RINGBUF_TYPE_PADDING:
1407                 rb_inc_page(cpu_buffer, &iter->head_page);
1408                 rb_reset_iter_read_page(iter);
1409                 goto again;
1410
1411         case RINGBUF_TYPE_TIME_EXTEND:
1412                 /* Internal data, OK to advance */
1413                 rb_advance_iter(iter);
1414                 goto again;
1415
1416         case RINGBUF_TYPE_TIME_STAMP:
1417                 /* FIXME: not implemented */
1418                 rb_advance_iter(iter);
1419                 goto again;
1420
1421         case RINGBUF_TYPE_DATA:
1422                 if (ts) {
1423                         *ts = iter->read_stamp + event->time_delta;
1424                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1425                 }
1426                 return event;
1427
1428         default:
1429                 BUG();
1430         }
1431
1432         return NULL;
1433 }
1434
1435 /**
1436  * ring_buffer_consume - return an event and consume it
1437  * @buffer: The ring buffer to get the next event from
1438  *
1439  * Returns the next event in the ring buffer, and that event is consumed.
1440  * Meaning, that sequential reads will keep returning a different event,
1441  * and eventually empty the ring buffer if the producer is slower.
1442  */
1443 struct ring_buffer_event *
1444 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1445 {
1446         struct ring_buffer_per_cpu *cpu_buffer;
1447         struct ring_buffer_event *event;
1448
1449         if (!cpu_isset(cpu, buffer->cpumask))
1450                 return NULL;
1451
1452         event = ring_buffer_peek(buffer, cpu, ts);
1453         if (!event)
1454                 return NULL;
1455
1456         cpu_buffer = buffer->buffers[cpu];
1457         rb_advance_head(cpu_buffer);
1458
1459         return event;
1460 }
1461
1462 /**
1463  * ring_buffer_read_start - start a non consuming read of the buffer
1464  * @buffer: The ring buffer to read from
1465  * @cpu: The cpu buffer to iterate over
1466  *
1467  * This starts up an iteration through the buffer. It also disables
1468  * the recording to the buffer until the reading is finished.
1469  * This prevents the reading from being corrupted. This is not
1470  * a consuming read, so a producer is not expected.
1471  *
1472  * Must be paired with ring_buffer_finish.
1473  */
1474 struct ring_buffer_iter *
1475 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1476 {
1477         struct ring_buffer_per_cpu *cpu_buffer;
1478         struct ring_buffer_iter *iter;
1479
1480         if (!cpu_isset(cpu, buffer->cpumask))
1481                 return NULL;
1482
1483         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1484         if (!iter)
1485                 return NULL;
1486
1487         cpu_buffer = buffer->buffers[cpu];
1488
1489         iter->cpu_buffer = cpu_buffer;
1490
1491         atomic_inc(&cpu_buffer->record_disabled);
1492         synchronize_sched();
1493
1494         spin_lock(&cpu_buffer->lock);
1495         iter->head = cpu_buffer->head;
1496         iter->head_page = cpu_buffer->head_page;
1497         rb_reset_iter_read_page(iter);
1498         spin_unlock(&cpu_buffer->lock);
1499
1500         return iter;
1501 }
1502
1503 /**
1504  * ring_buffer_finish - finish reading the iterator of the buffer
1505  * @iter: The iterator retrieved by ring_buffer_start
1506  *
1507  * This re-enables the recording to the buffer, and frees the
1508  * iterator.
1509  */
1510 void
1511 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1512 {
1513         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1514
1515         atomic_dec(&cpu_buffer->record_disabled);
1516         kfree(iter);
1517 }
1518
1519 /**
1520  * ring_buffer_read - read the next item in the ring buffer by the iterator
1521  * @iter: The ring buffer iterator
1522  * @ts: The time stamp of the event read.
1523  *
1524  * This reads the next event in the ring buffer and increments the iterator.
1525  */
1526 struct ring_buffer_event *
1527 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1528 {
1529         struct ring_buffer_event *event;
1530
1531         event = ring_buffer_iter_peek(iter, ts);
1532         if (!event)
1533                 return NULL;
1534
1535         rb_advance_iter(iter);
1536
1537         return event;
1538 }
1539
1540 /**
1541  * ring_buffer_size - return the size of the ring buffer (in bytes)
1542  * @buffer: The ring buffer.
1543  */
1544 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1545 {
1546         return BUF_PAGE_SIZE * buffer->pages;
1547 }
1548
1549 static void
1550 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1551 {
1552         cpu_buffer->head_page
1553                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1554         cpu_buffer->tail_page
1555                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1556
1557         cpu_buffer->head = cpu_buffer->tail = 0;
1558         cpu_buffer->overrun = 0;
1559         cpu_buffer->entries = 0;
1560 }
1561
1562 /**
1563  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1564  * @buffer: The ring buffer to reset a per cpu buffer of
1565  * @cpu: The CPU buffer to be reset
1566  */
1567 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1568 {
1569         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1570         unsigned long flags;
1571
1572         if (!cpu_isset(cpu, buffer->cpumask))
1573                 return;
1574
1575         raw_local_irq_save(flags);
1576         spin_lock(&cpu_buffer->lock);
1577
1578         rb_reset_cpu(cpu_buffer);
1579
1580         spin_unlock(&cpu_buffer->lock);
1581         raw_local_irq_restore(flags);
1582 }
1583
1584 /**
1585  * ring_buffer_reset - reset a ring buffer
1586  * @buffer: The ring buffer to reset all cpu buffers
1587  */
1588 void ring_buffer_reset(struct ring_buffer *buffer)
1589 {
1590         unsigned long flags;
1591         int cpu;
1592
1593         ring_buffer_lock(buffer, &flags);
1594
1595         for_each_buffer_cpu(buffer, cpu)
1596                 rb_reset_cpu(buffer->buffers[cpu]);
1597
1598         ring_buffer_unlock(buffer, flags);
1599 }
1600
1601 /**
1602  * rind_buffer_empty - is the ring buffer empty?
1603  * @buffer: The ring buffer to test
1604  */
1605 int ring_buffer_empty(struct ring_buffer *buffer)
1606 {
1607         struct ring_buffer_per_cpu *cpu_buffer;
1608         int cpu;
1609
1610         /* yes this is racy, but if you don't like the race, lock the buffer */
1611         for_each_buffer_cpu(buffer, cpu) {
1612                 cpu_buffer = buffer->buffers[cpu];
1613                 if (!rb_per_cpu_empty(cpu_buffer))
1614                         return 0;
1615         }
1616         return 1;
1617 }
1618
1619 /**
1620  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1621  * @buffer: The ring buffer
1622  * @cpu: The CPU buffer to test
1623  */
1624 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1625 {
1626         struct ring_buffer_per_cpu *cpu_buffer;
1627
1628         if (!cpu_isset(cpu, buffer->cpumask))
1629                 return 1;
1630
1631         cpu_buffer = buffer->buffers[cpu];
1632         return rb_per_cpu_empty(cpu_buffer);
1633 }
1634
1635 /**
1636  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1637  * @buffer_a: One buffer to swap with
1638  * @buffer_b: The other buffer to swap with
1639  *
1640  * This function is useful for tracers that want to take a "snapshot"
1641  * of a CPU buffer and has another back up buffer lying around.
1642  * it is expected that the tracer handles the cpu buffer not being
1643  * used at the moment.
1644  */
1645 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1646                          struct ring_buffer *buffer_b, int cpu)
1647 {
1648         struct ring_buffer_per_cpu *cpu_buffer_a;
1649         struct ring_buffer_per_cpu *cpu_buffer_b;
1650
1651         if (!cpu_isset(cpu, buffer_a->cpumask) ||
1652             !cpu_isset(cpu, buffer_b->cpumask))
1653                 return -EINVAL;
1654
1655         /* At least make sure the two buffers are somewhat the same */
1656         if (buffer_a->size != buffer_b->size ||
1657             buffer_a->pages != buffer_b->pages)
1658                 return -EINVAL;
1659
1660         cpu_buffer_a = buffer_a->buffers[cpu];
1661         cpu_buffer_b = buffer_b->buffers[cpu];
1662
1663         /*
1664          * We can't do a synchronize_sched here because this
1665          * function can be called in atomic context.
1666          * Normally this will be called from the same CPU as cpu.
1667          * If not it's up to the caller to protect this.
1668          */
1669         atomic_inc(&cpu_buffer_a->record_disabled);
1670         atomic_inc(&cpu_buffer_b->record_disabled);
1671
1672         buffer_a->buffers[cpu] = cpu_buffer_b;
1673         buffer_b->buffers[cpu] = cpu_buffer_a;
1674
1675         cpu_buffer_b->buffer = buffer_a;
1676         cpu_buffer_a->buffer = buffer_b;
1677
1678         atomic_dec(&cpu_buffer_a->record_disabled);
1679         atomic_dec(&cpu_buffer_b->record_disabled);
1680
1681         return 0;
1682 }
1683