#include <linux/cpu.h>
#include <linux/fs.h>
+#include <asm/local.h>
#include "trace.h"
/*
}
EXPORT_SYMBOL_GPL(tracing_is_on);
-#include "trace.h"
-
#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
#define RB_ALIGNMENT 4U
#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
static inline int rb_null_event(struct ring_buffer_event *event)
{
- return event->type_len == RINGBUF_TYPE_PADDING
- && event->time_delta == 0;
-}
-
-static inline int rb_discarded_event(struct ring_buffer_event *event)
-{
- return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta;
+ return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
}
static void rb_event_set_padding(struct ring_buffer_event *event)
{
+ /* padding has a NULL time_delta */
event->type_len = RINGBUF_TYPE_PADDING;
event->time_delta = 0;
}
int ret;
ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
- "offset:0;\tsize:%u;\n",
- (unsigned int)sizeof(field.time_stamp));
+ "offset:0;\tsize:%u;\tsigned:%u;\n",
+ (unsigned int)sizeof(field.time_stamp),
+ (unsigned int)is_signed_type(u64));
ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
- "offset:%u;\tsize:%u;\n",
+ "offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), commit),
- (unsigned int)sizeof(field.commit));
+ (unsigned int)sizeof(field.commit),
+ (unsigned int)is_signed_type(long));
ret = trace_seq_printf(s, "\tfield: char data;\t"
- "offset:%u;\tsize:%u;\n",
+ "offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), data),
- (unsigned int)BUF_PAGE_SIZE);
+ (unsigned int)BUF_PAGE_SIZE,
+ (unsigned int)is_signed_type(char));
return ret;
}
int cpu;
struct ring_buffer *buffer;
spinlock_t reader_lock; /* serialize readers */
- raw_spinlock_t lock;
+ arch_spinlock_t lock;
struct lock_class_key lock_key;
struct list_head *pages;
struct buffer_page *head_page; /* read from head */
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long head;
struct buffer_page *head_page;
+ struct buffer_page *cache_reader_page;
+ unsigned long cache_read;
u64 read_stamp;
};
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
-#define RB_WARN_ON(buffer, cond) \
- ({ \
- int _____ret = unlikely(cond); \
- if (_____ret) { \
- atomic_inc(&buffer->record_disabled); \
- WARN_ON(1); \
- } \
- _____ret; \
+#define RB_WARN_ON(b, cond) \
+ ({ \
+ int _____ret = unlikely(cond); \
+ if (_____ret) { \
+ if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
+ struct ring_buffer_per_cpu *__b = \
+ (void *)b; \
+ atomic_inc(&__b->buffer->record_disabled); \
+ } else \
+ atomic_inc(&b->record_disabled); \
+ WARN_ON(1); \
+ } \
+ _____ret; \
})
/* Up this if you want to test the TIME_EXTENTS and normalization */
#define DEBUG_SHIFT 0
-static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu)
+static inline u64 rb_time_stamp(struct ring_buffer *buffer)
{
/* shift to debug/test normalization and TIME_EXTENTS */
return buffer->clock() << DEBUG_SHIFT;
u64 time;
preempt_disable_notrace();
- time = rb_time_stamp(buffer, cpu);
+ time = rb_time_stamp(buffer);
preempt_enable_no_resched_notrace();
return time;
}
/*
- * rb_is_head_page - test if the give page is the head page
+ * rb_is_head_page - test if the given page is the head page
*
* Because the reader may move the head_page pointer, we can
* not trust what the head page is (it may be pointing to
val &= ~RB_FLAG_MASK;
- ret = (unsigned long)cmpxchg(&list->next,
- val | old_flag, val | new_flag);
+ ret = cmpxchg((unsigned long *)&list->next,
+ val | old_flag, val | new_flag);
/* check if the reader took the page */
if ((ret & ~RB_FLAG_MASK) != val)
val = *ptr & ~RB_FLAG_MASK;
val |= RB_PAGE_HEAD;
- ret = cmpxchg(ptr, val, &new->list);
+ ret = cmpxchg(ptr, val, (unsigned long)&new->list);
return ret == val;
}
cpu_buffer->buffer = buffer;
spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
- cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
+ cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
put_online_cpus();
+ kfree(buffer->buffers);
free_cpumask_var(buffer->cpumask);
kfree(buffer);
struct list_head *p;
unsigned i;
- atomic_inc(&cpu_buffer->record_disabled);
- synchronize_sched();
-
+ spin_lock_irq(&cpu_buffer->reader_lock);
rb_head_page_deactivate(cpu_buffer);
for (i = 0; i < nr_pages; i++) {
return;
rb_reset_cpu(cpu_buffer);
-
rb_check_pages(cpu_buffer);
- atomic_dec(&cpu_buffer->record_disabled);
-
+ spin_unlock_irq(&cpu_buffer->reader_lock);
}
static void
struct list_head *p;
unsigned i;
- atomic_inc(&cpu_buffer->record_disabled);
- synchronize_sched();
-
spin_lock_irq(&cpu_buffer->reader_lock);
rb_head_page_deactivate(cpu_buffer);
list_add_tail(&bpage->list, cpu_buffer->pages);
}
rb_reset_cpu(cpu_buffer);
- spin_unlock_irq(&cpu_buffer->reader_lock);
-
rb_check_pages(cpu_buffer);
- atomic_dec(&cpu_buffer->record_disabled);
+ spin_unlock_irq(&cpu_buffer->reader_lock);
}
/**
* @buffer: the buffer to resize.
* @size: the new size.
*
- * The tracer is responsible for making sure that the buffer is
- * not being used while changing the size.
- * Note: We may be able to change the above requirement by using
- * RCU synchronizations.
- *
* Minimum size is 2 * BUF_PAGE_SIZE.
*
* Returns -1 on failure.
if (size == buffer_size)
return size;
+ atomic_inc(&buffer->record_disabled);
+
+ /* Make sure all writers are done with this buffer. */
+ synchronize_sched();
+
mutex_lock(&buffer->mutex);
get_online_cpus();
put_online_cpus();
mutex_unlock(&buffer->mutex);
+ atomic_dec(&buffer->record_disabled);
+
return size;
free_pages:
}
put_online_cpus();
mutex_unlock(&buffer->mutex);
+ atomic_dec(&buffer->record_disabled);
return -ENOMEM;
/*
out_fail:
put_online_cpus();
mutex_unlock(&buffer->mutex);
+ atomic_dec(&buffer->record_disabled);
return -1;
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);
event->type_len = RINGBUF_TYPE_PADDING;
/* time delta must be non zero */
event->time_delta = 1;
- /* Account for this as an entry */
- local_inc(&tail_page->entries);
- local_inc(&cpu_buffer->entries);
/* Set write to end of buffer */
length = (tail + length) - BUF_PAGE_SIZE;
static struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long length, unsigned long tail,
- struct buffer_page *commit_page,
struct buffer_page *tail_page, u64 *ts)
{
+ struct buffer_page *commit_page = cpu_buffer->commit_page;
struct ring_buffer *buffer = cpu_buffer->buffer;
struct buffer_page *next_page;
int ret;
* Nested commits always have zero deltas, so
* just reread the time stamp
*/
- *ts = rb_time_stamp(buffer, cpu_buffer->cpu);
+ *ts = rb_time_stamp(buffer);
next_page->page->time_stamp = *ts;
}
__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
unsigned type, unsigned long length, u64 *ts)
{
- struct buffer_page *tail_page, *commit_page;
+ struct buffer_page *tail_page;
struct ring_buffer_event *event;
unsigned long tail, write;
- commit_page = cpu_buffer->commit_page;
- /* we just need to protect against interrupts */
- barrier();
tail_page = cpu_buffer->tail_page;
write = local_add_return(length, &tail_page->write);
/* See if we shot pass the end of this buffer page */
if (write > BUF_PAGE_SIZE)
return rb_move_tail(cpu_buffer, length, tail,
- commit_page, tail_page, ts);
+ tail_page, ts);
/* We reserved something on the buffer */
}
static struct ring_buffer_event *
-rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
+rb_reserve_next_event(struct ring_buffer *buffer,
+ struct ring_buffer_per_cpu *cpu_buffer,
unsigned long length)
{
struct ring_buffer_event *event;
rb_start_commit(cpu_buffer);
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
+ /*
+ * Due to the ability to swap a cpu buffer from a buffer
+ * it is possible it was swapped before we committed.
+ * (committing stops a swap). We check for it here and
+ * if it happened, we have to fail the write.
+ */
+ barrier();
+ if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
+ local_dec(&cpu_buffer->committing);
+ local_dec(&cpu_buffer->commits);
+ return NULL;
+ }
+#endif
+
length = rb_calculate_event_length(length);
again:
/*
if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
goto out_fail;
- ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
+ ts = rb_time_stamp(cpu_buffer->buffer);
/*
* Only the first commit can update the timestamp.
if (length > BUF_MAX_DATA_SIZE)
goto out;
- event = rb_reserve_next_event(cpu_buffer, length);
+ event = rb_reserve_next_event(buffer, cpu_buffer, length);
if (!event)
goto out;
}
EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
-static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+static void
+rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event)
{
- local_inc(&cpu_buffer->entries);
-
/*
* The event first in the commit queue updates the
* time stamp.
*/
if (rb_event_is_commit(cpu_buffer, event))
cpu_buffer->write_stamp += event->time_delta;
+}
+static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event)
+{
+ local_inc(&cpu_buffer->entries);
+ rb_update_write_stamp(cpu_buffer, event);
rb_end_commit(cpu_buffer);
}
event->time_delta = 1;
}
-/**
- * ring_buffer_event_discard - discard any event in the ring buffer
- * @event: the event to discard
- *
- * Sometimes a event that is in the ring buffer needs to be ignored.
- * This function lets the user discard an event in the ring buffer
- * and then that event will not be read later.
- *
- * Note, it is up to the user to be careful with this, and protect
- * against races. If the user discards an event that has been consumed
- * it is possible that it could corrupt the ring buffer.
+/*
+ * Decrement the entries to the page that an event is on.
+ * The event does not even need to exist, only the pointer
+ * to the page it is on. This may only be called before the commit
+ * takes place.
*/
-void ring_buffer_event_discard(struct ring_buffer_event *event)
+static inline void
+rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event)
{
- rb_event_discard(event);
+ unsigned long addr = (unsigned long)event;
+ struct buffer_page *bpage = cpu_buffer->commit_page;
+ struct buffer_page *start;
+
+ addr &= PAGE_MASK;
+
+ /* Do the likely case first */
+ if (likely(bpage->page == (void *)addr)) {
+ local_dec(&bpage->entries);
+ return;
+ }
+
+ /*
+ * Because the commit page may be on the reader page we
+ * start with the next page and check the end loop there.
+ */
+ rb_inc_page(cpu_buffer, &bpage);
+ start = bpage;
+ do {
+ if (bpage->page == (void *)addr) {
+ local_dec(&bpage->entries);
+ return;
+ }
+ rb_inc_page(cpu_buffer, &bpage);
+ } while (bpage != start);
+
+ /* commit not part of this buffer?? */
+ RB_WARN_ON(cpu_buffer, 1);
}
-EXPORT_SYMBOL_GPL(ring_buffer_event_discard);
/**
* ring_buffer_commit_discard - discard an event that has not been committed
* @buffer: the ring buffer
* @event: non committed event to discard
*
- * This is similar to ring_buffer_event_discard but must only be
- * performed on an event that has not been committed yet. The difference
- * is that this will also try to free the event from the ring buffer
+ * Sometimes an event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * This function only works if it is called before the the item has been
+ * committed. It will try to free the event from the ring buffer
* if another event has not been added behind it.
*
* If another event has been added behind it, it will set the event
*/
RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
- if (!rb_try_to_discard(cpu_buffer, event))
+ rb_decrement_entry(cpu_buffer, event);
+ if (rb_try_to_discard(cpu_buffer, event))
goto out;
/*
* The commit is still visible by the reader, so we
- * must increment entries.
+ * must still update the timestamp.
*/
- local_inc(&cpu_buffer->entries);
+ rb_update_write_stamp(cpu_buffer, event);
out:
rb_end_commit(cpu_buffer);
if (length > BUF_MAX_DATA_SIZE)
goto out;
- event = rb_reserve_next_event(cpu_buffer, length);
+ event = rb_reserve_next_event(buffer, cpu_buffer, length);
if (!event)
goto out;
EXPORT_SYMBOL_GPL(ring_buffer_entries);
/**
- * ring_buffer_overrun_cpu - get the number of overruns in buffer
+ * ring_buffer_overruns - get the number of overruns in buffer
* @buffer: The ring buffer
*
* Returns the total number of overruns in the ring buffer
iter->read_stamp = cpu_buffer->read_stamp;
else
iter->read_stamp = iter->head_page->page->time_stamp;
+ iter->cache_reader_page = cpu_buffer->reader_page;
+ iter->cache_read = cpu_buffer->read;
}
/**
int ret;
local_irq_save(flags);
- __raw_spin_lock(&cpu_buffer->lock);
+ arch_spin_lock(&cpu_buffer->lock);
again:
/*
* Splice the empty reader page into the list around the head.
*/
reader = rb_set_head_page(cpu_buffer);
- cpu_buffer->reader_page->list.next = reader->list.next;
+ cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
cpu_buffer->reader_page->list.prev = reader->list.prev;
/*
*
* Now make the new head point back to the reader page.
*/
- reader->list.next->prev = &cpu_buffer->reader_page->list;
+ rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
/* Finally update the reader page to the new head */
goto again;
out:
- __raw_spin_unlock(&cpu_buffer->lock);
+ arch_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
return reader;
event = rb_reader_event(cpu_buffer);
- if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX
- || rb_discarded_event(event))
+ if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
cpu_buffer->read++;
rb_update_read_stamp(cpu_buffer, event);
}
static struct ring_buffer_event *
-rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
+rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
{
- struct ring_buffer_per_cpu *cpu_buffer;
struct ring_buffer_event *event;
struct buffer_page *reader;
int nr_loops = 0;
- cpu_buffer = buffer->buffers[cpu];
-
again:
/*
* We repeat when a timestamp is encountered. It is possible
* the box. Return the padding, and we will release
* the current locks, and try again.
*/
- rb_advance_reader(cpu_buffer);
return event;
case RINGBUF_TYPE_TIME_EXTEND:
case RINGBUF_TYPE_DATA:
if (ts) {
*ts = cpu_buffer->read_stamp + event->time_delta;
- ring_buffer_normalize_time_stamp(buffer,
+ ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
cpu_buffer->cpu, ts);
}
return event;
struct ring_buffer_event *event;
int nr_loops = 0;
- if (ring_buffer_iter_empty(iter))
- return NULL;
-
cpu_buffer = iter->cpu_buffer;
buffer = cpu_buffer->buffer;
+ /*
+ * Check if someone performed a consuming read to
+ * the buffer. A consuming read invalidates the iterator
+ * and we need to reset the iterator in this case.
+ */
+ if (unlikely(iter->cache_read != cpu_buffer->read ||
+ iter->cache_reader_page != cpu_buffer->reader_page))
+ rb_iter_reset(iter);
+
again:
+ if (ring_buffer_iter_empty(iter))
+ return NULL;
+
/*
* We repeat when a timestamp is encountered.
* We can get multiple timestamps by nested interrupts or also
if (rb_per_cpu_empty(cpu_buffer))
return NULL;
+ if (iter->head >= local_read(&iter->head_page->page->commit)) {
+ rb_inc_iter(iter);
+ goto again;
+ }
+
event = rb_iter_head_event(iter);
switch (event->type_len) {
* buffer too. A one time deal is all you get from reading
* the ring buffer from an NMI.
*/
- if (likely(!in_nmi() && !oops_in_progress))
+ if (likely(!in_nmi()))
return 1;
tracing_off_permanent();
local_irq_save(flags);
if (dolock)
spin_lock(&cpu_buffer->reader_lock);
- event = rb_buffer_peek(buffer, cpu, ts);
+ event = rb_buffer_peek(cpu_buffer, ts);
+ if (event && event->type_len == RINGBUF_TYPE_PADDING)
+ rb_advance_reader(cpu_buffer);
if (dolock)
spin_unlock(&cpu_buffer->reader_lock);
local_irq_restore(flags);
- if (event && event->type_len == RINGBUF_TYPE_PADDING) {
- cpu_relax();
+ if (event && event->type_len == RINGBUF_TYPE_PADDING)
goto again;
- }
return event;
}
event = rb_iter_peek(iter, ts);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
- if (event && event->type_len == RINGBUF_TYPE_PADDING) {
- cpu_relax();
+ if (event && event->type_len == RINGBUF_TYPE_PADDING)
goto again;
- }
return event;
}
if (dolock)
spin_lock(&cpu_buffer->reader_lock);
- event = rb_buffer_peek(buffer, cpu, ts);
- if (!event)
- goto out_unlock;
-
- rb_advance_reader(cpu_buffer);
+ event = rb_buffer_peek(cpu_buffer, ts);
+ if (event)
+ rb_advance_reader(cpu_buffer);
- out_unlock:
if (dolock)
spin_unlock(&cpu_buffer->reader_lock);
local_irq_restore(flags);
out:
preempt_enable();
- if (event && event->type_len == RINGBUF_TYPE_PADDING) {
- cpu_relax();
+ if (event && event->type_len == RINGBUF_TYPE_PADDING)
goto again;
- }
return event;
}
synchronize_sched();
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
- __raw_spin_lock(&cpu_buffer->lock);
+ arch_spin_lock(&cpu_buffer->lock);
rb_iter_reset(iter);
- __raw_spin_unlock(&cpu_buffer->lock);
+ arch_spin_unlock(&cpu_buffer->lock);
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
return iter;
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
unsigned long flags;
- again:
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+ again:
event = rb_iter_peek(iter, ts);
if (!event)
goto out;
+ if (event->type_len == RINGBUF_TYPE_PADDING)
+ goto again;
+
rb_advance_iter(iter);
out:
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
- if (event && event->type_len == RINGBUF_TYPE_PADDING) {
- cpu_relax();
- goto again;
- }
-
return event;
}
EXPORT_SYMBOL_GPL(ring_buffer_read);
spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
- __raw_spin_lock(&cpu_buffer->lock);
+ if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
+ goto out;
+
+ arch_spin_lock(&cpu_buffer->lock);
rb_reset_cpu(cpu_buffer);
- __raw_spin_unlock(&cpu_buffer->lock);
+ arch_spin_unlock(&cpu_buffer->lock);
+ out:
spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
atomic_dec(&cpu_buffer->record_disabled);
}
EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
/**
* ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
* @buffer_a: One buffer to swap with
atomic_inc(&cpu_buffer_a->record_disabled);
atomic_inc(&cpu_buffer_b->record_disabled);
+ ret = -EBUSY;
+ if (local_read(&cpu_buffer_a->committing))
+ goto out_dec;
+ if (local_read(&cpu_buffer_b->committing))
+ goto out_dec;
+
buffer_a->buffers[cpu] = cpu_buffer_b;
buffer_b->buffers[cpu] = cpu_buffer_a;
cpu_buffer_b->buffer = buffer_a;
cpu_buffer_a->buffer = buffer_b;
+ ret = 0;
+
+out_dec:
atomic_dec(&cpu_buffer_a->record_disabled);
atomic_dec(&cpu_buffer_b->record_disabled);
-
- ret = 0;
out:
return ret;
}
EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
+#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
/**
* ring_buffer_alloc_read_page - allocate a page to read from buffer