Ring buffer backend, with page allocation, data read/write API, cpu hotplug management. Signed-off-by: Mathieu Desnoyers --- include/linux/ringbuffer/backend.h | 141 +++++ include/linux/ringbuffer/backend_internal.h | 418 +++++++++++++++ include/linux/ringbuffer/backend_types.h | 80 ++ lib/ringbuffer/Makefile | 1 lib/ringbuffer/ring_buffer_backend.c | 755 ++++++++++++++++++++++++++++ 5 files changed, 1395 insertions(+) Index: linux.trees.git/lib/ringbuffer/Makefile =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux.trees.git/lib/ringbuffer/Makefile 2010-07-09 18:09:01.000000000 -0400 @@ -0,0 +1 @@ +obj-y += ring_buffer_backend.o Index: linux.trees.git/include/linux/ringbuffer/backend.h =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux.trees.git/include/linux/ringbuffer/backend.h 2010-07-09 18:11:39.000000000 -0400 @@ -0,0 +1,141 @@ +#ifndef _LINUX_RING_BUFFER_BACKEND_H +#define _LINUX_RING_BUFFER_BACKEND_H + +/* + * linux/ringbuffer/backend.h + * + * Copyright (C) 2008-2010 - Mathieu Desnoyers + * + * Ring buffer backend (API). + * + * Dual LGPL v2.1/GPL v2 license. + * + * Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by + * the reader in flight recorder mode. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +/* Internal helpers */ +#include +#include + +/* Ring buffer backend API */ + +/* Ring buffer backend access (read/write) */ + +extern size_t ring_buffer_read(struct ring_buffer_backend *bufb, + size_t offset, void *dest, size_t len); + +extern int __ring_buffer_copy_to_user(struct ring_buffer_backend *bufb, + size_t offset, void __user *dest, + size_t len); + +extern int ring_buffer_read_cstr(struct ring_buffer_backend *bufb, + size_t offset, void *dest, size_t len); + +extern struct page ** +ring_buffer_read_get_page(struct ring_buffer_backend *bufb, size_t offset, + void ***virt); + +/* + * Return the address where a given offset is located. + * Should be used to get the current subbuffer header pointer. Given we know + * it's never on a page boundary, it's safe to write directly to this address, + * as long as the write is never bigger than a page size. + */ +extern void * +ring_buffer_offset_address(struct ring_buffer_backend *bufb, + size_t offset); +extern void * +ring_buffer_read_offset_address(struct ring_buffer_backend *bufb, + size_t offset); + +/** + * ring_buffer_write - write data to a buffer backend + * @config : ring buffer instance configuration + * @ctx: ring buffer context. (input arguments only) + * @src : source pointer to copy from + * @len : length of data to copy + * + * This function copies "len" bytes of data from a source pointer to a buffer + * backend, at the current context offset. This is more or less a buffer + * backend-specific memcpy() operation. Calls the slow path (_ring_buffer_write) + * if copy is crossing a page boundary. + */ +static inline +void ring_buffer_write(const struct ring_buffer_config *config, + struct ring_buffer_ctx *ctx, + const void *src, size_t len) +{ + struct ring_buffer_backend *bufb = &ctx->buf->backend; + struct channel_backend *chanb = &ctx->chan->backend; + size_t sbidx, index; + size_t offset = ctx->buf_offset; + ssize_t pagecpy; + struct ring_buffer_backend_pages *rpages; + unsigned long sb_bindex, id; + + offset &= chanb->buf_size - 1; + sbidx = offset >> chanb->subbuf_size_order; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK); + id = bufb->buf_wsb[sbidx].id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(ctx->chan, + config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + if (likely(pagecpy == len)) + ring_buffer_do_copy(config, + rpages->p[index].virt + + (offset & ~PAGE_MASK), + src, len); + else + _ring_buffer_write(bufb, offset, src, len, 0); + ctx->buf_offset += len; +} + +/* + * This accessor counts the number of unread records in a buffer. + * It only provides a consistent value if no reads not writes are performed + * concurrently. + */ +static inline +unsigned long ring_buffer_get_records_unread( + const struct ring_buffer_config *config, + struct ring_buffer *buf) +{ + struct ring_buffer_backend *bufb = &buf->backend; + struct ring_buffer_backend_pages *pages; + unsigned long records_unread = 0, sb_bindex, id; + unsigned int i; + + for (i = 0; i < bufb->chan->backend.num_subbuf; i++) { + id = bufb->buf_wsb[i].id; + sb_bindex = subbuffer_id_get_index(config, id); + pages = bufb->array[sb_bindex]; + records_unread += v_read(config, &pages->records_unread); + } + if (config->mode == RING_BUFFER_OVERWRITE) { + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + pages = bufb->array[sb_bindex]; + records_unread += v_read(config, &pages->records_unread); + } + return records_unread; +} + +ssize_t ring_buffer_file_splice_read(struct file *in, loff_t *ppos, + struct pipe_inode_info *pipe, size_t len, + unsigned int flags); +loff_t ring_buffer_no_llseek(struct file *file, loff_t offset, int origin); + +#endif /* _LINUX_RING_BUFFER_BACKEND_H */ Index: linux.trees.git/include/linux/ringbuffer/backend_internal.h =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux.trees.git/include/linux/ringbuffer/backend_internal.h 2010-07-09 18:11:58.000000000 -0400 @@ -0,0 +1,418 @@ +#ifndef _LINUX_RING_BUFFER_BACKEND_INTERNAL_H +#define _LINUX_RING_BUFFER_BACKEND_INTERNAL_H + +/* + * linux/ringbuffer/backend_internal.h + * + * Copyright (C) 2008-2010 - Mathieu Desnoyers + * + * Ring buffer backend (internal helpers). + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include +#include +#include + +/* Ring buffer backend API presented to the frontend */ + +/* Ring buffer and channel backend create/free */ + +int ring_buffer_backend_create(struct ring_buffer_backend *bufb, + struct channel_backend *chan, + int cpu); +void channel_backend_unregister_notifiers(struct channel_backend *chanb); +void ring_buffer_backend_free(struct ring_buffer_backend *bufb); +int channel_backend_init(struct channel_backend *chanb, + const char *name, + const struct ring_buffer_config *config, + void *priv, size_t subbuf_size, + size_t num_subbuf); +void channel_backend_free(struct channel_backend *chanb); + +void ring_buffer_backend_reset(struct ring_buffer_backend *bufb); +void channel_backend_reset(struct channel_backend *chanb); + +int ring_buffer_backend_init(void); +void ring_buffer_backend_exit(void); + +extern void _ring_buffer_write(struct ring_buffer_backend *bufb, + size_t offset, const void *src, size_t len, + ssize_t pagecpy); + +/* + * Subbuffer ID bits for overwrite mode. Need to fit within a single word to be + * exchanged atomically. + * + * Top half word, except lowest bit, belongs to "offset", which is used to keep + * to count the produced buffers. For overwrite mode, this provides the + * consumer with the capacity to read subbuffers in order, handling the + * situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit + * systems) concurrently with a single execution of get_subbuf (between offset + * sampling and subbuffer ID exchange). + */ + +#define HALF_ULONG_BITS (BITS_PER_LONG >> 1) + +#define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1) +#define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT) +#define SB_ID_OFFSET_MASK (~(SB_ID_OFFSET_COUNT - 1)) +/* + * Lowest bit of top word half belongs to noref. Used only for overwrite mode. + */ +#define SB_ID_NOREF_SHIFT (SB_ID_OFFSET_SHIFT - 1) +#define SB_ID_NOREF_COUNT (1UL << SB_ID_NOREF_SHIFT) +#define SB_ID_NOREF_MASK SB_ID_NOREF_COUNT +/* + * In overwrite mode: lowest half of word is used for index. + * Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit. + * In producer-consumer mode: whole word used for index. + */ +#define SB_ID_INDEX_SHIFT 0 +#define SB_ID_INDEX_COUNT (1UL << SB_ID_INDEX_SHIFT) +#define SB_ID_INDEX_MASK (SB_ID_NOREF_COUNT - 1) + +/* + * Construct the subbuffer id from offset, index and noref. Use only the index + * for producer-consumer mode (offset and noref are only used in overwrite + * mode). + */ +static inline +unsigned long subbuffer_id(const struct ring_buffer_config *config, + unsigned long offset, unsigned long noref, + unsigned long index) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return (offset << SB_ID_OFFSET_SHIFT) + | (noref << SB_ID_NOREF_SHIFT) + | index; + else + return index; +} + +/* + * Compare offset with the offset contained within id. Return 1 if the offset + * bits are identical, else 0. + */ +static inline +int subbuffer_id_compare_offset(const struct ring_buffer_config *config, + unsigned long id, unsigned long offset) +{ + return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT); +} + +static inline +unsigned long subbuffer_id_get_index(const struct ring_buffer_config *config, + unsigned long id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return id & SB_ID_INDEX_MASK; + else + return id; +} + +static inline +unsigned long subbuffer_id_is_noref(const struct ring_buffer_config *config, + unsigned long id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return !!(id & SB_ID_NOREF_MASK); + else + return 1; +} + +/* + * Only used by reader on subbuffer ID it has exclusive access to. No volatile + * needed. + */ +static inline +void subbuffer_id_set_noref(const struct ring_buffer_config *config, + unsigned long *id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + *id |= SB_ID_NOREF_MASK; +} + +static inline +void subbuffer_id_set_noref_offset(const struct ring_buffer_config *config, + unsigned long *id, unsigned long offset) +{ + unsigned long tmp; + + if (config->mode == RING_BUFFER_OVERWRITE) { + tmp = *id; + tmp &= ~SB_ID_OFFSET_MASK; + tmp |= offset << SB_ID_OFFSET_SHIFT; + tmp |= SB_ID_NOREF_MASK; + /* Volatile store, read concurrently by readers. */ + ACCESS_ONCE(*id) = tmp; + } +} + +/* No volatile access, since already used locally */ +static inline +void subbuffer_id_clear_noref(const struct ring_buffer_config *config, + unsigned long *id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + *id &= ~SB_ID_NOREF_MASK; +} + +/* + * For overwrite mode, cap the number of subbuffers per buffer to: + * 2^16 on 32-bit architectures + * 2^32 on 64-bit architectures + * This is required to fit in the index part of the ID. Return 0 on success, + * -EPERM on failure. + */ +static inline +int subbuffer_id_check_index(const struct ring_buffer_config *config, + unsigned long num_subbuf) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0; + else + return 0; +} + +static inline +void subbuffer_count_record(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + v_inc(config, &bufb->array[sb_bindex]->records_commit); +} + +/* + * Reader has exclusive subbuffer access for record consumption. No need to + * perform the decrement atomically. + */ +static inline +void subbuffer_consume_record(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); + CHAN_WARN_ON(bufb->chan, + !v_read(config, &bufb->array[sb_bindex]->records_unread)); + /* Non-atomic decrement protected by exclusive subbuffer access */ + _v_dec(config, &bufb->array[sb_bindex]->records_unread); + v_inc(config, &bufb->records_read); +} + +static inline +unsigned long subbuffer_get_records_count( + const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + return v_read(config, &bufb->array[sb_bindex]->records_commit); +} + +/* + * Must be executed at subbuffer delivery when the writer has _exclusive_ + * subbuffer access. See ring_buffer_check_deliver() for details. + * ring_buffer_get_records_count() must be called to get the records count + * before this function, because it resets the records_commit count. + */ +static inline +unsigned long subbuffer_count_records_overrun( + const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx) +{ + struct ring_buffer_backend_pages *pages; + unsigned long overruns, sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + overruns = v_read(config, &pages->records_unread); + v_set(config, &pages->records_unread, + v_read(config, &pages->records_commit)); + v_set(config, &pages->records_commit, 0); + + return overruns; +} + +static inline +void subbuffer_set_data_size(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx, + unsigned long data_size) +{ + struct ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + pages->data_size = data_size; +} + +static inline +unsigned long subbuffer_get_read_data_size( + const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb) +{ + struct ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); + pages = bufb->array[sb_bindex]; + return pages->data_size; +} + +static inline +unsigned long subbuffer_get_data_size( + const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx) +{ + struct ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + return pages->data_size; +} + +/** + * ring_buffer_clear_noref - Clear the noref subbuffer flag, called by writer. + */ +static inline +void ring_buffer_clear_noref(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long id, new_id; + + if (config->mode != RING_BUFFER_OVERWRITE) + return; + + /* + * Performing a volatile access to read the sb_pages, because we want to + * read a coherent version of the pointer and the associated noref flag. + */ + id = ACCESS_ONCE(bufb->buf_wsb[idx].id); + for (;;) { + /* This check is called on the fast path for each record. */ + if (likely(!subbuffer_id_is_noref(config, id))) { + /* + * Store after load dependency ordering the writes to + * the subbuffer after load and test of the noref flag + * matches the memory barrier implied by the cmpxchg() + * in update_read_sb_index(). + */ + return; /* Already writing to this buffer */ + } + new_id = id; + subbuffer_id_clear_noref(config, &new_id); + new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id); + if (likely(new_id == id)) + break; + id = new_id; + } +} + +/** + * ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset, + * called by writer. + */ +static inline +void ring_buffer_set_noref_offset(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + unsigned long idx, + unsigned long offset) +{ + if (config->mode != RING_BUFFER_OVERWRITE) + return; + + /* + * Because ring_buffer_set_noref() is only called by a single thread + * (the one which updated the cc_sb value), there are no concurrent + * updates to take care of: other writers have not updated cc_sb, so + * they cannot set the noref flag, and concurrent readers cannot modify + * the pointer because the noref flag is not set yet. + * The smp_wmb() in ring_buffer_commit() takes care of ordering writes + * to the subbuffer before this set noref operation. + * subbuffer_set_noref() uses a volatile store to deal with concurrent + * readers of the noref flag. + */ + CHAN_WARN_ON(bufb->chan, + subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id)); + /* + * Memory barrier that ensures counter stores are ordered before set + * noref and offset. + */ + smp_mb(); + subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset); +} + +/** + * update_read_sb_index - Read-side subbuffer index update. + */ +static inline +int update_read_sb_index(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + struct channel_backend *chanb, + unsigned long consumed_idx, + unsigned long consumed_count) +{ + unsigned long old_id, new_id; + + if (config->mode == RING_BUFFER_OVERWRITE) { + /* + * Exchange the target writer subbuffer with our own unused + * subbuffer. No need to use ACCESS_ONCE() here to read the + * old_wpage, because the value read will be confirmed by the + * following cmpxchg(). + */ + old_id = bufb->buf_wsb[consumed_idx].id; + if (unlikely(!subbuffer_id_is_noref(config, old_id))) + return -EAGAIN; + /* + * Make sure the offset count we are expecting matches the one + * indicated by the writer. + */ + if (unlikely(!subbuffer_id_compare_offset(config, old_id, + consumed_count))) + return -EAGAIN; + CHAN_WARN_ON(bufb->chan, + !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); + new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id, + bufb->buf_rsb.id); + if (unlikely(old_id != new_id)) + return -EAGAIN; + bufb->buf_rsb.id = new_id; + subbuffer_id_clear_noref(config, &bufb->buf_rsb.id); + } else { + /* No page exchange, use the writer page directly */ + bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id; + subbuffer_id_clear_noref(config, &bufb->buf_rsb.id); + } + return 0; +} + +/* + * Use the architecture-specific memcpy implementation for constant-sized + * inputs, but rely on an inline memcpy for length statically unknown. + * The function call to memcpy is just way too expensive for a fast path. + */ +#define ring_buffer_do_copy(config, dest, src, len) \ +do { \ + size_t __len = (len); \ + if (__builtin_constant_p(len)) \ + memcpy((dest), (src), __len); \ + else \ + inline_memcpy((dest), (src), __len); \ +} while (0) + +#endif /* _LINUX_RING_BUFFER_BACKEND_INTERNAL_H */ Index: linux.trees.git/lib/ringbuffer/ring_buffer_backend.c =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux.trees.git/lib/ringbuffer/ring_buffer_backend.c 2010-07-09 18:13:38.000000000 -0400 @@ -0,0 +1,755 @@ +/* + * ring_buffer_backend.c + * + * Copyright (C) 2005-2010 - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/** + * ring_buffer_backend_allocate - allocate a channel buffer + * @config: ring buffer instance configuration + * @buf: the buffer struct + * @size: total size of the buffer + * @num_subbuf: number of subbuffers + * @extra_reader_sb: need extra subbuffer for reader + */ +static +int ring_buffer_backend_allocate(const struct ring_buffer_config *config, + struct ring_buffer_backend *bufb, + size_t size, size_t num_subbuf, + int extra_reader_sb) +{ + struct channel_backend *chanb = &bufb->chan->backend; + unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0; + unsigned long subbuf_size, mmap_offset = 0; + unsigned long num_subbuf_alloc; + struct page **pages; + void **virt; + unsigned long i; + + num_pages = size >> PAGE_SHIFT; + num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf); + subbuf_size = chanb->subbuf_size; + num_subbuf_alloc = num_subbuf; + + if (extra_reader_sb) { + num_pages += num_pages_per_subbuf; /* Add pages for reader */ + num_subbuf_alloc++; + } + + pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + if (unlikely(!pages)) + goto pages_error; + + virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + if (unlikely(!virt)) + goto virt_error; + + bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array) + * num_subbuf_alloc, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + if (unlikely(!bufb->array)) + goto array_error; + + for (i = 0; i < num_pages; i++) { + pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)), + GFP_KERNEL | __GFP_ZERO, 0); + if (unlikely(!pages[i])) + goto depopulate; + virt[i] = page_address(pages[i]); + } + bufb->num_pages_per_subbuf = num_pages_per_subbuf; + + /* Allocate backend pages array elements */ + for (i = 0; i < num_subbuf_alloc; i++) { + bufb->array[i] = + kzalloc_node(ALIGN( + sizeof(struct ring_buffer_backend_pages) + + sizeof(struct ring_buffer_backend_page) + * num_pages_per_subbuf, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + if (!bufb->array[i]) + goto free_array; + } + + /* Allocate write-side subbuffer table */ + bufb->buf_wsb = kzalloc_node(ALIGN( + sizeof(struct ring_buffer_backend_subbuffer) + * num_subbuf, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + if (unlikely(!bufb->buf_wsb)) + goto free_array; + + for (i = 0; i < num_subbuf; i++) + bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i); + + /* Assign read-side subbuffer table */ + if (extra_reader_sb) + bufb->buf_rsb.id = subbuffer_id(config, 0, 1, + num_subbuf_alloc - 1); + else + bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0); + + /* Assign pages to page index */ + for (i = 0; i < num_subbuf_alloc; i++) { + for (j = 0; j < num_pages_per_subbuf; j++) { + CHAN_WARN_ON(chanb, page_idx > num_pages); + bufb->array[i]->p[j].virt = virt[page_idx]; + bufb->array[i]->p[j].page = pages[page_idx]; + page_idx++; + } + if (config->output == RING_BUFFER_MMAP) { + bufb->array[i]->mmap_offset = mmap_offset; + mmap_offset += subbuf_size; + } + } + + /* + * If kmalloc ever uses vmalloc underneath, make sure the buffer pages + * will not fault. + */ + vmalloc_sync_all(); + kfree(virt); + kfree(pages); + return 0; + +free_array: + for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++) + kfree(bufb->array[i]); +depopulate: + /* Free all allocated pages */ + for (i = 0; (i < num_pages && pages[i]); i++) + __free_page(pages[i]); + kfree(bufb->array); +array_error: + kfree(virt); +virt_error: + kfree(pages); +pages_error: + return -ENOMEM; +} + +int ring_buffer_backend_create(struct ring_buffer_backend *bufb, + struct channel_backend *chanb, + int cpu) +{ + const struct ring_buffer_config *config = chanb->config; + + bufb->chan = container_of(chanb, struct channel, backend); + bufb->cpu = cpu; + + return ring_buffer_backend_allocate(config, bufb, chanb->buf_size, + chanb->num_subbuf, + chanb->extra_reader_sb); +} + +void ring_buffer_backend_free(struct ring_buffer_backend *bufb) +{ + struct channel_backend *chanb = &bufb->chan->backend; + unsigned long i, j, num_subbuf_alloc; + + num_subbuf_alloc = chanb->num_subbuf; + if (chanb->extra_reader_sb) + num_subbuf_alloc++; + + kfree(bufb->buf_wsb); + for (i = 0; i < num_subbuf_alloc; i++) { + for (j = 0; j < bufb->num_pages_per_subbuf; j++) + __free_page(bufb->array[i]->p[j].page); + kfree(bufb->array[i]); + } + kfree(bufb->array); + bufb->allocated = 0; +} + +void ring_buffer_backend_reset(struct ring_buffer_backend *bufb) +{ + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + unsigned long num_subbuf_alloc; + unsigned int i; + + num_subbuf_alloc = chanb->num_subbuf; + if (chanb->extra_reader_sb) + num_subbuf_alloc++; + + for (i = 0; i < chanb->num_subbuf; i++) + bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i); + if (chanb->extra_reader_sb) + bufb->buf_rsb.id = subbuffer_id(config, 0, 1, + num_subbuf_alloc - 1); + else + bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0); + + for (i = 0; i < num_subbuf_alloc; i++) { + /* Don't reset mmap_offset */ + v_set(config, &bufb->array[i]->records_commit, 0); + v_set(config, &bufb->array[i]->records_unread, 0); + bufb->array[i]->data_size = 0; + /* Don't reset backend page and virt addresses */ + } + /* Don't reset num_pages_per_subbuf, cpu, allocated */ + v_set(config, &bufb->records_read, 0); +} + +/* + * The frontend is responsible for also calling ring_buffer_backend_reset for + * each buffer when calling channel_backend_reset. + */ +void channel_backend_reset(struct channel_backend *chanb) +{ + struct channel *chan = container_of(chanb, struct channel, backend); + const struct ring_buffer_config *config = chanb->config; + + /* + * Don't reset buf_size, subbuf_size, subbuf_size_order, + * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf, + * priv, notifiers, config, cpumask and name. + */ + chanb->start_tsc = config->cb.ring_buffer_clock_read(chan); +} + +#ifdef CONFIG_HOTPLUG_CPU +/** + * ring_buffer_cpu_hp_callback - CPU hotplug callback + * @nb: notifier block + * @action: hotplug action to take + * @hcpu: CPU number + * + * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) + */ +static +int __cpuinit ring_buffer_cpu_hp_callback(struct notifier_block *nb, + unsigned long action, + void *hcpu) +{ + unsigned int cpu = (unsigned long)hcpu; + struct channel_backend *chanb = container_of(nb, struct channel_backend, + cpu_hp_notifier); + const struct ring_buffer_config *config = chanb->config; + struct ring_buffer *buf; + int ret; + + CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL); + + switch (action) { + case CPU_UP_PREPARE: + case CPU_UP_PREPARE_FROZEN: + buf = per_cpu_ptr(chanb->buf, cpu); + ret = ring_buffer_create(buf, chanb, cpu); + if (ret) { + printk(KERN_ERR + "ring_buffer_cpu_hp_callback: cpu %d " + "buffer creation failed\n", cpu); + return NOTIFY_BAD; + } + break; + case CPU_DEAD: + case CPU_DEAD_FROZEN: + /* No need to do a buffer switch here, because it will happen + * when tracing is stopped, or will be done by switch timer CPU + * DEAD callback. */ + break; + } + return NOTIFY_OK; +} +#endif + +/** + * channel_backend_init - initialize a channel backend + * @chanb: channel backend + * @name: channel name + * @config: client ring buffer configuration + * @priv: client private data + * @parent: dentry of parent directory, %NULL for root directory + * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2) + * @num_subbuf: number of sub-buffers (power of 2) + * + * Returns channel pointer if successful, %NULL otherwise. + * + * Creates per-cpu channel buffers using the sizes and attributes + * specified. The created channel buffer files will be named + * name_0...name_N-1. File permissions will be %S_IRUSR. + * + * Called with CPU hotplug disabled. + */ +int channel_backend_init(struct channel_backend *chanb, + const char *name, + const struct ring_buffer_config *config, + void *priv, size_t subbuf_size, + size_t num_subbuf) +{ + struct channel *chan = container_of(chanb, struct channel, backend); + unsigned int i; + int ret; + + if (!name) + return -EPERM; + + if (!(subbuf_size && num_subbuf)) + return -EPERM; + + /* Check that the subbuffer size is larger than a page. */ + CHAN_WARN_ON(chanb, subbuf_size < PAGE_SIZE); + + /* + * Make sure the number of subbuffers and subbuffer size are power of 2. + */ + CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1); + CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1); + + ret = subbuffer_id_check_index(config, num_subbuf); + if (ret) + return ret; + + chanb->priv = priv; + chanb->buf_size = num_subbuf * subbuf_size; + chanb->subbuf_size = subbuf_size; + chanb->buf_size_order = get_count_order(chanb->buf_size); + chanb->subbuf_size_order = get_count_order(subbuf_size); + chanb->num_subbuf_order = get_count_order(num_subbuf); + chanb->extra_reader_sb = + (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0; + chanb->num_subbuf = num_subbuf; + strlcpy(chanb->name, name, NAME_MAX); + chanb->config = config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL)) + return -ENOMEM; + } + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + /* Allocating the buffer per-cpu structures */ + chanb->buf = alloc_percpu(struct ring_buffer); + if (!chanb->buf) + goto free_cpumask; + + /* + * In case of non-hotplug cpu, if the ring-buffer is allocated + * in early initcall, it will not be notified of secondary cpus. + * In that off case, we need to allocate for all possible cpus. + */ +#ifdef CONFIG_HOTPLUG_CPU + /* + * buf->backend.allocated test takes care of concurrent CPU + * hotplug. + * Priority higher than frontend, so we create the ring buffer + * before we start the timer. + */ + chanb->cpu_hp_notifier.notifier_call = + ring_buffer_cpu_hp_callback; + chanb->cpu_hp_notifier.priority = 5; + register_hotcpu_notifier(&chanb->cpu_hp_notifier); + + get_online_cpus(); + for_each_online_cpu(i) { + ret = ring_buffer_create(per_cpu_ptr(chanb->buf, i), + chanb, i); + if (ret) + goto free_bufs; /* cpu hotplug locked */ + } + put_online_cpus(); +#else + for_each_possible_cpu(i) { + ret = ring_buffer_create(per_cpu_ptr(chanb->buf, i), + chanb, i); + if (ret) + goto free_bufs; /* cpu hotplug locked */ + } +#endif + } else { + chanb->buf = kzalloc(sizeof(struct ring_buffer), GFP_KERNEL); + if (!chanb->buf) + goto free_cpumask; + ret = ring_buffer_create(chanb->buf, chanb, -1); + if (ret) + goto free_bufs; + } + chanb->start_tsc = config->cb.ring_buffer_clock_read(chan); + + return 0; + +free_bufs: + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + for_each_possible_cpu(i) { + struct ring_buffer *buf = per_cpu_ptr(chanb->buf, i); + + if (!buf->backend.allocated) + continue; + ring_buffer_free(buf); + } +#ifdef CONFIG_HOTPLUG_CPU + put_online_cpus(); +#endif + free_percpu(chanb->buf); + } else + kfree(chanb->buf); +free_cpumask: + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + free_cpumask_var(chanb->cpumask); + return -ENOMEM; +} + +/** + * channel_backend_unregister_notifiers - unregister notifiers + * @chan: the channel + * + * Holds CPU hotplug. + */ +void channel_backend_unregister_notifiers(struct channel_backend *chanb) +{ + const struct ring_buffer_config *config = chanb->config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + unregister_hotcpu_notifier(&chanb->cpu_hp_notifier); +} + +/** + * channel_backend_free - destroy the channel + * @chan: the channel + * + * Destroy all channel buffers and frees the channel. + */ +void channel_backend_free(struct channel_backend *chanb) +{ + const struct ring_buffer_config *config = chanb->config; + unsigned int i; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + for_each_possible_cpu(i) { + struct ring_buffer *buf = per_cpu_ptr(chanb->buf, i); + + if (!buf->backend.allocated) + continue; + ring_buffer_free(buf); + } + free_cpumask_var(chanb->cpumask); + free_percpu(chanb->buf); + } else { + struct ring_buffer *buf = chanb->buf; + + CHAN_WARN_ON(chanb, !buf->backend.allocated); + ring_buffer_free(buf); + kfree(buf); + } +} + +/** + * ring_buffer_write - write data to a ring_buffer buffer. + * @bufb : buffer backend + * @offset : offset within the buffer + * @src : source address + * @len : length to write + * @pagecpy : page size copied so far + */ +void _ring_buffer_write(struct ring_buffer_backend *bufb, size_t offset, + const void *src, size_t len, ssize_t pagecpy) +{ + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + size_t sbidx, index; + struct ring_buffer_backend_pages *rpages; + unsigned long sb_bindex, id; + + do { + len -= pagecpy; + src += pagecpy; + offset += pagecpy; + sbidx = offset >> chanb->subbuf_size_order; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + + /* + * Underlying layer should never ask for writes across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); + id = bufb->buf_wsb[sbidx].id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + ring_buffer_do_copy(config, + rpages->p[index].virt + + (offset & ~PAGE_MASK), + src, pagecpy); + } while (unlikely(len != pagecpy)); +} +EXPORT_SYMBOL_GPL(_ring_buffer_write); + +/** + * ring_buffer_read - read data from ring_buffer_buffer. + * @bufb : buffer backend + * @offset : offset within the buffer + * @dest : destination address + * @len : length to copy to destination + * + * Should be protected by get_subbuf/put_subbuf. + * Returns the length copied. + */ +size_t ring_buffer_read(struct ring_buffer_backend *bufb, size_t offset, + void *dest, size_t len) +{ + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + size_t index; + ssize_t pagecpy, orig_len; + struct ring_buffer_backend_pages *rpages; + unsigned long sb_bindex, id; + + orig_len = len; + offset &= chanb->buf_size - 1; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + if (unlikely(!len)) + return 0; + for (;;) { + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK), + pagecpy); + len -= pagecpy; + if (likely(!len)) + break; + dest += pagecpy; + offset += pagecpy; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + /* + * Underlying layer should never ask for reads across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + } + return orig_len; +} +EXPORT_SYMBOL_GPL(ring_buffer_read); + +/** + * ring_buffer_copy_to_user - read data from ring_buffer_buffer to userspace + * @bufb : buffer backend + * @offset : offset within the buffer + * @dest : destination userspace address + * @len : length to copy to destination + * + * Should be protected by get_subbuf/put_subbuf. + * access_ok() must have been performed on dest addresses prior to call this + * function. + * Returns -EFAULT on error, 0 if ok. + */ +int __ring_buffer_copy_to_user(struct ring_buffer_backend *bufb, + size_t offset, void __user *dest, size_t len) +{ + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + size_t index; + ssize_t pagecpy, orig_len; + struct ring_buffer_backend_pages *rpages; + unsigned long sb_bindex, id; + + orig_len = len; + offset &= chanb->buf_size - 1; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + if (unlikely(!len)) + return 0; + for (;;) { + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + if (__copy_to_user(dest, + rpages->p[index].virt + (offset & ~PAGE_MASK), + pagecpy)) + return -EFAULT; + len -= pagecpy; + if (likely(!len)) + break; + dest += pagecpy; + offset += pagecpy; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + /* + * Underlying layer should never ask for reads across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + } + return 0; +} +EXPORT_SYMBOL_GPL(__ring_buffer_copy_to_user); + +/** + * ring_buffer_read_cstr - read a C-style string from ring_buffer_buffer. + * @bufb : buffer backend + * @offset : offset within the buffer + * @dest : destination address + * @len : destination's length + * + * return string's length + * Should be protected by get_subbuf/put_subbuf. + */ +int ring_buffer_read_cstr(struct ring_buffer_backend *bufb, size_t offset, + void *dest, size_t len) +{ + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + size_t index; + ssize_t pagecpy, pagelen, strpagelen, orig_offset; + char *str; + struct ring_buffer_backend_pages *rpages; + unsigned long sb_bindex, id; + + offset &= chanb->buf_size - 1; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + orig_offset = offset; + for (;;) { + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK); + pagelen = PAGE_SIZE - (offset & ~PAGE_MASK); + strpagelen = strnlen(str, pagelen); + if (len) { + pagecpy = min_t(size_t, len, strpagelen); + if (dest) { + memcpy(dest, str, pagecpy); + dest += pagecpy; + } + len -= pagecpy; + } + offset += strpagelen; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + if (strpagelen < pagelen) + break; + /* + * Underlying layer should never ask for reads across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + } + if (dest && len) + ((char *)dest)[0] = 0; + return offset - orig_offset; +} +EXPORT_SYMBOL_GPL(ring_buffer_read_cstr); + +/** + * ring_buffer_read_get_page - Get a whole page to read from + * @bufb : buffer backend + * @offset : offset within the buffer + * @virt : pointer to page address (output) + * + * Should be protected by get_subbuf/put_subbuf. + * Returns the pointer to the page struct pointer. + */ +struct page **ring_buffer_read_get_page(struct ring_buffer_backend *bufb, + size_t offset, void ***virt) +{ + size_t index; + struct ring_buffer_backend_pages *rpages; + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + unsigned long sb_bindex, id; + + offset &= chanb->buf_size - 1; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + *virt = &rpages->p[index].virt; + return &rpages->p[index].page; +} +EXPORT_SYMBOL_GPL(ring_buffer_read_get_page); + +/** + * ring_buffer_read_offset_address - get address of a location within the buffer + * @bufb : buffer backend + * @offset : offset within the buffer. + * + * Return the address where a given offset is located (for read). + * Should be used to get the current subbuffer header pointer. Given we know + * it's never on a page boundary, it's safe to write directly to this address, + * as long as the write is never bigger than a page size. + */ +void *ring_buffer_read_offset_address(struct ring_buffer_backend *bufb, + size_t offset) +{ + size_t index; + struct ring_buffer_backend_pages *rpages; + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + unsigned long sb_bindex, id; + + offset &= chanb->buf_size - 1; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + return rpages->p[index].virt + (offset & ~PAGE_MASK); +} +EXPORT_SYMBOL_GPL(ring_buffer_read_offset_address); + +/** + * ring_buffer_offset_address - get address of a location within the buffer + * @bufb : buffer backend + * @offset : offset within the buffer. + * + * Return the address where a given offset is located. + * Should be used to get the current subbuffer header pointer. Given we know + * it's always at the beginning of a page, it's safe to write directly to this + * address, as long as the write is never bigger than a page size. + */ +void *ring_buffer_offset_address(struct ring_buffer_backend *bufb, + size_t offset) +{ + size_t sbidx, index; + struct ring_buffer_backend_pages *rpages; + struct channel_backend *chanb = &bufb->chan->backend; + const struct ring_buffer_config *config = chanb->config; + unsigned long sb_bindex, id; + + offset &= chanb->buf_size - 1; + sbidx = offset >> chanb->subbuf_size_order; + index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + id = bufb->buf_wsb[sbidx].id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + return rpages->p[index].virt + (offset & ~PAGE_MASK); +} +EXPORT_SYMBOL_GPL(ring_buffer_offset_address); Index: linux.trees.git/include/linux/ringbuffer/backend_types.h =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux.trees.git/include/linux/ringbuffer/backend_types.h 2010-07-09 18:09:01.000000000 -0400 @@ -0,0 +1,80 @@ +#ifndef _LINUX_RING_BUFFER_BACKEND_TYPES_H +#define _LINUX_RING_BUFFER_BACKEND_TYPES_H + +/* + * linux/ringbuffer/backend_types.h + * + * Copyright (C) 2008-2010 - Mathieu Desnoyers + * + * Ring buffer backend (types). + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include + +struct ring_buffer_backend_page { + void *virt; /* page virtual address (cached) */ + struct page *page; /* pointer to page structure */ +}; + +struct ring_buffer_backend_pages { + unsigned long mmap_offset; /* offset of the subbuffer in mmap */ + union v_atomic records_commit; /* current records committed count */ + union v_atomic records_unread; /* records to read */ + unsigned long data_size; /* Amount of data to read from subbuf */ + struct ring_buffer_backend_page p[]; +}; + +struct ring_buffer_backend_subbuffer { + /* Identifier for subbuf backend pages. Exchanged atomically. */ + unsigned long id; /* backend subbuffer identifier */ +}; + +/* + * Forward declaration of frontend-specific channel and ring_buffer. + */ +struct channel; +struct ring_buffer; + +struct ring_buffer_backend { + /* Array of ring_buffer_backend_subbuffer for writer */ + struct ring_buffer_backend_subbuffer *buf_wsb; + /* ring_buffer_backend_subbuffer for reader */ + struct ring_buffer_backend_subbuffer buf_rsb; + /* + * Pointer array of backend pages, for whole buffer. + * Indexed by ring_buffer_backend_subbuffer identifier (id) index. + */ + struct ring_buffer_backend_pages **array; + unsigned int num_pages_per_subbuf; + + struct channel *chan; /* Associated channel */ + int cpu; /* This buffer's cpu. -1 if global. */ + union v_atomic records_read; /* Number of records read */ + unsigned int allocated:1; /* Bool: is buffer allocated ? */ +}; + +struct channel_backend { + unsigned long buf_size; /* Size of the buffer */ + unsigned long subbuf_size; /* Sub-buffer size */ + unsigned int subbuf_size_order; /* Order of sub-buffer size */ + unsigned int num_subbuf_order; /* + * Order of number of sub-buffers/buffer + * for writer. + */ + unsigned int buf_size_order; /* Order of buffer size */ + int extra_reader_sb:1; /* Bool: has extra reader subbuffer */ + struct ring_buffer *buf; /* Channel per-cpu buffers */ + + unsigned long num_subbuf; /* Number of sub-buffers for writer */ + u64 start_tsc; /* Channel creation TSC value */ + void *priv; /* Client-specific information */ + struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */ + const struct ring_buffer_config *config; /* Ring buffer configuration */ + cpumask_var_t cpumask; /* Allocated per-cpu buffers cpumask */ + char name[NAME_MAX]; /* Channel name */ +}; + +#endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */ -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/