[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20190807222634.1723-2-john.ogness@linutronix.de>
Date: Thu, 8 Aug 2019 00:32:26 +0206
From: John Ogness <john.ogness@...utronix.de>
To: linux-kernel@...r.kernel.org
Cc: Peter Zijlstra <peterz@...radead.org>,
Petr Mladek <pmladek@...e.com>,
Sergey Senozhatsky <sergey.senozhatsky.work@...il.com>,
Steven Rostedt <rostedt@...dmis.org>,
Linus Torvalds <torvalds@...ux-foundation.org>,
Greg Kroah-Hartman <gregkh@...uxfoundation.org>,
Andrea Parri <andrea.parri@...rulasolutions.com>,
Thomas Gleixner <tglx@...utronix.de>,
Sergey Senozhatsky <sergey.senozhatsky@...il.com>,
Brendan Higgins <brendanhiggins@...gle.com>
Subject: [RFC PATCH v4 1/9] printk-rb: add a new printk ringbuffer implementation
See documentation for details.
For the real patch the "prb overview" documentation section in
kernel/printk/ringbuffer.c will be included in the commit message.
Signed-off-by: John Ogness <john.ogness@...utronix.de>
---
kernel/printk/Makefile | 3 +
kernel/printk/dataring.c | 761 +++++++++++++++++++++++++++++++++++
kernel/printk/dataring.h | 95 +++++
kernel/printk/numlist.c | 375 +++++++++++++++++
kernel/printk/numlist.h | 72 ++++
kernel/printk/ringbuffer.c | 800 +++++++++++++++++++++++++++++++++++++
kernel/printk/ringbuffer.h | 288 +++++++++++++
7 files changed, 2394 insertions(+)
create mode 100644 kernel/printk/dataring.c
create mode 100644 kernel/printk/dataring.h
create mode 100644 kernel/printk/numlist.c
create mode 100644 kernel/printk/numlist.h
create mode 100644 kernel/printk/ringbuffer.c
create mode 100644 kernel/printk/ringbuffer.h
diff --git a/kernel/printk/Makefile b/kernel/printk/Makefile
index 4d052fc6bcde..567999aa93af 100644
--- a/kernel/printk/Makefile
+++ b/kernel/printk/Makefile
@@ -1,4 +1,7 @@
# SPDX-License-Identifier: GPL-2.0-only
obj-y = printk.o
obj-$(CONFIG_PRINTK) += printk_safe.o
+obj-$(CONFIG_PRINTK) += ringbuffer.o
+obj-$(CONFIG_PRINTK) += numlist.o
+obj-$(CONFIG_PRINTK) += dataring.o
obj-$(CONFIG_A11Y_BRAILLE_CONSOLE) += braille.o
diff --git a/kernel/printk/dataring.c b/kernel/printk/dataring.c
new file mode 100644
index 000000000000..911bac593ec1
--- /dev/null
+++ b/kernel/printk/dataring.c
@@ -0,0 +1,761 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include "dataring.h"
+
+/**
+ * DOC: dataring overview
+ *
+ * A dataring is a lockless ringbuffer consisting of variable length data
+ * blocks, each of which are assigned an ID. The IDs map to descriptors, which
+ * contain metadata about the data block. The lookup function mapping IDs to
+ * descriptors is implemented by the user.
+ *
+ * Data Blocks
+ * -----------
+ * All ringbuffer data is stored within a single static byte array. This is
+ * to ensure that any pointers to the data (past and present) will always
+ * point to valid memory. This is important because the lockless readers
+ * and writers may preempt for long periods of time and when they resume may
+ * be working with expired pointers.
+ *
+ * Data blocks are specified by begin and end logical positions (lpos) that
+ * map directly to byte array offsets. Using logical positions indirectly
+ * provides tagged state references for the data blocks to avoid ABA issues
+ * when the ringbuffer wraps. The number of tagged states per index is::
+ *
+ * sizeof(long) / size of byte array
+ *
+ * If a data block starts near the end of the byte array but would extend
+ * beyond it, that data block is handled differently: a special "wrapping data
+ * block" is inserted in the space available at the end of the byte array and
+ * a "content data block" is placed at the beginning of the byte array. This
+ * can waste space at the end of the byte array, but simplifies the
+ * implementation by allowing writers to always work with contiguous buffers.
+ * For example, for a 1000 byte array, a descriptor may show a start lpos of
+ * 1950 and an end lpos of 2100. The data block associated with this
+ * descriptor is 100 bytes in size. Its ID is located in the "wrapping" data
+ * block (located at offset 950 of the byte array) and its data is found in
+ * the "content" data block (located at offset 0 of the byte array).
+ *
+ * Descriptors
+ * -----------
+ * A descriptor is a handle to a data block. How descriptors are structured
+ * and mapped to IDs is implemented by the user.
+ *
+ * Descriptors contain the begin (begin_lpos) and end (next_lpos) logical
+ * positions of the data block they represent. The end logical position
+ * matches the begin logical position of the adjacent data block.
+ *
+ * Why Descriptors?
+ * ----------------
+ * The data ringbuffer supports variable length entities, which means that
+ * data blocks will not always begin at a predictable offset of the byte
+ * array. This is a major problem for lockless writers that, for example, will
+ * compete to expire and reuse old data blocks when the ringbuffer is full.
+ * Without a predictable begin for the data blocks, a writer has no reliable
+ * information about the status of the "free" area. Are any flags or state
+ * variables already set or is it just garbage left over from previous usage?
+ *
+ * Descriptors allow safe and controlled access to data block metadata by
+ * providing predictable offsets for such metadata. This is key to supporting
+ * multiple concurrent lockless writers.
+ *
+ * Behavior
+ * --------
+ * The data ringbuffer allows writers to commit data without regard for
+ * readers. Readers must pre- and post-validate the data blocks they are
+ * processing to be sure the processed data is consistent. A function
+ * dataring_datablock_isvalid() is available for that. Readers can only
+ * iterate data blocks by utilizing an external implementation using
+ * descriptor lookups based on IDs.
+ *
+ * Writers commit data in two steps:
+ *
+ * (1) Reserve a new data block (dataring_push()).
+ * (2) Commit the data block (dataring_datablock_setid()).
+ *
+ * Once a data block is committed, it is available for recycling by another
+ * writer. Therefore, once committed, a writer must no longer access the data
+ * block.
+ *
+ * If data block reservation fails, it means the oldest reserved data block
+ * has not yet been committed by its writer. This acts as a blocker for any
+ * future data block reservation.
+ */
+
+#define DATA_SIZE(dr) (1 << (dr)->size_bits)
+#define DATA_SIZE_MASK(dr) (DATA_SIZE(dr) - 1)
+
+/**
+ * DATA_INDEX() - Determine the data array index from a logical position.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @lpos: A logical position used (or to be used) for a data block.
+ */
+#define DATA_INDEX(dr, lpos) ((lpos) & DATA_SIZE_MASK(dr))
+
+/**
+ * DATA_WRAPS() - Determine how many times the data array has wrapped.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @lpos: A logical position used (or to be used) for a data block.
+ *
+ * The number of wraps is useful when determining if one logical position
+ * is overtaking the data array index of another logical position.
+ */
+#define DATA_WRAPS(dr, lpos) ((lpos) >> (dr)->size_bits)
+
+/**
+ * DATA_THIS_WRAP_START_LPOS() - Get the position at the start of the wrap.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @lpos: A logical position used (or to be used) for a data block.
+ *
+ * Given a logical position, return the logical position if backed up to the
+ * beginning of the current wrap (data array index 0). This is useful when a
+ * data block extends beyond the end of the data array (i.e. wraps) and
+ * therefore needs to store its data at the beginning of the data array.
+ */
+#define DATA_THIS_WRAP_START_LPOS(dr, lpos) ((lpos) & ~DATA_SIZE_MASK(dr))
+
+/**
+ * to_datablock() - Get a data block pointer from a logical position.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @begin_lpos: The logical position of the beginning of the data block.
+ *
+ * Return: A pointer to a data block.
+ *
+ * Since multiple logical positions can map to the same array index, the
+ * returned data block may not be the data the caller wants. The caller is
+ * responsible for validating the data.
+ *
+ * The returned pointer has an address dependency to @begin_lpos.
+ */
+static struct dr_datablock *to_datablock(struct dataring *dr,
+ unsigned long begin_lpos)
+{
+ return (struct dr_datablock *)&dr->data[DATA_INDEX(dr, begin_lpos)];
+}
+
+/**
+ * to_db_size() - Increase a data size to account for ID and padding.
+ *
+ * @size: A pointer to a size value to modify.
+ *
+ * IMPORTANT: The alignment padding specified within ALIGN() must be greater
+ * than or equal to the size of @dr_datablock.id. This ensures that there is
+ * always space in the data array for the @id of a "wrapping" data block.
+ */
+static void to_db_size(unsigned int *size)
+{
+ *size += sizeof(struct dr_datablock);
+ /* Alignment padding must be >= sizeof(dr_datablock.id). */
+ *size = ALIGN(*size, sizeof(long));
+}
+
+/**
+ * _datablock_valid() - Check if given positions yield a valid data block.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @head_lpos: The newest data logical position.
+ *
+ * @tail_lpos: The oldest data logical position.
+ *
+ * @begin_lpos: The beginning logical position of the data block to check.
+ *
+ * @next_lpos: The logical position of the next adjacent data block.
+ * This value is used to identify the end of the data block.
+ *
+ * A data block is considered valid if it satisfies the two conditions:
+ *
+ * * tail_lpos <= begin_lpos < next_lpos <= head_lpos
+ * * tail_lpos is at most exactly 1 wrap behind head_lpos
+ *
+ * Return: true if the specified data block is valid.
+ */
+static bool _datablock_valid(struct dataring *dr,
+ unsigned long head_lpos, unsigned long tail_lpos,
+ unsigned long begin_lpos, unsigned long next_lpos)
+
+{
+ unsigned long size = DATA_SIZE(dr);
+
+ /* tail_lpos <= begin_lpos */
+ if (begin_lpos - tail_lpos >= size)
+ return false;
+
+ /* begin_lpos < next_lpos */
+ if (begin_lpos == next_lpos)
+ return false;
+ if (next_lpos - begin_lpos >= size)
+ return false;
+
+ /* next_lpos <= head_lpos */
+ if (head_lpos - next_lpos >= size)
+ return false;
+
+ /* at most exactly 1 wrap */
+ if (head_lpos - tail_lpos > size)
+ return false;
+
+ return true;
+}
+
+/**
+ * dataring_datablock_isvalid() - Check if a specified data block is valid.
+ *
+ * @dr: The associated data ringbuffer.
+ *
+ * @desc: A descriptor describing a data block to check.
+ *
+ * Return: true if the data block is valid, otherwise false.
+ */
+bool dataring_datablock_isvalid(struct dataring *dr, struct dr_desc *desc)
+{
+ return _datablock_valid(dr,
+ atomic_long_read(&dr->head_lpos),
+ atomic_long_read(&dr->tail_lpos),
+ READ_ONCE(desc->begin_lpos),
+ READ_ONCE(desc->next_lpos));
+}
+
+/**
+ * dataring_checksize() - Check if a data size is legal.
+ *
+ * @dr: The data ringbuffer to check against.
+ *
+ * @size: The size value requested by a writer.
+ *
+ * The size value is increased to include the data block structure size and
+ * any needed alignment padding.
+ *
+ * Return: true if the size is legal for the data ringbuffer, otherwise false.
+ */
+bool dataring_checksize(struct dataring *dr, unsigned int size)
+{
+ if (size == 0)
+ return false;
+
+ /* Ensure the alignment padded size fits in the data array. */
+ to_db_size(&size);
+ if (size >= DATA_SIZE(dr))
+ return false;
+
+ return true;
+}
+
+/**
+ * _dataring_pop() - Move tail forward, invalidating the oldest data block.
+ *
+ * @dr: The data ringbuffer containing the data block.
+ *
+ * @tail_lpos: The logical position of the oldest data block.
+ *
+ * This function expects to move the pointer to the oldest data block forward,
+ * thus invalidating the oldest data block. Before attempting to move the
+ * tail, it is verified that the data block is valid. An invalid data block
+ * means that another task has already moved the tail pointer forward.
+ *
+ * Return: The new/current value (logical position) of the tail.
+ *
+ * From the return value the caller can identify if the tail was moved
+ * forward. However, the caller does not know if it was the task that
+ * performed the move.
+ *
+ * If, after seeing a moved tail, the caller will be modifying @begin_lpos or
+ * @next_lpos of a descriptor or will be modifying the head, a full memory
+ * barrier is required before doing so. This ensures that if any update to a
+ * descriptor's @begin_lpos or @next_lpos or the data ringbuffer's head is
+ * visible, that the previous update to the tail is also visible. This avoids
+ * the possibility of failure to notice when another task has moved the tail.
+ *
+ * If the tail has not moved forward it means the @id for the data block was
+ * not set yet. In this case the tail cannot move forward.
+ */
+static unsigned long _dataring_pop(struct dataring *dr,
+ unsigned long tail_lpos)
+{
+ unsigned long new_tail_lpos;
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+ struct dr_datablock *db;
+ struct dr_desc *desc;
+
+ /*
+ * dA:
+ *
+ * @db has an address dependency on @tail_pos. Therefore @tail_lpos
+ * must be loaded before dB, which accesses @db.
+ */
+ db = to_datablock(dr, tail_lpos);
+
+ /*
+ * dB:
+ *
+ * When a writer has completed accessing its data block, it sets the
+ * @id thus making the data block available for invalidation. This
+ * _acquire() ensures that this task sees all data ringbuffer and
+ * descriptor values seen by the writer as @id was set. This is
+ * necessary to ensure that the data block can be correctly identified
+ * as valid (i.e. @begin_lpos, @next_lpos, @head_lpos are at least the
+ * values seen by that writer, which yielded a valid data block at
+ * that time). It is not enough to rely on the address dependency of
+ * @desc to @id because @head_lpos is not depedent on @id. This pairs
+ * with the _release() in dataring_datablock_setid().
+ *
+ * Memory barrier involvement:
+ *
+ * If dB reads from gA, then dC reads from fG.
+ * If dB reads from gA, then dD reads from fH.
+ * If dB reads from gA, then dE reads from fE.
+ *
+ * Note that if dB reads from gA, then dC cannot read from fC.
+ * Note that if dB reads from gA, then dD cannot read from fD.
+ *
+ * Relies on:
+ *
+ * RELEASE from fG to gA
+ * matching
+ * ADDRESS DEP. from dB to dC
+ *
+ * RELEASE from fH to gA
+ * matching
+ * ADDRESS DEP. from dB to dD
+ *
+ * RELEASE from fE to gA
+ * matching
+ * ACQUIRE from dB to dE
+ */
+ desc = dr->getdesc(smp_load_acquire(&db->id), dr->getdesc_arg);
+ if (!desc) {
+ /*
+ * The data block @id is invalid. The data block is either in
+ * use by the writer (@id not yet set) or has already been
+ * invalidated by another task and the data array area or
+ * descriptor have already been recycled. The latter case
+ * (descriptor already recycled) relies on the implementation
+ * of getdesc(), which, when using an smp_rmb(), must allow
+ * this task to see @tail_lpos as it was visible to the task
+ * that changed the ID-to-descriptor mapping. See the
+ * implementation of getdesc() for details.
+ */
+ goto out;
+ }
+
+ /*
+ * dC:
+ *
+ * Even though the data block @id was determined to be valid, it is
+ * possible that it is a data block recently made available and @id
+ * has not yet been initialized. The @id needs to be re-validated (dF)
+ * after checking if the descriptor points to the data block. Use
+ * _acquire() to ensure that the re-loading of @id occurs after
+ * loading @begin_lpos. This pairs with the _release() in
+ * dataring_push(). See fG for details.
+ */
+ begin_lpos = smp_load_acquire(&desc->begin_lpos);
+
+ if (begin_lpos != tail_lpos) {
+ /*
+ * @desc is not describing the data block at @tail_lpos. Since
+ * a data block and its descriptor always become valid before
+ * @id is set (see dB for details) the data block at
+ * @tail_lpos has already been invalidated.
+ */
+ goto out;
+ }
+
+ /* dD: */
+ next_lpos = READ_ONCE(desc->next_lpos);
+
+ if (!_datablock_valid(dr,
+ /* dE: */
+ atomic_long_read(&dr->head_lpos),
+ tail_lpos, begin_lpos, next_lpos)) {
+ /* Another task has already invalidated the data block. */
+ goto out;
+ }
+
+ /* dF: */
+ if (dr->getdesc(READ_ONCE(db->id), dr->getdesc_arg) != desc) {
+ /*
+ * The data block ID has changed. The rare case of an
+ * uninitialized @db->id matching the descriptor ID was hit.
+ * This is a special case and it applies to the failure of the
+ * previous @id check (dB).
+ */
+ goto out;
+ }
+
+ /* dG: */
+ new_tail_lpos = atomic_long_cmpxchg_relaxed(&dr->tail_lpos,
+ begin_lpos, next_lpos);
+ if (new_tail_lpos == begin_lpos)
+ return next_lpos;
+ return new_tail_lpos;
+out:
+ /*
+ * dH:
+ *
+ * Ensure that the updated @tail_lpos is visible if the data block has
+ * been invalidated. This pairs with the smp_mb() in dataring_push()
+ * (see fB for details) as well as with the ID synchronization used in
+ * the getdesc() implementation, which must guarantee that an
+ * smp_rmb() is sufficient for seeing an updated @tail_lpos (see the
+ * implementation of getdesc() for details).
+ */
+ smp_rmb();
+
+ /* dI: */
+ return atomic_long_read(&dr->tail_lpos);
+}
+
+/**
+ * get_new_lpos() - Determine the logical positions of a new data block.
+ *
+ * @dr: The data ringbuffer to contain the data.
+ *
+ * @size: The (alignment padded) size of the new data block.
+ *
+ * @begin_lpos_out: A pointer where to set the begin logical position value.
+ * This will be the beginning of the data block.
+ *
+ * @next_lpos_out: A pointer where to set the next logical position value.
+ * This value is used to identify the end of the data block.
+ *
+ * Based on the logical position @head_lpos, determine @begin_lpos and
+ * @next_lpos values for a new data block. If the data block would overwrite
+ * the tail data block, this function will invalidate the tail data block,
+ * thus providing itself space for the new data block.
+ *
+ * IMPORTANT: This function can push or see a pushed data ringbuffer tail.
+ * If the caller will be modifying @begin_lpos or @next_lpos of a descriptor
+ * or will be modifying the head, a full memory barrier is required before
+ * doing so.
+ *
+ * Return: true if logical positions were determined, otherwise false.
+ *
+ * This will only fail if it was not possible to invalidate the tail data
+ * block (i.e. the @id of the tail data block was not yet set by its writer).
+ */
+static bool get_new_lpos(struct dataring *dr, unsigned int size,
+ unsigned long *begin_lpos_out,
+ unsigned long *next_lpos_out)
+{
+ unsigned long data_begin_lpos;
+ unsigned long new_tail_lpos;
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+ unsigned long tail_lpos;
+
+ /* eA: #1 */
+ tail_lpos = atomic_long_read(&dr->tail_lpos);
+
+ for (;;) {
+ begin_lpos = atomic_long_read(&dr->head_lpos);
+ data_begin_lpos = begin_lpos;
+
+ for (;;) {
+ next_lpos = data_begin_lpos + size;
+
+ if (next_lpos - tail_lpos > DATA_SIZE(dr)) {
+ /* would overwrite oldest */
+
+ new_tail_lpos = _dataring_pop(dr, tail_lpos);
+ if (new_tail_lpos == tail_lpos)
+ return false;
+ /* eA: #2 */
+ tail_lpos = new_tail_lpos;
+ break;
+ }
+
+ if (DATA_WRAPS(dr, data_begin_lpos) ==
+ DATA_WRAPS(dr, next_lpos)) {
+ *begin_lpos_out = begin_lpos;
+ *next_lpos_out = next_lpos;
+ return true;
+ }
+
+ data_begin_lpos =
+ DATA_THIS_WRAP_START_LPOS(dr, next_lpos);
+ }
+ }
+}
+
+/**
+ * dataring_push() - Reserve a data block in the data array.
+ *
+ * @dr: The data ringbuffer to reserve data in.
+ *
+ * @size: The size to reserve.
+ *
+ * @desc: A pointer to a descriptor to store the data block information.
+ *
+ * @id: The ID of the descriptor to be associated.
+ * The data block will not be set with @id, but rather initialized with
+ * a value that is explicitly different than @id. This is to handle the
+ * case when newly available garbage by chance matches the descriptor
+ * ID.
+ *
+ * This function expects to move the head pointer forward. If this would
+ * result in overtaking the data array index of the tail, the tail data block
+ * will be invalidated.
+ *
+ * Return: A pointer to the reserved writer data, otherwise NULL.
+ *
+ * This will only fail if it was not possible to invalidate the tail data
+ * block.
+ */
+char *dataring_push(struct dataring *dr, unsigned int size,
+ struct dr_desc *desc, unsigned long id)
+{
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+ struct dr_datablock *db;
+ bool ret;
+
+ to_db_size(&size);
+
+ do {
+ /* fA: */
+ ret = get_new_lpos(dr, size, &begin_lpos, &next_lpos);
+
+ /*
+ * fB:
+ *
+ * The data ringbuffer tail may have been pushed (by this or
+ * any other task). The updated @tail_lpos must be visible to
+ * all observers before changes to @begin_lpos, @next_lpos, or
+ * @head_lpos by this task are visible in order to allow other
+ * tasks to recognize the invalidation of the data blocks.
+ * This pairs with the smp_rmb() in _dataring_pop() as well as
+ * any reader task using smp_rmb() to post-validate data that
+ * has been read from a data block.
+ *
+ * Memory barrier involvement:
+ *
+ * If dE reads from fE, then dI reads from fA->eA.
+ * If dC reads from fG, then dI reads from fA->eA.
+ * If dD reads from fH, then dI reads from fA->eA.
+ * If mC reads from fH, then mF reads from fA->eA.
+ *
+ * Relies on:
+ *
+ * FULL MB between fA->eA and fE
+ * matching
+ * RMB between dE and dI
+ *
+ * FULL MB between fA->eA and fG
+ * matching
+ * RMB between dC and dI
+ *
+ * FULL MB between fA->eA and fH
+ * matching
+ * RMB between dD and dI
+ *
+ * FULL MB between fA->eA and fH
+ * matching
+ * RMB between mC and mF
+ */
+ smp_mb();
+
+ if (!ret) {
+ /*
+ * Force @desc permanently invalid to minimize risk
+ * of the descriptor later unexpectedly being
+ * determined as valid due to overflowing/wrapping of
+ * @head_lpos. An unaligned @begin_lpos can never
+ * point to a data block and having the same value
+ * for @begin_lpos and @next_lpos is also invalid.
+ */
+
+ /* fC: */
+ WRITE_ONCE(desc->begin_lpos, 1);
+
+ /* fD: */
+ WRITE_ONCE(desc->next_lpos, 1);
+
+ return NULL;
+ }
+ /* fE: */
+ } while (atomic_long_cmpxchg_relaxed(&dr->head_lpos, begin_lpos,
+ next_lpos) != begin_lpos);
+
+ db = to_datablock(dr, begin_lpos);
+
+ /*
+ * fF:
+ *
+ * @db->id is a garbage value and could possibly match the @id. This
+ * would be a problem because the data block would be considered
+ * valid before the writer has finished with it (i.e. before the
+ * writer has set @id). Force some other ID value.
+ */
+ WRITE_ONCE(db->id, id - 1);
+
+ /*
+ * fG:
+ *
+ * Ensure that @db->id is initialized to a wrong ID value before
+ * setting @begin_lpos so that there is no risk of accidentally
+ * matching a data block to a descriptor before the writer is finished
+ * with it (i.e. before the writer has set the correct @id). This
+ * pairs with the _acquire() in _dataring_pop().
+ *
+ * Memory barrier involvement:
+ *
+ * If dC reads from fG, then dF reads from fF.
+ *
+ * Relies on:
+ *
+ * RELEASE from fF to fG
+ * matching
+ * ACQUIRE from dC to dF
+ */
+ smp_store_release(&desc->begin_lpos, begin_lpos);
+
+ /* fH: */
+ WRITE_ONCE(desc->next_lpos, next_lpos);
+
+ /* If this data block wraps, use @data from the content data block. */
+ if (DATA_WRAPS(dr, begin_lpos) != DATA_WRAPS(dr, next_lpos))
+ db = to_datablock(dr, 0);
+
+ return &db->data[0];
+}
+
+/**
+ * dataring_datablock_setid() - Set the @id for a data block.
+ *
+ * @dr: The data ringbuffer containing the data block.
+ *
+ * @desc: A descriptor of the data block to set the iD of.
+ *
+ * @id: The ID value to set.
+ *
+ * The data block ID is not a field within @desc, but rather is part of the
+ * data block within the data array (struct dr_datablock). Once this value is
+ * set (and only when it is set), it is possible for the data block to be
+ * invalidated. Upon calling this function, the writer data will be fully
+ * stored in the data ringbuffer. The writer must not perform any operations
+ * on the data block after calling this function.
+ */
+void dataring_datablock_setid(struct dataring *dr, struct dr_desc *desc,
+ unsigned long id)
+{
+ struct dr_datablock *db;
+
+ db = to_datablock(dr, READ_ONCE(desc->begin_lpos));
+
+ /*
+ * gA:
+ *
+ * Ensure that all information (within the data ringbuffer and the
+ * descriptor) for the valid data block are visible before allowing
+ * the data block to be invalidated. This includes all storage of
+ * writer data for this data block. After this _release() the writer
+ * is no longer allowed to write to the data block. It pairs with the
+ * load_acquire() in _dataring_pop(). See dB for details.
+ */
+ smp_store_release(&db->id, id);
+}
+
+/**
+ * dataring_pop() - Invalidate the tail data block.
+ *
+ * @dr: The data ringbuffer to invalidatd the tail data block in.
+ *
+ * This is a public wrapper for _dataring_pop(). See _dataring_pop() for
+ * more details about this function.
+ *
+ * Return: true if the tail was invalidated, otherwise false.
+ *
+ * This function returns true if it notices the tail was invalidated even if
+ * it was not the task the did the invalidation.
+ */
+bool dataring_pop(struct dataring *dr)
+{
+ unsigned long tail_lpos;
+
+ tail_lpos = atomic_long_read(&dr->tail_lpos);
+
+ return (_dataring_pop(dr, tail_lpos) != tail_lpos);
+}
+
+/**
+ * dataring_getdatablock() - Return the data block of a descriptor.
+ *
+ * @dr: The data ringbuffer containing the data block.
+ *
+ * @desc: The descriptor to retrieve the data block of.
+ *
+ * @size: A pointer to a variable to set the size of the data block.
+ * @size will include any alignment padding that may have been added.
+ *
+ * Since datablocks always contain contiguous data, the situation can occur
+ * where there is not enough space at the end of the array for a new data
+ * block. In this situation, two data blocks are created:
+ *
+ * * A "wrapping" data block at the end of the data array.
+ * * A "content" data block at the beginning of the data array.
+ *
+ * In this situation, @desc will contain the beginning logical position of the
+ * wrapping data block and the end logical position of the content data block
+ * Note that the ID is stored in the wrapping data block.
+ *
+ * This function transparently handles wrapping/content data blocks to return
+ * the data block with the data content. Note that @desc->id is a private
+ * field and thus must not be directly accessed by callers. For
+ * wrapping/content data blocks, @desc->id of the content data block is
+ * garbage.
+ *
+ * Return: A pointer to the data block. Also, the variable pointed to by @size
+ * is set to the size of the data block.
+ */
+struct dr_datablock *dataring_getdatablock(struct dataring *dr,
+ struct dr_desc *desc, int *size)
+{
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+
+ begin_lpos = READ_ONCE(desc->begin_lpos);
+ next_lpos = READ_ONCE(desc->next_lpos);
+
+ if (DATA_WRAPS(dr, begin_lpos) == DATA_WRAPS(dr, next_lpos)) {
+ *size = next_lpos - begin_lpos;
+ } else {
+ /* Use the content data block. */
+ *size = DATA_INDEX(dr, next_lpos);
+ begin_lpos = 0;
+ }
+ *size -= sizeof(struct dr_datablock);
+
+ return to_datablock(dr, begin_lpos);
+}
+
+/**
+ * dataring_block_copy() - Copy information referring to a data block.
+ *
+ * @dst: The data block to copy the information to.
+ *
+ * @src: The data block to copy the information from.
+ *
+ * This function only copies the data block information (i.e. begin and
+ * end logical positions for a data block). After calling this function
+ * both descriptors are referring to the same data block.
+ *
+ * This can be useful for tasks that want a local copy of a descriptor
+ * so that validation can be performed without concern of the descriptor
+ * values changing during validation.
+ */
+void dataring_desc_copy(struct dr_desc *dst, struct dr_desc *src)
+{
+ dst->begin_lpos = READ_ONCE(src->begin_lpos);
+ dst->next_lpos = READ_ONCE(src->next_lpos);
+}
diff --git a/kernel/printk/dataring.h b/kernel/printk/dataring.h
new file mode 100644
index 000000000000..346a455a335a
--- /dev/null
+++ b/kernel/printk/dataring.h
@@ -0,0 +1,95 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_DATARING_H
+#define _KERNEL_PRINTK_DATARING_H
+
+#include <linux/atomic.h>
+
+/**
+ * struct dr_datablock - A contiguous block of data reserved for a writer.
+ *
+ * @id: An ID assigned by the writer.
+ * This has no pre-initialized value. IMPORTANT: The size of this
+ * field MUST be less than or equal to the alignment padding used by
+ * ALIGN() in to_db_size() and @id must be located at the beginning of
+ * the structure.
+ *
+ * @data: The writer data.
+ *
+ * A pointer to the beginning of a data block can be directly typecasted to
+ * this structure to access the data block fields and their content.
+ */
+struct dr_datablock {
+ /* private */
+ unsigned long id;
+
+ /* public */
+ char data[0];
+};
+
+/**
+ * struct dr_desc - Meta data describing a data block.
+ *
+ * @begin_lpos: The logical position of the beginning of the data block.
+ * The array index can be derived from the logical position.
+ *
+ * @next_lpos: The logical position of the next data block.
+ * This is the @begin_lpos of the adjacent data block and is used
+ * to determine the size of the data block being described.
+ */
+struct dr_desc {
+ /* private */
+ unsigned long begin_lpos;
+ unsigned long next_lpos;
+};
+
+/**
+ * struct dataring - A data ringbuffer with support for entry IDs.
+ *
+ * @size_bits: The power-of-2 size of the ringbuffer's data array.
+ *
+ * @data: A pointer to the ringbuffer's data array.
+ *
+ * @head_lpos: The @next_lpos value of the most recently pushed data block.
+ * The array index can be derived from the logical position.
+ *
+ * @tail_lpos: The @begin_lpos value of the oldest data block.
+ * The array index can be derived from the logical position.
+ *
+ * @getdesc: A callback function to get a descriptor for a data block.
+ * IMPORTANT: The lookup must be implemented in such a way to
+ * ensure that if the ID of a descriptor has been updated, any
+ * changed data ringbuffer and descriptor fields can be
+ * synchronized using a pairing smp_rmb().
+ *
+ * @getdesc_arg: An argument that will be passed to the getdesc() callback.
+ *
+ * IDs are used in order to get a descriptor given an ID. It is the
+ * responsibility of the user to implement this functionality via
+ * dataring_datablock_setid() and dataring.getdesc().
+ */
+struct dataring {
+ /* private */
+ unsigned int size_bits;
+ char *data;
+ atomic_long_t head_lpos;
+ atomic_long_t tail_lpos;
+
+ struct dr_desc *(*getdesc)(unsigned long id, void *arg);
+ void *getdesc_arg;
+};
+
+bool dataring_checksize(struct dataring *dr, unsigned int size);
+
+bool dataring_pop(struct dataring *dr);
+char *dataring_push(struct dataring *dr, unsigned int size,
+ struct dr_desc *desc, unsigned long id);
+
+void dataring_datablock_setid(struct dataring *dr, struct dr_desc *desc,
+ unsigned long id);
+struct dr_datablock *dataring_getdatablock(struct dataring *dr,
+ struct dr_desc *desc, int *size);
+bool dataring_datablock_isvalid(struct dataring *dr, struct dr_desc *desc);
+void dataring_desc_copy(struct dr_desc *dst, struct dr_desc *src);
+
+#endif /* _KERNEL_PRINTK_DATARING_H */
diff --git a/kernel/printk/numlist.c b/kernel/printk/numlist.c
new file mode 100644
index 000000000000..df3f89e7f7fd
--- /dev/null
+++ b/kernel/printk/numlist.c
@@ -0,0 +1,375 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/sched.h>
+#include "numlist.h"
+
+/**
+ * DOC: numlist overview
+ *
+ * A numlist is a lockless FIFO queue list, where the nodes of the list are
+ * sequentially numbered. A numlist can be iterated over by non-consuming
+ * readers. The purpose of the sequence numbers is so that a non-consuming
+ * reader can identify if it has missed nodes (for example, if they were
+ * popped off the list before the non-consuming reader could finish reading
+ * them).
+ *
+ * IDs vs. Pointers
+ * ----------------
+ * Rather than nodes being linked by pointers, each node is given an ID and
+ * has the ID of the next node in the list. IDs are used in order to avoid an
+ * ABA problem between writers when assigning sequence numbers.
+ *
+ * The ABA problem using pointers can be seen if the list has one node and
+ * while CPU0 is adding a second node another CPU adds and removes nodes
+ * (recycling the node that CPU0 saw as the head)::
+ *
+ * CPU0 CPU1
+ * ---- ----
+ * (enter numlist_push())
+ * head = READ_ONCE(h->head);
+ * seq = READ_ONCE(head->seq);
+ * WRITE_ONCE(n->seq, seq + 1);
+ * WRITE_ONCE(n->next, NULL);
+ * numlist_push(); // add some node
+ * numlist_pop(); // remove "head"
+ * numlist_push(); // re-add "head"
+ * cmpxchg(&h->head, head, n);
+ * WRITE_ONCE(head->next, n);
+ * (exit numlist_push())
+ *
+ * Let the original head have a sequence number of 1. Then after the above
+ * scenario the list nodes could have the following sequence numbers::
+ *
+ * 1 -> 2 -> 3 -> 2
+ *
+ * The problem is that the cmpxchg() is succeeding even though the head that
+ * CPU0 wants to replace is not the head it expects. This problem is avoided
+ * by using IDs, which indirectly provide tagged state references for the
+ * nodes. The number of tagged states per node is::
+ *
+ * sizeof(long) / max number of nodes in the list
+ *
+ * Using IDs instead of pointers, the cmpxchg() will fail as it should.
+ *
+ * Numlist users are required to implement a function to map ID values to
+ * numlist nodes and are responsible for issuing IDs to numlist nodes.
+ *
+ * Behavior
+ * --------
+ * A numbered list has two conditions in order to pop the tail node from the
+ * list:
+ *
+ * * The tail node is not the only node on the list.
+ * * The tail node is not busy.
+ *
+ * The first condition is necessary to support writers adding new nodes to the
+ * list. Such writers must modify the "next pointer" of the previous head,
+ * which becomes complex/racey if that previous head could be removed from the
+ * list and recycled during the update. One consequence of this condition is
+ * that numbered lists must be initialized such that the tail and head are
+ * already pointing to a node.
+ *
+ * For the second condition, the term "busy" is defined by the user, who must
+ * implement a function to report if a node is busy. This gives the user
+ * control to decide if a node can be removed from the list.
+ *
+ * For non-consuming readers, the function numlist_read() needs to be called a
+ * second time after reading node data to ensure the node is still valid.
+ * Nodes can become invalid while being read by non-consuming readers.
+ */
+
+/**
+ * numlist_read() - Read the information stored within a node.
+ *
+ * @nl: The numbered list to use.
+ *
+ * @id: The ID of the node to read from.
+ *
+ * @seq: A pointer to a variable to store the sequence number of the node.
+ * This may be NULL if the caller is not interested in this value.
+ *
+ * @next_id: A pointer to a variable to store the next node ID in the list.
+ * This may be NULL if the caller is not interested in this value.
+ * If this function stores the node's own ID to @next_id, this is
+ * the last node on the list. Note that a node does not know its own
+ * ID. It is up to the caller to track the node IDs during
+ * traversal.
+ *
+ * This function can be used to read node contents for both a node that is
+ * part of a list or a node that is off-list. (Although, from this function it
+ * is not possible to identify if the node is part of a list.)
+ *
+ * Readers should call this function a second time after reading node data to
+ * ensure the node is still valid because it may have become invalid while the
+ * reader was reading the data.
+ *
+ * Return: true if the node was read, otherwise false.
+ *
+ * This function will fail if @id is not valid anytime during this function.
+ */
+bool numlist_read(struct numlist *nl, unsigned long id, unsigned long *seq,
+ unsigned long *next_id)
+{
+ struct nl_node *n;
+
+ n = nl->node(id, nl->node_arg);
+ if (!n)
+ return false;
+
+ if (seq) {
+ /*
+ * aA:
+ *
+ * Adresss dependency on @id.
+ */
+ *seq = READ_ONCE(n->seq);
+ }
+
+ if (next_id) {
+ /*
+ * aB:
+ *
+ * Adresss dependency on @id.
+ */
+ *next_id = READ_ONCE(n->next_id);
+ }
+
+ /*
+ * aC:
+ *
+ * Validate @seq and @next_id by making sure the node has not been
+ * recycled. This pairs with the smp_wmb() of the function that
+ * updates the ID, which is required to issue an smp_wmb() after
+ * updating the ID and before modifying fields of the node. See the
+ * smp_wmb() in prb_reserve() for details.
+ */
+ smp_rmb();
+
+ return (nl->node(id, nl->node_arg) != NULL);
+}
+
+/**
+ * numlist_read_tail() - Read the oldest node.
+ *
+ * @nl: The numbered list to use.
+ *
+ * @seq: A pointer to a variable to store the sequence number of the node.
+ * This may be NULL if the caller is not interested in this value.
+ *
+ * @next_id: A pointer to a variable to store the next node ID in the list.
+ * This may be NULL if the caller is not interested in this value.
+ *
+ * This function will not return until it has successfully read the tail
+ * node.
+ *
+ * Return: The ID of the tail node.
+ */
+unsigned long numlist_read_tail(struct numlist *nl, unsigned long *seq,
+ unsigned long *next_id)
+{
+ unsigned long tail_id;
+
+ tail_id = atomic_long_read(&nl->tail_id);
+
+ while (!numlist_read(nl, tail_id, seq, next_id)) {
+ /* @tail_id is invalid. Try again with an updated value. */
+
+ cpu_relax();
+
+ tail_id = atomic_long_read(&nl->tail_id);
+ }
+
+ return tail_id;
+}
+
+/**
+ * numlist_push() - Add a node to the list and assign it a sequence number.
+ *
+ * @nl: The numbered list to push to.
+ *
+ * @n: A node to push to the numbered list.
+ * The node must not already be part of a list.
+ *
+ * @id: The ID of the node.
+ *
+ * A node is added in two steps: The first step is to make this node the
+ * head, which causes a following push to add to this node. The second step is
+ * to update @next_id of the former head node to point to this one, which
+ * makes this node visible to any task that sees the former head node.
+ */
+void numlist_push(struct numlist *nl, struct nl_node *n, unsigned long id)
+{
+ unsigned long head_id;
+ unsigned long seq;
+ unsigned long r;
+
+ /*
+ * bA:
+ *
+ * Setup the node to be a list terminator: next_id == id.
+ */
+ WRITE_ONCE(n->next_id, id);
+
+ /* bB: #1 */
+ head_id = atomic_long_read(&nl->head_id);
+
+ for (;;) {
+ /* bC: */
+ while (!numlist_read(nl, head_id, &seq, NULL)) {
+ /*
+ * @head_id is invalid. Try again with an
+ * updated value.
+ */
+
+ cpu_relax();
+
+ /* bB: #2 */
+ head_id = atomic_long_read(&nl->head_id);
+ }
+
+ /*
+ * bD:
+ *
+ * Set @seq to +1 of @seq from the previous head.
+ *
+ * Memory barrier involvement:
+ *
+ * If bB reads from bE, then bC->aA reads from bD.
+ *
+ * Relies on:
+ *
+ * RELEASE from bD to bE
+ * matching
+ * ADDRESS DEP. from bB to bC->aA
+ */
+ WRITE_ONCE(n->seq, seq + 1);
+
+ /*
+ * bE:
+ *
+ * This store_release() guarantees that @seq and @next are
+ * stored before the node with @id is visible to any popping
+ * writers. It pairs with the address dependency between @id
+ * and @seq/@...t provided by numlist_read(). See bD and bF
+ * for details.
+ */
+ r = atomic_long_cmpxchg_release(&nl->head_id, head_id, id);
+ if (r == head_id)
+ break;
+
+ /* bB: #3 */
+ head_id = r;
+ }
+
+ n = nl->node(head_id, nl->node_arg);
+
+ /*
+ * The old head (which is still the list terminator), cannot be
+ * removed because the list will always have at least one node.
+ * Therefore @n must be non-NULL.
+ */
+
+ /*
+ * bF: the STORE part for @next_id
+ *
+ * Set @next_id of the previous head to @id.
+ *
+ * Memory barrier involvement:
+ *
+ * If bB reads from bE, then bF overwrites bA.
+ *
+ * Relies on:
+ *
+ * RELEASE from bA to bE
+ * matching
+ * ADDRESS DEP. from bB to bF
+ */
+ /*
+ * bG: the RELEASE part for @next_id
+ *
+ * This _release() guarantees that a reader will see the updates to
+ * this node's @seq/@...t_id if the reader saw the @next_id of the
+ * previous node in the list. It pairs with the address dependency
+ * between @id and @seq/@...t provided by numlist_read().
+ *
+ * Memory barrier involvement:
+ *
+ * If aB reads from bG, then aA' reads from bD, where aA' is in
+ * numlist_read() to read the node ID from bG.
+ * If aB reads from bG, then aB' reads from bA, where aB' is in
+ * numlist_read() to read the node ID from bG.
+ *
+ * Relies on:
+ *
+ * RELEASE from bG to bD
+ * matching
+ * ADDRESS DEP. from aB to aA'
+ *
+ * RELEASE from bG to bA
+ * matching
+ * ADDRESS DEP. from aB to aB'
+ */
+ smp_store_release(&n->next_id, id);
+}
+
+/**
+ * numlist_pop() - Remove the oldest node from the list.
+ *
+ * @nl: The numbered list from which to remove the tail node.
+ *
+ * The tail node can only be removed if two conditions are satisfied:
+ *
+ * * The node is not the only node on the list.
+ * * The node is not busy.
+ *
+ * If, during this function, another task removes the tail, this function
+ * will try again with the new tail.
+ *
+ * Return: The removed node or NULL if the tail node cannot be removed.
+ */
+struct nl_node *numlist_pop(struct numlist *nl)
+{
+ unsigned long tail_id;
+ unsigned long next_id;
+ unsigned long r;
+
+ /* cA: #1 */
+ tail_id = atomic_long_read(&nl->tail_id);
+
+ for (;;) {
+ /* cB */
+ while (!numlist_read(nl, tail_id, NULL, &next_id)) {
+ /*
+ * @tail_id is invalid. Try again with an
+ * updated value.
+ */
+
+ cpu_relax();
+
+ /* cA: #2 */
+ tail_id = atomic_long_read(&nl->tail_id);
+ }
+
+ /* Make sure the node is not the only node on the list. */
+ if (next_id == tail_id)
+ return NULL;
+
+ /*
+ * cC:
+ *
+ * Make sure the node is not busy.
+ */
+ if (nl->busy(tail_id, nl->busy_arg))
+ return NULL;
+
+ r = atomic_long_cmpxchg_relaxed(&nl->tail_id,
+ tail_id, next_id);
+ if (r == tail_id)
+ break;
+
+ /* cA: #3 */
+ tail_id = r;
+ }
+
+ return nl->node(tail_id, nl->node_arg);
+}
diff --git a/kernel/printk/numlist.h b/kernel/printk/numlist.h
new file mode 100644
index 000000000000..cdc3b21e6597
--- /dev/null
+++ b/kernel/printk/numlist.h
@@ -0,0 +1,72 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_NUMLIST_H
+#define _KERNEL_PRINTK_NUMLIST_H
+
+#include <linux/atomic.h>
+
+/**
+ * struct nl_node - A node of a numbered list.
+ *
+ * @seq: The sequence number assigned to this node.
+ *
+ * @next_id: The ID of the next node of the numbered list.
+ * If this is the node's own ID, this is the last node on the list.
+ * Note that a node does not know its own ID. It is up to the caller
+ * to track the node IDs during traversal.
+ *
+ * The fields are private and must not be directly accessed by users.
+ * numlist_read() is available for users to access these fields.
+ */
+struct nl_node {
+ /* private */
+ unsigned long seq;
+ unsigned long next_id;
+};
+
+/**
+ * struct numlist - A numbered list of nodes.
+ *
+ * @head_id: The ID of the most recently pushed node.
+ * Disregarding overflow, this node will have the highest sequence
+ * number.
+ *
+ * @tail_id: The ID of the oldest node.
+ * Disregarding overflow, this node will have the lowest sequence
+ * number.
+ *
+ * @node: A callback function to get a node for an ID.
+ * IDs must be implemented such that an smp_wmb() is issued after
+ * the ID of a node has been modified and before any further
+ * changes to the node are performed.
+ *
+ * @node_arg: An argument that will be passed to the node() callback.
+ *
+ * @busy: A callback function to determine if a node can be removed.
+ * Nodes that are "busy" will not be removed.
+ *
+ * @busy_arg: An argument that will be passed to the busy() callback.
+ *
+ * List nodes are sorted by their sequence numbers.
+ */
+struct numlist {
+ /* private */
+ atomic_long_t head_id;
+ atomic_long_t tail_id;
+
+ struct nl_node *(*node)(unsigned long id, void *arg);
+ void *node_arg;
+
+ bool (*busy)(unsigned long id, void *arg);
+ void *busy_arg;
+};
+
+void numlist_push(struct numlist *nl, struct nl_node *n, unsigned long id);
+struct nl_node *numlist_pop(struct numlist *nl);
+
+unsigned long numlist_read_tail(struct numlist *nl, unsigned long *seq,
+ unsigned long *next_id);
+bool numlist_read(struct numlist *nl, unsigned long id, unsigned long *seq,
+ unsigned long *next_id);
+
+#endif /* _KERNEL_PRINTK_NUMLIST_H */
diff --git a/kernel/printk/ringbuffer.c b/kernel/printk/ringbuffer.c
new file mode 100644
index 000000000000..59bf59aba3de
--- /dev/null
+++ b/kernel/printk/ringbuffer.c
@@ -0,0 +1,800 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/irqflags.h>
+#include <linux/string.h>
+#include <linux/err.h>
+#include "ringbuffer.h"
+
+/**
+ * DOC: prb overview
+ *
+ * As the name suggests, this ringbuffer was implemented specifically to
+ * serve the needs of the printk() infrastructure. The ringbuffer itself is
+ * not specific to printk and could be used for other purposes. However, the
+ * requirements and semantics of printk are rather unique. If you intend to
+ * use this ringbuffer for anything other than printk, you need to be very
+ * clear on its features, behavior, and limitations.
+ *
+ * Features
+ * --------
+ * * single global buffer
+ * * resides in initialized data section (available at early boot)
+ * * supports multiple concurrent lockless readers and writers
+ * * safe from any context (including NMI)
+ * * groups bytes into variable length data blocks (referenced by entries)
+ * * entries tagged with sequence numbers
+ *
+ * Terminology
+ * -----------
+ * * data block: A contiguous block of data containing an ID of an associated
+ * descriptor and the raw data from the writer.
+ *
+ * * descriptor: Meta data for a data block containing an ID, the logical
+ * positions of the associated data block, a unique sequence
+ * number, and a pointer to the next (newer) descriptor.
+ *
+ * * entry: A high level object used by the readers/writers that contains
+ * a descriptor as well as state information during the
+ * reserve/commit window.
+ *
+ * Data Structure
+ * --------------
+ * The ringbuffer implementation makes use of two internal data structures:
+ *
+ * * a data ringbuffer (dataring) to manage the raw data storage of entries
+ *
+ * * a numbered list (numlist) to sort and sequentially label committed
+ * entries
+ *
+ * Both dataring and numlist implementations make use of ID numbers to
+ * identify their entities. The ringbuffer implementation creates and manages
+ * those IDs using the same value for all data structures (i.e. ID=22 for a
+ * dataring data block and descriptor corresponds to ID=22 for the numlist
+ * node.)
+ *
+ * The ringbuffer implementation provides a high-level descriptor (prb_desc).
+ * This descriptor contains the ID, a dataring descriptor, and a numlist node,
+ * thus acting as the unifying entity for the various data structures.
+ *
+ * The descriptors are stored within a static array and IDs map directly to
+ * array offsets. The full range of IDs (unsigned long) is used in order to
+ * provide tagged state references for the descriptors to avoid ABA issues
+ * when the descriptors are recycled. The number of tagged states per
+ * descriptor is::
+ *
+ * sizeof(long) / number of descriptors in the array
+ *
+ * Behavior
+ * --------
+ * Since the printk ringbuffer is lockless, there exists no synchronization
+ * between readers and writers. Basically writers are the tasks in control and
+ * may overwrite any and all committed data at any time and from any context.
+ * For this reason readers can miss entries if they are overwritten before the
+ * reader was able to access the data. The reader API implementation is such
+ * that reader access to data is atomic, so there is no risk of readers having
+ * to deal with partial or corrupt data blocks. Also, entries include the
+ * sequence number of the associated descriptor so that readers can recognize
+ * if entries were missed.
+ *
+ * Writing to the ringbuffer consists of two steps:
+ *
+ * (1) Reserve data (prb_reserve()).
+ * (2) Commit the reserved data (prb_commit()).
+ *
+ * The sequence number is assigned on commit.
+ *
+ * Once committed, a writer must no longer access the data directly. This is
+ * because the data may have been overwritten and no longer exists. If a
+ * writer must access the data, it should either keep a private copy before
+ * committing or use the reader API to gain access to the data.
+ *
+ * Because of how the data backend (dataring) is implemented, data blocks that
+ * have been reserved but not yet committed act as blockers, preventing future
+ * writers from filling the ringbuffer beyond the location of the reserved but
+ * not yet committed data block region. For this reason it is important that
+ * writers perform both reserve and commit as quickly as possible. To assist
+ * with this, local interrupts are disabled during the reserve/commit window.
+ * Writers in NMI contexts can still preempt any other writers, but as long
+ * as these writers do not write a large amount of data with respect to the
+ * ringbuffer size, this should not become an issue.
+ *
+ * The reader API provides an iterator to traverse the list of committed
+ * entries. The iterator utilizes a user-provided entry buffer to copy into
+ * and validate the entry data upon traversal.
+ *
+ * Entry Lifecycle
+ * ---------------
+ * This is an overview of the lifecycle for an entry. It is meant to assist in
+ * understanding how the various data structures and their procedures are
+ * coordinated. Following is what happens within the reserve/commit window of
+ * a writer:
+ *
+ * (1) Reserve a descriptor (part 1 of 3): Invalidate the oldest data block by
+ * moving the dataring tail forward. (See the next step for why this was
+ * necessary.)
+ *
+ * (2) Reserve a descriptor (part 2 of 3): Remove the oldest descriptor from
+ * the committed list by removing the numlist tail. (Note that
+ * descriptors can only be removed if their associated data block is
+ * invalid. That invalidation was performed in the previous step.)
+ *
+ * (3) Reserve a descriptor (part 3 of 3): Invalidate the removed descriptor
+ * by modifying its ID.
+ *
+ * (4) Reserve data: Reserve a new data block by moving the dataring head
+ * forward.
+ *
+ * (5) Write data: Write the user data into the reserved data block.
+ *
+ * (6) Commit the data: Set the descriptor ID in the reserved data block.
+ *
+ * (7) Commit the descriptor (part 1 of 2): Add the descriptor to the
+ * committed list by setting it as the numlist head.
+ *
+ * (8) Commit the descriptor (part 2 of 2): Link the descriptor to the older
+ * committed entries by setting it as the "next pointer" of the former
+ * numlist head.
+ *
+ * Usage
+ * -----
+ * Here are some simple examples demonstrating writers and readers. For the
+ * examples it is assumed that a global ringbuffer is available::
+ *
+ * DECLARE_PRINTKRB(rb, 5, 7);
+ *
+ * This ringbuffer has a size of 4096 bytes, expects an average data size of
+ * 32 bytes, and allows up to 128 descriptors.
+ *
+ * Sample writer code::
+ *
+ * struct prb_reserved_entry e;
+ * char *s;
+ *
+ * s = prb_reserve(&e, &rb, 16);
+ * if (s) {
+ * snprintf(s, 16, "Hello, world!");
+ * prb_commit(&e);
+ * }
+ *
+ * Sample reader code::
+ *
+ * DECLARE_PRINTKRB_ENTRY(e, 64);
+ * struct prb_iterator iter;
+ * u64 last_seq = 0;
+ * int len;
+ * char *s;
+ *
+ * prb_for_each_entry(&iter, &rb, &e, len) {
+ * if (e.seq - last_seq != 1) {
+ * pr_warn("LOST %llu ENTRIES\n",
+ * e.seq - (last_seq + 1));
+ * }
+ * last_seq = e.seq;
+ *
+ * s = (char *)&e.buffer[0];
+ * if (len >= 64) {
+ * pr_warn("ENTRY %llu TRUNCATED\n", e.seq);
+ * s[64 - 1] = 0;
+ * }
+ * pr_info("%llu: %s\n", e.seq, s);
+ * }
+ */
+
+#define DESCS_COUNT(rb) (1 << (rb)->desc_count_bits)
+#define DESCS_COUNT_MASK(rb) (DESCS_COUNT(rb) - 1)
+
+/**
+ * to_desc() - Translate an ID to a descriptor.
+ *
+ * @rb: The ringbuffer containing the descriptor.
+ *
+ * @id: An ID value to translate.
+ *
+ * This is a low-level function to provide a safe mapping that always maps
+ * to a descriptor. Callers are responsible for checking the ID of the
+ * returned descriptor to see if the mapping was correct.
+ *
+ * Return: A pointer to a descriptor.
+ */
+static struct prb_desc *to_desc(struct printk_ringbuffer *rb,
+ unsigned long id)
+{
+ return &rb->descs[id & DESCS_COUNT_MASK(rb)];
+}
+
+/**
+ * prb_desc_node() - Numbered list callback to lookup a node from an ID.
+ *
+ * @id: The ID to lookup.
+ *
+ * @arg: The ringbuffer containing the numlist the node belongs to.
+ *
+ * Return: A pointer to the node or NULL if the ID is unknown.
+ *
+ * The returned pointer has an address dependency to @id.
+ */
+struct nl_node *prb_desc_node(unsigned long id, void *arg)
+{
+ struct prb_desc *d = to_desc(arg, id);
+
+ if (id != atomic_long_read(&d->id))
+ return NULL;
+
+ return &d->list;
+}
+
+/**
+ * prb_desc_busy() - Numbered list callback to report if a node is busy.
+ *
+ * @id: The ID of the node to check.
+ *
+ * @arg: The ringbuffer containing the numlist the node belongs to.
+ *
+ * This callback is used by numbered lists to determine if a node can be
+ * removed from the committed list. A descriptor is considered busy if the
+ * data block it references is valid. Data blocks must be invalidated before
+ * descriptors can be recycled.
+ *
+ * Return: true if the descriptor's data block is valid, otherwise false.
+ *
+ * If the specified ID is unknown, the (unknown) descriptor is reported as
+ * not busy.
+ */
+bool prb_desc_busy(unsigned long id, void *arg)
+{
+ struct printk_ringbuffer *rb = arg;
+ struct prb_desc *d = to_desc(rb, id);
+
+ /* hA: */
+ if (!dataring_datablock_isvalid(&rb->dr, &d->desc))
+ return false;
+
+ /*
+ * hB:
+ *
+ * Ensure that the data block that was just checked belongs to the
+ * expected descriptor. Writers modify the descriptor ID before making
+ * any other changes to the data block or descriptor. This pairs with
+ * smp_wmb() in prb_reserve(). See kB for details.
+ */
+ smp_rmb();
+
+ /* hC: */
+ return (id == atomic_long_read(&d->id));
+}
+
+/**
+ * prb_getdesc() - Data ringbuffer callback to lookup a descriptor from an ID.
+ *
+ * @id: The ID to lookup.
+ *
+ * @arg: The ringbuffer containing the dataring the descriptor belongs to.
+ *
+ * The data ringbuffer requires that the caller, by issuing an smp_rmb()
+ * after this function, will see data ringbuffer updates that were visible to
+ * the task that last updated the ID.
+ *
+ * Return: A pointer to the dataring descriptor or NULL if the ID is unknown.
+ *
+ * The returned pointer has an address dependency to @id.
+ */
+struct dr_desc *prb_getdesc(unsigned long id, void *arg)
+{
+ struct prb_desc *d = to_desc(arg, id);
+
+ /*
+ * iA:
+ *
+ * Since the values of @id correspond to the values of @d->id, it is
+ * enough that the ID updating task performs a _release() in
+ * assign_desc(). The smp_rmb() issued by the caller after calling
+ * this function pairs with that _release(). See jB for details.
+ */
+ if (id != atomic_long_read(&d->id))
+ return NULL;
+
+ /* iB: */
+ return &d->desc;
+}
+
+/**
+ * assign_desc() - Assign a descriptor to the caller.
+ *
+ * @e: The entry structure to store the assigned descriptor to.
+ *
+ * Find an available descriptor to assign to the caller. First it is checked
+ * if the tail descriptor from the committed list can be recycled. If not,
+ * perhaps a never-used descriptor is available. Otherwise, data blocks will
+ * be invalidated until the tail descriptor from the committed list can be
+ * recycled.
+ *
+ * Assigned descriptors are invalid until data has been reserved for them.
+ *
+ * Return: true if a descriptor was assigned, otherwise false.
+ *
+ * This will only fail if it was not possible to invalidate data blocks in
+ * order to recycle a descriptor. This can happen if a writer has reserved but
+ * not yet committed data and that reserved data is currently the oldest data.
+ */
+static bool assign_desc(struct prb_reserved_entry *e)
+{
+ struct printk_ringbuffer *rb = e->rb;
+ struct prb_desc *d;
+ struct nl_node *n;
+ unsigned long i;
+
+ for (;;) {
+ /*
+ * jA:
+ *
+ * Try to recycle a descriptor on the committed list.
+ */
+ n = numlist_pop(&rb->nl);
+ if (n) {
+ d = container_of(n, struct prb_desc, list);
+ break;
+ }
+
+ /* Fallback to static never-used descriptors. */
+ if (atomic_read(&rb->desc_next_unused) < DESCS_COUNT(rb)) {
+ i = atomic_fetch_inc(&rb->desc_next_unused);
+ if (i < DESCS_COUNT(rb)) {
+ d = &rb->descs[i];
+ atomic_long_set(&d->id, i);
+ break;
+ }
+ }
+
+ /*
+ * No descriptor available. Make one available for recycling
+ * by invalidating data (which some descriptor will be
+ * referencing).
+ */
+ if (!dataring_pop(&rb->dr))
+ return false;
+ }
+
+ /*
+ * jB:
+ *
+ * Modify the descriptor ID so that users of the descriptor see that
+ * it has been recycled. A _release() is used so that prb_getdesc()
+ * callers can see all data ringbuffer updates after issuing a
+ * pairing smb_rmb(). See iA for details.
+ *
+ * Memory barrier involvement:
+ *
+ * If dB->iA reads from jB, then dI reads the same value as
+ * jA->cD->hA.
+ *
+ * Relies on:
+ *
+ * RELEASE from jA->cD->hA to jB
+ * matching
+ * RMB between dB->iA and dI
+ */
+ atomic_long_set_release(&d->id, atomic_long_read(&d->id) +
+ DESCS_COUNT(rb));
+
+ e->desc = d;
+ return true;
+}
+
+/**
+ * prb_reserve() - Reserve data in the ringbuffer.
+ *
+ * @e: The entry structure to setup.
+ *
+ * @rb: The ringbuffer to reserve data in.
+ *
+ * @size: The size of the data to reserve.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: A pointer to the reserved data or an ERR_PTR if data could not be
+ * reserved.
+ *
+ * If the provided size is legal, this will only fail if it was not possible
+ * to invalidate the oldest data block. This can happen if a writer has
+ * reserved but not yet committed data and that reserved data is currently
+ * the oldest data.
+ *
+ * The ERR_PTR values and their meaning:
+ *
+ * * -EINVAL: illegal @size value
+ * * -EBUSY: failed to reserve a descriptor (@fail count incremented)
+ * * -ENOMEM: failed to reserve data (invalid descriptor committed)
+ */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ unsigned int size)
+{
+ struct prb_desc *d;
+ unsigned long id;
+ char *buf;
+
+ if (!dataring_checksize(&rb->dr, size))
+ return ERR_PTR(-EINVAL);
+
+ e->rb = rb;
+
+ /*
+ * Disable interrupts during the reserve/commit window in order to
+ * minimize the number of reserved but not yet committed data blocks
+ * in the data ringbuffer. Although such data blocks are not bad per
+ * se, they act as blockers for writers once the data ringbuffer has
+ * wrapped back to them.
+ */
+ local_irq_save(e->irqflags);
+
+ /* kA: */
+ if (!assign_desc(e)) {
+ /* Failures to reserve descriptors are counted. */
+ atomic_long_inc(&rb->fail);
+ buf = ERR_PTR(-EBUSY);
+ goto err_out;
+ }
+
+ d = e->desc;
+
+ /*
+ * kB:
+ *
+ * The descriptor ID has been updated so that its users can see that
+ * it is now invalid. Issue an smp_wmb() so that upcoming changes to
+ * the descriptor will not be associated with the old descriptor ID.
+ * This pairs with the smp_rmb() of prb_desc_busy() (see hB for
+ * details) and the smp_rmb() within numlist_read() and the smp_rmb()
+ * of prb_iter_next_valid_entry() (see mD for details).
+ *
+ * Memory barrier involvement:
+ *
+ * If hA reads from kC, then hC reads from jB.
+ * If mC reads from kC, then mE reads from jB.
+ *
+ * Relies on:
+ *
+ * WMB between jB and kC
+ * matching
+ * RMB between hA and hC
+ *
+ * WMB between jB and kC
+ * matching
+ * RMB between mC and mE
+ */
+ smp_wmb();
+
+ id = atomic_long_read(&d->id);
+
+ /* kC: */
+ buf = dataring_push(&rb->dr, size, &d->desc, id);
+ if (!buf) {
+ /* Put the invalid descriptor on the committed list. */
+ numlist_push(&rb->nl, &d->list, id);
+ buf = ERR_PTR(-ENOMEM);
+ goto err_out;
+ }
+
+ return buf;
+err_out:
+ local_irq_restore(e->irqflags);
+ return buf;
+}
+EXPORT_SYMBOL(prb_reserve);
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+ struct printk_ringbuffer *rb = e->rb;
+ struct prb_desc *d = e->desc;
+ unsigned long id;
+
+ id = atomic_long_read(&d->id);
+
+ /*
+ * lA:
+ *
+ * Commit all writer data to the data ringbuffer. By setting the ID,
+ * the data will be available for invalidation.
+ */
+ dataring_datablock_setid(&rb->dr, &d->desc, id);
+
+ /*
+ * lB:
+ *
+ * Add the descriptor to the committed list. Then it will be visible
+ * to readers and popping writers (as long as all preceding
+ * descriptors have also completed the numlist_push() call).
+ */
+ numlist_push(&rb->nl, &d->list, id);
+
+ /* The reserve/commit window is closed. Re-enable interrupts. */
+ local_irq_restore(e->irqflags);
+}
+EXPORT_SYMBOL(prb_commit);
+
+/**
+ * prb_iter_init() - Initialize an iterator.
+ *
+ * @iter: The iterator to initialize.
+ *
+ * @rb: The ringbuffer to associate with the iterator.
+ *
+ * @e: An entry structure to use during iteration.
+ *
+ * This is the public function available to readers to initialize an iterator.
+ *
+ * As an alternative, DECLARE_PRINTKRB_ITER() can be used.
+ *
+ * The interator is initialized to the beginning of the committed list (the
+ * oldest committed entry).
+ *
+ * Context: Any context.
+ */
+void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
+ struct prb_entry *e)
+{
+ iter->rb = rb;
+ iter->e = e;
+
+ iter->last_id = 0;
+ iter->last_seq = 0;
+ iter->next_id = 0;
+}
+EXPORT_SYMBOL(prb_iter_init);
+
+/**
+ * reset_iter() - Reset an iterator to the beginning of the committed list.
+ *
+ * @iter: The iterator to reset.
+ */
+static void reset_iter(struct prb_iterator *iter)
+{
+ unsigned long last_seq;
+
+ iter->next_id = numlist_read_tail(&iter->rb->nl, &last_seq, NULL);
+
+ /* Pretend the entry preceding the oldest entry was last seen. */
+ iter->last_seq = last_seq - 1;
+
+ /*
+ * @last_id is only significant in EOL situations, when it is equal to
+ * @next_id and the iterator wants to read the entry after @last_id as
+ * the next entry. Set @last_id to something other than @next_id. So
+ * that the iterator will read @next_id as the next entry.
+ */
+ iter->last_id = iter->next_id - 1;
+}
+
+/**
+ * setup_next() - Prepare an iterator to read the next entry.
+ *
+ * @nl: The numbered list used for the committed list.
+ *
+ * @iter: The iterator to prepare.
+ *
+ * Evaluate the current state of the iterator and committed list and determine
+ * if and how the iterator can proceed. For example, if the iterator
+ * previously hit the end of the list, there may now be new entries available
+ * for traversing.
+ *
+ * If the last seen entry no longer exists, the iterator is reset to the
+ * beginning of the committed list. The oldest available entry will be newer
+ * than the last seen entry.
+ *
+ * Return: true if a next entry is available for reading, otherwise false.
+ */
+static bool setup_next(struct numlist *nl, struct prb_iterator *iter)
+{
+ unsigned long next;
+
+ if (iter->last_id == iter->next_id) {
+ /* previously hit EOL, check for updated next */
+
+ if (!numlist_read(nl, iter->last_id, NULL, &next))
+ reset_iter(iter);
+ else if (next != iter->next_id)
+ iter->next_id = next;
+ else
+ return false;
+ }
+
+ return true;
+}
+
+/**
+ * prb_iter_next_valid_entry() - Traverse to and read the next (newer) entry.
+ *
+ * @iter: The iterator used for list traversal.
+ *
+ * This is the public function available to readers to traverse the committed
+ * entry list.
+ *
+ * If the iterator has not yet been used or has fallen behind and no longer
+ * has a reference to a valid entry, the next read entry will be the oldest
+ * in the committed list (which will be newer than the previously read entry).
+ *
+ * Context: Any context.
+ * Return: The size of the entry data or 0 if there is no next entry.
+ *
+ * The entry data is padded (if necessary) to allow alignment for following
+ * data blocks. Therefore the returned size value can be larger than the size
+ * reserved. If users want the exact size to be tracked, they should include
+ * this information within their data.
+ */
+int prb_iter_next_valid_entry(struct prb_iterator *iter)
+{
+ struct printk_ringbuffer *rb = iter->rb;
+ struct prb_entry *e = iter->e;
+ struct dataring *dr = &rb->dr;
+ struct numlist *nl = &rb->nl;
+ struct dr_datablock *db;
+ unsigned long next_id;
+ struct dr_desc desc;
+ struct prb_desc *d;
+ struct nl_node *n;
+ unsigned long seq;
+ unsigned long id;
+ int size;
+
+ if (!setup_next(nl, iter))
+ return 0;
+
+ for (;;) {
+ id = iter->next_id;
+
+ /* mA: */
+ if (!numlist_read(nl, id, &seq, &next_id)) {
+ /* @id not available */
+ reset_iter(iter);
+ continue;
+ }
+
+ if (seq != iter->last_seq + 1) {
+ /* @seq has an unexpected value */
+ reset_iter(iter);
+ continue;
+ }
+
+ /* mB: */
+ n = prb_desc_node(id, rb);
+ if (!n) {
+ /* @id has become invalid */
+ reset_iter(iter);
+ continue;
+ }
+
+ /* advance iter */
+ iter->last_id = id;
+ iter->last_seq = seq;
+ iter->next_id = next_id;
+
+ d = container_of(n, struct prb_desc, list);
+
+ /* get a local copy to allow non-racey validation */
+ dataring_desc_copy(&desc, &d->desc);
+
+ /* mC: */
+ if (dataring_datablock_isvalid(dr, &desc)) {
+ e->seq = seq;
+
+ db = dataring_getdatablock(dr, &desc, &size);
+ memcpy(&e->buffer[0], &db->data[0],
+ size > e->buffer_size ? e->buffer_size : size);
+
+ /*
+ * mD:
+ *
+ * Now that the data is copied, re-validate that @id
+ * and @desc are still valid. For @id validation, this
+ * pairs with the smp_wmb() in prb_reserve() (see kB
+ * for details). For @desc validation, this pairs with
+ * the smp_mb() in dataring_push() (see fB for
+ * details).
+ */
+ smp_rmb();
+
+ /* mE: */
+ if (prb_desc_node(id, rb) &&
+ /* mF: */
+ dataring_datablock_isvalid(dr, &desc)) {
+ return size;
+ }
+ }
+
+ /* hit EOL? */
+ if (next_id == id)
+ return 0;
+ }
+}
+EXPORT_SYMBOL(prb_iter_next_valid_entry);
+
+/**
+ * prb_iter_sync() - Position an iterator to that of another iterator.
+ *
+ * @dst: The iterator to modify.
+ *
+ * @src: The iterator to sync from.
+ *
+ * This is the public function available to readers to set an iterator to
+ * the position of another iterator. This is particularly useful for making
+ * backup copies of an iterator in case a form of rewinding is needed or if
+ * one iterator should continue where another left off.
+ *
+ * Note that the destination iterator must be previously initialized. The
+ * prb_entry provided during initialization will continue to be used. The
+ * iterator being sync'd from is allowed to be using a different prb_entry.
+ *
+ * Also note that if the iterator being sync'd from is traversing a
+ * different ringbuffer, the modified iterator will now also traverse that
+ * ringbuffer.
+ *
+ * Context: Any context.
+ *
+ * It is safe to call this function from any context and state. But note
+ * that this function is not atomic. Callers must not sync iterators that
+ * can be accessed by other tasks/contexts unless proper synchronization is
+ * used.
+ */
+void prb_iter_sync(struct prb_iterator *dst, struct prb_iterator *src)
+{
+ /* copy everything except the entry buffer */
+
+ dst->rb = src->rb;
+ dst->last_id = src->last_id;
+ dst->last_seq = src->last_seq;
+ dst->next_id = src->next_id;
+}
+EXPORT_SYMBOL(prb_iter_sync);
+
+/**
+ * prb_iter_peek_next_entry() - Check if there is a next (newer) entry.
+ *
+ * @iter: The iterator used for list traversal.
+ *
+ * This is the public function available to readers to check if a newer
+ * entry is available.
+ *
+ * Context: Any context.
+ * Return: true if there is a next entry, otherwise false.
+ */
+bool prb_iter_peek_next_entry(struct prb_iterator *iter)
+{
+ DECLARE_PRINTKRB_ENTRY(e, 1);
+ DECLARE_PRINTKRB_ITER(iter_copy, iter->rb, &e);
+
+ prb_iter_sync(&iter_copy, iter);
+
+ return (prb_iter_next_valid_entry(&iter_copy) != 0);
+}
+EXPORT_SYMBOL(prb_iter_peek_next_entry);
+
+/**
+ * prb_getfail() - Read the descriptor reservation failure counter.
+ *
+ * @rb: The ringbuffer whose counter to read.
+ *
+ * This is the public function available to readers to see how many descriptor
+ * reservation failures exist.
+ *
+ * The counter only counts failures to assign a descriptor. Failures due to
+ * failing to reserve data are not counted because the associated (invalid)
+ * descriptor with its sequence number will exist and readers will be able to
+ * identify that as a lost entry by seeing a jump in the sequence number.
+ *
+ * Context: Any context.
+ * Return: The number of descriptor reservation failures for this ringbuffer.
+ */
+unsigned long prb_getfail(struct printk_ringbuffer *rb)
+{
+ return atomic_long_read(&rb->fail);
+}
+EXPORT_SYMBOL(prb_getfail);
diff --git a/kernel/printk/ringbuffer.h b/kernel/printk/ringbuffer.h
new file mode 100644
index 000000000000..9fe54a09fbc2
--- /dev/null
+++ b/kernel/printk/ringbuffer.h
@@ -0,0 +1,288 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _LINUX_PRINTK_RINGBUFFER_H
+#define _LINUX_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+#include "numlist.h"
+#include "dataring.h"
+
+/**
+ * struct prb_desc - A descriptor representing an entry in the ringbuffer.
+ *
+ * @id: The ID of the descriptor.
+ * The descriptor array index can be derived from the ID.
+ *
+ * @desc: The dataring descriptor where the entry data is stored.
+ *
+ * @list: The numbered list node within the committed list.
+ *
+ * Descriptors include information about where (and if) data for this entry is
+ * stored within the ringbuffer's dataring. A descriptor may or may not be
+ * within a numbered list of committed descriptors.
+ */
+struct prb_desc {
+ /* private */
+ atomic_long_t id;
+ struct dr_desc desc;
+ struct nl_node list;
+};
+
+/**
+ * struct printk_ringbuffer - The ringbuffer structure.
+ *
+ * @desc_count_bits: The power-of-2 maximum amount of descriptors allowed.
+ *
+ * @descs: An array of all descriptors to use.
+ *
+ * @desc_next_unused: An index of the next available (never before used)
+ * descriptor. This value only increases until the
+ * maximum is reached.
+ *
+ * @nl: A numbered list of committed entries.
+ *
+ * @dr: The dataring used to manage the entry data.
+ *
+ * @fail: A counter tracking how often writers fail to reserve.
+ * This only tracks failure due to not being able to get a
+ * descriptor. Failure due to not being able to reserve
+ * space in the dataring is not counted because readers
+ * will notice a lost sequence number in that case.
+ */
+struct printk_ringbuffer {
+ /* private */
+ unsigned int desc_count_bits;
+ struct prb_desc *descs;
+ atomic_t desc_next_unused;
+
+ struct numlist nl;
+
+ struct dataring dr;
+
+ atomic_long_t fail;
+};
+
+/**
+ * struct prb_reserved_entry - Used by writers to reserve/commit an entry.
+ *
+ * @rb: The printk ringbuffer used for reserve/commit.
+ *
+ * @desc: A pointer to the descriptor of the reserved entry.
+ *
+ * @irqflags: Local IRQs are disabled during the reserve/commit window.
+ *
+ * A writer provides this structure when reserving and committing data. The
+ * values of all the members are set on reserve and are only valid until
+ * commit has been called.
+ */
+struct prb_reserved_entry {
+ /* private */
+ struct printk_ringbuffer *rb;
+ struct prb_desc *desc;
+ unsigned long irqflags;
+};
+
+/**
+ * struct prb_entry - Used by readers to read a ringbuffer entry.
+ *
+ * @seq: The sequence number of the entry.
+ *
+ * @buffer: A pointer to a reader-provided buffer.
+ * When reading an entry, the data is copied to this buffer.
+ *
+ * @buffer_size: The size of the reader-provided buffer.
+ *
+ * A reader initializes and provides this structure when traversing/reading
+ * the entries of the ringbuffer.
+ */
+struct prb_entry {
+ /* public */
+ unsigned long seq;
+ char *buffer;
+ int buffer_size;
+};
+
+/**
+ * struct prb_iterator - Used by readers to traverse the committed list.
+ *
+ * @rb: The printk ringbuffer being traversed.
+ *
+ * @e: A pointer to a reader-provided entry structure.
+ *
+ * @last_id: The ID of the last read entry.
+ *
+ * @last_seq: The sequence number of the last read entry.
+ *
+ * @next_id: The ID of the next (unread) entry.
+ *
+ * An iterator tracks the current position of a reader within the ringbuffer.
+ * Readers can notice if they have missed an entry by a jump in the sequence
+ * number.
+ */
+struct prb_iterator {
+ /* private */
+ struct printk_ringbuffer *rb;
+ struct prb_entry *e;
+
+ unsigned long last_id;
+ unsigned long last_seq;
+ unsigned long next_id;
+};
+
+/* writer interface */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ unsigned int size);
+void prb_commit(struct prb_reserved_entry *e);
+
+/* reader interface */
+void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
+ struct prb_entry *e);
+int prb_iter_next_valid_entry(struct prb_iterator *iter);
+void prb_iter_sync(struct prb_iterator *dest, struct prb_iterator *src);
+bool prb_iter_peek_next_entry(struct prb_iterator *iter);
+
+/* utility functions */
+unsigned long prb_getfail(struct printk_ringbuffer *rb);
+
+/* prototypes for callbacks used by numlist and dataring, respectively */
+struct nl_node *prb_desc_node(unsigned long id, void *arg);
+bool prb_desc_busy(unsigned long id, void *arg);
+struct dr_desc *prb_getdesc(unsigned long id, void *arg);
+
+/**
+ * DECLARE_PRINTKRB() - Declare a printk ringbuffer.
+ *
+ * @name: The name for the ringbuffer structure variable.
+ *
+ * @avgdatabits: The average size of data as a power-of-2.
+ * If this value is greater than the actual average data size,
+ * there will not be enough descriptors available. This will
+ * lead to situations where data is invalidated in order to free
+ * up descriptors, rather than because the data ringbuffer is
+ * full. Generally, values that are less than the actual average
+ * data size are preferred.
+ *
+ * @descbits: The power-of-2 maximum amount of descriptors allowed.
+ *
+ * The size of the data array will be the average data size multiplied by the
+ * maximum amount of descriptors.
+ *
+ * As per numlist requirement of always having at least one node in the list,
+ * the ringbuffer structures are initialized such that:
+ *
+ * * the numlist head and tail point to descriptor 0
+ * * descriptor 0 has an invalid data block and is the terminating node
+ * * descriptor 1 will be the next descriptor
+ */
+#define DECLARE_PRINTKRB(name, avgdatabits, descbits) \
+char _##name##_data[(1 << ((avgdatabits) + (descbits))) + \
+ sizeof(long)] \
+ __aligned(__alignof__(long)); \
+struct prb_desc _##name##_descs[1 << (descbits)]; \
+struct printk_ringbuffer name = { \
+ .desc_count_bits = descbits, \
+ .descs = &_##name##_descs[0], \
+ .desc_next_unused = ATOMIC_INIT(1), \
+ .nl = { \
+ .head_id = ATOMIC_LONG_INIT(0), \
+ .tail_id = ATOMIC_LONG_INIT(0), \
+ .node = prb_desc_node, \
+ .node_arg = &name, \
+ .busy = prb_desc_busy, \
+ .busy_arg = &name, \
+ }, \
+ .dr = { \
+ .size_bits = (avgdatabits) + (descbits), \
+ .data = &_##name##_data[0], \
+ .head_lpos = ATOMIC_LONG_INIT(-111 * \
+ sizeof(long)), \
+ .tail_lpos = ATOMIC_LONG_INIT(-111 * \
+ sizeof(long)), \
+ .getdesc = prb_getdesc, \
+ .getdesc_arg = &name, \
+ }, \
+ .fail = ATOMIC_LONG_INIT(0), \
+}
+
+/**
+ * DECLARE_PRINTKRB_ENTRY() - Declare an entry structure.
+ *
+ * @name: The name for the entry structure variable.
+ *
+ * @size: The size of the associated reader buffer (also declared).
+ *
+ * This macro is particularly useful for static entry structures that should
+ * be immediately available and initialized. It is an alternative to the
+ * reader manually creating a buffer and setting the buffer and
+ * buffer_size fields of the structure.
+ *
+ * Note that this macro will declare the buffer as well. This could be a
+ * problem if this is used with a large buffer size within a stack frame.
+ */
+#define DECLARE_PRINTKRB_ENTRY(name, size) \
+char _##name##_entry_buf[size]; \
+struct prb_entry name = { \
+ .seq = 0, \
+ .buffer = &_##name##_entry_buf[0], \
+ .buffer_size = size, \
+}
+
+/**
+ * DECLARE_PRINTKRB_ITER() - Declare an iterator for readers.
+ *
+ * @name: The name for the iterator structure variable.
+ *
+ * @rbaddr: A pointer to a printk ringbuffer.
+ *
+ * @entryaddr: A pointer to an entry structure.
+ *
+ * This macro is particularly useful for static iterators that should be
+ * immediately available and initialized. It is an alternative to
+ * manually initializing an iterator with prb_iter_init().
+ */
+#define DECLARE_PRINTKRB_ITER(name, rbaddr, entryaddr) \
+struct prb_iterator name = { \
+ .rb = rbaddr, \
+ .e = entryaddr, \
+ .last_id = 0, \
+ .last_seq = 0, \
+ .next_id = 0, \
+}
+
+/**
+ * prb_for_each_entry() - Iterate all entries of a ringbuffer.
+ *
+ * @i: A pointer to an iterator.
+ *
+ * @r: The printk ringbuffer to iterate.
+ *
+ * @e: An entry structure to use during iteration.
+ *
+ * @l: An integer used to identify when the final entry is traversed.
+ *
+ * This macro initializes the iterator and traverses through all available
+ * ringbuffer entries.
+ *
+ * See prb_for_each_entry_continue() if you want to continue traversing using
+ * an iterator that has already begun traversal.
+ */
+#define prb_for_each_entry(i, r, e, l) \
+ for (prb_iter_init(i, r, e); (l = prb_iter_next_valid_entry(i)) != 0;)
+
+/**
+ * prb_for_each_entry_continue() - Continue iterating entries of a ringbuffer.
+ *
+ * @i: A pointer to an iterator.
+ *
+ * @l: An integer used to identify when the final entry is traversed.
+ *
+ * This macro expects the iterator to be initialized. It does not reset the
+ * iterator. If the iterator has already been used for some traversal, this
+ * macro will continue where the iterator left off.
+ *
+ * See prb_for_each_entry() if you want to iterate from the beginning.
+ */
+#define prb_for_each_entry_continue(i, l) \
+ for (; (l = prb_iter_next_valid_entry(i)) != 0;)
+
+#endif /*_LINUX_PRINTK_RINGBUFFER_H */
--
2.20.1
Powered by blists - more mailing lists