lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20190607162349.18199-2-john.ogness@linutronix.de>
Date:   Fri,  7 Jun 2019 18:29:48 +0206
From:   John Ogness <john.ogness@...utronix.de>
To:     linux-kernel@...r.kernel.org
Cc:     Peter Zijlstra <peterz@...radead.org>,
        Petr Mladek <pmladek@...e.com>,
        Sergey Senozhatsky <sergey.senozhatsky.work@...il.com>,
        Steven Rostedt <rostedt@...dmis.org>,
        Linus Torvalds <torvalds@...ux-foundation.org>,
        Greg Kroah-Hartman <gregkh@...uxfoundation.org>,
        Andrea Parri <andrea.parri@...rulasolutions.com>,
        Thomas Gleixner <tglx@...utronix.de>,
        Sergey Senozhatsky <sergey.senozhatsky@...il.com>
Subject: [RFC PATCH v2 1/2] printk-rb: add a new printk ringbuffer implementation

See documentation for details.

Signed-off-by: John Ogness <john.ogness@...utronix.de>
---
 Documentation/core-api/index.rst             |   1 +
 Documentation/core-api/printk-ringbuffer.rst | 104 +++
 include/linux/printk_ringbuffer.h            | 238 +++++++
 lib/printk_ringbuffer.c                      | 924 +++++++++++++++++++++++++++
 4 files changed, 1267 insertions(+)
 create mode 100644 Documentation/core-api/printk-ringbuffer.rst
 create mode 100644 include/linux/printk_ringbuffer.h
 create mode 100644 lib/printk_ringbuffer.c

diff --git a/Documentation/core-api/index.rst b/Documentation/core-api/index.rst
index ee1bb8983a88..0ab649134577 100644
--- a/Documentation/core-api/index.rst
+++ b/Documentation/core-api/index.rst
@@ -27,6 +27,7 @@ Core utilities
    errseq
    printk-formats
    circular-buffers
+   printk-ringbuffer
    generic-radix-tree
    memory-allocation
    mm-api
diff --git a/Documentation/core-api/printk-ringbuffer.rst b/Documentation/core-api/printk-ringbuffer.rst
new file mode 100644
index 000000000000..5634c2010ed8
--- /dev/null
+++ b/Documentation/core-api/printk-ringbuffer.rst
@@ -0,0 +1,104 @@
+=====================
+The printk Ringbuffer
+=====================
+
+:Author: John Ogness <john.ogness@...utronix.de>
+
+
+.. Contents:
+
+ (*) Overview
+     - Features
+     - Terminology
+     - Behavior
+     - Data Blocks
+     - Descriptors
+     - Why Descriptors?
+
+ (*) Memory Barriers
+     - Writers
+     - Readers
+
+ (*) Structures, Macros, Functions
+
+ (*) Examples
+     - Writer
+     - Reader
+
+
+Overview
+========
+
+.. kernel-doc:: lib/printk_ringbuffer.c
+   :doc: prb overview
+
+
+Memory Barriers
+===============
+
+.. kernel-doc:: lib/printk_ringbuffer.c
+   :doc: memory barriers
+
+
+Examples
+========
+
+Here are some simple examples demonstrating writers and readers. For the
+examples it is assumed that a global ringbuffer is available::
+
+	DECLARE_PRINTKRB(rb, 7, 5);
+
+This expects an average data size of 128 bytes and allows up to 32
+descriptors.
+
+
+Writer
+------
+
+Sample writer code::
+
+	struct prb_reserved_entry e;
+	char *s;
+
+	s = prb_reserve(&e, &rb, 32);
+	if (s) {
+		sprintf(s, "Hello, world!");
+		prb_commit(&e);
+	}
+
+
+Reader
+------
+
+Sample reader code::
+
+	DECLARE_PRINTKRB_ENTRY(entry, 128);
+	DECLARE_PRINTKRB_ITER(iter, &test_rb, &entry);
+	u64 last_seq = 0;
+	int len;
+	char *s;
+
+	prb_for_each_entry(&iter, len) {
+		if (entry.seq - last_seq != 1) {
+			printf("LOST %llu ENTRIES\n",
+				entry.seq - (last_seq + 1));
+		}
+		last_seq = entry.seq;
+
+		s = (char *)&entry.buffer[0];
+		if (len >= 128)
+			s[128 - 1] = 0;
+		printf("data: %s\n", s);
+	}
+
+
+Structures, Macros, Functions
+=============================
+
+Follwing is a description of all the printk ringbuffer data structures,
+macros, and functions, most of which are private. Public interfaces are
+explicitly mentioned/marked as so.
+
+.. kernel-doc:: include/linux/printk_ringbuffer.h
+.. kernel-doc:: lib/printk_ringbuffer.c
+   :functions:
diff --git a/include/linux/printk_ringbuffer.h b/include/linux/printk_ringbuffer.h
new file mode 100644
index 000000000000..569980a61c0a
--- /dev/null
+++ b/include/linux/printk_ringbuffer.h
@@ -0,0 +1,238 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_PRINTK_RINGBUFFER_H
+#define _LINUX_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+
+/**
+ * struct prb_list - An abstract linked list of items.
+ * @oldest: The oldest item on the list.
+ * @newest: The newest item on the list.
+ *
+ * Items are represented as integers (logical array indexes) into an external
+ * array. For the data array they represent the beginning of the data for that
+ * item. For the descriptor array they represent the array element of the
+ * descriptor.
+ *
+ * Traversing the list requires that the items also include integers to the
+ * next item in the list. Note that only the descriptor list is ever
+ * traversed.
+ */
+struct prb_list {
+	/* private */
+	unsigned long oldest;
+	unsigned long newest;
+};
+
+/**
+ * struct prb_descr - A descriptor representing an entry in the ringbuffer.
+ * @seq: The sequence number of the entry.
+ * @id: The descriptor id.
+ *      The location of the descriptor within the descriptor array can be 
+ *      determined from this value.
+ * @data: The logical position of the data for this entry.
+ *        The location of the beginning of the data within the data array
+ *        can be determined from this value.
+ * @data_next: The logical position of the data next to this entry.
+ *             This is used to determine the length of the data as well as
+ *             identify where the next data begins.
+ * @next: The id of the next (newer) descriptor in the linked list.
+ *        A value of EOL means it is the last descriptor in the list.
+ *
+ * Descriptors are used to identify where the data for each entry is and
+ * also provide an ordering for readers. Entry ordering is based on the
+ * descriptor linked list (not the ordering of data in the data array).
+ */
+struct prb_descr {
+	/* private */
+	u64 seq;
+	unsigned long id;
+	unsigned long data;
+	unsigned long data_next;
+	unsigned long next;
+};
+
+/**
+ * struct printk_ringbuffer - The ringbuffer structure.
+ * @data_array_size_bits: The size of the data array as a power-of-2.
+ * @data_array: A pointer to the data array.
+ * @data_list: A list of entry data.
+ *             Since the data list is not traversed, this list is only used to
+ *             mark the contiguous section of the data array that is in use.
+ * @descr_max_count: The maximum amount of descriptors allowed.
+ * @descr_array: A pointer to the descriptor array.
+ * @descr_list: A list of entry descriptors.
+ *              The list can be traversed from oldest to newest.
+ * @descr_next: An index of the next available (never before used) descriptor.
+ *              This value only increases until the maximum is reached.
+ * @lost: A counter tracking how often writers failed to write.
+ *        This is only provided as convenience. It does not increment
+ *        automatically. Writers must increment it after they have deteremined
+ *        that a write failed.
+ */
+struct printk_ringbuffer {
+	/* private */
+	unsigned int data_array_size_bits;
+	char *data_array;
+	struct prb_list data_list;
+
+	unsigned int descr_max_count;
+	struct prb_descr *descr_array;
+	struct prb_list descr_list;
+
+	atomic_t descr_next;
+
+	atomic_long_t lost;
+};
+
+/**
+ * struct prb_reserved_entry - Used by writers to reserve/commit data.
+ * @rb: The printk ringbuffer used for reserve/commit.
+ * @descr: A pointer to the descriptor assigned to the reserved data.
+ * @id: The descriptor's id value, set on reserve.
+ * @data: The descriptor's data value, set on reserve.
+ * @data_next: The descriptor's future data_next value, set on commit.
+ * @irqflags: Local IRQs are disabled during the reserve/commit window.
+ *
+ * A writer provides this structure when reserving and committing data. The
+ * values of all the members are set on reserve and are only valid until
+ * commit.
+ */
+struct prb_reserved_entry {
+	/* private */
+	struct printk_ringbuffer *rb;
+	struct prb_descr *descr;
+	unsigned long id;
+	unsigned long data;
+	unsigned long data_next;
+	unsigned long irqflags;
+};
+
+/**
+ * struct prb_entry - Used by readers to read a ringbuffer entry.
+ * @seq: The sequence number of the entry descriptor.
+ * @buffer: A pointer to a reader-provided buffer.
+ *          When reading an entry, the data is copied to this buffer.
+ * @buffer_size: The size of the reader-provided buffer.
+ *
+ * A reader initializes and provides this structure when traversing/reading
+ * the entries of the ringbuffer.
+ */
+struct prb_entry {
+	/* public */
+	u64 seq;
+	char *buffer;
+	int buffer_size;
+};
+
+/**
+ * struct prb_iterator - Used by readers to traverse a descriptor list.
+ * @rb: The printk ringbuffer being traversed.
+ * @e: A pointer to a reader-provided entry structure.
+ * @id: The id of the descriptor last accessed.
+ * @id_next: The id of the next (newer) descriptor to access.
+ */
+struct prb_iterator {
+	/* private */
+	struct printk_ringbuffer *rb;
+	struct prb_entry *e;
+	unsigned long id;
+	unsigned long id_next;
+};
+
+/**
+ * DECLARE_PRINTKRB() - Declare a printk ringbuffer.
+ * @name: The name for the ringbuffer structure variable.
+ * @avgdatabits: The average size of data as a power-of-2.
+ *               If this value is too small, it will not be possible to store
+ *               as many entries as desired. If this value is too large, there
+ *               will be some wasted space in the data array because there are
+ *               not enough descriptors. Generatlly values that are too large
+ *               are preferred over thost that are too small.
+ * @descrbits: The number of descriptors (desired entries) as a power-of-2.
+ *
+ * The size of the data array will be the average data size multiplied by the
+ * number of descriptors.
+ */
+#define DECLARE_PRINTKRB(name, avgdatabits, descrbits)			\
+char _##name##_data_array[(1 << ((avgdatabits) + (descrbits))) +	\
+				 sizeof(long)]				\
+	__aligned(__alignof__(long));					\
+struct prb_descr _##name##_descr_array[1 << (descrbits)];		\
+struct printk_ringbuffer name = {					\
+	.data_array_size_bits = (avgdatabits) + (descrbits),		\
+	.data_array = &_##name##_data_array[0],				\
+	.data_list.oldest = -111 * sizeof(long),			\
+	.data_list.newest = -111 * sizeof(long),			\
+	.descr_max_count = 1 << (descrbits),				\
+	.descr_array = &_##name##_descr_array[0],			\
+	.descr_next = ATOMIC_INIT(0),					\
+	.descr_list.oldest = 0,						\
+	.descr_list.newest = 0,						\
+	.lost = ATOMIC_LONG_INIT(0),					\
+}
+
+/**
+ * DECLARE_PRINTKRB_ENTRY() - Declare an entry structure.
+ * @name: The name for the entry structure variable.
+ * @size: The size of the associated reader buffer (also declared).
+ *
+ * This macro is particularly useful for static entry structures that should be
+ * immediately available and initialized. It is an alternative to the reader
+ * manually setting the buffer and buffer_size members of the structure.
+ *
+ * Note that this macro will declare the buffer as well. This could be a
+ * problem if this is used with a large buffer size within a stack frame.
+ */
+#define DECLARE_PRINTKRB_ENTRY(name, size)				\
+char _##name##_entry_buf[size];						\
+struct prb_entry name = {						\
+	.buffer_size = size,						\
+	.buffer = &_##name##_entry_buf[0],				\
+}
+
+/**
+ * DECLARE_PRINTKRB_ITER() - Declare an iterator for readers.
+ * @name: The name for the iterator structure variable.
+ * @rbaddr: A pointer to a printk ringbuffer.
+ * @entryaddr: A pointer to an entry structure.
+ *
+ * This macro is particularly useful for static iterators that should be
+ * immediately available and initialized. It is an alternative to
+ * manually initializing an iterator with prb_iter_init().
+ */
+#define DECLARE_PRINTKRB_ITER(name, rbaddr, entryaddr)			\
+struct prb_iterator name = {						\
+	.rb = rbaddr,							\
+        .e = entryaddr,							\
+        .id = 0,							\
+        .id_next = 0,							\
+}
+
+/* writer interface */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		  unsigned int size);
+void prb_commit(struct prb_reserved_entry *e);
+
+/* reader interface */
+void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
+		   struct prb_entry *e);
+int prb_iter_next_valid_entry(struct prb_iterator *iter);
+bool prb_iter_peek_next_entry(struct prb_iterator *iter);
+
+/**
+ * prb_for_each_entry() - Iterate through all the entries of a ringbuffer.
+ * @i: A pointer to an iterator.
+ * @l: An integer used to identify when the last entry is traversed.
+ *
+ * This macro expects the iterator to be initialized. It also does not reset
+ * the iterator. So if the iterator has already been used for some traversal,
+ * this macro will continue where the iterator left off.
+ */
+#define prb_for_each_entry(i, l) \
+	for (; (l = prb_iter_next_valid_entry(i)) != 0;)
+
+/* utility functions */
+void prb_inc_lost(struct printk_ringbuffer *rb);
+
+#endif /*_LINUX_PRINTK_RINGBUFFER_H */
diff --git a/lib/printk_ringbuffer.c b/lib/printk_ringbuffer.c
new file mode 100644
index 000000000000..d0b2b6a549b0
--- /dev/null
+++ b/lib/printk_ringbuffer.c
@@ -0,0 +1,924 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/sched.h>
+#include <linux/smp.h>
+#include <linux/string.h>
+#include <linux/errno.h>
+#include <linux/printk_ringbuffer.h>
+
+/**
+ * DOC: prb overview
+ *
+ * As the name suggests, this ringbuffer was implemented specifically to
+ * serve the needs of the printk() infrastructure. The ringbuffer itself is
+ * not specific to printk and could be used for other purposes. However, the
+ * requirements and semantics of printk are rather unique. If you intend to use
+ * this ringbuffer for anything other than printk, you need to be very clear on
+ * its features, behavior, and pitfalls.
+ *
+ * Features
+ * --------
+ * * single global buffer
+ * * resides in initialized data section (available at early boot)
+ * * supports multiple concurrent lockless writers
+ * * supports multiple concurrent lockless readers
+ * * safe from any context (including NMI)
+ * * groups bytes into variable length data blocks (referenced by entries)
+ * * entries tagged with sequence numbers
+ *
+ * Terminology
+ * -----------
+ * data block - A contiguous block of data containing an id to an associated
+ *              descriptor and the raw data from the writer.
+ * descriptor - Meta data for a data block containing an id, the logical
+ *              positions of the associated data block, a unique sequence
+ *              number, and a pointer to the next (newer) descriptor.
+ * entry - A high level object used by the readers/writers that contains a
+ *         descriptor as well as state information during the reserve/commit
+ *         window.
+ *
+ * Behavior
+ * --------
+ * Since the printk ringbuffer is lockless, there exists no synchronization
+ * between readers and writers. Basically writers are the tasks in control and
+ * may overwrite any and all committed data at any time and from any context.
+ * For this reason readers can miss entries if they are overwritten before the
+ * reader was able to access the data. The reader API implementation is such
+ * that reader access to data is atomic, so there is no risk of readers having
+ * to deal with partial or corrupt data blocks. Also, entries include the
+ * sequence number of the associated descriptor so that readers can recognize
+ * if entries were missed.
+ *
+ * Writing to the ringbuffer consists of 2 steps. First a writer must reserve
+ * a data block of desired size. After this step the writer has exclusive
+ * access to the memory region. Once the writer's data has been written to
+ * memory, the entry needs to be committed to the ringbuffer. After this step
+ * the data has been inserted into the ringbuffer and assigned an appropriate
+ * sequence number.
+ *
+ * Once committed, a writer must no longer access the data directly. This is
+ * because the data may have been overwritten and no longer exists. If a
+ * writer must access the data, it should either keep a private copy before
+ * committing or use the reader API to gain access to the data.
+ *
+ * Because of how the data backend is implemented, data blocks that have been
+ * reserved but not yet committed act as barriers, preventing future writers
+ * from filling the ringbuffer beyond the location of the reserved but not
+ * yet committed data block region. For this reason it is important that
+ * writers perform both reserve and commit as quickly as possible. Also, be
+ * aware that local interrupts are disabled during the reserve/commit window.
+ * Writers in NMI contexts can still preempt any other writers, but as long
+ * as these writers do not write a large amount of data with respect to the
+ * ringbuffer size, this should not become an issue.
+ *
+ * Data Blocks
+ * -----------
+ * All ring buffer data is stored within a single static byte array. The reason
+ * for this is to ensure that any pointers to the data (past and present) will
+ * always point to valid memory. This is important because the lockless readers
+ * and writers may preempt for long periods of time and when they resume may be
+ * working with expired pointers.
+ *
+ * Data blocks are specified by start and end indices. (The end index has the
+ * same value as the start index of the neighboring data block.) But indices
+ * are not simply offsets into the byte array. They are logical position
+ * values (lpos) that always increase but map directly to byte array offsets.
+ *
+ * For example, for a byte array of 1000, a data block may have have a start
+ * lpos of 100. Another data block may have a start lpos of 1100. And yet
+ * another 2100. All of these data blocks are pointing to the same data, but
+ * only the most recent data block is valid. The other data blocks are pointing
+ * to valid memory, but represent data blocks that have been overwritten.
+ *
+ * Also note that due to overflowing, the most recent data block is not
+ * necessarily the one with the highest lpos. Indeed, the printk ring buffer
+ * initializes its data such that an overflow happens relatively quickly in
+ * order to validate the handling of this situation.
+ *
+ * If a data block starts near the end of the byte array but would extend
+ * beyond it, that data block is handled differently: a special "wrapping data
+ * block" is inserted into the byte array and the real data block is placed at
+ * the beginning of the byte array. This can waste space at the end of the byte
+ * array, but simplifies the implementation by allowing writers to always work
+ * with contiguous buffers. For example, for a 1000 byte array, a descriptor
+ * may show a start lpos of 1950 and an end lpos of 2100. The data block
+ * associated with this descriptor is 100 bytes in size and its data is found
+ * at offset 0 of the byte array.
+ *
+ * Descriptors
+ * -----------
+ * A descriptor is a handle to a data block. Like data blocks, all descriptors
+ * are also stored in their own single static array. The reasoning is the same
+ * as for the data blocks: pointers to descriptors should point to valid
+ * memory at all times, even if the descriptor itself has become invalid.
+ *
+ * Descriptors contain the start (data) and end (data_next) lpos of the data
+ * block they represent. They also have their own id that works like the lpos
+ * for data blocks: values that always increase but map directly to the
+ * descriptor array offset.
+ *
+ * For example, for a descriptor array of 10, a descriptor may have an id of
+ * 1, another of 11, and another of 21. All of these descriptor ids are
+ * pointing to the same descriptor, but only the most recent descriptor id
+ * is valid. The other values represent descriptors that have become invalid.
+ *
+ * Why Descriptors?
+ * ----------------
+ * At first glance it may seem as though descriptors are an unnecessary
+ * abstraction layer added over the data array. After all, couldn't we just put
+ * the fields of a descriptor into a structure at the head of the data block?
+ * The answer is no. The reason is that the printk ring buffer supports
+ * variable length records, which means that data blocks will not always begin
+ * at a predictable offset of the byte array. This is a major problem for
+ * lockless writers that, for example, will need to expire old data blocks
+ * when the ringbuffer is full. A writer has no way of knowing if it is
+ * allowed to push the oldest pointer. Flags could not be used because upon
+ * pushing the newest pointer (reserve), initially random data will be set
+ * that could falsely indicate the flag status. Introducing a third
+ * "committed" pointer also will not help because now the problem is
+ * advancing the committed pointer. (Reserve ordering does not match commit
+ * ordering.) Even using cmpxchg() will not help because random data could
+ * potentially match the metadata to replace.
+ *
+ * Descriptors allow safe and controlled access to data block metadata by
+ * providing predictable offsets for such metadata. This is key to supporting
+ * multiple concurrent lockless writers.
+ */
+
+/**
+ * DOC: memory barriers
+ *
+ * Writers
+ * -------
+ * The main issue with writers is expiring/invalidating old data blocks in
+ * order to create new data blocks. This is performed in 6 steps that must
+ * be observed in order by all writers to allow cooperation. Here is a list
+ * of the 6 steps and the named acquire/release memory barrier pairs that
+ * are used to synchronized them:
+ *
+ * * old data invalidation (MB1): Pushing rb.data_list.oldest forward.
+ *   Necessary for identifying if data has been expired.
+ *
+ * * new data reservation (MB2): Pushing rb.data_list.newest forward.
+ *   Necessary for validating data.
+ *
+ * * assign the data block to a descriptor (MB3): Setting data block id to
+ *   descriptor id. Necessary for finding the descriptor associated with th
+ *   data block.
+ *
+ * * commit data (MB4): Setting data block data_next. (Now data block is
+ *   valid). Necessary for validating data.
+ *
+ * * make descriptor newest (MB5): Setting rb.descr_list.newest to descriptor.
+ *   (Now following new descriptors will be linked to this one.) Necessary for
+ *   ensuring the descriptor's next is set to EOL before adding to the list.
+ *
+ * * link descriprtor to previous newest (MB6): Setting the next of the
+ *   previous descriptor to this one. Necessary for correctly identifying if
+ *   a descriptor is the only descriptor on the list.
+ *
+ * Readers
+ * -------
+ * Readers only make of smb_rmb() to ensure that certain critical load
+ * operations are performed in an order that allows readers to evaluate if
+ * the data they read is really valid.
+ */
+
+/* end of list marker */
+#define EOL 0
+
+/**
+ * struct prb_datablock - A data block.
+ * @id: The descriptor id that is associated with this data block.
+ * @data: The data committed by the writer.
+ */
+struct prb_datablock {
+	unsigned long id;
+	char data[0];
+};
+
+#define DATAARRAY_SIZE(rb) (1 << rb->data_array_size_bits)
+#define DATAARRAY_SIZE_BITMASK(rb) (DATAARRAY_SIZE(rb) - 1)
+
+/**
+ * DATA_INDEX() - Determine the data array index from logical position.
+ * @rb: The associated ringbuffer.
+ * @lpos: The logical position (data/data_next).
+ */
+#define DATA_INDEX(rb, lpos) (lpos & DATAARRAY_SIZE_BITMASK(rb))
+
+/**
+ * DATA_WRAPS() - Determine how many times the data array has wrapped.
+ * @rb: The associated ringbuffer.
+ * @lpos: The logical position (data/data_next).
+ *
+ * The number of wraps is useful when determining if one logical position
+ * is overtaking the data array index another logical position.
+ */
+#define DATA_WRAPS(rb, lpos) (lpos >> rb->data_array_size_bits)
+
+/**
+ * DATA_THIS_WRAP_START_LPOS() - Get the position at the start of the wrap.
+ * @rb: The associated ringbuffer.
+ * @lpos: The logical position (data/data_next).
+ *
+ * Given a logical position, return the logical position if backed up to the
+ * beginning (data array index 0) of the current wrap. This is used when a
+ * data block wraps and therefore needs to begin at the beginning of the data
+ * array (for the next wrap).
+ */
+#define DATA_THIS_WRAP_START_LPOS(rb, lpos) \
+	(DATA_WRAPS(rb, lpos) << rb->data_array_size_bits)
+
+#define DATA_ALIGN sizeof(long)
+#define DATA_ALIGN_SIZE(sz) \
+	((sz + (DATA_ALIGN - 1)) & ~(DATA_ALIGN - 1))
+
+#define DESCR_COUNT_BITMASK(rb) (rb->descr_max_count - 1)
+
+/**
+ * DESCR_INDEX() - Determine the descriptor array index from the id.
+ * @rb: The associated ringbuffer.
+ * @id: The descriptor id.
+ */
+#define DESCR_INDEX(rb, id) (id & DESCR_COUNT_BITMASK(rb))
+
+#define TO_DATABLOCK(rb, lpos) \
+	((struct prb_datablock *)&rb->data_array[DATA_INDEX(rb, lpos)])
+#define TO_DESCR(rb, id) \
+	(&rb->descr_array[DESCR_INDEX(rb, id)])
+
+/**
+ * data_valid() - Check if a data block is valid.
+ * @rb: The ringbuffer containing the data.
+ * @oldest_data: The oldest data logical position.
+ * @newest_data: The newest data logical position.
+ * @data: The logical position for the data block to check.
+ * @data_next: The logical position for the data block next to this one.
+ *             This value is used to identify the end of the data block.
+ *
+ * A data block is considered valid if it satisfies the two conditions:
+ *
+ * * oldest_data <= data < data_next <= newest_data
+ * * oldest_data is at most exactly 1 wrap behind newest_data
+ *
+ * Return: true if the specified data block is valid.
+ */
+static inline bool data_valid(struct printk_ringbuffer *rb,
+			      unsigned long oldest_data,
+			      unsigned long newest_data,
+			      unsigned long data, unsigned long data_next)
+
+{
+	return ((data - oldest_data) < DATAARRAY_SIZE(rb) &&
+		data_next != data &&
+		(data_next - data) < DATAARRAY_SIZE(rb) &&
+		(newest_data - data_next) < DATAARRAY_SIZE(rb) &&
+		(newest_data - oldest_data) <= DATAARRAY_SIZE(rb));
+}
+
+/**
+ * add_descr_list() - Add a descriptor to the descriptor list.
+ * @e: An entry that has already reserved data.
+ *
+ * The provided entry contains a pointer to a descriptor that has already
+ * been reserved for this entry. However, the reserved descriptor is not
+ * yet on the list. Add this descriptor as the newest item.
+ *
+ * A descriptor is added in two steps. The first step is to make this
+ * descriptor the newest. The second step is to update the "next" field of
+ * the former newest item to point to this item.
+ */
+static void add_descr_list(struct prb_reserved_entry *e)
+{
+	struct printk_ringbuffer *rb = e->rb;
+	struct prb_list *l = &rb->descr_list;
+	struct prb_descr *d = e->descr;
+	struct prb_descr *newest_d;
+	unsigned long newest_id;
+
+	/* set as newest */
+	do {
+		/* MB5: synchronize add descr */
+		newest_id = smp_load_acquire(&l->newest);
+		newest_d = TO_DESCR(rb, newest_id);
+
+		if (newest_id == EOL)
+			WRITE_ONCE(d->seq, 1);
+		else
+			WRITE_ONCE(d->seq, READ_ONCE(newest_d->seq) + 1);
+		/*
+		 * MB5: synchronize add descr
+		 *
+		 * In particular: next written before cmpxchg
+		 */
+	} while (cmpxchg_release(&l->newest, newest_id, e->id) != newest_id);
+
+	if (unlikely(newest_id == EOL)) {
+		/* no previous newest means we *are* the list, set oldest */
+
+		/*
+		 * MB UNPAIRED
+		 *
+		 * In particular: Force cmpxchg _after_ cmpxchg on newest.
+		 */
+		WARN_ON_ONCE(cmpxchg_release(&l->oldest, EOL, e->id) != EOL);
+	} else {
+		/* link to previous chain */
+
+		/*
+		 * MB6: synchronize link descr
+		 *
+		 * In particular: Force cmpxchg _after_ cmpxchg on newest.
+		 */
+		WARN_ON_ONCE(cmpxchg_release(&newest_d->next,
+					     EOL, e->id) != EOL);
+	}
+}
+
+/**
+ * remove_oldest_descr() - Remove the oldest descriptor from the list.
+ * @rb: The ringbuffer from which to remove the oldest descriptor.
+ *
+ * A oldest descriptor can be removed from the descriptor list on two
+ * conditions:
+ *
+ * * The data block for the descriptor is invalid.
+ * * The descriptor is not the only descriptor on the list.
+ *
+ * If, during this function, another task removes the oldest, this function
+ * will try again.
+ *
+ * Return: The removed descriptor or NULL if the oldest descriptor cannot
+ *         removed.
+ */
+static struct prb_descr *remove_oldest_descr(struct printk_ringbuffer *rb)
+{
+	struct prb_list *l = &rb->descr_list;
+	unsigned long oldest_id;
+	struct prb_descr *d;
+	unsigned long next;
+
+	for (;;) {
+		oldest_id = READ_ONCE(l->oldest);
+
+		/* list empty */
+		if (oldest_id == EOL)
+			return NULL;
+
+		d = TO_DESCR(rb, oldest_id);
+
+		/* only descriptors with _invalid_ data can be removed */
+		if (data_valid(rb, READ_ONCE(rb->data_list.oldest),
+			       READ_ONCE(rb->data_list.newest),
+			       READ_ONCE(d->data),
+			       READ_ONCE(d->data_next))) {
+			return NULL;
+		}
+
+		/*
+		 * MB6: synchronize link descr
+		 *
+		 * In particular: l->oldest is loaded as a data dependency so
+		 * d->next and the following l->oldest will load afterwards,
+		 * respectively.
+		 */
+		next = smp_load_acquire(&d->next);
+
+		if (next == EOL && READ_ONCE(l->oldest) == oldest_id) {
+			/*
+			 * The oldest has no next, so this is a list of one
+			 * descriptor. Lists must always have at least one
+			 * descriptor.
+			 */
+			return NULL;
+		}
+
+		if (cmpxchg(&l->oldest, oldest_id, next) == oldest_id) {
+			/* removed successfully */
+			break;
+		}
+
+		/* oldest descriptor removed by another task, try again */
+	}
+
+	return d;
+}
+
+/**
+ * expire_oldest_data() - Invalidate the oldest data block.
+ * @rb: The ringbuffer containing the data block.
+ * @oldest_lpos: The logical position of the oldest data block.
+ *
+ * This function expects to "push" the pointer to the oldest data block
+ * forward, thus invalidating the oldest data block. However, before pushing,
+ * it is verified if the data block is valid. (For example, if the data block
+ * was reserved but not yet committed, it is not permitted to invalidate the
+ * "in use by a writer" data.)
+ *
+ * If the data is valid, it will be associated with a descriptor, which will
+ * then provide the necessary information to validate the data.
+ *
+ * Return: true if the oldest data was invalidated (regardless if this
+ *         task was the one that did it or not), otherwise false.
+ */
+static bool expire_oldest_data(struct printk_ringbuffer *rb,
+			       unsigned long oldest_lpos)
+{
+	unsigned long newest_lpos;
+	struct prb_datablock *b;
+	unsigned long data_next;
+	struct prb_descr *d;
+	unsigned long data;
+
+	/* MB2: synchronize data reservation */
+	newest_lpos = smp_load_acquire(&rb->data_list.newest);
+
+	b = TO_DATABLOCK(rb, oldest_lpos);
+
+	/* MB3: synchronize descr setup */
+	d = TO_DESCR(rb, smp_load_acquire(&b->id));
+
+	data = READ_ONCE(d->data);
+
+	/* sanity check to check to see if b->id was correct */
+	if (oldest_lpos != data)
+		goto out;
+
+	/* MB4: synchronize commit */
+	data_next = smp_load_acquire(&d->data_next);
+
+	if (!data_valid(rb, oldest_lpos, newest_lpos, data, data_next))
+		goto out;
+
+	/* MB1: synchronize data invalidation */
+	cmpxchg_release(&rb->data_list.oldest, data, data_next);
+
+	/* Some task (maybe this one) successfully expired the oldest data. */
+	return true;
+out:
+	return (oldest_lpos != READ_ONCE(rb->data_list.oldest));
+}
+
+/**
+ * get_new_lpos() - Determine the logical positions of a new data block.
+ * @rb: The ringbuffer to contain the data.
+ * @size: The size of the new data block.
+ * @data: A pointer to the start logical position value to be set.
+ *        This will be the beginning of the data block.
+ * @data_next: A pointer to the end logical position value to be set.
+ *             This value is used to identify the end of the data block.
+ *
+ * Based on the logical position of the newest data block to create,
+ * determine what the data and data_next values will be. If the data block
+ * would overwrite the oldest data block, this function will invalidate the
+ * oldest data block, thus providing itself space for the new data block.
+ *
+ * Return: true if logical positions were determined, otherwise false.
+ *
+ * This will only fail if it was not possible to invalidate the oldest data
+ * block. This can happen if a writer has reserved but not yet committed data
+ * and that reserved data is currently the oldest data.
+ */
+static bool get_new_lpos(struct printk_ringbuffer *rb, unsigned int size,
+			 unsigned long *data, unsigned long *data_next)
+{
+	unsigned long oldest_lpos;
+	unsigned long data_begin;
+
+	for (;;) {
+		*data = READ_ONCE(rb->data_list.newest);
+		data_begin = *data;
+
+		for (;;) {
+			*data_next = data_begin + size;
+
+			/* MB1: synchronize data invalidation */
+			oldest_lpos = smp_load_acquire(&rb->data_list.oldest);
+
+			if (*data_next - oldest_lpos > DATAARRAY_SIZE(rb)) {
+				/* would overwrite oldest */
+				if (!expire_oldest_data(rb, oldest_lpos))
+					return false;
+				break;
+			}
+
+			if (DATA_WRAPS(rb, data_begin) ==
+			    DATA_WRAPS(rb, *data_next)) {
+				return true;
+			}
+
+			data_begin = DATA_THIS_WRAP_START_LPOS(rb, *data_next);
+		}
+	}
+}
+
+/**
+ * assign_descr() - Assign a descriptor to an entry.
+ * @e: The entry to assign a descriptor to.
+ *
+ * Find an available descriptor to assign to the entry. First it is checked
+ * if the oldest descriptor can be used. If not, perhaps a never-used
+ * descriptor is available.
+ *
+ * If no descriptors are found, data blocks will be invalidated until the
+ * oldest descriptor can be used.
+ *
+ * Return: true if a descriptor was assigned, otherwise false.
+ *
+ * This will only fail if it was not possible to invalidate the oldest data
+ * block. This can happen if a writer has reserved but not yet committed data
+ * and that reserved data is currently the oldest data.
+ */
+static bool assign_descr(struct prb_reserved_entry *e)
+{
+	struct printk_ringbuffer *rb = e->rb;
+	struct prb_descr *d;
+	unsigned long id;
+
+	for (;;) {
+		/* use invalid descriptor at oldest */
+		d = remove_oldest_descr(rb);
+		if (d) {
+			id = READ_ONCE(d->id) + rb->descr_max_count;
+			/*
+			 * EOL has special meaning (to represent a terminator
+			 * for the list) so no descriptor is allowed to use
+			 * it as its id.
+			 */
+			if (id == EOL)
+				id += rb->descr_max_count;
+
+			/*
+			 * Any readers sitting at this descriptor can still
+			 * traverse forward until the new id is assigned.
+			 */
+			break;
+		}
+
+		/* fallback to static never-used descriptors */
+		if (atomic_read(&rb->descr_next) < rb->descr_max_count) {
+			id = atomic_fetch_inc(&rb->descr_next);
+			if (id < rb->descr_max_count) {
+				d = &rb->descr_array[id];
+				break;
+			}
+		}
+
+		/* no descriptors, free one */
+		/* MB1: synchronize data invalidation */
+		if (!expire_oldest_data(rb,
+				smp_load_acquire(&rb->data_list.oldest))) {
+			return false;
+		}
+	}
+
+	e->id = id;
+	e->descr = d;
+	return true;
+}
+
+/**
+ * data_reserve() - Reserve data in the data array.
+ * @e: The entry to reserve data for.
+ * @size: The size to reserve.
+ *
+ * This function expects to "push" the pointer to the newest data block
+ * forward. If this would result in overtaking the data array index of the
+ * oldest data, that oldest data will be invalidated.
+ *
+ * Return: true if data was reservied, otherwise false.
+ *
+ * This will only fail if it was not possible to invalidate the oldest data
+ * block. This can happen if a writer has reserved but not yet committed data
+ * and that reserved data is currently the oldest data.
+ */
+static bool data_reserve(struct prb_reserved_entry *e, unsigned int size)
+{
+	struct printk_ringbuffer *rb = e->rb;
+
+	do {
+		if (!get_new_lpos(rb, size, &e->data, &e->data_next))
+			return false;
+		/* MB2: synchronize data reservation */
+	} while (cmpxchg_release(&rb->data_list.newest,
+				 e->data, e->data_next) != e->data);
+
+	return true;
+}
+
+/**
+ * prb_reserve() - Reserve data in the ringbuffer.
+ * @e: The entry structure to setup.
+ * @rb: The ringbuffer to reserve data in.
+ * @size: The size of the data to reserve.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: A pointer to the reserved data or NULL if data could not be
+ *         reserved.
+ *
+ * Assuming the provided size is legal, this will only fail if it was not
+ * possible to invalidate the oldest data block. This can happen if a writer
+ * has reserved but not yet committed data and that reserved data is
+ * currently the oldest data.
+ */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		  unsigned int size)
+{
+	struct prb_datablock *b;
+	struct prb_descr *d;
+	char *buf;
+
+	if (size == 0)
+		return NULL;
+
+	size += sizeof(struct prb_datablock);
+	size = DATA_ALIGN_SIZE(size);
+	if (size > DATAARRAY_SIZE(rb))
+		return NULL;
+
+	e->rb = rb;
+
+	local_irq_save(e->irqflags);
+
+	if (!assign_descr(e))
+		goto err_out;
+
+	d = e->descr;
+	WRITE_ONCE(d->id, e->id);
+
+	if (!data_reserve(e, size)) {
+		/* put invalid descriptor on list, can still be traversed */
+		WRITE_ONCE(d->next, EOL);
+		add_descr_list(e);
+		goto err_out;
+	}
+
+	WRITE_ONCE(d->data, e->data);
+	WRITE_ONCE(d->data_next, e->data);
+
+	if (DATA_WRAPS(rb, e->data) != DATA_WRAPS(rb, e->data_next)) {
+		b = TO_DATABLOCK(rb, 0);
+		WRITE_ONCE(b->id, e->id);
+	} else {
+		b = TO_DATABLOCK(rb, e->data);
+	}
+	buf = &b->data[0];
+
+	b = TO_DATABLOCK(rb, e->data);
+
+	/* MB3: synchronize descr setup */
+	smp_store_release(&b->id, e->id);
+
+	return buf;
+err_out:
+	local_irq_restore(e->irqflags);
+	return NULL;
+}
+EXPORT_SYMBOL(prb_reserve);
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+	struct prb_descr *d = e->descr;
+
+	WRITE_ONCE(d->next, EOL);
+
+	/* MB4: synchronize commit */
+	smp_store_release(&d->data_next, e->data_next);
+
+	/* from this point on, the data could be expired */
+
+	add_descr_list(e);
+
+	/* now the descriptor is visible to the readers */
+
+	local_irq_restore(e->irqflags);
+}
+EXPORT_SYMBOL(prb_commit);
+
+/**
+ * get_datablock() - Return the data block for the provided logical positions.
+ * @rb: The ringbuffer containin the data block.
+ * @data: The logical position for the beginning of the data block.
+ * @data_next: The logical position for the data block next to this one.
+ *             This value is used to identify the end of the data block.
+ * @size: A pointer to an integer to set to the size of the data block.
+ *
+ * Since datablocks always contain contiguous data, the situation will occur
+ * where there is not enough room at the end of the array for a new data
+ * block. In this situation, two data blocks are created:
+ *
+ * * A "wrapping" data block at the end of the data array.
+ * * The real data block at the beginning of the data array.
+ *
+ * The descriptor contains the beginning position of the wrapping data block
+ * and the end position of the real data block. This function is used
+ * determines if a wrapping data block is being used and always returns the
+ * real data block and size. (Note that the descriptor id from the wrapping
+ * data block is used.)
+ *
+ * Return: A pointer to data block structure. Also, size is set to the size of
+ *         the data block.
+ */
+static struct prb_datablock *get_datablock(struct printk_ringbuffer *rb,
+					   unsigned long data,
+					   unsigned long data_next, int *size)
+{
+	if (DATA_WRAPS(rb, data) == DATA_WRAPS(rb, data_next)) {
+		*size = data_next - data;
+	} else {
+		*size = DATA_INDEX(rb, data_next);
+		data = 0;
+	}
+	*size -= sizeof(struct prb_datablock);
+
+	return TO_DATABLOCK(rb, data);
+}
+
+/**
+ * prb_iter_init() - Initialize an iterator structure.
+ * @iter: The iterator to initialize.
+ * @rb: The ringbuffer to associate with the iterator.
+ * @e: An entry structure to use during iteration.
+ *
+ * This is the public function available to readers to initialize their
+ * iterator structure.
+ *
+ * As an alternative, DECLARE_PRINTKRB_ITER() could be used.
+ *
+ * Context: Any context.
+ */
+void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
+		   struct prb_entry *e)
+{
+	iter->rb = rb;
+	iter->e = e;
+	iter->id = EOL;
+	iter->id_next = EOL;
+}
+EXPORT_SYMBOL(prb_iter_init);
+
+/**
+ * iter_peek_next_id() - Determine the next (newer) descriptor id.
+ * @iter: The iterator used for list traversal.
+ *
+ * If the iterator has not yet been used or has fallen behind and no longer
+ * has a pointer to a valid descriptor, the next descriptor will be the oldest
+ * descriptor in the list.
+ *
+ * Return: The next descriptor id. A value of EOL means there is no next
+ *         descriptor.
+ */
+static unsigned long iter_peek_next_id(struct prb_iterator *iter)
+{
+	struct printk_ringbuffer *rb = iter->rb;
+	unsigned long next_id = iter->id_next;
+	struct prb_descr *d;
+
+	if (iter->id == EOL) {
+		next_id = READ_ONCE(rb->descr_list.oldest);
+	} else if (iter->id_next == EOL) {
+		d = TO_DESCR(rb, iter->id);
+		next_id = READ_ONCE(d->next);
+
+		if (READ_ONCE(d->id) != iter->id)
+			next_id = READ_ONCE(rb->descr_list.oldest);
+	}
+
+	return next_id;
+}
+
+/**
+ * prb_iter_peek_next_entry() - Check if there is a next (newer) entry.
+ * @iter: The iterator used for list traversal.
+ *
+ * This is the public function available to readers to check if a newer
+ * entry is available.
+ *
+ * Context: Any context.
+ * Return: true if there is a next entry, otherwise false.
+ */
+bool prb_iter_peek_next_entry(struct prb_iterator *iter)
+{
+	return (iter_peek_next_id(iter) != EOL);
+}
+EXPORT_SYMBOL(prb_iter_peek_next_entry);
+
+/**
+ * prb_iter_next_valid_entry() - Traverse to and read the next (newer) entry.
+ * @iter: The iterator used for list traversal.
+ *
+ * This is the public function available to readers to traverse the entry
+ * list.
+ *
+ * If the iterator has not yet been used or has fallen behind and no longer
+ * has a pointer to a valid descriptor, the next descriptor will be the oldest
+ * descriptor in the list.
+ *
+ * Context: Any context.
+ * Return: The size of the entry data or 0 if there is no next entry.
+ *
+ * The entry data is padded (if necessary) to allow aligment for following
+ * data blocks. Therefore the size value can be larger than the size reserved.
+ * If users want the exact size to be tracked, they should include this
+ * information within their data.
+ */
+int prb_iter_next_valid_entry(struct prb_iterator *iter)
+{
+	struct printk_ringbuffer *rb = iter->rb;
+	struct prb_entry *e = iter->e;
+	struct prb_datablock *b;
+	unsigned long data_next;
+	unsigned long next_id;
+	struct prb_descr *d;
+	unsigned long data;
+	int size;
+
+	iter->id_next = iter_peek_next_id(iter);
+
+	while (iter->id_next != EOL) {
+		d = TO_DESCR(rb, iter->id_next);
+		data = READ_ONCE(d->data);
+		data_next = READ_ONCE(d->data_next);
+
+		/*
+		 * Loaded a local copy of the data pointers before
+		 * checking for validity of the data.
+		 */
+		smp_rmb();
+
+		if (READ_ONCE(d->id) == iter->id_next &&
+		    data_valid(rb, READ_ONCE(rb->data_list.oldest),
+			       READ_ONCE(rb->data_list.newest),
+			       data, data_next)) {
+
+			b = get_datablock(rb, data, data_next, &size);
+
+			memcpy(&e->buffer[0], &b->data[0],
+			       size > e->buffer_size ? e->buffer_size : size);
+			e->seq = READ_ONCE(d->seq);
+
+			/*
+			 * Loaded a local copy of the data/seq before
+			 * rechecking the validity of the data.
+			 */
+			smp_rmb();
+
+			if (READ_ONCE(d->id) == iter->id_next &&
+			    data_valid(rb,
+				       READ_ONCE(rb->data_list.oldest),
+				       READ_ONCE(rb->data_list.newest),
+				       data,
+				       data_next)) {
+
+				iter->id = iter->id_next;
+				iter->id_next = READ_ONCE(d->next);
+				return size;
+			}
+		}
+
+		next_id = READ_ONCE(d->next);
+
+		/*
+		 * Loaded a local copy of the next descr before
+		 * checking the traversal-validity of the descr.
+		 * (It is enough for the id to be consistent.)
+		 */
+		smp_rmb();
+
+		if (READ_ONCE(d->id) == iter->id_next) {
+			iter->id = iter->id_next;
+			iter->id_next = next_id;
+		} else {
+			iter->id = EOL;
+			iter->id_next = READ_ONCE(rb->descr_list.oldest);
+		}
+	}
+
+	return 0;
+}
+EXPORT_SYMBOL(prb_iter_next_valid_entry);
+
+/**
+ * prb_inc_lost() - Increment internal lost counter.
+ * @rb: The ringbuffer, whose counter to modify.
+ *
+ * This is the public function available to writers to update statistics about
+ * failed writes where the writer has given up.
+ *
+ * Context: Any context.
+ */
+void prb_inc_lost(struct printk_ringbuffer *rb)
+{
+        atomic_long_inc(&rb->lost);
+}
+EXPORT_SYMBOL(prb_inc_lost);
-- 
2.11.0

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ