lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Message-ID: <20201021125324.ualpvrxvzyie6d7d@linutronix.de>
Date:   Wed, 21 Oct 2020 14:53:24 +0200
From:   Sebastian Andrzej Siewior <bigeasy@...utronix.de>
To:     Thomas Gleixner <tglx@...utronix.de>
Cc:     LKML <linux-kernel@...r.kernel.org>,
        linux-rt-users <linux-rt-users@...r.kernel.org>,
        Steven Rostedt <rostedt@...dmis.org>
Subject: [ANNOUNCE] v5.9.1-rt18

Dear RT folks!

I'm pleased to announce the v5.9.1-rt18 patch set. 

Changes since v5.9.1-rt17:

  - Update the migrate-disable series by Peter Zijlstra to v3. Include
    also fixes discussed in the thread.

  - UP builds did not boot since the replace of the migrate-disable
    code. Reported by Christian Egger. Fixed as a part of v3 by Peter
    Zijlstra.

  - Rebase the printk code on top of the ringer buffer designed for
    printk which was merged in the v5.10 merge window. Patches by John
    Ogness.

Known issues
     - It has been pointed out that due to changes to the printk code the
       internal buffer representation changed. This is only an issue if tools
       like `crash' are used to extract the printk buffer from a kernel memory
       image.

The delta patch against v5.9.1-rt17 is appended below and can be found here:
 
     https://cdn.kernel.org/pub/linux/kernel/projects/rt/5.9/incr/patch-5.9.1-rt17-rt18.patch.xz

You can get this release via the git tree at:

    git://git.kernel.org/pub/scm/linux/kernel/git/rt/linux-rt-devel.git v5.9.1-rt18

The RT patch against v5.9.1 can be found here:

    https://cdn.kernel.org/pub/linux/kernel/projects/rt/5.9/older/patch-5.9.1-rt18.patch.xz

The split quilt queue is available at:

    https://cdn.kernel.org/pub/linux/kernel/projects/rt/5.9/older/patches-5.9.1-rt18.tar.xz

Sebastian

diff --git a/Documentation/admin-guide/kdump/gdbmacros.txt b/Documentation/admin-guide/kdump/gdbmacros.txt
index 220d0a80ca2c9..82aecdcae8a6c 100644
--- a/Documentation/admin-guide/kdump/gdbmacros.txt
+++ b/Documentation/admin-guide/kdump/gdbmacros.txt
@@ -170,57 +170,82 @@ document trapinfo
 	address the kernel panicked.
 end
 
-define dump_log_idx
-	set $idx = $arg0
-	if ($argc > 1)
-		set $prev_flags = $arg1
+define dump_record
+	set var $desc = $arg0
+	set var $info = $arg1
+	if ($argc > 2)
+		set var $prev_flags = $arg2
 	else
-		set $prev_flags = 0
-	end
-	set $msg = ((struct printk_log *) (log_buf + $idx))
-	set $prefix = 1
-	set $newline = 1
-	set $log = log_buf + $idx + sizeof(*$msg)
-
-	# prev & LOG_CONT && !(msg->flags & LOG_PREIX)
-	if (($prev_flags & 8) && !($msg->flags & 4))
-		set $prefix = 0
+		set var $prev_flags = 0
 	end
 
-	# msg->flags & LOG_CONT
-	if ($msg->flags & 8)
+	set var $prefix = 1
+	set var $newline = 1
+
+	set var $begin = $desc->text_blk_lpos.begin % (1U << prb->text_data_ring.size_bits)
+	set var $next = $desc->text_blk_lpos.next % (1U << prb->text_data_ring.size_bits)
+
+	# handle data-less record
+	if ($begin & 1)
+		set var $text_len = 0
+		set var $log = ""
+	else
+		# handle wrapping data block
+		if ($begin > $next)
+			set var $begin = 0
+		end
+
+		# skip over descriptor id
+		set var $begin = $begin + sizeof(long)
+
+		# handle truncated message
+		if ($next - $begin < $info->text_len)
+			set var $text_len = $next - $begin
+		else
+			set var $text_len = $info->text_len
+		end
+
+		set var $log = &prb->text_data_ring.data[$begin]
+	end
+
+	# prev & LOG_CONT && !(info->flags & LOG_PREIX)
+	if (($prev_flags & 8) && !($info->flags & 4))
+		set var $prefix = 0
+	end
+
+	# info->flags & LOG_CONT
+	if ($info->flags & 8)
 		# (prev & LOG_CONT && !(prev & LOG_NEWLINE))
 		if (($prev_flags & 8) && !($prev_flags & 2))
-			set $prefix = 0
+			set var $prefix = 0
 		end
-		# (!(msg->flags & LOG_NEWLINE))
-		if (!($msg->flags & 2))
-			set $newline = 0
+		# (!(info->flags & LOG_NEWLINE))
+		if (!($info->flags & 2))
+			set var $newline = 0
 		end
 	end
 
 	if ($prefix)
-		printf "[%5lu.%06lu] ", $msg->ts_nsec / 1000000000, $msg->ts_nsec % 1000000000
+		printf "[%5lu.%06lu] ", $info->ts_nsec / 1000000000, $info->ts_nsec % 1000000000
 	end
-	if ($msg->text_len != 0)
-		eval "printf \"%%%d.%ds\", $log", $msg->text_len, $msg->text_len
+	if ($text_len)
+		eval "printf \"%%%d.%ds\", $log", $text_len, $text_len
 	end
 	if ($newline)
 		printf "\n"
 	end
-	if ($msg->dict_len > 0)
-		set $dict = $log + $msg->text_len
-		set $idx = 0
-		set $line = 1
-		while ($idx < $msg->dict_len)
-			if ($line)
-				printf " "
-				set $line = 0
-			end
-			set $c = $dict[$idx]
+
+	# handle dictionary data
+
+	set var $dict = &$info->dev_info.subsystem[0]
+	set var $dict_len = sizeof($info->dev_info.subsystem)
+	if ($dict[0] != '\0')
+		printf " SUBSYSTEM="
+		set var $idx = 0
+		while ($idx < $dict_len)
+			set var $c = $dict[$idx]
 			if ($c == '\0')
-				printf "\n"
-				set $line = 1
+				loop_break
 			else
 				if ($c < ' ' || $c >= 127 || $c == '\\')
 					printf "\\x%02x", $c
@@ -228,33 +253,67 @@ define dump_log_idx
 					printf "%c", $c
 				end
 			end
-			set $idx = $idx + 1
+			set var $idx = $idx + 1
+		end
+		printf "\n"
+	end
+
+	set var $dict = &$info->dev_info.device[0]
+	set var $dict_len = sizeof($info->dev_info.device)
+	if ($dict[0] != '\0')
+		printf " DEVICE="
+		set var $idx = 0
+		while ($idx < $dict_len)
+			set var $c = $dict[$idx]
+			if ($c == '\0')
+				loop_break
+			else
+				if ($c < ' ' || $c >= 127 || $c == '\\')
+					printf "\\x%02x", $c
+				else
+					printf "%c", $c
+				end
+			end
+			set var $idx = $idx + 1
 		end
 		printf "\n"
 	end
 end
-document dump_log_idx
-	Dump a single log given its index in the log buffer.  The first
-	parameter is the index into log_buf, the second is optional and
-	specified the previous log buffer's flags, used for properly
-	formatting continued lines.
+document dump_record
+	Dump a single record. The first parameter is the descriptor,
+	the second parameter is the info, the third parameter is
+	optional and specifies the previous record's flags, used for
+	properly formatting continued lines.
 end
 
 define dmesg
-	set $i = log_first_idx
-	set $end_idx = log_first_idx
-	set $prev_flags = 0
+	# definitions from kernel/printk/printk_ringbuffer.h
+	set var $desc_committed = 1
+	set var $desc_finalized = 2
+	set var $desc_sv_bits = sizeof(long) * 8
+	set var $desc_flags_shift = $desc_sv_bits - 2
+	set var $desc_flags_mask = 3 << $desc_flags_shift
+	set var $id_mask = ~$desc_flags_mask
+
+	set var $desc_count = 1U << prb->desc_ring.count_bits
+	set var $prev_flags = 0
+
+	set var $id = prb->desc_ring.tail_id.counter
+	set var $end_id = prb->desc_ring.head_id.counter
 
 	while (1)
-		set $msg = ((struct printk_log *) (log_buf + $i))
-		if ($msg->len == 0)
-			set $i = 0
-		else
-			dump_log_idx $i $prev_flags
-			set $i = $i + $msg->len
-			set $prev_flags = $msg->flags
+		set var $desc = &prb->desc_ring.descs[$id % $desc_count]
+		set var $info = &prb->desc_ring.infos[$id % $desc_count]
+
+		# skip non-committed record
+		set var $state = 3 & ($desc->state_var.counter >> $desc_flags_shift)
+		if ($state == $desc_committed || $state == $desc_finalized)
+			dump_record $desc $info $prev_flags
+			set var $prev_flags = $info->flags
 		end
-		if ($i == $end_idx)
+
+		set var $id = ($id + 1) & $id_mask
+		if ($id == $end_id)
 			loop_break
 		end
 	end
diff --git a/Documentation/admin-guide/kdump/vmcoreinfo.rst b/Documentation/admin-guide/kdump/vmcoreinfo.rst
index 2baad0bfb09d0..e44a6c01f3362 100644
--- a/Documentation/admin-guide/kdump/vmcoreinfo.rst
+++ b/Documentation/admin-guide/kdump/vmcoreinfo.rst
@@ -189,50 +189,123 @@ from this.
 Free areas descriptor. User-space tools use this value to iterate the
 free_area ranges. MAX_ORDER is used by the zone buddy allocator.
 
-log_first_idx
--------------
+prb
+---
 
-Index of the first record stored in the buffer log_buf. Used by
-user-space tools to read the strings in the log_buf.
+A pointer to the printk ringbuffer (struct printk_ringbuffer). This
+may be pointing to the static boot ringbuffer or the dynamically
+allocated ringbuffer, depending on when the the core dump occurred.
+Used by user-space tools to read the active kernel log buffer.
 
-log_buf
--------
+printk_rb_static
+----------------
 
-Console output is written to the ring buffer log_buf at index
-log_first_idx. Used to get the kernel log.
+A pointer to the static boot printk ringbuffer. If @prb has a
+different value, this is useful for viewing the initial boot messages,
+which may have been overwritten in the dynamically allocated
+ringbuffer.
 
-log_buf_len
------------
-
-log_buf's length.
-
-clear_idx
+clear_seq
 ---------
 
-The index that the next printk() record to read after the last clear
-command. It indicates the first record after the last SYSLOG_ACTION
-_CLEAR, like issued by 'dmesg -c'. Used by user-space tools to dump
-the dmesg log.
+The sequence number of the printk() record after the last clear
+command. It indicates the first record after the last
+SYSLOG_ACTION_CLEAR, like issued by 'dmesg -c'. Used by user-space
+tools to dump a subset of the dmesg log.
 
-log_next_idx
-------------
+printk_ringbuffer
+-----------------
 
-The index of the next record to store in the buffer log_buf. Used to
-compute the index of the current buffer position.
+The size of a printk_ringbuffer structure. This structure contains all
+information required for accessing the various components of the
+kernel log buffer.
 
-printk_log
-----------
+(printk_ringbuffer, desc_ring|text_data_ring|dict_data_ring|fail)
+-----------------------------------------------------------------
 
-The size of a structure printk_log. Used to compute the size of
-messages, and extract dmesg log. It encapsulates header information for
-log_buf, such as timestamp, syslog level, etc.
+Offsets for the various components of the printk ringbuffer. Used by
+user-space tools to view the kernel log buffer without requiring the
+declaration of the structure.
 
-(printk_log, ts_nsec|len|text_len|dict_len)
--------------------------------------------
+prb_desc_ring
+-------------
 
-It represents field offsets in struct printk_log. User space tools
-parse it and check whether the values of printk_log's members have been
-changed.
+The size of the prb_desc_ring structure. This structure contains
+information about the set of record descriptors.
+
+(prb_desc_ring, count_bits|descs|head_id|tail_id)
+-------------------------------------------------
+
+Offsets for the fields describing the set of record descriptors. Used
+by user-space tools to be able to traverse the descriptors without
+requiring the declaration of the structure.
+
+prb_desc
+--------
+
+The size of the prb_desc structure. This structure contains
+information about a single record descriptor.
+
+(prb_desc, info|state_var|text_blk_lpos|dict_blk_lpos)
+------------------------------------------------------
+
+Offsets for the fields describing a record descriptors. Used by
+user-space tools to be able to read descriptors without requiring
+the declaration of the structure.
+
+prb_data_blk_lpos
+-----------------
+
+The size of the prb_data_blk_lpos structure. This structure contains
+information about where the text or dictionary data (data block) is
+located within the respective data ring.
+
+(prb_data_blk_lpos, begin|next)
+-------------------------------
+
+Offsets for the fields describing the location of a data block. Used
+by user-space tools to be able to locate data blocks without
+requiring the declaration of the structure.
+
+printk_info
+-----------
+
+The size of the printk_info structure. This structure contains all
+the meta-data for a record.
+
+(printk_info, seq|ts_nsec|text_len|dict_len|caller_id)
+------------------------------------------------------
+
+Offsets for the fields providing the meta-data for a record. Used by
+user-space tools to be able to read the information without requiring
+the declaration of the structure.
+
+prb_data_ring
+-------------
+
+The size of the prb_data_ring structure. This structure contains
+information about a set of data blocks.
+
+(prb_data_ring, size_bits|data|head_lpos|tail_lpos)
+---------------------------------------------------
+
+Offsets for the fields describing a set of data blocks. Used by
+user-space tools to be able to access the data blocks without
+requiring the declaration of the structure.
+
+atomic_long_t
+-------------
+
+The size of the atomic_long_t structure. Used by user-space tools to
+be able to copy the full structure, regardless of its
+architecture-specific implementation.
+
+(atomic_long_t, counter)
+------------------------
+
+Offset for the long value of an atomic_long_t variable. Used by
+user-space tools to access the long value without requiring the
+architecture-specific declaration.
 
 (free_area.free_list, MIGRATE_TYPES)
 ------------------------------------
diff --git a/Documentation/printk-ringbuffer.txt b/Documentation/printk-ringbuffer.txt
deleted file mode 100644
index 6bde5dbd8545b..0000000000000
--- a/Documentation/printk-ringbuffer.txt
+++ /dev/null
@@ -1,377 +0,0 @@
-struct printk_ringbuffer
-------------------------
-John Ogness <john.ogness@...utronix.de>
-
-Overview
-~~~~~~~~
-As the name suggests, this ring buffer was implemented specifically to serve
-the needs of the printk() infrastructure. The ring buffer itself is not
-specific to printk and could be used for other purposes. _However_, the
-requirements and semantics of printk are rather unique. If you intend to use
-this ring buffer for anything other than printk, you need to be very clear on
-its features, behavior, and pitfalls.
-
-Features
-^^^^^^^^
-The printk ring buffer has the following features:
-
-- single global buffer
-- resides in initialized data section (available at early boot)
-- lockless readers
-- supports multiple writers
-- supports multiple non-consuming readers
-- safe from any context (including NMI)
-- groups bytes into variable length blocks (referenced by entries)
-- entries tagged with sequence numbers
-
-Behavior
-^^^^^^^^
-Since the printk ring buffer readers are lockless, there exists no
-synchronization between readers and writers. Basically writers are the tasks
-in control and may overwrite any and all committed data at any time and from
-any context. For this reason readers can miss entries if they are overwritten
-before the reader was able to access the data. The reader API implementation
-is such that reader access to entries is atomic, so there is no risk of
-readers having to deal with partial or corrupt data. Also, entries are
-tagged with sequence numbers so readers can recognize if entries were missed.
-
-Writing to the ring buffer consists of 2 steps. First a writer must reserve
-an entry of desired size. After this step the writer has exclusive access
-to the memory region. Once the data has been written to memory, it needs to
-be committed to the ring buffer. After this step the entry has been inserted
-into the ring buffer and assigned an appropriate sequence number.
-
-Once committed, a writer must no longer access the data directly. This is
-because the data may have been overwritten and no longer exists. If a
-writer must access the data, it should either keep a private copy before
-committing the entry or use the reader API to gain access to the data.
-
-Because of how the data backend is implemented, entries that have been
-reserved but not yet committed act as barriers, preventing future writers
-from filling the ring buffer beyond the location of the reserved but not
-yet committed entry region. For this reason it is *important* that writers
-perform both reserve and commit as quickly as possible. Also, be aware that
-preemption and local interrupts are disabled and writing to the ring buffer
-is processor-reentrant locked during the reserve/commit window. Writers in
-NMI contexts can still preempt any other writers, but as long as these
-writers do not write a large amount of data with respect to the ring buffer
-size, this should not become an issue.
-
-API
-~~~
-
-Declaration
-^^^^^^^^^^^
-The printk ring buffer can be instantiated as a static structure:
-
- /* declare a static struct printk_ringbuffer */
- #define DECLARE_STATIC_PRINTKRB(name, szbits, cpulockptr)
-
-The value of szbits specifies the size of the ring buffer in bits. The
-cpulockptr field is a pointer to a prb_cpulock struct that is used to
-perform processor-reentrant spin locking for the writers. It is specified
-externally because it may be used for multiple ring buffers (or other
-code) to synchronize writers without risk of deadlock.
-
-Here is an example of a declaration of a printk ring buffer specifying a
-32KB (2^15) ring buffer:
-
-....
-DECLARE_STATIC_PRINTKRB_CPULOCK(rb_cpulock);
-DECLARE_STATIC_PRINTKRB(rb, 15, &rb_cpulock);
-....
-
-If writers will be using multiple ring buffers and the ordering of that usage
-is not clear, the same prb_cpulock should be used for both ring buffers.
-
-Writer API
-^^^^^^^^^^
-The writer API consists of 2 functions. The first is to reserve an entry in
-the ring buffer, the second is to commit that data to the ring buffer. The
-reserved entry information is stored within a provided `struct prb_handle`.
-
- /* reserve an entry */
- char *prb_reserve(struct prb_handle *h, struct printk_ringbuffer *rb,
-                   unsigned int size);
-
- /* commit a reserved entry to the ring buffer */
- void prb_commit(struct prb_handle *h);
-
-Here is an example of a function to write data to a ring buffer:
-
-....
-int write_data(struct printk_ringbuffer *rb, char *data, int size)
-{
-    struct prb_handle h;
-    char *buf;
-
-    buf = prb_reserve(&h, rb, size);
-    if (!buf)
-        return -1;
-    memcpy(buf, data, size);
-    prb_commit(&h);
-
-    return 0;
-}
-....
-
-Pitfalls
-++++++++
-Be aware that prb_reserve() can fail. A retry might be successful, but it
-depends entirely on whether or not the next part of the ring buffer to
-overwrite belongs to reserved but not yet committed entries of other writers.
-Writers can use the prb_inc_lost() function to allow readers to notice that a
-message was lost.
-
-Reader API
-^^^^^^^^^^
-The reader API utilizes a `struct prb_iterator` to track the reader's
-position in the ring buffer.
-
- /* declare a pre-initialized static iterator for a ring buffer */
- #define DECLARE_STATIC_PRINTKRB_ITER(name, rbaddr)
-
- /* initialize iterator for a ring buffer (if static macro NOT used) */
- void prb_iter_init(struct prb_iterator *iter,
-                    struct printk_ringbuffer *rb, u64 *seq);
-
- /* make a deep copy of an iterator */
- void prb_iter_copy(struct prb_iterator *dest,
-                    struct prb_iterator *src);
-
- /* non-blocking, advance to next entry (and read the data) */
- int prb_iter_next(struct prb_iterator *iter, char *buf,
-                   int size, u64 *seq);
-
- /* blocking, advance to next entry (and read the data) */
- int prb_iter_wait_next(struct prb_iterator *iter, char *buf,
-                        int size, u64 *seq);
-
- /* position iterator at the entry seq */
- int prb_iter_seek(struct prb_iterator *iter, u64 seq);
-
- /* read data at current position */
- int prb_iter_data(struct prb_iterator *iter, char *buf,
-                   int size, u64 *seq);
-
-Typically prb_iter_data() is not needed because the data can be retrieved
-directly with prb_iter_next().
-
-Here is an example of a non-blocking function that will read all the data in
-a ring buffer:
-
-....
-void read_all_data(struct printk_ringbuffer *rb, char *buf, int size)
-{
-    struct prb_iterator iter;
-    u64 prev_seq = 0;
-    u64 seq;
-    int ret;
-
-    prb_iter_init(&iter, rb, NULL);
-
-    for (;;) {
-        ret = prb_iter_next(&iter, buf, size, &seq);
-        if (ret > 0) {
-            if (seq != ++prev_seq) {
-                /* "seq - prev_seq" entries missed */
-                prev_seq = seq;
-            }
-            /* process buf here */
-        } else if (ret == 0) {
-            /* hit the end, done */
-            break;
-        } else if (ret < 0) {
-            /*
-             * iterator is invalid, a writer overtook us, reset the
-             * iterator and keep going, entries were missed
-             */
-            prb_iter_init(&iter, rb, NULL);
-        }
-    }
-}
-....
-
-Pitfalls
-++++++++
-The reader's iterator can become invalid at any time because the reader was
-overtaken by a writer. Typically the reader should reset the iterator back
-to the current oldest entry (which will be newer than the entry the reader
-was at) and continue, noting the number of entries that were missed.
-
-Utility API
-^^^^^^^^^^^
-Several functions are available as convenience for external code.
-
- /* query the size of the data buffer */
- int prb_buffer_size(struct printk_ringbuffer *rb);
-
- /* skip a seq number to signify a lost record */
- void prb_inc_lost(struct printk_ringbuffer *rb);
-
- /* processor-reentrant spin lock */
- void prb_lock(struct prb_cpulock *cpu_lock, unsigned int *cpu_store);
-
- /* processor-reentrant spin unlock */
- void prb_lock(struct prb_cpulock *cpu_lock, unsigned int *cpu_store);
-
-Pitfalls
-++++++++
-Although the value returned by prb_buffer_size() does represent an absolute
-upper bound, the amount of data that can be stored within the ring buffer
-is actually less because of the additional storage space of a header for each
-entry.
-
-The prb_lock() and prb_unlock() functions can be used to synchronize between
-ring buffer writers and other external activities. The function of a
-processor-reentrant spin lock is to disable preemption and local interrupts
-and synchronize against other processors. It does *not* protect against
-multiple contexts of a single processor, i.e NMI.
-
-Implementation
-~~~~~~~~~~~~~~
-This section describes several of the implementation concepts and details to
-help developers better understand the code.
-
-Entries
-^^^^^^^
-All ring buffer data is stored within a single static byte array. The reason
-for this is to ensure that any pointers to the data (past and present) will
-always point to valid memory. This is important because the lockless readers
-may be preempted for long periods of time and when they resume may be working
-with expired pointers.
-
-Entries are identified by start index and size. (The start index plus size
-is the start index of the next entry.) The start index is not simply an
-offset into the byte array, but rather a logical position (lpos) that maps
-directly to byte array offsets.
-
-For example, for a byte array of 1000, an entry may have have a start index
-of 100. Another entry may have a start index of 1100. And yet another 2100.
-All of these entry are pointing to the same memory region, but only the most
-recent entry is valid. The other entries are pointing to valid memory, but
-represent entries that have been overwritten.
-
-Note that due to overflowing, the most recent entry is not necessarily the one
-with the highest lpos value. Indeed, the printk ring buffer initializes its
-data such that an overflow happens relatively quickly in order to validate the
-handling of this situation. The implementation assumes that an lpos (unsigned
-long) will never completely wrap while a reader is preempted. If this were to
-become an issue, the seq number (which never wraps) could be used to increase
-the robustness of handling this situation.
-
-Buffer Wrapping
-^^^^^^^^^^^^^^^
-If an entry starts near the end of the byte array but would extend beyond it,
-a special terminating entry (size = -1) is inserted into the byte array and
-the real entry is placed at the beginning of the byte array. This can waste
-space at the end of the byte array, but simplifies the implementation by
-allowing writers to always work with contiguous buffers.
-
-Note that the size field is the first 4 bytes of the entry header. Also note
-that calc_next() always ensures that there are at least 4 bytes left at the
-end of the byte array to allow room for a terminating entry.
-
-Ring Buffer Pointers
-^^^^^^^^^^^^^^^^^^^^
-Three pointers (lpos values) are used to manage the ring buffer:
-
- - _tail_: points to the oldest entry
- - _head_: points to where the next new committed entry will be
- - _reserve_: points to where the next new reserved entry will be
-
-These pointers always maintain a logical ordering:
-
- tail <= head <= reserve
-
-The reserve pointer moves forward when a writer reserves a new entry. The
-head pointer moves forward when a writer commits a new entry.
-
-The reserve pointer cannot overwrite the tail pointer in a wrap situation. In
-such a situation, the tail pointer must be "pushed forward", thus
-invalidating that oldest entry. Readers identify if they are accessing a
-valid entry by ensuring their entry pointer is `>= tail && < head`.
-
-If the tail pointer is equal to the head pointer, it cannot be pushed and any
-reserve operation will fail. The only resolution is for writers to commit
-their reserved entries.
-
-Processor-Reentrant Locking
-^^^^^^^^^^^^^^^^^^^^^^^^^^^
-The purpose of the processor-reentrant locking is to limit the interruption
-scenarios of writers to 2 contexts. This allows for a simplified
-implementation where:
-
-- The reserve/commit window only exists on 1 processor at a time. A reserve
-  can never fail due to uncommitted entries of other processors.
-
-- When committing entries, it is trivial to handle the situation when
-  subsequent entries have already been committed, i.e. managing the head
-  pointer.
-
-Performance
-~~~~~~~~~~~
-Some basic tests were performed on a quad Intel(R) Xeon(R) CPU E5-2697 v4 at
-2.30GHz (36 cores / 72 threads). All tests involved writing a total of
-32,000,000 records at an average of 33 bytes each. Each writer was pinned to
-its own CPU and would write as fast as it could until a total of 32,000,000
-records were written. All tests involved 2 readers that were both pinned
-together to another CPU. Each reader would read as fast as it could and track
-how many of the 32,000,000 records it could read. All tests used a ring buffer
-of 16KB in size, which holds around 350 records (header + data for each
-entry).
-
-The only difference between the tests is the number of writers (and thus also
-the number of records per writer). As more writers are added, the time to
-write a record increases. This is because data pointers, modified via cmpxchg,
-and global data access in general become more contended.
-
-1 writer
-^^^^^^^^
- runtime: 0m 18s
- reader1: 16219900/32000000 (50%) records
- reader2: 16141582/32000000 (50%) records
-
-2 writers
-^^^^^^^^^
- runtime: 0m 32s
- reader1: 16327957/32000000 (51%) records
- reader2: 16313988/32000000 (50%) records
-
-4 writers
-^^^^^^^^^
- runtime: 0m 42s
- reader1: 16421642/32000000 (51%) records
- reader2: 16417224/32000000 (51%) records
-
-8 writers
-^^^^^^^^^
- runtime: 0m 43s
- reader1: 16418300/32000000 (51%) records
- reader2: 16432222/32000000 (51%) records
-
-16 writers
-^^^^^^^^^^
- runtime: 0m 54s
- reader1: 16539189/32000000 (51%) records
- reader2: 16542711/32000000 (51%) records
-
-32 writers
-^^^^^^^^^^
- runtime: 1m 13s
- reader1: 16731808/32000000 (52%) records
- reader2: 16735119/32000000 (52%) records
-
-Comments
-^^^^^^^^
-It is particularly interesting to compare/contrast the 1-writer and 32-writer
-tests. Despite the writing of the 32,000,000 records taking over 4 times
-longer, the readers (which perform no cmpxchg) were still unable to keep up.
-This shows that the memory contention between the increasing number of CPUs
-also has a dramatic effect on readers.
-
-It should also be noted that in all cases each reader was able to read >=50%
-of the records. This means that a single reader would have been able to keep
-up with the writer(s) in all cases, becoming slightly easier as more writers
-are added. This was the purpose of pinning 2 readers to 1 CPU: to observe how
-maximum reader performance changes.
diff --git a/MAINTAINERS b/MAINTAINERS
index 867157311dc8b..7ae63272d994c 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -13960,6 +13960,7 @@ PRINTK
 M:	Petr Mladek <pmladek@...e.com>
 M:	Sergey Senozhatsky <sergey.senozhatsky@...il.com>
 R:	Steven Rostedt <rostedt@...dmis.org>
+R:	John Ogness <john.ogness@...utronix.de>
 S:	Maintained
 F:	include/linux/printk.h
 F:	kernel/printk/
diff --git a/drivers/base/core.c b/drivers/base/core.c
index bb5806a2bd4ca..f90e9f77bf8c2 100644
--- a/drivers/base/core.c
+++ b/drivers/base/core.c
@@ -4061,22 +4061,21 @@ void device_shutdown(void)
  */
 
 #ifdef CONFIG_PRINTK
-static int
-create_syslog_header(const struct device *dev, char *hdr, size_t hdrlen)
+static void
+set_dev_info(const struct device *dev, struct dev_printk_info *dev_info)
 {
 	const char *subsys;
-	size_t pos = 0;
+
+	memset(dev_info, 0, sizeof(*dev_info));
 
 	if (dev->class)
 		subsys = dev->class->name;
 	else if (dev->bus)
 		subsys = dev->bus->name;
 	else
-		return 0;
+		return;
 
-	pos += snprintf(hdr + pos, hdrlen - pos, "SUBSYSTEM=%s", subsys);
-	if (pos >= hdrlen)
-		goto overflow;
+	strscpy(dev_info->subsystem, subsys, sizeof(dev_info->subsystem));
 
 	/*
 	 * Add device identifier DEVICE=:
@@ -4092,41 +4091,28 @@ create_syslog_header(const struct device *dev, char *hdr, size_t hdrlen)
 			c = 'b';
 		else
 			c = 'c';
-		pos++;
-		pos += snprintf(hdr + pos, hdrlen - pos,
-				"DEVICE=%c%u:%u",
-				c, MAJOR(dev->devt), MINOR(dev->devt));
+
+		snprintf(dev_info->device, sizeof(dev_info->device),
+			 "%c%u:%u", c, MAJOR(dev->devt), MINOR(dev->devt));
 	} else if (strcmp(subsys, "net") == 0) {
 		struct net_device *net = to_net_dev(dev);
 
-		pos++;
-		pos += snprintf(hdr + pos, hdrlen - pos,
-				"DEVICE=n%u", net->ifindex);
+		snprintf(dev_info->device, sizeof(dev_info->device),
+			 "n%u", net->ifindex);
 	} else {
-		pos++;
-		pos += snprintf(hdr + pos, hdrlen - pos,
-				"DEVICE=+%s:%s", subsys, dev_name(dev));
+		snprintf(dev_info->device, sizeof(dev_info->device),
+			 "+%s:%s", subsys, dev_name(dev));
 	}
-
-	if (pos >= hdrlen)
-		goto overflow;
-
-	return pos;
-
-overflow:
-	dev_WARN(dev, "device/subsystem name too long");
-	return 0;
 }
 
 int dev_vprintk_emit(int level, const struct device *dev,
 		     const char *fmt, va_list args)
 {
-	char hdr[128];
-	size_t hdrlen;
+	struct dev_printk_info dev_info;
 
-	hdrlen = create_syslog_header(dev, hdr, sizeof(hdr));
+	set_dev_info(dev, &dev_info);
 
-	return vprintk_emit(0, level, hdrlen ? hdr : NULL, hdrlen, fmt, args);
+	return vprintk_emit(0, level, &dev_info, fmt, args);
 }
 EXPORT_SYMBOL(dev_vprintk_emit);
 
diff --git a/fs/proc/kmsg.c b/fs/proc/kmsg.c
index 18ed6e4e0c7e7..b38ad552887fb 100644
--- a/fs/proc/kmsg.c
+++ b/fs/proc/kmsg.c
@@ -18,6 +18,8 @@
 #include <linux/uaccess.h>
 #include <asm/io.h>
 
+extern wait_queue_head_t log_wait;
+
 static int kmsg_open(struct inode * inode, struct file * file)
 {
 	return do_syslog(SYSLOG_ACTION_OPEN, NULL, 0, SYSLOG_FROM_PROC);
@@ -40,7 +42,7 @@ static ssize_t kmsg_read(struct file *file, char __user *buf,
 
 static __poll_t kmsg_poll(struct file *file, poll_table *wait)
 {
-	poll_wait(file, printk_wait_queue(), wait);
+	poll_wait(file, &log_wait, wait);
 	if (do_syslog(SYSLOG_ACTION_SIZE_UNREAD, NULL, 0, SYSLOG_FROM_PROC))
 		return EPOLLIN | EPOLLRDNORM;
 	return 0;
diff --git a/include/linux/console.h b/include/linux/console.h
index 1badb57ba82f3..00d7437a92e11 100644
--- a/include/linux/console.h
+++ b/include/linux/console.h
@@ -137,6 +137,7 @@ static inline int con_debug_leave(void)
 #define CON_ANYTIME	(16) /* Safe to call when cpu is offline */
 #define CON_BRL		(32) /* Used for a braille device */
 #define CON_EXTENDED	(64) /* Use the extended output format a la /dev/kmsg */
+#define CON_HANDOVER	(128) /* Device was previously a boot console. */
 
 struct console {
 	char	name[16];
@@ -151,8 +152,8 @@ struct console {
 	short	flags;
 	short	index;
 	int	cflag;
-	unsigned long printk_seq;
-	int	wrote_history;
+	atomic64_t printk_seq;
+	struct task_struct *thread;
 	void	*data;
 	struct	 console *next;
 };
diff --git a/include/linux/crash_core.h b/include/linux/crash_core.h
index 6594dbc34a374..206bde8308b2d 100644
--- a/include/linux/crash_core.h
+++ b/include/linux/crash_core.h
@@ -55,6 +55,9 @@ phys_addr_t paddr_vmcoreinfo_note(void);
 #define VMCOREINFO_OFFSET(name, field) \
 	vmcoreinfo_append_str("OFFSET(%s.%s)=%lu\n", #name, #field, \
 			      (unsigned long)offsetof(struct name, field))
+#define VMCOREINFO_TYPE_OFFSET(name, field) \
+	vmcoreinfo_append_str("OFFSET(%s.%s)=%lu\n", #name, #field, \
+			      (unsigned long)offsetof(name, field))
 #define VMCOREINFO_LENGTH(name, value) \
 	vmcoreinfo_append_str("LENGTH(%s)=%lu\n", #name, (unsigned long)value)
 #define VMCOREINFO_NUMBER(name) \
diff --git a/include/linux/dev_printk.h b/include/linux/dev_printk.h
index 3028b644b4fbd..6f009559ee540 100644
--- a/include/linux/dev_printk.h
+++ b/include/linux/dev_printk.h
@@ -21,6 +21,14 @@
 
 struct device;
 
+#define PRINTK_INFO_SUBSYSTEM_LEN	16
+#define PRINTK_INFO_DEVICE_LEN		48
+
+struct dev_printk_info {
+	char subsystem[PRINTK_INFO_SUBSYSTEM_LEN];
+	char device[PRINTK_INFO_DEVICE_LEN];
+};
+
 #ifdef CONFIG_PRINTK
 
 __printf(3, 0) __cold
diff --git a/include/linux/kmsg_dump.h b/include/linux/kmsg_dump.h
index 25f6652c05d53..3378bcbe585ea 100644
--- a/include/linux/kmsg_dump.h
+++ b/include/linux/kmsg_dump.h
@@ -45,8 +45,10 @@ struct kmsg_dumper {
 	bool registered;
 
 	/* private state of the kmsg iterator */
-	u64 line_seq;
-	u64 buffer_end_seq;
+	u32 cur_idx;
+	u32 next_idx;
+	u64 cur_seq;
+	u64 next_seq;
 };
 
 #ifdef CONFIG_PRINTK
diff --git a/include/linux/preempt.h b/include/linux/preempt.h
index e72b67e0ced8c..8a47b9b1bade1 100644
--- a/include/linux/preempt.h
+++ b/include/linux/preempt.h
@@ -236,11 +236,16 @@ do { \
 		__preempt_schedule(); \
 } while (0)
 
+/*
+ * open code preempt_check_resched() because it is not exported to modules and
+ * used by local_unlock() or bpf_enable_instrumentation().
+ */
 #define preempt_lazy_enable() \
 do { \
 	dec_preempt_lazy_count(); \
 	barrier(); \
-	preempt_check_resched(); \
+	if (should_resched(0)) \
+		__preempt_schedule(); \
 } while (0)
 
 #else /* !CONFIG_PREEMPTION */
@@ -441,7 +446,19 @@ static inline void preempt_notifier_init(struct preempt_notifier *notifier,
 extern void migrate_disable(void);
 extern void migrate_enable(void);
 
-#else /* !(CONFIG_SMP && CONFIG_PREEMPT_RT) */
+#elif defined(CONFIG_PREEMPT_RT)
+
+static inline void migrate_disable(void)
+{
+	preempt_lazy_disable();
+}
+
+static inline void migrate_enable(void)
+{
+	preempt_lazy_enable();
+}
+
+#else /* !CONFIG_PREEMPT_RT */
 
 /**
  * migrate_disable - Prevent migration of the current task
diff --git a/include/linux/printk.h b/include/linux/printk.h
index 4318e2190408a..c49d5bb3f8ffa 100644
--- a/include/linux/printk.h
+++ b/include/linux/printk.h
@@ -59,7 +59,6 @@ static inline const char *printk_skip_headers(const char *buffer)
  */
 #define CONSOLE_LOGLEVEL_DEFAULT CONFIG_CONSOLE_LOGLEVEL_DEFAULT
 #define CONSOLE_LOGLEVEL_QUIET	 CONFIG_CONSOLE_LOGLEVEL_QUIET
-#define CONSOLE_LOGLEVEL_EMERGENCY CONFIG_CONSOLE_LOGLEVEL_EMERGENCY
 
 extern int console_printk[];
 
@@ -67,7 +66,6 @@ extern int console_printk[];
 #define default_message_loglevel (console_printk[1])
 #define minimum_console_loglevel (console_printk[2])
 #define default_console_loglevel (console_printk[3])
-#define emergency_console_loglevel (console_printk[4])
 
 static inline void console_silent(void)
 {
@@ -149,10 +147,12 @@ static inline __printf(1, 2) __cold
 void early_printk(const char *s, ...) { }
 #endif
 
+struct dev_printk_info;
+
 #ifdef CONFIG_PRINTK
-asmlinkage __printf(5, 0)
+asmlinkage __printf(4, 0)
 int vprintk_emit(int facility, int level,
-		 const char *dict, size_t dictlen,
+		 const struct dev_printk_info *dev_info,
 		 const char *fmt, va_list args);
 
 asmlinkage __printf(1, 0)
@@ -193,7 +193,6 @@ __printf(1, 2) void dump_stack_set_arch_desc(const char *fmt, ...);
 void dump_stack_print_info(const char *log_lvl);
 void show_regs_print_info(const char *log_lvl);
 extern asmlinkage void dump_stack(void) __cold;
-struct wait_queue_head *printk_wait_queue(void);
 #else
 static inline __printf(1, 0)
 int vprintk(const char *s, va_list args)
@@ -257,7 +256,6 @@ static inline void show_regs_print_info(const char *log_lvl)
 static inline void dump_stack(void)
 {
 }
-
 #endif
 
 extern int kptr_restrict;
diff --git a/include/linux/printk_ringbuffer.h b/include/linux/printk_ringbuffer.h
deleted file mode 100644
index afd03305d2066..0000000000000
--- a/include/linux/printk_ringbuffer.h
+++ /dev/null
@@ -1,114 +0,0 @@
-/* SPDX-License-Identifier: GPL-2.0 */
-#ifndef _LINUX_PRINTK_RINGBUFFER_H
-#define _LINUX_PRINTK_RINGBUFFER_H
-
-#include <linux/irq_work.h>
-#include <linux/atomic.h>
-#include <linux/percpu.h>
-#include <linux/wait.h>
-
-struct prb_cpulock {
-	atomic_t owner;
-	unsigned long __percpu *irqflags;
-};
-
-struct printk_ringbuffer {
-	void *buffer;
-	unsigned int size_bits;
-
-	u64 seq;
-	atomic_long_t lost;
-
-	atomic_long_t tail;
-	atomic_long_t head;
-	atomic_long_t reserve;
-
-	struct prb_cpulock *cpulock;
-	atomic_t ctx;
-
-	struct wait_queue_head *wq;
-	atomic_long_t wq_counter;
-	struct irq_work *wq_work;
-};
-
-struct prb_entry {
-	unsigned int size;
-	u64 seq;
-	char data[0];
-};
-
-struct prb_handle {
-	struct printk_ringbuffer *rb;
-	unsigned int cpu;
-	struct prb_entry *entry;
-};
-
-#define DECLARE_STATIC_PRINTKRB_CPULOCK(name)				\
-static DEFINE_PER_CPU(unsigned long, _##name##_percpu_irqflags);	\
-static struct prb_cpulock name = {					\
-	.owner = ATOMIC_INIT(-1),					\
-	.irqflags = &_##name##_percpu_irqflags,				\
-}
-
-#define PRB_INIT ((unsigned long)-1)
-
-#define DECLARE_STATIC_PRINTKRB_ITER(name, rbaddr)			\
-static struct prb_iterator name = {					\
-	.rb = rbaddr,							\
-	.lpos = PRB_INIT,						\
-}
-
-struct prb_iterator {
-	struct printk_ringbuffer *rb;
-	unsigned long lpos;
-};
-
-#define DECLARE_STATIC_PRINTKRB(name, szbits, cpulockptr)		\
-static char _##name##_buffer[1 << (szbits)]				\
-	__aligned(__alignof__(long));					\
-static DECLARE_WAIT_QUEUE_HEAD(_##name##_wait);				\
-static void _##name##_wake_work_func(struct irq_work *irq_work)		\
-{									\
-	wake_up_interruptible_all(&_##name##_wait);			\
-}									\
-static struct irq_work _##name##_wake_work = {				\
-	.func = _##name##_wake_work_func,				\
-	.flags = ATOMIC_INIT(IRQ_WORK_LAZY),				\
-};									\
-static struct printk_ringbuffer name = {				\
-	.buffer = &_##name##_buffer[0],					\
-	.size_bits = szbits,						\
-	.seq = 0,							\
-	.lost = ATOMIC_LONG_INIT(0),					\
-	.tail = ATOMIC_LONG_INIT(-111 * sizeof(long)),			\
-	.head = ATOMIC_LONG_INIT(-111 * sizeof(long)),			\
-	.reserve = ATOMIC_LONG_INIT(-111 * sizeof(long)),		\
-	.cpulock = cpulockptr,						\
-	.ctx = ATOMIC_INIT(0),						\
-	.wq = &_##name##_wait,						\
-	.wq_counter = ATOMIC_LONG_INIT(0),				\
-	.wq_work = &_##name##_wake_work,				\
-}
-
-/* writer interface */
-char *prb_reserve(struct prb_handle *h, struct printk_ringbuffer *rb,
-		  unsigned int size);
-void prb_commit(struct prb_handle *h);
-
-/* reader interface */
-void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
-		   u64 *seq);
-void prb_iter_copy(struct prb_iterator *dest, struct prb_iterator *src);
-int prb_iter_next(struct prb_iterator *iter, char *buf, int size, u64 *seq);
-int prb_iter_wait_next(struct prb_iterator *iter, char *buf, int size,
-		       u64 *seq);
-int prb_iter_seek(struct prb_iterator *iter, u64 seq);
-int prb_iter_data(struct prb_iterator *iter, char *buf, int size, u64 *seq);
-
-/* utility functions */
-int prb_buffer_size(struct printk_ringbuffer *rb);
-void prb_inc_lost(struct printk_ringbuffer *rb);
-void prb_lock(struct prb_cpulock *cpu_lock, unsigned int *cpu_store);
-void prb_unlock(struct prb_cpulock *cpu_lock, unsigned int cpu_store);
-
-#endif /*_LINUX_PRINTK_RINGBUFFER_H */
diff --git a/include/linux/ratelimit.h b/include/linux/ratelimit.h
index 5ca206a41d678..b17e0cd0a30cf 100644
--- a/include/linux/ratelimit.h
+++ b/include/linux/ratelimit.h
@@ -28,7 +28,7 @@ static inline void ratelimit_state_exit(struct ratelimit_state *rs)
 		return;
 
 	if (rs->missed) {
-		pr_info("%s: %d output lines suppressed due to ratelimiting\n",
+		pr_warn("%s: %d output lines suppressed due to ratelimiting\n",
 			current->comm, rs->missed);
 		rs->missed = 0;
 	}
diff --git a/init/Kconfig b/init/Kconfig
index 7743d6e62a06a..c48887283f88a 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -682,7 +682,8 @@ config IKHEADERS
 
 config LOG_BUF_SHIFT
 	int "Kernel log buffer size (16 => 64KB, 17 => 128KB)"
-	range 12 25
+	range 12 25 if !H8300
+	range 12 19 if H8300
 	default 17
 	depends on PRINTK
 	help
diff --git a/kernel/printk/Makefile b/kernel/printk/Makefile
index 7b219d824c0fb..59cb24e25f004 100644
--- a/kernel/printk/Makefile
+++ b/kernel/printk/Makefile
@@ -1,3 +1,4 @@
 # SPDX-License-Identifier: GPL-2.0-only
 obj-y	= printk.o
 obj-$(CONFIG_A11Y_BRAILLE_CONSOLE)	+= braille.o
+obj-$(CONFIG_PRINTK)	+= printk_ringbuffer.o
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index ee7008c436ca1..78a277ea5c351 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -46,10 +46,10 @@
 #include <linux/uio.h>
 #include <linux/kthread.h>
 #include <linux/clocksource.h>
-#include <linux/printk_ringbuffer.h>
 #include <linux/sched/clock.h>
 #include <linux/sched/debug.h>
 #include <linux/sched/task_stack.h>
+#include <linux/kdb.h>
 
 #include <linux/uaccess.h>
 #include <asm/sections.h>
@@ -58,15 +58,15 @@
 #define CREATE_TRACE_POINTS
 #include <trace/events/printk.h>
 
+#include "printk_ringbuffer.h"
 #include "console_cmdline.h"
 #include "braille.h"
 
-int console_printk[5] = {
+int console_printk[4] = {
 	CONSOLE_LOGLEVEL_DEFAULT,	/* console_loglevel */
 	MESSAGE_LOGLEVEL_DEFAULT,	/* default_message_loglevel */
 	CONSOLE_LOGLEVEL_MIN,		/* minimum_console_loglevel */
 	CONSOLE_LOGLEVEL_DEFAULT,	/* default_console_loglevel */
-	CONSOLE_LOGLEVEL_EMERGENCY,	/* emergency_console_loglevel */
 };
 EXPORT_SYMBOL_GPL(console_printk);
 
@@ -80,6 +80,9 @@ EXPORT_SYMBOL(ignore_console_lock_warning);
 int oops_in_progress;
 EXPORT_SYMBOL(oops_in_progress);
 
+/* Set to enable sync mode. Once set, it is never cleared. */
+static bool sync_mode;
+
 /*
  * console_sem protects the console_drivers list, and also
  * provides serialisation for access to the entire console
@@ -276,30 +279,22 @@ enum con_msg_format_flags {
 static int console_msg_format = MSG_FORMAT_DEFAULT;
 
 /*
- * The printk log buffer consists of a chain of concatenated variable
- * length records. Every record starts with a record header, containing
- * the overall length of the record.
+ * The printk log buffer consists of a sequenced collection of records, each
+ * containing variable length message text. Every record also contains its
+ * own meta-data (@info).
  *
- * The heads to the first and last entry in the buffer, as well as the
- * sequence numbers of these entries are maintained when messages are
- * stored.
+ * Every record meta-data carries the timestamp in microseconds, as well as
+ * the standard userspace syslog level and syslog facility. The usual kernel
+ * messages use LOG_KERN; userspace-injected messages always carry a matching
+ * syslog facility, by default LOG_USER. The origin of every message can be
+ * reliably determined that way.
  *
- * If the heads indicate available messages, the length in the header
- * tells the start next message. A length == 0 for the next message
- * indicates a wrap-around to the beginning of the buffer.
+ * The human readable log message of a record is available in @text, the
+ * length of the message text in @text_len. The stored message is not
+ * terminated.
  *
- * Every record carries the monotonic timestamp in microseconds, as well as
- * the standard userspace syslog level and syslog facility. The usual
- * kernel messages use LOG_KERN; userspace-injected messages always carry
- * a matching syslog facility, by default LOG_USER. The origin of every
- * message can be reliably determined that way.
- *
- * The human readable log message directly follows the message header. The
- * length of the message text is stored in the header, the stored message
- * is not terminated.
- *
- * Optionally, a message can carry a dictionary of properties (key/value pairs),
- * to provide userspace with a machine-readable message context.
+ * Optionally, a record can carry a dictionary of properties (key/value
+ * pairs), to provide userspace with a machine-readable message context.
  *
  * Examples for well-defined, commonly used property names are:
  *   DEVICE=b12:8               device identifier
@@ -309,25 +304,22 @@ static int console_msg_format = MSG_FORMAT_DEFAULT;
  *                                +sound:card0  subsystem:devname
  *   SUBSYSTEM=pci              driver-core subsystem name
  *
- * Valid characters in property names are [a-zA-Z0-9.-_]. The plain text value
- * follows directly after a '=' character. Every property is terminated by
- * a '\0' character. The last property is not terminated.
+ * Valid characters in property names are [a-zA-Z0-9.-_]. Property names
+ * and values are terminated by a '\0' character.
  *
- * Example of a message structure:
- *   0000  ff 8f 00 00 00 00 00 00      monotonic time in nsec
- *   0008  34 00                        record is 52 bytes long
- *   000a        0b 00                  text is 11 bytes long
- *   000c              1f 00            dictionary is 23 bytes long
- *   000e                    03 00      LOG_KERN (facility) LOG_ERR (level)
- *   0010  69 74 27 73 20 61 20 6c      "it's a l"
- *         69 6e 65                     "ine"
- *   001b           44 45 56 49 43      "DEVIC"
- *         45 3d 62 38 3a 32 00 44      "E=b8:2\0D"
- *         52 49 56 45 52 3d 62 75      "RIVER=bu"
- *         67                           "g"
- *   0032     00 00 00                  padding to next message header
+ * Example of record values:
+ *   record.text_buf                = "it's a line" (unterminated)
+ *   record.info.seq                = 56
+ *   record.info.ts_nsec            = 36863
+ *   record.info.text_len           = 11
+ *   record.info.facility           = 0 (LOG_KERN)
+ *   record.info.flags              = 0
+ *   record.info.level              = 3 (LOG_ERR)
+ *   record.info.caller_id          = 299 (task 299)
+ *   record.info.dev_info.subsystem = "pci" (terminated)
+ *   record.info.dev_info.device    = "+pci:0000:00:01.0" (terminated)
  *
- * The 'struct printk_log' buffer header must never be directly exported to
+ * The 'struct printk_info' buffer must never be directly exported to
  * userspace, it is a kernel-private implementation detail that might
  * need to be changed in the future, when the requirements change.
  *
@@ -347,40 +339,23 @@ enum log_flags {
 	LOG_CONT	= 8,	/* text is a fragment of a continuation line */
 };
 
-struct printk_log {
-	u64 ts_nsec;		/* timestamp in nanoseconds */
-	u16 cpu;		/* cpu that generated record */
-	u16 len;		/* length of entire record */
-	u16 text_len;		/* length of text buffer */
-	u16 dict_len;		/* length of dictionary buffer */
-	u8 facility;		/* syslog facility */
-	u8 flags:5;		/* internal record flags */
-	u8 level:3;		/* syslog level */
-#ifdef CONFIG_PRINTK_CALLER
-	u32 caller_id;            /* thread id or processor id */
-#endif
-}
-#ifdef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS
-__packed __aligned(4)
-#endif
-;
-
-DECLARE_STATIC_PRINTKRB_CPULOCK(printk_cpulock);
+/* The syslog_lock protects syslog_* variables. */
+static DEFINE_SPINLOCK(syslog_lock);
+#define syslog_lock_irq() spin_lock_irq(&syslog_lock)
+#define syslog_unlock_irq() spin_unlock_irq(&syslog_lock)
+#define syslog_lock_irqsave(flags) spin_lock_irqsave(&syslog_lock, flags)
+#define syslog_unlock_irqrestore(flags) spin_unlock_irqrestore(&syslog_lock, flags)
 
 #ifdef CONFIG_PRINTK
-/* record buffer */
-DECLARE_STATIC_PRINTKRB(printk_rb, CONFIG_LOG_BUF_SHIFT, &printk_cpulock);
-
-static DEFINE_MUTEX(syslog_lock);
-DECLARE_STATIC_PRINTKRB_ITER(syslog_iter, &printk_rb);
-
-/* the last printk record to read by syslog(READ) or /proc/kmsg */
+DECLARE_WAIT_QUEUE_HEAD(log_wait);
+/* All 3 protected by @syslog_lock. */
+/* the next printk record to read by syslog(READ) or /proc/kmsg */
 static u64 syslog_seq;
 static size_t syslog_partial;
 static bool syslog_time;
 
 /* the next printk record to read after the last 'clear' command */
-static u64 clear_seq;
+static atomic64_t clear_seq = ATOMIC64_INIT(0);
 
 #ifdef CONFIG_PRINTK_CALLER
 #define PREFIX_MAX		48
@@ -392,76 +367,80 @@ static u64 clear_seq;
 #define LOG_LEVEL(v)		((v) & 0x07)
 #define LOG_FACILITY(v)		((v) >> 3 & 0xff)
 
+/* record buffer */
+#define LOG_ALIGN __alignof__(unsigned long)
+#define __LOG_BUF_LEN (1 << CONFIG_LOG_BUF_SHIFT)
+#define LOG_BUF_LEN_MAX (u32)(1 << 31)
+static char __log_buf[__LOG_BUF_LEN] __aligned(LOG_ALIGN);
+static char *log_buf = __log_buf;
+static u32 log_buf_len = __LOG_BUF_LEN;
+
+/*
+ * Define the average message size. This only affects the number of
+ * descriptors that will be available. Underestimating is better than
+ * overestimating (too many available descriptors is better than not enough).
+ */
+#define PRB_AVGBITS 5	/* 32 character average length */
+
+#if CONFIG_LOG_BUF_SHIFT <= PRB_AVGBITS
+#error CONFIG_LOG_BUF_SHIFT value too small.
+#endif
+_DEFINE_PRINTKRB(printk_rb_static, CONFIG_LOG_BUF_SHIFT - PRB_AVGBITS,
+		 PRB_AVGBITS, &__log_buf[0]);
+
+static struct printk_ringbuffer printk_rb_dynamic;
+
+static struct printk_ringbuffer *prb = &printk_rb_static;
+
+/*
+ * We cannot access per-CPU data (e.g. per-CPU flush irq_work) before
+ * per_cpu_areas are initialised. This variable is set to true when
+ * it's safe to access per-CPU data.
+ */
+static bool __printk_percpu_data_ready __read_mostly;
+
+static bool printk_percpu_data_ready(void)
+{
+	return __printk_percpu_data_ready;
+}
+
 /* Return log buffer address */
 char *log_buf_addr_get(void)
 {
-	return printk_rb.buffer;
+	return log_buf;
 }
 
 /* Return log buffer size */
 u32 log_buf_len_get(void)
 {
-	return (1 << printk_rb.size_bits);
+	return log_buf_len;
 }
 
-/* human readable text of the record */
-static char *log_text(const struct printk_log *msg)
+/*
+ * Define how much of the log buffer we could take at maximum. The value
+ * must be greater than two. Note that only half of the buffer is available
+ * when the index points to the middle.
+ */
+#define MAX_LOG_TAKE_PART 4
+static const char trunc_msg[] = "<truncated>";
+
+static void truncate_msg(u16 *text_len, u16 *trunc_msg_len)
 {
-	return (char *)msg + sizeof(struct printk_log);
-}
+	/*
+	 * The message should not take the whole buffer. Otherwise, it might
+	 * get removed too soon.
+	 */
+	u32 max_text_len = log_buf_len / MAX_LOG_TAKE_PART;
 
-/* optional key/value pair dictionary attached to the record */
-static char *log_dict(const struct printk_log *msg)
-{
-	return (char *)msg + sizeof(struct printk_log) + msg->text_len;
-}
+	if (*text_len > max_text_len)
+		*text_len = max_text_len;
 
-static void printk_emergency(char *buffer, int level, u64 ts_nsec, u16 cpu,
-			     char *text, u16 text_len);
-
-/* insert record into the buffer, discard old ones, update heads */
-static int log_store(u32 caller_id, int facility, int level,
-		     enum log_flags flags, u64 ts_nsec, u16 cpu,
-		     const char *dict, u16 dict_len,
-		     const char *text, u16 text_len)
-{
-	struct printk_log *msg;
-	struct prb_handle h;
-	char *rbuf;
-	u32 size;
-
-	size = sizeof(*msg) + text_len + dict_len;
-
-	rbuf = prb_reserve(&h, &printk_rb, size);
-	if (!rbuf) {
-		/*
-		 * An emergency message would have been printed, but
-		 * it cannot be stored in the log.
-		 */
-		prb_inc_lost(&printk_rb);
-		return 0;
-	}
-
-	/* fill message */
-	msg = (struct printk_log *)rbuf;
-	memcpy(log_text(msg), text, text_len);
-	msg->text_len = text_len;
-	memcpy(log_dict(msg), dict, dict_len);
-	msg->dict_len = dict_len;
-	msg->facility = facility;
-	msg->level = level & 7;
-	msg->flags = flags & 0x1f;
-	msg->ts_nsec = ts_nsec;
-#ifdef CONFIG_PRINTK_CALLER
-	msg->caller_id = caller_id;
-#endif
-	msg->cpu = cpu;
-	msg->len = size;
-
-	/* insert message */
-	prb_commit(&h);
-
-	return msg->text_len;
+	/* enable the warning message (if there is room) */
+	*trunc_msg_len = strlen(trunc_msg);
+	if (*text_len >= *trunc_msg_len)
+		*text_len -= *trunc_msg_len;
+	else
+		*trunc_msg_len = 0;
 }
 
 int dmesg_restrict = IS_ENABLED(CONFIG_SECURITY_DMESG_RESTRICT);
@@ -513,13 +492,13 @@ static void append_char(char **pp, char *e, char c)
 		*(*pp)++ = c;
 }
 
-static ssize_t msg_print_ext_header(char *buf, size_t size,
-				    struct printk_log *msg, u64 seq)
+static ssize_t info_print_ext_header(char *buf, size_t size,
+				     struct printk_info *info)
 {
-	u64 ts_usec = msg->ts_nsec;
+	u64 ts_usec = info->ts_nsec;
 	char caller[20];
 #ifdef CONFIG_PRINTK_CALLER
-	u32 id = msg->caller_id;
+	u32 id = info->caller_id;
 
 	snprintf(caller, sizeof(caller), ",caller=%c%u",
 		 id & 0x80000000 ? 'C' : 'T', id & ~0x80000000);
@@ -529,14 +508,14 @@ static ssize_t msg_print_ext_header(char *buf, size_t size,
 
 	do_div(ts_usec, 1000);
 
-	return scnprintf(buf, size, "%u,%llu,%llu,%c%s,%hu;",
-			 (msg->facility << 3) | msg->level, seq, ts_usec,
-			 msg->flags & LOG_CONT ? 'c' : '-', caller, msg->cpu);
+	return scnprintf(buf, size, "%u,%llu,%llu,%c%s;",
+			 (info->facility << 3) | info->level, info->seq,
+			 ts_usec, info->flags & LOG_CONT ? 'c' : '-', caller);
 }
 
-static ssize_t msg_print_ext_body(char *buf, size_t size,
-				  char *dict, size_t dict_len,
-				  char *text, size_t text_len)
+static ssize_t msg_add_ext_text(char *buf, size_t size,
+				const char *text, size_t text_len,
+				unsigned char endc)
 {
 	char *p = buf, *e = buf + size;
 	size_t i;
@@ -550,50 +529,56 @@ static ssize_t msg_print_ext_body(char *buf, size_t size,
 		else
 			append_char(&p, e, c);
 	}
-	append_char(&p, e, '\n');
-
-	if (dict_len) {
-		bool line = true;
-
-		for (i = 0; i < dict_len; i++) {
-			unsigned char c = dict[i];
-
-			if (line) {
-				append_char(&p, e, ' ');
-				line = false;
-			}
-
-			if (c == '\0') {
-				append_char(&p, e, '\n');
-				line = true;
-				continue;
-			}
-
-			if (c < ' ' || c >= 127 || c == '\\') {
-				p += scnprintf(p, e - p, "\\x%02x", c);
-				continue;
-			}
-
-			append_char(&p, e, c);
-		}
-		append_char(&p, e, '\n');
-	}
+	append_char(&p, e, endc);
 
 	return p - buf;
 }
 
-#define PRINTK_SPRINT_MAX (LOG_LINE_MAX + PREFIX_MAX)
-#define PRINTK_RECORD_MAX (sizeof(struct printk_log) + \
-				CONSOLE_EXT_LOG_MAX + PRINTK_SPRINT_MAX)
+static ssize_t msg_add_dict_text(char *buf, size_t size,
+				 const char *key, const char *val)
+{
+	size_t val_len = strlen(val);
+	ssize_t len;
+
+	if (!val_len)
+		return 0;
+
+	len = msg_add_ext_text(buf, size, "", 0, ' ');	/* dict prefix */
+	len += msg_add_ext_text(buf + len, size - len, key, strlen(key), '=');
+	len += msg_add_ext_text(buf + len, size - len, val, val_len, '\n');
+
+	return len;
+}
+
+static ssize_t msg_print_ext_body(char *buf, size_t size,
+				  char *text, size_t text_len,
+				  struct dev_printk_info *dev_info)
+{
+	ssize_t len;
+
+	len = msg_add_ext_text(buf, size, text, text_len, '\n');
+
+	if (!dev_info)
+		goto out;
+
+	len += msg_add_dict_text(buf + len, size - len, "SUBSYSTEM",
+				 dev_info->subsystem);
+	len += msg_add_dict_text(buf + len, size - len, "DEVICE",
+				 dev_info->device);
+out:
+	return len;
+}
 
 /* /dev/kmsg - userspace message inject/listen interface */
 struct devkmsg_user {
 	u64 seq;
-	struct prb_iterator iter;
 	struct ratelimit_state rs;
 	struct mutex lock;
 	char buf[CONSOLE_EXT_LOG_MAX];
-	char msgbuf[PRINTK_RECORD_MAX];
+
+	struct printk_info info;
+	char text_buf[CONSOLE_EXT_LOG_MAX];
+	struct printk_record record;
 };
 
 static __printf(3, 4) __cold
@@ -603,7 +588,7 @@ int devkmsg_emit(int facility, int level, const char *fmt, ...)
 	int r;
 
 	va_start(args, fmt);
-	r = vprintk_emit(facility, level, NULL, 0, fmt, args);
+	r = vprintk_emit(facility, level, NULL, fmt, args);
 	va_end(args);
 
 	return r;
@@ -676,11 +661,9 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 			    size_t count, loff_t *ppos)
 {
 	struct devkmsg_user *user = file->private_data;
-	struct prb_iterator backup_iter;
-	struct printk_log *msg;
-	ssize_t ret;
+	struct printk_record *r = &user->record;
 	size_t len;
-	u64 seq;
+	ssize_t ret;
 
 	if (!user)
 		return -EBADF;
@@ -689,63 +672,42 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 	if (ret)
 		return ret;
 
-	/* make a backup copy in case there is a problem */
-	prb_iter_copy(&backup_iter, &user->iter);
+	if (!prb_read_valid(prb, user->seq, r)) {
+		if (file->f_flags & O_NONBLOCK) {
+			ret = -EAGAIN;
+			goto out;
+		}
 
-	if (file->f_flags & O_NONBLOCK) {
-		ret = prb_iter_next(&user->iter, &user->msgbuf[0],
-				      sizeof(user->msgbuf), &seq);
-	} else {
-		ret = prb_iter_wait_next(&user->iter, &user->msgbuf[0],
-					   sizeof(user->msgbuf), &seq);
+		ret = wait_event_interruptible(log_wait,
+					prb_read_valid(prb, user->seq, r));
+		if (ret)
+			goto out;
 	}
-	if (ret == 0) {
-		/* end of list */
-		ret = -EAGAIN;
-		goto out;
-	} else if (ret == -EINVAL) {
-		/* iterator invalid, return error and reset */
+
+	if (user->seq < prb_first_valid_seq(prb)) {
+		/* our last seen message is gone, return error and reset */
+		user->seq = prb_first_valid_seq(prb);
 		ret = -EPIPE;
-		prb_iter_init(&user->iter, &printk_rb, &user->seq);
-		goto out;
-	} else if (ret < 0) {
-		/* interrupted by signal */
 		goto out;
 	}
 
-	user->seq++;
-	if (user->seq < seq) {
-		ret = -EPIPE;
-		goto restore_out;
-	}
-
-	msg = (struct printk_log *)&user->msgbuf[0];
-	len = msg_print_ext_header(user->buf, sizeof(user->buf),
-				   msg, user->seq);
+	len = info_print_ext_header(user->buf, sizeof(user->buf), r->info);
 	len += msg_print_ext_body(user->buf + len, sizeof(user->buf) - len,
-				  log_dict(msg), msg->dict_len,
-				  log_text(msg), msg->text_len);
+				  &r->text_buf[0], r->info->text_len,
+				  &r->info->dev_info);
+
+	user->seq = r->info->seq + 1;
 
 	if (len > count) {
 		ret = -EINVAL;
-		goto restore_out;
+		goto out;
 	}
 
 	if (copy_to_user(buf, user->buf, len)) {
 		ret = -EFAULT;
-		goto restore_out;
+		goto out;
 	}
-
 	ret = len;
-	goto out;
-restore_out:
-	/*
-	 * There was an error, but this message should not be
-	 * lost because of it. Restore the backup and setup
-	 * seq so that it will work with the next read.
-	 */
-	prb_iter_copy(&user->iter, &backup_iter);
-	user->seq = seq - 1;
 out:
 	mutex_unlock(&user->lock);
 	return ret;
@@ -762,22 +724,17 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 static loff_t devkmsg_llseek(struct file *file, loff_t offset, int whence)
 {
 	struct devkmsg_user *user = file->private_data;
-	loff_t ret;
-	u64 seq;
+	loff_t ret = 0;
 
 	if (!user)
 		return -EBADF;
 	if (offset)
 		return -ESPIPE;
 
-	ret = mutex_lock_interruptible(&user->lock);
-	if (ret)
-		return ret;
-
 	switch (whence) {
 	case SEEK_SET:
 		/* the first record */
-		prb_iter_init(&user->iter, &printk_rb, &user->seq);
+		user->seq = prb_first_valid_seq(prb);
 		break;
 	case SEEK_DATA:
 		/*
@@ -785,87 +742,35 @@ static loff_t devkmsg_llseek(struct file *file, loff_t offset, int whence)
 		 * like issued by 'dmesg -c'. Reading /dev/kmsg itself
 		 * changes no global state, and does not clear anything.
 		 */
-		for (;;) {
-			prb_iter_init(&user->iter, &printk_rb, &seq);
-			ret = prb_iter_seek(&user->iter, clear_seq);
-			if (ret > 0) {
-				/* seeked to clear seq */
-				user->seq = clear_seq;
-				break;
-			} else if (ret == 0) {
-				/*
-				 * The end of the list was hit without
-				 * ever seeing the clear seq. Just
-				 * seek to the beginning of the list.
-				 */
-				prb_iter_init(&user->iter, &printk_rb,
-					      &user->seq);
-				break;
-			}
-			/* iterator invalid, start over */
-
-			/* reset clear_seq if it is no longer available */
-			if (seq > clear_seq)
-				clear_seq = 0;
-		}
-		ret = 0;
+		user->seq = atomic64_read(&clear_seq);
 		break;
 	case SEEK_END:
 		/* after the last record */
-		for (;;) {
-			ret = prb_iter_next(&user->iter, NULL, 0, &user->seq);
-			if (ret == 0)
-				break;
-			else if (ret > 0)
-				continue;
-			/* iterator invalid, start over */
-			prb_iter_init(&user->iter, &printk_rb, &user->seq);
-		}
-		ret = 0;
+		user->seq = prb_next_seq(prb);
 		break;
 	default:
 		ret = -EINVAL;
 	}
-
-	mutex_unlock(&user->lock);
 	return ret;
 }
 
-struct wait_queue_head *printk_wait_queue(void)
-{
-	/* FIXME: using prb internals! */
-	return printk_rb.wq;
-}
-
 static __poll_t devkmsg_poll(struct file *file, poll_table *wait)
 {
 	struct devkmsg_user *user = file->private_data;
-	struct prb_iterator iter;
 	__poll_t ret = 0;
-	int rbret;
-	u64 seq;
 
 	if (!user)
 		return EPOLLERR|EPOLLNVAL;
 
-	poll_wait(file, printk_wait_queue(), wait);
+	poll_wait(file, &log_wait, wait);
 
-	mutex_lock(&user->lock);
-
-	/* use copy so no actual iteration takes place */
-	prb_iter_copy(&iter, &user->iter);
-
-	rbret = prb_iter_next(&iter, &user->msgbuf[0],
-			      sizeof(user->msgbuf), &seq);
-	if (rbret == 0)
-		goto out;
-
-	ret = EPOLLIN|EPOLLRDNORM;
-
-	if (rbret < 0 || (seq - user->seq) != 1)
-		ret |= EPOLLERR|EPOLLPRI;
-out:
-	mutex_unlock(&user->lock);
+	if (prb_read_valid(prb, user->seq, NULL)) {
+		/* return error when data has vanished underneath us */
+		if (user->seq < prb_first_valid_seq(prb))
+			ret = EPOLLIN|EPOLLRDNORM|EPOLLERR|EPOLLPRI;
+		else
+			ret = EPOLLIN|EPOLLRDNORM;
+	}
 
 	return ret;
 }
@@ -895,7 +800,10 @@ static int devkmsg_open(struct inode *inode, struct file *file)
 
 	mutex_init(&user->lock);
 
-	prb_iter_init(&user->iter, &printk_rb, &user->seq);
+	prb_rec_init_rd(&user->record, &user->info,
+			&user->text_buf[0], sizeof(user->text_buf));
+
+	user->seq = prb_first_valid_seq(prb);
 
 	file->private_data = user;
 	return 0;
@@ -935,23 +843,64 @@ const struct file_operations kmsg_fops = {
  */
 void log_buf_vmcoreinfo_setup(void)
 {
+	struct dev_printk_info *dev_info = NULL;
+
+	VMCOREINFO_SYMBOL(prb);
+	VMCOREINFO_SYMBOL(printk_rb_static);
+	VMCOREINFO_SYMBOL(clear_seq);
+
 	/*
-	 * Export struct printk_log size and field offsets. User space tools can
+	 * Export struct size and field offsets. User space tools can
 	 * parse it and detect any changes to structure down the line.
 	 */
-	VMCOREINFO_STRUCT_SIZE(printk_log);
-	VMCOREINFO_OFFSET(printk_log, ts_nsec);
-	VMCOREINFO_OFFSET(printk_log, len);
-	VMCOREINFO_OFFSET(printk_log, text_len);
-	VMCOREINFO_OFFSET(printk_log, dict_len);
-#ifdef CONFIG_PRINTK_CALLER
-	VMCOREINFO_OFFSET(printk_log, caller_id);
-#endif
+
+	VMCOREINFO_SIZE(atomic64_t);
+	VMCOREINFO_TYPE_OFFSET(atomic64_t, counter);
+
+	VMCOREINFO_STRUCT_SIZE(printk_ringbuffer);
+	VMCOREINFO_OFFSET(printk_ringbuffer, desc_ring);
+	VMCOREINFO_OFFSET(printk_ringbuffer, text_data_ring);
+	VMCOREINFO_OFFSET(printk_ringbuffer, fail);
+
+	VMCOREINFO_STRUCT_SIZE(prb_desc_ring);
+	VMCOREINFO_OFFSET(prb_desc_ring, count_bits);
+	VMCOREINFO_OFFSET(prb_desc_ring, descs);
+	VMCOREINFO_OFFSET(prb_desc_ring, infos);
+	VMCOREINFO_OFFSET(prb_desc_ring, head_id);
+	VMCOREINFO_OFFSET(prb_desc_ring, tail_id);
+
+	VMCOREINFO_STRUCT_SIZE(prb_desc);
+	VMCOREINFO_OFFSET(prb_desc, state_var);
+	VMCOREINFO_OFFSET(prb_desc, text_blk_lpos);
+
+	VMCOREINFO_STRUCT_SIZE(prb_data_blk_lpos);
+	VMCOREINFO_OFFSET(prb_data_blk_lpos, begin);
+	VMCOREINFO_OFFSET(prb_data_blk_lpos, next);
+
+	VMCOREINFO_STRUCT_SIZE(printk_info);
+	VMCOREINFO_OFFSET(printk_info, seq);
+	VMCOREINFO_OFFSET(printk_info, ts_nsec);
+	VMCOREINFO_OFFSET(printk_info, text_len);
+	VMCOREINFO_OFFSET(printk_info, caller_id);
+	VMCOREINFO_OFFSET(printk_info, dev_info);
+
+	VMCOREINFO_STRUCT_SIZE(dev_printk_info);
+	VMCOREINFO_OFFSET(dev_printk_info, subsystem);
+	VMCOREINFO_LENGTH(printk_info_subsystem, sizeof(dev_info->subsystem));
+	VMCOREINFO_OFFSET(dev_printk_info, device);
+	VMCOREINFO_LENGTH(printk_info_device, sizeof(dev_info->device));
+
+	VMCOREINFO_STRUCT_SIZE(prb_data_ring);
+	VMCOREINFO_OFFSET(prb_data_ring, size_bits);
+	VMCOREINFO_OFFSET(prb_data_ring, data);
+	VMCOREINFO_OFFSET(prb_data_ring, head_lpos);
+	VMCOREINFO_OFFSET(prb_data_ring, tail_lpos);
+
+	VMCOREINFO_SIZE(atomic_long_t);
+	VMCOREINFO_TYPE_OFFSET(atomic_long_t, counter);
 }
 #endif
 
-/* FIXME: no support for buffer resizing */
-#if 0
 /* requested log_buf_len from kernel cmdline */
 static unsigned long __initdata new_log_buf_len;
 
@@ -1017,15 +966,59 @@ static void __init log_buf_add_cpu(void)
 #else /* !CONFIG_SMP */
 static inline void log_buf_add_cpu(void) {}
 #endif /* CONFIG_SMP */
-#endif /* 0 */
+
+static void __init set_percpu_data_ready(void)
+{
+	__printk_percpu_data_ready = true;
+}
+
+static unsigned int __init add_to_rb(struct printk_ringbuffer *rb,
+				     struct printk_record *r)
+{
+	struct prb_reserved_entry e;
+	struct printk_record dest_r;
+
+	prb_rec_init_wr(&dest_r, r->info->text_len);
+
+	if (!prb_reserve(&e, rb, &dest_r))
+		return 0;
+
+	memcpy(&dest_r.text_buf[0], &r->text_buf[0], r->info->text_len);
+	dest_r.info->text_len = r->info->text_len;
+	dest_r.info->facility = r->info->facility;
+	dest_r.info->level = r->info->level;
+	dest_r.info->flags = r->info->flags;
+	dest_r.info->ts_nsec = r->info->ts_nsec;
+	dest_r.info->caller_id = r->info->caller_id;
+	memcpy(&dest_r.info->dev_info, &r->info->dev_info, sizeof(dest_r.info->dev_info));
+
+	prb_final_commit(&e);
+
+	return prb_record_text_space(&e);
+}
+
+static char setup_text_buf[LOG_LINE_MAX] __initdata;
 
 void __init setup_log_buf(int early)
 {
-/* FIXME: no support for buffer resizing */
-#if 0
-	unsigned long flags;
+	struct printk_info *new_infos;
+	unsigned int new_descs_count;
+	struct prb_desc *new_descs;
+	struct printk_info info;
+	struct printk_record r;
+	size_t new_descs_size;
+	size_t new_infos_size;
 	char *new_log_buf;
 	unsigned int free;
+	u64 seq;
+
+	/*
+	 * Some archs call setup_log_buf() multiple times - first is very
+	 * early, e.g. from setup_arch(), and second - when percpu_areas
+	 * are initialised.
+	 */
+	if (!early)
+		set_percpu_data_ready();
 
 	if (log_buf != __log_buf)
 		return;
@@ -1036,25 +1029,71 @@ void __init setup_log_buf(int early)
 	if (!new_log_buf_len)
 		return;
 
-	new_log_buf = memblock_alloc(new_log_buf_len, LOG_ALIGN);
-	if (unlikely(!new_log_buf)) {
-		pr_err("log_buf_len: %lu bytes not available\n",
-			new_log_buf_len);
+	new_descs_count = new_log_buf_len >> PRB_AVGBITS;
+	if (new_descs_count == 0) {
+		pr_err("new_log_buf_len: %lu too small\n", new_log_buf_len);
 		return;
 	}
 
-	logbuf_lock_irqsave(flags);
+	new_log_buf = memblock_alloc(new_log_buf_len, LOG_ALIGN);
+	if (unlikely(!new_log_buf)) {
+		pr_err("log_buf_len: %lu text bytes not available\n",
+		       new_log_buf_len);
+		return;
+	}
+
+	new_descs_size = new_descs_count * sizeof(struct prb_desc);
+	new_descs = memblock_alloc(new_descs_size, LOG_ALIGN);
+	if (unlikely(!new_descs)) {
+		pr_err("log_buf_len: %zu desc bytes not available\n",
+		       new_descs_size);
+		goto err_free_log_buf;
+	}
+
+	new_infos_size = new_descs_count * sizeof(struct printk_info);
+	new_infos = memblock_alloc(new_infos_size, LOG_ALIGN);
+	if (unlikely(!new_infos)) {
+		pr_err("log_buf_len: %zu info bytes not available\n",
+		       new_infos_size);
+		goto err_free_descs;
+	}
+
+	prb_rec_init_rd(&r, &info, &setup_text_buf[0], sizeof(setup_text_buf));
+
+	prb_init(&printk_rb_dynamic,
+		 new_log_buf, ilog2(new_log_buf_len),
+		 new_descs, ilog2(new_descs_count),
+		 new_infos);
+
 	log_buf_len = new_log_buf_len;
 	log_buf = new_log_buf;
 	new_log_buf_len = 0;
-	free = __LOG_BUF_LEN - log_next_idx;
-	memcpy(log_buf, __log_buf, __LOG_BUF_LEN);
-	logbuf_unlock_irqrestore(flags);
+
+	free = __LOG_BUF_LEN;
+	prb_for_each_record(0, &printk_rb_static, seq, &r)
+		free -= add_to_rb(&printk_rb_dynamic, &r);
+
+	/*
+	 * This is early enough that everything is still running on the
+	 * boot CPU and interrupts are disabled. So no new messages will
+	 * appear during the transition to the dynamic buffer.
+	 */
+	prb = &printk_rb_dynamic;
+
+	if (seq != prb_next_seq(&printk_rb_static)) {
+		pr_err("dropped %llu messages\n",
+		       prb_next_seq(&printk_rb_static) - seq);
+	}
 
 	pr_info("log_buf_len: %u bytes\n", log_buf_len);
 	pr_info("early log buf free: %u(%u%%)\n",
 		free, (free * 100) / __LOG_BUF_LEN);
-#endif
+	return;
+
+err_free_descs:
+	memblock_free(__pa(new_descs), new_descs_size);
+err_free_log_buf:
+	memblock_free(__pa(new_log_buf), new_log_buf_len);
 }
 
 static bool __read_mostly ignore_loglevel;
@@ -1135,11 +1174,6 @@ static inline void boot_delay_msec(int level)
 static bool printk_time = IS_ENABLED(CONFIG_PRINTK_TIME);
 module_param_named(time, printk_time, bool, S_IRUGO | S_IWUSR);
 
-static size_t print_cpu(u16 cpu, char *buf)
-{
-	return sprintf(buf, "%03hu: ", cpu);
-}
-
 static size_t print_syslog(unsigned int level, char *buf)
 {
 	return sprintf(buf, "<%u>", level);
@@ -1166,104 +1200,169 @@ static size_t print_caller(u32 id, char *buf)
 #define print_caller(id, buf) 0
 #endif
 
-static size_t print_prefix(const struct printk_log *msg, bool syslog,
-			   bool time, char *buf)
+static size_t info_print_prefix(const struct printk_info  *info, bool syslog,
+				bool time, char *buf)
 {
 	size_t len = 0;
 
 	if (syslog)
-		len = print_syslog((msg->facility << 3) | msg->level, buf);
+		len = print_syslog((info->facility << 3) | info->level, buf);
 
 	if (time)
-		len += print_time(msg->ts_nsec, buf + len);
+		len += print_time(info->ts_nsec, buf + len);
 
-	len += print_caller(msg->caller_id, buf + len);
+	len += print_caller(info->caller_id, buf + len);
 
 	if (IS_ENABLED(CONFIG_PRINTK_CALLER) || time) {
 		buf[len++] = ' ';
 		buf[len] = '\0';
 	}
-	len += print_cpu(msg->cpu, buf + len);
 
 	return len;
 }
 
-static size_t msg_print_text(const struct printk_log *msg, bool syslog,
-			     bool time, char *buf, size_t size)
+/*
+ * Prepare the record for printing. The text is shifted within the given
+ * buffer to avoid a need for another one. The following operations are
+ * done:
+ *
+ *   - Add prefix for each line.
+ *   - Add the trailing newline that has been removed in vprintk_store().
+ *   - Drop truncated lines that do not longer fit into the buffer.
+ *
+ * Return: The length of the updated/prepared text, including the added
+ * prefixes and the newline. The dropped line(s) are not counted.
+ */
+static size_t record_print_text(struct printk_record *r, bool syslog,
+				bool time)
 {
-	const char *text = log_text(msg);
-	size_t text_size = msg->text_len;
-	size_t len = 0;
+	size_t text_len = r->info->text_len;
+	size_t buf_size = r->text_buf_size;
+	char *text = r->text_buf;
 	char prefix[PREFIX_MAX];
-	const size_t prefix_len = print_prefix(msg, syslog, time, prefix);
+	bool truncated = false;
+	size_t prefix_len;
+	size_t line_len;
+	size_t len = 0;
+	char *next;
 
-	do {
-		const char *next = memchr(text, '\n', text_size);
-		size_t text_len;
+	/*
+	 * If the message was truncated because the buffer was not large
+	 * enough, treat the available text as if it were the full text.
+	 */
+	if (text_len > buf_size)
+		text_len = buf_size;
 
+	prefix_len = info_print_prefix(r->info, syslog, time, prefix);
+
+	/*
+	 * @text_len: bytes of unprocessed text
+	 * @line_len: bytes of current line _without_ newline
+	 * @text:     pointer to beginning of current line
+	 * @len:      number of bytes prepared in r->text_buf
+	 */
+	for (;;) {
+		next = memchr(text, '\n', text_len);
 		if (next) {
-			text_len = next - text;
-			next++;
-			text_size -= next - text;
+			line_len = next - text;
 		} else {
-			text_len = text_size;
+			/* Drop truncated line(s). */
+			if (truncated)
+				break;
+			line_len = text_len;
 		}
 
-		if (buf) {
-			if (prefix_len + text_len + 1 >= size - len)
+		/*
+		 * Truncate the text if there is not enough space to add the
+		 * prefix and a trailing newline.
+		 */
+		if (len + prefix_len + text_len + 1 > buf_size) {
+			/* Drop even the current line if no space. */
+			if (len + prefix_len + line_len + 1 > buf_size)
 				break;
 
-			memcpy(buf + len, prefix, prefix_len);
-			len += prefix_len;
-			memcpy(buf + len, text, text_len);
-			len += text_len;
-			buf[len++] = '\n';
-		} else {
-			/* SYSLOG_ACTION_* buffer size only calculation */
-			len += prefix_len + text_len + 1;
+			text_len = buf_size - len - prefix_len - 1;
+			truncated = true;
 		}
 
-		text = next;
-	} while (text);
+		memmove(text + prefix_len, text, text_len);
+		memcpy(text, prefix, prefix_len);
+
+		len += prefix_len + line_len + 1;
+
+		if (text_len == line_len) {
+			/*
+			 * Add the trailing newline removed in
+			 * vprintk_store().
+			 */
+			text[prefix_len + line_len] = '\n';
+			break;
+		}
+
+		/*
+		 * Advance beyond the added prefix and the related line with
+		 * its newline.
+		 */
+		text += prefix_len + line_len + 1;
+
+		/*
+		 * The remaining text has only decreased by the line with its
+		 * newline.
+		 *
+		 * Note that @text_len can become zero. It happens when @text
+		 * ended with a newline (either due to truncation or the
+		 * original string ending with "\n\n"). The loop is correctly
+		 * repeated and (if not truncated) an empty line with a prefix
+		 * will be prepared.
+		 */
+		text_len -= line_len + 1;
+	}
 
 	return len;
 }
 
-static int syslog_print(char __user *buf, int size, char *text,
-			char *msgbuf, int *locked)
+static size_t get_record_print_text_size(struct printk_info *info,
+					 unsigned int line_count,
+					 bool syslog, bool time)
 {
-	struct prb_iterator iter;
-	struct printk_log *msg;
+	char prefix[PREFIX_MAX];
+	size_t prefix_len;
+
+	prefix_len = info_print_prefix(info, syslog, time, prefix);
+
+	/*
+	 * Each line will be preceded with a prefix. The intermediate
+	 * newlines are already within the text, but a final trailing
+	 * newline will be added.
+	 */
+	return ((prefix_len * line_count) + info->text_len + 1);
+}
+
+static int syslog_print(char __user *buf, int size)
+{
+	struct printk_info info;
+	struct printk_record r;
+	char *text;
 	int len = 0;
-	u64 seq;
-	int ret;
+
+	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
+	if (!text)
+		return -ENOMEM;
+
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX);
 
 	while (size > 0) {
 		size_t n;
 		size_t skip;
 
-		for (;;) {
-			prb_iter_copy(&iter, &syslog_iter);
-			ret = prb_iter_next(&iter, msgbuf,
-					    PRINTK_RECORD_MAX, &seq);
-			if (ret < 0) {
-				/* messages are gone, move to first one */
-				prb_iter_init(&syslog_iter, &printk_rb,
-					      &syslog_seq);
-				syslog_partial = 0;
-				continue;
-			}
+		syslog_lock_irq();
+		if (!prb_read_valid(prb, syslog_seq, &r)) {
+			syslog_unlock_irq();
 			break;
 		}
-		if (ret == 0)
-			break;
-
-		/*
-		 * If messages have been missed, the partial tracker
-		 * is no longer valid and must be reset.
-		 */
-		if (syslog_seq > 0 && seq - 1 != syslog_seq) {
-			syslog_seq = seq - 1;
+		if (r.info->seq != syslog_seq) {
+			/* message is gone, move to next valid one */
+			syslog_seq = r.info->seq;
 			syslog_partial = 0;
 		}
 
@@ -1274,213 +1373,124 @@ static int syslog_print(char __user *buf, int size, char *text,
 		if (!syslog_partial)
 			syslog_time = printk_time;
 
-		msg = (struct printk_log *)msgbuf;
-
 		skip = syslog_partial;
-		n = msg_print_text(msg, true, syslog_time, text,
-				   PRINTK_SPRINT_MAX);
+		n = record_print_text(&r, true, syslog_time);
 		if (n - syslog_partial <= size) {
 			/* message fits into buffer, move forward */
-			prb_iter_next(&syslog_iter, NULL, 0, &syslog_seq);
+			syslog_seq = r.info->seq + 1;
 			n -= syslog_partial;
 			syslog_partial = 0;
-		} else if (!len) {
+		} else if (!len){
 			/* partial read(), remember position */
 			n = size;
 			syslog_partial += n;
 		} else
 			n = 0;
+		syslog_unlock_irq();
 
 		if (!n)
 			break;
 
-		mutex_unlock(&syslog_lock);
 		if (copy_to_user(buf, text + skip, n)) {
 			if (!len)
 				len = -EFAULT;
-			*locked = 0;
 			break;
 		}
-		ret = mutex_lock_interruptible(&syslog_lock);
 
 		len += n;
 		size -= n;
 		buf += n;
-
-		if (ret) {
-			if (!len)
-				len = ret;
-			*locked = 0;
-			break;
-		}
 	}
 
+	kfree(text);
 	return len;
 }
 
-static int count_remaining(struct prb_iterator *iter, u64 until_seq,
-			   char *msgbuf, int size, bool records, bool time)
-{
-	struct prb_iterator local_iter;
-	struct printk_log *msg;
-	int len = 0;
-	u64 seq;
-	int ret;
-
-	prb_iter_copy(&local_iter, iter);
-	for (;;) {
-		ret = prb_iter_next(&local_iter, msgbuf, size, &seq);
-		if (ret == 0) {
-			break;
-		} else if (ret < 0) {
-			/* the iter is invalid, restart from head */
-			prb_iter_init(&local_iter, &printk_rb, NULL);
-			len = 0;
-			continue;
-		}
-
-		if (until_seq && seq >= until_seq)
-			break;
-
-		if (records) {
-			len++;
-		} else {
-			msg = (struct printk_log *)msgbuf;
-			len += msg_print_text(msg, true, time, NULL, 0);
-		}
-	}
-
-	return len;
-}
-
-static void syslog_clear(void)
-{
-	struct prb_iterator iter;
-	int ret;
-
-	prb_iter_init(&iter, &printk_rb, &clear_seq);
-	for (;;) {
-		ret = prb_iter_next(&iter, NULL, 0, &clear_seq);
-		if (ret == 0)
-			break;
-		else if (ret < 0)
-			prb_iter_init(&iter, &printk_rb, &clear_seq);
-	}
-}
-
 static int syslog_print_all(char __user *buf, int size, bool clear)
 {
-	struct prb_iterator iter;
-	struct printk_log *msg;
-	char *msgbuf = NULL;
-	char *text = NULL;
-	int textlen;
-	u64 seq = 0;
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
+	u64 newest_seq;
+	u64 clr_seq;
+	char *text;
 	int len = 0;
+	u64 seq;
 	bool time;
-	int ret;
 
-	text = kmalloc(PRINTK_SPRINT_MAX, GFP_KERNEL);
+	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
 	if (!text)
 		return -ENOMEM;
-	msgbuf = kmalloc(PRINTK_RECORD_MAX, GFP_KERNEL);
-	if (!msgbuf) {
-		kfree(text);
-		return -ENOMEM;
-	}
 
 	time = printk_time;
+	clr_seq = atomic64_read(&clear_seq);
 
 	/*
-	 * Setup iter to last event before clear. Clear may
-	 * be lost, but keep going with a best effort.
+	 * Find first record that fits, including all following records,
+	 * into the user-provided buffer for this dump.
 	 */
-	prb_iter_init(&iter, &printk_rb, NULL);
-	prb_iter_seek(&iter, clear_seq);
 
-	/* count the total bytes after clear */
-	len = count_remaining(&iter, 0, msgbuf, PRINTK_RECORD_MAX,
-			      false, time);
+	prb_for_each_info(clr_seq, prb, seq, &info, &line_count)
+		len += get_record_print_text_size(&info, line_count, true, time);
 
-	/* move iter forward until length fits into the buffer */
-	while (len > size) {
-		ret = prb_iter_next(&iter, msgbuf,
-				    PRINTK_RECORD_MAX, &seq);
-		if (ret == 0) {
+	/*
+	 * Keep track of the latest in case new records are coming in fast
+	 * and overwriting the older records.
+	 */
+	newest_seq = seq;
+
+	/*
+	 * Move first record forward until length fits into the buffer. This
+	 * is a best effort attempt. If @newest_seq is reached because the
+	 * ringbuffer is wrapping too fast, just start filling the buffer
+	 * from there.
+	 */
+	prb_for_each_info(clr_seq, prb, seq, &info, &line_count) {
+		if (len <= size || info.seq > newest_seq)
 			break;
-		} else if (ret < 0) {
-			/*
-			 * The iter is now invalid so clear will
-			 * also be invalid. Restart from the head.
-			 */
-			prb_iter_init(&iter, &printk_rb, NULL);
-			len = count_remaining(&iter, 0, msgbuf,
-					      PRINTK_RECORD_MAX, false, time);
-			continue;
-		}
-
-		msg = (struct printk_log *)msgbuf;
-		len -= msg_print_text(msg, true, time, NULL, 0);
-
-		if (clear)
-			clear_seq = seq;
+		len -= get_record_print_text_size(&info, line_count, true, time);
 	}
 
-	/* copy messages to buffer */
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX);
+
 	len = 0;
-	while (len >= 0 && len < size) {
-		if (clear)
-			clear_seq = seq;
+	prb_for_each_record(seq, prb, seq, &r) {
+		int textlen;
 
-		ret = prb_iter_next(&iter, msgbuf,
-				    PRINTK_RECORD_MAX, &seq);
-		if (ret == 0) {
-			break;
-		} else if (ret < 0) {
-			/*
-			 * The iter is now invalid. Make a best
-			 * effort to grab the rest of the log
-			 * from the new head.
-			 */
-			prb_iter_init(&iter, &printk_rb, NULL);
-			continue;
-		}
+		textlen = record_print_text(&r, true, time);
 
-		msg = (struct printk_log *)msgbuf;
-		textlen = msg_print_text(msg, true, time, text,
-					 PRINTK_SPRINT_MAX);
-		if (textlen < 0) {
-			len = textlen;
+		if (len + textlen > size) {
+			seq--;
 			break;
 		}
 
-		if (len + textlen > size)
-			break;
-
 		if (copy_to_user(buf + len, text, textlen))
 			len = -EFAULT;
 		else
 			len += textlen;
+
+		if (len < 0)
+			break;
 	}
 
-	if (clear && !seq)
-		syslog_clear();
+	if (clear)
+		atomic64_set(&clear_seq, seq);
 
 	kfree(text);
-	kfree(msgbuf);
 	return len;
 }
 
+static void syslog_clear(void)
+{
+	atomic64_set(&clear_seq, prb_next_seq(prb));
+}
+
 int do_syslog(int type, char __user *buf, int len, int source)
 {
 	bool clear = false;
 	static int saved_console_loglevel = LOGLEVEL_DEFAULT;
-	struct prb_iterator iter;
-	char *msgbuf = NULL;
-	char *text = NULL;
-	int locked;
 	int error;
-	int ret;
+	u64 seq;
 
 	error = check_syslog_permissions(type, source);
 	if (error)
@@ -1498,54 +1508,19 @@ int do_syslog(int type, char __user *buf, int len, int source)
 			return 0;
 		if (!access_ok(buf, len))
 			return -EFAULT;
-
-		text = kmalloc(PRINTK_SPRINT_MAX, GFP_KERNEL);
-		msgbuf = kmalloc(PRINTK_RECORD_MAX, GFP_KERNEL);
-		if (!text || !msgbuf) {
-			error = -ENOMEM;
-			goto out;
-		}
-
-		error = mutex_lock_interruptible(&syslog_lock);
+		syslog_lock_irq();
+		seq = syslog_seq;
+		syslog_unlock_irq();
+		error = wait_event_interruptible(log_wait,
+				prb_read_valid(prb, seq, NULL));
 		if (error)
-			goto out;
-
-		/*
-		 * Wait until a first message is available. Use a copy
-		 * because no iteration should occur for syslog now.
-		 */
-		for (;;) {
-			prb_iter_copy(&iter, &syslog_iter);
-
-			mutex_unlock(&syslog_lock);
-			ret = prb_iter_wait_next(&iter, NULL, 0, NULL);
-			if (ret == -ERESTARTSYS) {
-				error = ret;
-				goto out;
-			}
-			error = mutex_lock_interruptible(&syslog_lock);
-			if (error)
-				goto out;
-
-			if (ret == -EINVAL) {
-				prb_iter_init(&syslog_iter, &printk_rb,
-					      &syslog_seq);
-				syslog_partial = 0;
-				continue;
-			}
-			break;
-		}
-
-		/* print as much as will fit in the user buffer */
-		locked = 1;
-		error = syslog_print(buf, len, text, msgbuf, &locked);
-		if (locked)
-			mutex_unlock(&syslog_lock);
+			return error;
+		error = syslog_print(buf, len);
 		break;
 	/* Read/clear last kernel messages */
 	case SYSLOG_ACTION_READ_CLEAR:
 		clear = true;
-		/* FALL THRU */
+		fallthrough;
 	/* Read last kernel messages */
 	case SYSLOG_ACTION_READ_ALL:
 		if (!buf || len < 0)
@@ -1585,43 +1560,44 @@ int do_syslog(int type, char __user *buf, int len, int source)
 		break;
 	/* Number of chars in the log buffer */
 	case SYSLOG_ACTION_SIZE_UNREAD:
-		msgbuf = kmalloc(PRINTK_RECORD_MAX, GFP_KERNEL);
-		if (!msgbuf)
-			return -ENOMEM;
-
-		error = mutex_lock_interruptible(&syslog_lock);
-		if (error)
-			goto out;
-
+		syslog_lock_irq();
+		if (syslog_seq < prb_first_valid_seq(prb)) {
+			/* messages are gone, move to first one */
+			syslog_seq = prb_first_valid_seq(prb);
+			syslog_partial = 0;
+		}
 		if (source == SYSLOG_FROM_PROC) {
 			/*
 			 * Short-cut for poll(/"proc/kmsg") which simply checks
 			 * for pending data, not the size; return the count of
 			 * records, not the length.
 			 */
-			error = count_remaining(&syslog_iter, 0, msgbuf,
-						PRINTK_RECORD_MAX, true,
-						printk_time);
+			error = prb_next_seq(prb) - syslog_seq;
 		} else {
-			error = count_remaining(&syslog_iter, 0, msgbuf,
-						PRINTK_RECORD_MAX, false,
-						printk_time);
+			bool time = syslog_partial ? syslog_time : printk_time;
+			struct printk_info info;
+			unsigned int line_count;
+			u64 seq;
+
+			prb_for_each_info(syslog_seq, prb, seq, &info,
+					  &line_count) {
+				error += get_record_print_text_size(&info, line_count,
+								    true, time);
+				time = printk_time;
+			}
 			error -= syslog_partial;
 		}
-
-		mutex_unlock(&syslog_lock);
+		syslog_unlock_irq();
 		break;
 	/* Size of the log buffer */
 	case SYSLOG_ACTION_SIZE_BUFFER:
-		error = prb_buffer_size(&printk_rb);
+		error = log_buf_len;
 		break;
 	default:
 		error = -EINVAL;
 		break;
 	}
-out:
-	kfree(msgbuf);
-	kfree(text);
+
 	return error;
 }
 
@@ -1630,11 +1606,135 @@ SYSCALL_DEFINE3(syslog, int, type, char __user *, buf, int, len)
 	return do_syslog(type, buf, len, SYSLOG_FROM_READER);
 }
 
+/*
+ * The per-cpu sprint buffers are used with interrupts disabled, so each CPU
+ * only requires 2 buffers: for non-NMI and NMI contexts. Recursive printk()
+ * calls are handled by the global sprint buffers.
+ */
+#define SPRINT_CTX_DEPTH 2
+
+/* Static sprint buffers for early boot (only 1 CPU) and recursion. */
+static DECLARE_BITMAP(sprint_global_buffer_map, SPRINT_CTX_DEPTH);
+static char sprint_global_buffer[SPRINT_CTX_DEPTH][PREFIX_MAX + LOG_LINE_MAX];
+
+struct sprint_buffers {
+	char		buf[SPRINT_CTX_DEPTH][PREFIX_MAX + LOG_LINE_MAX];
+	atomic_t	index;
+};
+
+static DEFINE_PER_CPU(struct sprint_buffers, percpu_sprint_buffers);
+
+/*
+ * Acquire an unused buffer, returning its index. If no buffer is
+ * available, @count is returned.
+ */
+static int _get_sprint_buf(unsigned long *map, int count)
+{
+	int index;
+
+	do {
+		index = find_first_zero_bit(map, count);
+		if (index == count)
+			break;
+	/*
+	 * Guarantee map changes are ordered for the other CPUs.
+	 * Pairs with clear_bit() in _put_sprint_buf().
+	 */
+	} while (test_and_set_bit(index, map));
+
+	return index;
+}
+
+/* Mark the buffer @index as unused. */
+static void _put_sprint_buf(unsigned long *map, unsigned int count, unsigned int index)
+{
+	/*
+	 * Guarantee map changes are ordered for the other CPUs.
+	 * Pairs with test_and_set_bit() in _get_sprint_buf().
+	 */
+	clear_bit(index, map);
+}
+
+/*
+ * Get a buffer sized PREFIX_MAX+LOG_LINE_MAX for sprinting. On success, @id
+ * is set and interrupts are disabled. @id is used to put back the buffer.
+ *
+ * @id is non-negative for per-cpu buffers, negative for global buffers.
+ */
+static char *get_sprint_buf(int *id, unsigned long *flags)
+{
+	struct sprint_buffers *bufs;
+	unsigned int index;
+	unsigned int cpu;
+
+	local_irq_save(*flags);
+	cpu = get_cpu();
+
+	if (printk_percpu_data_ready()) {
+
+		/*
+		 * First try with per-cpu pool. Note that the last
+		 * buffer is reserved for NMI context.
+		 */
+		bufs = per_cpu_ptr(&percpu_sprint_buffers, cpu);
+		index = atomic_read(&bufs->index);
+		if (index < (SPRINT_CTX_DEPTH - 1) ||
+		    (in_nmi() && index < SPRINT_CTX_DEPTH)) {
+			atomic_set(&bufs->index, index + 1);
+			*id = cpu;
+			return &bufs->buf[index][0];
+		}
+	}
+
+	/*
+	 * Fallback to global pool.
+	 *
+	 * The global pool will only ever be used if per-cpu data is not ready
+	 * yet or printk recurses. Recursion will not occur unless printk is
+	 * having internal issues.
+	 */
+	index = _get_sprint_buf(sprint_global_buffer_map, SPRINT_CTX_DEPTH);
+	if (index != SPRINT_CTX_DEPTH) {
+		/* Convert to global buffer representation. */
+		*id = -index - 1;
+		return &sprint_global_buffer[index][0];
+	}
+
+	/* Failed to get a buffer. */
+	put_cpu();
+	local_irq_restore(*flags);
+	return NULL;
+}
+
+/* Put back an sprint buffer and restore interrupts. */
+static void put_sprint_buf(int id, unsigned long flags)
+{
+	struct sprint_buffers *bufs;
+	unsigned int index;
+	unsigned int cpu;
+
+	if (id >= 0) {
+		cpu = id;
+		bufs = per_cpu_ptr(&percpu_sprint_buffers, cpu);
+		index = atomic_read(&bufs->index);
+		atomic_set(&bufs->index, index - 1);
+	} else {
+		/* Convert from global buffer representation. */
+		index = -id - 1;
+		_put_sprint_buf(sprint_global_buffer_map,
+				SPRINT_CTX_DEPTH, index);
+	}
+
+	put_cpu();
+	local_irq_restore(flags);
+}
+
 int printk_delay_msec __read_mostly;
 
 static inline void printk_delay(int level)
 {
 	boot_delay_msec(level);
+
 	if (unlikely(printk_delay_msec)) {
 		int m = printk_delay_msec;
 
@@ -1645,168 +1745,116 @@ static inline void printk_delay(int level)
 	}
 }
 
-static void print_console_dropped(struct console *con, u64 count)
+static bool kernel_sync_mode(void)
 {
-	char text[64];
-	int len;
-
-	len = sprintf(text, "** %llu printk message%s dropped **\n",
-		      count, count > 1 ? "s" : "");
-	con->write(con, text, len);
+	return (oops_in_progress || sync_mode);
 }
 
-static void format_text(struct printk_log *msg, u64 seq,
-			char *ext_text, size_t *ext_len,
-			char *text, size_t *len, bool time)
+static bool console_can_sync(struct console *con)
 {
-	if (suppress_message_printing(msg->level)) {
-		/*
-		 * Skip record that has level above the console
-		 * loglevel and update each console's local seq.
-		 */
-		*len = 0;
-		*ext_len = 0;
-		return;
-	}
-
-	*len = msg_print_text(msg, console_msg_format & MSG_FORMAT_SYSLOG,
-			      time, text, PRINTK_SPRINT_MAX);
-	if (nr_ext_console_drivers) {
-		*ext_len = msg_print_ext_header(ext_text, CONSOLE_EXT_LOG_MAX,
-						msg, seq);
-		*ext_len += msg_print_ext_body(ext_text + *ext_len,
-					       CONSOLE_EXT_LOG_MAX - *ext_len,
-					       log_dict(msg), msg->dict_len,
-					       log_text(msg), msg->text_len);
-	} else {
-		*ext_len = 0;
-	}
-}
-
-static void printk_write_history(struct console *con, u64 master_seq)
-{
-	struct prb_iterator iter;
-	bool time = printk_time;
-	static char *ext_text;
-	static char *text;
-	static char *buf;
-	u64 seq;
-
-	ext_text = kmalloc(CONSOLE_EXT_LOG_MAX, GFP_KERNEL);
-	text = kmalloc(PRINTK_SPRINT_MAX, GFP_KERNEL);
-	buf = kmalloc(PRINTK_RECORD_MAX, GFP_KERNEL);
-	if (!ext_text || !text || !buf)
-		return;
-
 	if (!(con->flags & CON_ENABLED))
-		goto out;
-
-	if (!con->write)
-		goto out;
-
-	if (!cpu_online(raw_smp_processor_id()) &&
-	    !(con->flags & CON_ANYTIME))
-		goto out;
-
-	prb_iter_init(&iter, &printk_rb, NULL);
-
-	for (;;) {
-		struct printk_log *msg;
-		size_t ext_len;
-		size_t len;
-		int ret;
-
-		ret = prb_iter_next(&iter, buf, PRINTK_RECORD_MAX, &seq);
-		if (ret == 0) {
-			break;
-		} else if (ret < 0) {
-			prb_iter_init(&iter, &printk_rb, NULL);
-			continue;
-		}
-
-		if (seq > master_seq)
-			break;
-
-		con->printk_seq++;
-		if (con->printk_seq < seq) {
-			print_console_dropped(con, seq - con->printk_seq);
-			con->printk_seq = seq;
-		}
-
-		msg = (struct printk_log *)buf;
-		format_text(msg, master_seq, ext_text, &ext_len, text,
-			    &len, time);
-
-		if (len == 0 && ext_len == 0)
-			continue;
-
-		if (con->flags & CON_EXTENDED)
-			con->write(con, ext_text, ext_len);
-		else
-			con->write(con, text, len);
-
-		printk_delay(msg->level);
-	}
-out:
-	con->wrote_history = 1;
-	kfree(ext_text);
-	kfree(text);
-	kfree(buf);
+		return false;
+	if (con->write_atomic && kernel_sync_mode())
+		return true;
+	if (con->write_atomic && (con->flags & CON_HANDOVER) && !con->thread)
+		return true;
+	if (con->write && (con->flags & CON_BOOT) && !con->thread)
+		return true;
+	return false;
 }
 
-/*
- * Call the console drivers, asking them to write out
- * log_buf[start] to log_buf[end - 1].
- * The console_lock must be held.
- */
-static void call_console_drivers(u64 seq, const char *ext_text, size_t ext_len,
-				 const char *text, size_t len, int level,
-				 int facility)
+static bool call_sync_console_driver(struct console *con, const char *text, size_t text_len)
+{
+	if (!(con->flags & CON_ENABLED))
+		return false;
+	if (con->write_atomic && kernel_sync_mode())
+		con->write_atomic(con, text, text_len);
+	else if (con->write_atomic && (con->flags & CON_HANDOVER) && !con->thread)
+		con->write_atomic(con, text, text_len);
+	else if (con->write && (con->flags & CON_BOOT) && !con->thread)
+		con->write(con, text, text_len);
+	else
+		return false;
+
+	return true;
+}
+
+static bool any_console_can_sync(void)
 {
 	struct console *con;
 
-	trace_console_rcuidle(text, len);
+	for_each_console(con) {
+		if (console_can_sync(con))
+			return true;
+	}
+	return false;
+}
+
+static bool have_atomic_console(void)
+{
+	struct console *con;
 
 	for_each_console(con) {
 		if (!(con->flags & CON_ENABLED))
 			continue;
-		if (!con->wrote_history) {
-			if (con->flags & CON_PRINTBUFFER) {
-				printk_write_history(con, seq);
-				continue;
-			}
-			con->wrote_history = 1;
-			con->printk_seq = seq - 1;
-		}
-		if (con->flags & CON_BOOT && facility == 0) {
-			/* skip boot messages, already printed */
-			if (con->printk_seq < seq)
-				con->printk_seq = seq;
-			continue;
-		}
-		if (!con->write)
-			continue;
-		if (!cpu_online(raw_smp_processor_id()) &&
-		    !(con->flags & CON_ANYTIME))
-			continue;
-		if (con->printk_seq >= seq)
-			continue;
-
-		con->printk_seq++;
-		if (con->printk_seq < seq) {
-			print_console_dropped(con, seq - con->printk_seq);
-			con->printk_seq = seq;
-		}
-
-		/* for supressed messages, only seq is updated */
-		if (len == 0 && ext_len == 0)
-			continue;
-
-		if (con->flags & CON_EXTENDED)
-			con->write(con, ext_text, ext_len);
-		else
-			con->write(con, text, len);
+		if (con->write_atomic)
+			return true;
 	}
+	return false;
+}
+
+static bool print_sync(struct console *con, char *buf, size_t buf_size, u64 *seq)
+{
+	struct printk_info info;
+	struct printk_record r;
+	size_t text_len;
+
+	prb_rec_init_rd(&r, &info, buf, buf_size);
+
+	if (!prb_read_valid(prb, *seq, &r))
+		return false;
+
+	text_len = record_print_text(&r, console_msg_format & MSG_FORMAT_SYSLOG, printk_time);
+
+	if (!call_sync_console_driver(con, buf, text_len))
+		return false;
+
+	*seq = r.info->seq;
+
+	touch_softlockup_watchdog_sync();
+	clocksource_touch_watchdog();
+	rcu_cpu_stall_reset();
+	touch_nmi_watchdog();
+
+	if (text_len)
+		printk_delay(r.info->level);
+
+	return true;
+}
+
+static void print_sync_until(u64 seq, struct console *con, char *buf, size_t buf_size)
+{
+	unsigned int flags;
+	u64 printk_seq;
+
+	if (!con) {
+		for_each_console(con) {
+			if (console_can_sync(con))
+				print_sync_until(seq, con, buf, buf_size);
+		}
+		return;
+	}
+
+	console_atomic_lock(&flags);
+	for (;;) {
+		printk_seq = atomic64_read(&con->printk_seq);
+		if (printk_seq >= seq)
+			break;
+		if (!print_sync(con, buf, buf_size, &printk_seq))
+			break;
+		atomic64_set(&con->printk_seq, printk_seq + 1);
+	}
+	console_atomic_unlock(flags);
 }
 
 static inline u32 printk_caller_id(void)
@@ -1815,105 +1863,39 @@ static inline u32 printk_caller_id(void)
 		0x80000000 + raw_smp_processor_id();
 }
 
-/*
- * Continuation lines are buffered, and not committed to the record buffer
- * until the line is complete, or a race forces it. The line fragments
- * though, are printed immediately to the consoles to ensure everything has
- * reached the console in case of a kernel crash.
- */
-static struct cont {
-	char buf[LOG_LINE_MAX];
-	size_t len;			/* length == 0 means unused buffer */
-	u32 caller_id;			/* printk_caller_id() of first print */
-	int cpu_owner;			/* cpu of first print */
-	u64 ts_nsec;			/* time of first print */
-	u8 level;			/* log level of first message */
-	u8 facility;			/* log facility of first message */
-	enum log_flags flags;		/* prefix, newline flags */
-} cont[2];
-
-static void cont_flush(int ctx)
-{
-	struct cont *c = &cont[ctx];
-
-	if (c->len == 0)
-		return;
-
-	log_store(c->caller_id, c->facility, c->level, c->flags,
-		  c->ts_nsec, c->cpu_owner, NULL, 0, c->buf, c->len);
-	c->len = 0;
-}
-
-static void cont_add(int ctx, int cpu, u32 caller_id, int facility, int level,
-		     enum log_flags flags, const char *text, size_t len)
-{
-	struct cont *c = &cont[ctx];
-
-	if (cpu != c->cpu_owner || !(flags & LOG_CONT))
-		cont_flush(ctx);
-
-	/* If the line gets too long, split it up in separate records. */
-	while (c->len + len > sizeof(c->buf))
-		cont_flush(ctx);
-
-	if (!c->len) {
-		c->facility = facility;
-		c->level = level;
-		c->caller_id = caller_id;
-		c->ts_nsec = local_clock();
-		c->flags = flags;
-		c->cpu_owner = cpu;
-	}
-
-	memcpy(c->buf + c->len, text, len);
-	c->len += len;
-
-	// The original flags come from the first line,
-	// but later continuations can add a newline.
-	if (flags & LOG_NEWLINE) {
-		c->flags |= LOG_NEWLINE;
-		cont_flush(ctx);
-	}
-}
-
-/* ring buffer used as memory allocator for temporary sprint buffers */
-DECLARE_STATIC_PRINTKRB(sprint_rb,
-			ilog2(PRINTK_RECORD_MAX + sizeof(struct prb_entry) +
-			      sizeof(long)) + 2, &printk_cpulock);
-
-asmlinkage int vprintk_emit(int facility, int level,
-			    const char *dict, size_t dictlen,
-			    const char *fmt, va_list args)
+__printf(4, 0)
+static int vprintk_store(int facility, int level,
+			 const struct dev_printk_info *dev_info,
+			 const char *fmt, va_list args)
 {
 	const u32 caller_id = printk_caller_id();
-	int ctx = !!in_nmi();
+	struct prb_reserved_entry e;
 	enum log_flags lflags = 0;
-	int printed_len = 0;
-	struct prb_handle h;
-	size_t text_len;
+	bool final_commit = false;
+	unsigned long irqflags;
+	struct printk_record r;
+	u16 trunc_msg_len = 0;
+	int sprint_id;
+	u16 text_len;
 	u64 ts_nsec;
+	int ret = 0;
 	char *text;
-	char *rbuf;
-	int cpu;
+	u64 seq;
 
 	ts_nsec = local_clock();
 
-	rbuf = prb_reserve(&h, &sprint_rb, PRINTK_SPRINT_MAX);
-	if (!rbuf) {
-		prb_inc_lost(&printk_rb);
-		return printed_len;
-	}
-
-	cpu = raw_smp_processor_id();
+	/* No buffer is available if printk has recursed too much. */
+	text = get_sprint_buf(&sprint_id, &irqflags);
+	if (!text)
+		return 0;
 
 	/*
-	 * If this turns out to be an emergency message, there
-	 * may need to be a prefix added. Leave room for it.
+	 * The printf needs to come first; we need the syslog
+	 * prefix which might be passed-in as a parameter.
 	 */
-	text = rbuf + PREFIX_MAX;
-	text_len = vscnprintf(text, PRINTK_SPRINT_MAX - PREFIX_MAX, fmt, args);
+	text_len = vscnprintf(text, LOG_LINE_MAX, fmt, args);
 
-	/* strip and flag a trailing newline */
+	/* mark and strip a trailing newline */
 	if (text_len && text[text_len-1] == '\n') {
 		text_len--;
 		lflags |= LOG_NEWLINE;
@@ -1941,38 +1923,108 @@ asmlinkage int vprintk_emit(int facility, int level,
 	if (level == LOGLEVEL_DEFAULT)
 		level = default_message_loglevel;
 
-	if (dict)
+	if (dev_info)
 		lflags |= LOG_NEWLINE;
 
-	/*
-	 * NOTE:
-	 * - rbuf points to beginning of allocated buffer
-	 * - text points to beginning of text
-	 * - there is room before text for prefix
-	 */
-	if (facility == 0) {
-		/* only the kernel can create emergency messages */
-		printk_emergency(rbuf, level & 7, ts_nsec, cpu, text, text_len);
+	if (lflags & LOG_CONT) {
+		prb_rec_init_wr(&r, text_len);
+		if (prb_reserve_in_last(&e, prb, &r, caller_id, LOG_LINE_MAX)) {
+			seq = r.info->seq;
+			memcpy(&r.text_buf[r.info->text_len], text, text_len);
+			r.info->text_len += text_len;
+			if (lflags & LOG_NEWLINE) {
+				r.info->flags |= LOG_NEWLINE;
+				prb_final_commit(&e);
+				final_commit = true;
+			} else {
+				prb_commit(&e);
+			}
+			ret = text_len;
+			goto out;
+		}
 	}
 
+	/* Store it in the record log */
+
+	prb_rec_init_wr(&r, text_len);
+
+	if (!prb_reserve(&e, prb, &r)) {
+		/* truncate the message if it is too long for empty buffer */
+		truncate_msg(&text_len, &trunc_msg_len);
+		prb_rec_init_wr(&r, text_len + trunc_msg_len);
+		/* survive when the log buffer is too small for trunc_msg */
+		if (!prb_reserve(&e, prb, &r))
+			goto out;
+	}
+
+	seq = r.info->seq;
+
+	/* fill message */
+	memcpy(&r.text_buf[0], text, text_len);
+	if (trunc_msg_len)
+		memcpy(&r.text_buf[text_len], trunc_msg, trunc_msg_len);
+	r.info->text_len = text_len + trunc_msg_len;
+	r.info->facility = facility;
+	r.info->level = level & 7;
+	r.info->flags = lflags & 0x1f;
+	r.info->ts_nsec = ts_nsec;
+	r.info->caller_id = caller_id;
+	if (dev_info)
+		memcpy(&r.info->dev_info, dev_info, sizeof(r.info->dev_info));
+
+	/* insert message */
 	if ((lflags & LOG_CONT) || !(lflags & LOG_NEWLINE)) {
-		 cont_add(ctx, cpu, caller_id, facility, level, lflags, text, text_len);
-		 printed_len = text_len;
+		prb_commit(&e);
 	} else {
-		if (cpu == cont[ctx].cpu_owner)
-			cont_flush(ctx);
-		printed_len = log_store(caller_id, facility, level, lflags, ts_nsec, cpu,
-					dict, dictlen, text, text_len);
+		prb_final_commit(&e);
+		final_commit = true;
 	}
 
-	prb_commit(&h);
+	ret = text_len + trunc_msg_len;
+out:
+	/* only the kernel may perform synchronous printing */
+	if (facility == 0 && final_commit && any_console_can_sync())
+		print_sync_until(seq + 1, NULL, text, PREFIX_MAX + LOG_LINE_MAX);
+
+	put_sprint_buf(sprint_id, irqflags);
+	return ret;
+}
+
+asmlinkage int vprintk_emit(int facility, int level,
+			    const struct dev_printk_info *dev_info,
+			    const char *fmt, va_list args)
+{
+	int printed_len;
+
+	/* Suppress unimportant messages after panic happens */
+	if (unlikely(suppress_printk))
+		return 0;
+
+	if (level == LOGLEVEL_SCHED)
+		level = LOGLEVEL_DEFAULT;
+
+	printed_len = vprintk_store(facility, level, dev_info, fmt, args);
+
+	wake_up_klogd();
 	return printed_len;
 }
 EXPORT_SYMBOL(vprintk_emit);
 
-static __printf(1, 0) int vprintk_func(const char *fmt, va_list args)
+ __printf(1, 0)
+static int vprintk_default(const char *fmt, va_list args)
 {
-	return vprintk_emit(0, LOGLEVEL_DEFAULT, NULL, 0, fmt, args);
+	return vprintk_emit(0, LOGLEVEL_DEFAULT, NULL, fmt, args);
+}
+
+__printf(1, 0)
+static int vprintk_func(const char *fmt, va_list args)
+{
+#ifdef CONFIG_KGDB_KDB
+	/* Allow to pass printk() to kdb but avoid a recursion. */
+	if (unlikely(kdb_trap_printk && kdb_printf_cpu < 0))
+		return vkdb_printf(KDB_MSGSRC_PRINTK, fmt, args);
+#endif
+	return vprintk_default(fmt, args);
 }
 
 asmlinkage int vprintk(const char *fmt, va_list args)
@@ -2014,6 +2066,35 @@ asmlinkage __visible int printk(const char *fmt, ...)
 	return r;
 }
 EXPORT_SYMBOL(printk);
+
+#else /* CONFIG_PRINTK */
+
+#define LOG_LINE_MAX		0
+#define PREFIX_MAX		0
+#define printk_time		false
+
+#define prb_read_valid(rb, seq, r)	false
+#define prb_first_valid_seq(rb)		0
+
+static u64 syslog_seq;
+
+static size_t record_print_text(const struct printk_record *r,
+				bool syslog, bool time)
+{
+	return 0;
+}
+static ssize_t info_print_ext_header(char *buf, size_t size,
+				     struct printk_info *info)
+{
+	return 0;
+}
+static ssize_t msg_print_ext_body(char *buf, size_t size,
+				  char *text, size_t text_len,
+				  struct dev_printk_info *dev_info) { return 0; }
+static void call_console_drivers(const char *ext_text, size_t ext_len,
+				 const char *text, size_t len) {}
+static bool suppress_message_printing(int level) { return false; }
+
 #endif /* CONFIG_PRINTK */
 
 #ifdef CONFIG_EARLY_PRINTK
@@ -2256,6 +2337,12 @@ EXPORT_SYMBOL(is_console_locked);
  * Releases the console_lock which the caller holds on the console system
  * and the console driver list.
  *
+ * While the console_lock was held, console output may have been buffered
+ * by printk().  If this is the case, console_unlock(); emits
+ * the output prior to releasing the lock.
+ *
+ * If there is output waiting, we wake /dev/kmsg and syslog() users.
+ *
  * console_unlock(); may be called from any context.
  */
 void console_unlock(void)
@@ -2317,11 +2404,21 @@ void console_unblank(void)
  */
 void console_flush_on_panic(enum con_flush_mode mode)
 {
-	/*
-	 * FIXME: This is currently a NOP. Emergency messages will have been
-	 * printed, but what about if write_atomic is not available on the
-	 * console? What if the printk kthread is still alive?
-	 */
+	struct console *c;
+	u64 seq;
+
+	if (!console_trylock())
+		return;
+
+	console_may_schedule = 0;
+
+	if (mode == CONSOLE_REPLAY_ALL) {
+		seq = prb_first_valid_seq(prb);
+		for_each_console(c)
+			atomic64_set(&c->printk_seq, seq);
+	}
+
+	console_unlock();
 }
 
 /*
@@ -2434,6 +2531,8 @@ static int try_enable_new_console(struct console *newcon, bool user_specified)
 	return -ENOENT;
 }
 
+static void console_try_thread(struct console *con);
+
 /*
  * The console driver calls this routine during kernel initialization
  * to register the console printing procedure with printk() and to
@@ -2478,6 +2577,8 @@ void register_console(struct console *newcon)
 		}
 	}
 
+	newcon->thread = NULL;
+
 	if (console_drivers && console_drivers->flags & CON_BOOT)
 		bcon = console_drivers;
 
@@ -2519,8 +2620,10 @@ void register_console(struct console *newcon)
 	 * the real console are the same physical device, it's annoying to
 	 * see the beginning boot messages twice
 	 */
-	if (bcon && ((newcon->flags & (CON_CONSDEV | CON_BOOT)) == CON_CONSDEV))
+	if (bcon && ((newcon->flags & (CON_CONSDEV | CON_BOOT)) == CON_CONSDEV)) {
 		newcon->flags &= ~CON_PRINTBUFFER;
+		newcon->flags |= CON_HANDOVER;
+	}
 
 	/*
 	 *	Put this console in the list - keep the
@@ -2542,6 +2645,12 @@ void register_console(struct console *newcon)
 	if (newcon->flags & CON_EXTENDED)
 		nr_ext_console_drivers++;
 
+	if (newcon->flags & CON_PRINTBUFFER)
+		atomic64_set(&newcon->printk_seq, 0);
+	else
+		atomic64_set(&newcon->printk_seq, prb_next_seq(prb));
+
+	console_try_thread(newcon);
 	console_unlock();
 	console_sysfs_notify();
 
@@ -2551,10 +2660,6 @@ void register_console(struct console *newcon)
 	 * boot consoles, real consoles, etc - this is to ensure that end
 	 * users know there might be something in the kernel's log buffer that
 	 * went to the bootconsole (that they do not see on the real console)
-	 *
-	 * This message is also important because it will trigger the
-	 * printk kthread to begin dumping the log buffer to the newly
-	 * registered console.
 	 */
 	pr_info("%sconsole [%s%d] enabled\n",
 		(newcon->flags & CON_BOOT) ? "boot" : "" ,
@@ -2619,6 +2724,9 @@ int unregister_console(struct console *console)
 	console_unlock();
 	console_sysfs_notify();
 
+	if (console->thread && !IS_ERR(console->thread))
+		kthread_stop(console->thread);
+
 	if (console->exit)
 		res = console->exit(console);
 
@@ -2662,6 +2770,154 @@ void __init console_init(void)
 	}
 }
 
+static int printk_kthread_func(void *data)
+{
+	struct console *con = data;
+	unsigned long dropped = 0;
+	struct printk_info info;
+	struct printk_record r;
+	char *ext_text = NULL;
+	size_t dropped_len;
+	char *dropped_text;
+	int ret = -ENOMEM;
+	char *write_text;
+	u64 printk_seq;
+	size_t len;
+	char *text;
+	int error;
+	u64 seq;
+
+	if (con->flags & CON_EXTENDED) {
+		ext_text = kmalloc(CONSOLE_EXT_LOG_MAX, GFP_KERNEL);
+		if (!ext_text)
+			return ret;
+	}
+	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
+	dropped_text = kmalloc(64, GFP_KERNEL);
+	if (!text || !dropped_text)
+		goto out;
+
+	if (con->flags & CON_EXTENDED)
+		write_text = ext_text;
+	else
+		write_text = text;
+
+	seq = atomic64_read(&con->printk_seq);
+
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX);
+
+	for (;;) {
+		error = wait_event_interruptible(log_wait,
+				prb_read_valid(prb, seq, &r) || kthread_should_stop());
+
+		if (kthread_should_stop())
+			break;
+
+		if (error)
+			continue;
+
+		if (seq != r.info->seq) {
+			dropped += r.info->seq - seq;
+			seq = r.info->seq;
+		}
+
+		seq++;
+
+		if (!(con->flags & CON_ENABLED))
+			continue;
+
+		if (suppress_message_printing(r.info->level))
+			continue;
+
+		if (con->flags & CON_EXTENDED) {
+			len = info_print_ext_header(ext_text,
+				CONSOLE_EXT_LOG_MAX,
+				r.info);
+			len += msg_print_ext_body(ext_text + len,
+				CONSOLE_EXT_LOG_MAX - len,
+				&r.text_buf[0], r.info->text_len,
+				&r.info->dev_info);
+		} else {
+			len = record_print_text(&r,
+				console_msg_format & MSG_FORMAT_SYSLOG,
+				printk_time);
+		}
+
+		printk_seq = atomic64_read(&con->printk_seq);
+
+		console_lock();
+		console_may_schedule = 0;
+
+		if (kernel_sync_mode() && con->write_atomic) {
+			console_unlock();
+			break;
+		}
+
+		if (!(con->flags & CON_EXTENDED) && dropped) {
+			dropped_len = snprintf(dropped_text, 64,
+					       "** %lu printk messages dropped **\n",
+					       dropped);
+			dropped = 0;
+
+			con->write(con, dropped_text, dropped_len);
+			printk_delay(r.info->level);
+		}
+
+		con->write(con, write_text, len);
+		if (len)
+			printk_delay(r.info->level);
+
+		atomic64_cmpxchg_relaxed(&con->printk_seq, printk_seq, seq);
+
+		console_unlock();
+	}
+out:
+	kfree(dropped_text);
+	kfree(text);
+	kfree(ext_text);
+	pr_info("%sconsole [%s%d]: printing thread stopped\n",
+		(con->flags & CON_BOOT) ? "boot" : "" ,
+		con->name, con->index);
+	return ret;
+}
+
+static void start_printk_kthread(struct console *con)
+{
+	con->thread = kthread_run(printk_kthread_func, con,
+				  "pr/%s%d", con->name, con->index);
+	if (IS_ERR(con->thread)) {
+		pr_err("%sconsole [%s%d]: unable to start printing thread\n",
+			(con->flags & CON_BOOT) ? "boot" : "" ,
+			con->name, con->index);
+		return;
+	}
+	pr_info("%sconsole [%s%d]: printing thread started\n",
+		(con->flags & CON_BOOT) ? "boot" : "" ,
+		con->name, con->index);
+}
+
+static bool kthreads_started;
+
+static void console_try_thread(struct console *con)
+{
+	unsigned long irqflags;
+	int sprint_id;
+	char *buf;
+
+	if (kthreads_started) {
+		start_printk_kthread(con);
+		return;
+	}
+
+	buf = get_sprint_buf(&sprint_id, &irqflags);
+	if (!buf)
+		return;
+
+	print_sync_until(prb_next_seq(prb), con, buf, PREFIX_MAX + LOG_LINE_MAX);
+
+	put_sprint_buf(sprint_id, irqflags);
+}
+
 /*
  * Some boot consoles access data that is in the init section and which will
  * be discarded after the initcalls have been run. To make sure that no code
@@ -2701,6 +2957,13 @@ static int __init printk_late_init(void)
 			unregister_console(con);
 		}
 	}
+
+	console_lock();
+	for_each_console(con)
+		start_printk_kthread(con);
+	kthreads_started = true;
+	console_unlock();
+
 	ret = cpuhp_setup_state_nocalls(CPUHP_PRINTK_DEAD, "printk:dead", NULL,
 					console_cpu_notify);
 	WARN_ON(ret < 0);
@@ -2712,75 +2975,43 @@ static int __init printk_late_init(void)
 late_initcall(printk_late_init);
 
 #if defined CONFIG_PRINTK
-static int printk_kthread_func(void *data)
+/*
+ * Delayed printk version, for scheduler-internal messages:
+ */
+#define PRINTK_PENDING_WAKEUP	0x01
+
+static DEFINE_PER_CPU(int, printk_pending);
+
+static void wake_up_klogd_work_func(struct irq_work *irq_work)
 {
-	struct prb_iterator iter;
-	struct printk_log *msg;
-	size_t ext_len;
-	char *ext_text;
-	u64 master_seq;
-	size_t len;
-	char *text;
-	char *buf;
-	int ret;
+	int pending = __this_cpu_xchg(printk_pending, 0);
 
-	ext_text = kmalloc(CONSOLE_EXT_LOG_MAX, GFP_KERNEL);
-	text = kmalloc(PRINTK_SPRINT_MAX, GFP_KERNEL);
-	buf = kmalloc(PRINTK_RECORD_MAX, GFP_KERNEL);
-	if (!ext_text || !text || !buf)
-		return -1;
-
-	prb_iter_init(&iter, &printk_rb, NULL);
-
-	/* the printk kthread never exits */
-	for (;;) {
-		ret = prb_iter_wait_next(&iter, buf,
-					 PRINTK_RECORD_MAX, &master_seq);
-		if (ret == -ERESTARTSYS) {
-			continue;
-		} else if (ret < 0) {
-			/* iterator invalid, start over */
-			prb_iter_init(&iter, &printk_rb, NULL);
-			continue;
-		}
-
-		msg = (struct printk_log *)buf;
-		format_text(msg, master_seq, ext_text, &ext_len, text,
-			    &len, printk_time);
-
-		console_lock();
-		console_may_schedule = 0;
-		call_console_drivers(master_seq, ext_text, ext_len, text, len,
-				     msg->level, msg->facility);
-		if (len > 0 || ext_len > 0)
-			printk_delay(msg->level);
-		console_unlock();
-	}
-
-	kfree(ext_text);
-	kfree(text);
-	kfree(buf);
-
-	return 0;
+	if (pending & PRINTK_PENDING_WAKEUP)
+		wake_up_interruptible(&log_wait);
 }
 
-static int __init init_printk_kthread(void)
-{
-	struct task_struct *thread;
+static DEFINE_PER_CPU(struct irq_work, wake_up_klogd_work) = {
+	.func = wake_up_klogd_work_func,
+	.flags = ATOMIC_INIT(IRQ_WORK_LAZY),
+};
 
-	thread = kthread_run(printk_kthread_func, NULL, "printk");
-	if (IS_ERR(thread)) {
-		pr_err("printk: unable to create printing thread\n");
-		return PTR_ERR(thread);
+void wake_up_klogd(void)
+{
+	if (!printk_percpu_data_ready())
+		return;
+
+	preempt_disable();
+	if (waitqueue_active(&log_wait)) {
+		this_cpu_or(printk_pending, PRINTK_PENDING_WAKEUP);
+		irq_work_queue(this_cpu_ptr(&wake_up_klogd_work));
 	}
-
-	return 0;
+	preempt_enable();
 }
-late_initcall(init_printk_kthread);
 
-__printf(1, 0) static int vprintk_deferred(const char *fmt, va_list args)
+__printf(1, 0)
+static int vprintk_deferred(const char *fmt, va_list args)
 {
-	return vprintk_emit(0, LOGLEVEL_DEFAULT, NULL, 0, fmt, args);
+	return vprintk_emit(0, LOGLEVEL_DEFAULT, NULL, fmt, args);
 }
 
 int printk_deferred(const char *fmt, ...)
@@ -2909,6 +3140,66 @@ const char *kmsg_dump_reason_str(enum kmsg_dump_reason reason)
 }
 EXPORT_SYMBOL_GPL(kmsg_dump_reason_str);
 
+/**
+ * pr_flush() - Wait for printing threads to catch up.
+ *
+ * @timeout_ms:        The maximum time (in ms) to wait.
+ * @reset_on_progress: Reset the timeout if forward progress is seen.
+ *
+ * A value of 0 for @timeout_ms means no waiting will occur. A value of -1
+ * represents infinite waiting.
+ *
+ * If @reset_on_progress is true, the timeout will be reset whenever any
+ * printer has been seen to make some forward progress.
+ *
+ * Context: Any context if @timeout_ms is 0. Otherwise process context and
+ * may sleep if a printer is not caught up.
+ * Return: true if all enabled printers are caught up.
+ */
+static bool pr_flush(int timeout_ms, bool reset_on_progress)
+{
+	int remaining = timeout_ms;
+	struct console *con;
+	u64 last_diff = 0;
+	u64 printk_seq;
+	u64 diff;
+	u64 seq;
+
+	seq = prb_next_seq(prb);
+
+	for (;;) {
+		diff = 0;
+
+		for_each_console(con) {
+			if (!(con->flags & CON_ENABLED))
+				continue;
+			printk_seq = atomic64_read(&con->printk_seq);
+			if (printk_seq < seq)
+				diff += seq - printk_seq;
+		}
+
+		if (diff != last_diff && reset_on_progress)
+			remaining = timeout_ms;
+
+		if (!diff || remaining == 0)
+			break;
+
+		if (remaining < 0) {
+			msleep(100);
+		} else if (remaining < 100) {
+			msleep(remaining);
+			remaining = 0;
+		} else {
+			msleep(100);
+			remaining -= 100;
+		}
+
+		last_diff = diff;
+	}
+
+	return (diff == 0);
+}
+
 /**
  * kmsg_dump - dump kernel log to kernel message dumpers.
  * @reason: the reason (oops, panic etc) for dumping
@@ -2919,9 +3210,26 @@ EXPORT_SYMBOL_GPL(kmsg_dump_reason_str);
  */
 void kmsg_dump(enum kmsg_dump_reason reason)
 {
-	struct kmsg_dumper dumper_local;
 	struct kmsg_dumper *dumper;
 
+	if (!oops_in_progress) {
+		/*
+		 * If atomic consoles are available, activate kernel sync mode
+		 * to make sure any final messages are visible. The trailing
+		 * printk message is important to flush any pending messages.
+		 */
+		if (have_atomic_console()) {
+			sync_mode = true;
+			pr_info("enabled sync mode\n");
+		}
+
+		/*
+		 * Give the printing threads time to flush, allowing up to 1
+		 * second of no printing forward progress before giving up.
+		 */
+		pr_flush(1000, true);
+	}
+
 	rcu_read_lock();
 	list_for_each_entry_rcu(dumper, &dump_list, list) {
 		enum kmsg_dump_reason max_reason = dumper->max_reason;
@@ -2937,18 +3245,16 @@ void kmsg_dump(enum kmsg_dump_reason reason)
 		if (reason > max_reason)
 			continue;
 
-		/*
-		 * use a local copy to avoid modifying the
-		 * iterator used by any other cpus/contexts
-		 */
-		memcpy(&dumper_local, dumper, sizeof(dumper_local));
-
 		/* initialize iterator with data about the stored records */
-		dumper_local.active = true;
-		kmsg_dump_rewind(&dumper_local);
+		dumper->active = true;
+
+		kmsg_dump_rewind_nolock(dumper);
 
 		/* invoke dumper which will iterate over records */
-		dumper_local.dump(&dumper_local, reason);
+		dumper->dump(dumper, reason);
+
+		/* reset iterator */
+		dumper->active = false;
 	}
 	rcu_read_unlock();
 }
@@ -2975,67 +3281,38 @@ void kmsg_dump(enum kmsg_dump_reason reason)
 bool kmsg_dump_get_line_nolock(struct kmsg_dumper *dumper, bool syslog,
 			       char *line, size_t size, size_t *len)
 {
-	struct prb_iterator iter;
-	struct printk_log *msg;
-	struct prb_handle h;
-	bool cont = false;
-	char *msgbuf;
-	char *rbuf;
-	size_t l;
-	u64 seq;
-	int ret;
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
+	size_t l = 0;
+	bool ret = false;
+
+	prb_rec_init_rd(&r, &info, line, size);
 
 	if (!dumper->active)
-		return cont;
+		goto out;
 
-	rbuf = prb_reserve(&h, &sprint_rb, PRINTK_RECORD_MAX);
-	if (!rbuf)
-		return cont;
-	msgbuf = rbuf;
-retry:
-	for (;;) {
-		prb_iter_init(&iter, &printk_rb, &seq);
-
-		if (dumper->line_seq == seq) {
-			/* already where we want to be */
-			break;
-		} else if (dumper->line_seq < seq) {
-			/* messages are gone, move to first available one */
-			dumper->line_seq = seq;
-			break;
+	/* Read text or count text lines? */
+	if (line) {
+		if (!prb_read_valid(prb, dumper->cur_seq, &r))
+			goto out;
+		l = record_print_text(&r, syslog, printk_time);
+	} else {
+		if (!prb_read_valid_info(prb, dumper->cur_seq,
+					 &info, &line_count)) {
+			goto out;
 		}
+		l = get_record_print_text_size(&info, line_count, syslog,
+					       printk_time);
 
-		ret = prb_iter_seek(&iter, dumper->line_seq);
-		if (ret > 0) {
-			/* seeked to line_seq */
-			break;
-		} else if (ret == 0) {
-			/*
-			 * The end of the list was hit without ever seeing
-			 * line_seq. Reset it to the beginning of the list.
-			 */
-			prb_iter_init(&iter, &printk_rb, &dumper->line_seq);
-			break;
-		}
-		/* iterator invalid, start over */
 	}
 
-	ret = prb_iter_next(&iter, msgbuf, PRINTK_RECORD_MAX,
-			    &dumper->line_seq);
-	if (ret == 0)
-		goto out;
-	else if (ret < 0)
-		goto retry;
-
-	msg = (struct printk_log *)msgbuf;
-	l = msg_print_text(msg, syslog, printk_time, line, size);
-
+	dumper->cur_seq = r.info->seq + 1;
+	ret = true;
+out:
 	if (len)
 		*len = l;
-	cont = true;
-out:
-	prb_commit(&h);
-	return cont;
+	return ret;
 }
 
 /**
@@ -3058,11 +3335,7 @@ bool kmsg_dump_get_line_nolock(struct kmsg_dumper *dumper, bool syslog,
 bool kmsg_dump_get_line(struct kmsg_dumper *dumper, bool syslog,
 			char *line, size_t size, size_t *len)
 {
-	bool ret;
-
-	ret = kmsg_dump_get_line_nolock(dumper, syslog, line, size, len);
-
-	return ret;
+	return kmsg_dump_get_line_nolock(dumper, syslog, line, size, len);
 }
 EXPORT_SYMBOL_GPL(kmsg_dump_get_line);
 
@@ -3072,7 +3345,7 @@ EXPORT_SYMBOL_GPL(kmsg_dump_get_line);
  * @syslog: include the "<4>" prefixes
  * @buf: buffer to copy the line to
  * @size: maximum size of the buffer
- * @len: length of line placed into buffer
+ * @len_out: length of line placed into buffer
  *
  * Start at the end of the kmsg buffer and fill the provided buffer
  * with as many of the the *youngest* kmsg records that fit into it.
@@ -3086,103 +3359,74 @@ EXPORT_SYMBOL_GPL(kmsg_dump_get_line);
  * read.
  */
 bool kmsg_dump_get_buffer(struct kmsg_dumper *dumper, bool syslog,
-			  char *buf, size_t size, size_t *len)
+			  char *buf, size_t size, size_t *len_out)
 {
-	struct prb_iterator iter;
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
+	u64 seq;
+	u64 next_seq;
+	size_t len = 0;
+	bool ret = false;
 	bool time = printk_time;
-	struct printk_log *msg;
-	u64 new_end_seq = 0;
-	struct prb_handle h;
-	bool cont = false;
-	char *msgbuf;
-	u64 end_seq;
-	int textlen;
-	u64 seq = 0;
-	char *rbuf;
-	int l = 0;
-	int ret;
 
-	if (!dumper->active)
-		return cont;
-
-	rbuf = prb_reserve(&h, &sprint_rb, PRINTK_RECORD_MAX);
-	if (!rbuf)
-		return cont;
-	msgbuf = rbuf;
-
-	prb_iter_init(&iter, &printk_rb, NULL);
-
-	/*
-	 * seek to the start record, which is set/modified
-	 * by kmsg_dump_get_line_nolock()
-	 */
-	ret = prb_iter_seek(&iter, dumper->line_seq);
-	if (ret <= 0)
-		prb_iter_init(&iter, &printk_rb, &seq);
-
-	/* work with a local end seq to have a constant value */
-	end_seq = dumper->buffer_end_seq;
-	if (!end_seq) {
-		/* initialize end seq to "infinity" */
-		end_seq = -1;
-		dumper->buffer_end_seq = end_seq;
-	}
-retry:
-	if (seq >= end_seq)
+	if (!dumper->active || !buf || !size)
 		goto out;
 
-	/* count the total bytes after seq */
-	textlen = count_remaining(&iter, end_seq, msgbuf,
-				  PRINTK_RECORD_MAX, 0, time);
-
-	/* move iter forward until length fits into the buffer */
-	while (textlen > size) {
-		ret = prb_iter_next(&iter, msgbuf, PRINTK_RECORD_MAX, &seq);
-		if (ret == 0) {
-			break;
-		} else if (ret < 0 || seq >= end_seq) {
-			prb_iter_init(&iter, &printk_rb, &seq);
-			goto retry;
-		}
-
-		msg = (struct printk_log *)msgbuf;
-		textlen -= msg_print_text(msg, true, time, NULL, 0);
+	if (dumper->cur_seq < prb_first_valid_seq(prb)) {
+		/* messages are gone, move to first available one */
+		dumper->cur_seq = prb_first_valid_seq(prb);
 	}
 
-	/* save end seq for the next interation */
-	new_end_seq = seq + 1;
+	/* last entry */
+	if (dumper->cur_seq >= dumper->next_seq)
+		goto out;
 
-	/* copy messages to buffer */
-	while (l < size) {
-		ret = prb_iter_next(&iter, msgbuf, PRINTK_RECORD_MAX, &seq);
-		if (ret == 0) {
+	/*
+	 * Find first record that fits, including all following records,
+	 * into the user-provided buffer for this dump.
+	 */
+
+	prb_for_each_info(dumper->cur_seq, prb, seq, &info, &line_count) {
+		if (info.seq >= dumper->next_seq)
 			break;
-		} else if (ret < 0) {
-			/*
-			 * iterator (and thus also the start position)
-			 * invalid, start over from beginning of list
-			 */
-			prb_iter_init(&iter, &printk_rb, NULL);
-			continue;
-		}
-
-		if (seq >= end_seq)
-			break;
-
-		msg = (struct printk_log *)msgbuf;
-		textlen = msg_print_text(msg, syslog, time, buf + l, size - l);
-		if (textlen > 0)
-			l += textlen;
-		cont = true;
+		len += get_record_print_text_size(&info, line_count, true, time);
 	}
 
-	if (cont && len)
-		*len = l;
+	/*
+	 * Move first record forward until length fits into the buffer. This
+	 * is a best effort attempt. If @dumper->next_seq is reached because
+	 * the ringbuffer is wrapping too fast, just start filling the buffer
+	 * from there.
+	 */
+	prb_for_each_info(dumper->cur_seq, prb, seq, &info, &line_count) {
+		if (len <= size || info.seq >= dumper->next_seq)
+			break;
+		len -= get_record_print_text_size(&info, line_count, true, time);
+	}
+
+	/* Keep track of the last message for the next interation. */
+	next_seq = seq;
+
+	prb_rec_init_rd(&r, &info, buf, size);
+
+	len = 0;
+	prb_for_each_record(seq, prb, seq, &r) {
+		if (r.info->seq >= dumper->next_seq)
+			break;
+
+		len += record_print_text(&r, syslog, time);
+
+		/* Adjust record to store to remaining buffer space. */
+		prb_rec_init_rd(&r, &info, buf + len, size - len);
+	}
+
+	dumper->next_seq = next_seq;
+	ret = true;
 out:
-	prb_commit(&h);
-	if (new_end_seq)
-		dumper->buffer_end_seq = new_end_seq;
-	return cont;
+	if (len_out)
+		*len_out = len;
+	return ret;
 }
 EXPORT_SYMBOL_GPL(kmsg_dump_get_buffer);
 
@@ -3193,13 +3437,11 @@ EXPORT_SYMBOL_GPL(kmsg_dump_get_buffer);
  * Reset the dumper's iterator so that kmsg_dump_get_line() and
  * kmsg_dump_get_buffer() can be called again and used multiple
  * times within the same dumper.dump() callback.
- *
- * The function is similar to kmsg_dump_rewind(), but grabs no locks.
  */
 void kmsg_dump_rewind_nolock(struct kmsg_dumper *dumper)
 {
-	dumper->line_seq = 0;
-	dumper->buffer_end_seq = 0;
+	dumper->cur_seq = atomic64_read(&clear_seq);
+	dumper->next_seq = prb_next_seq(prb);
 }
 
 /**
@@ -3216,76 +3458,95 @@ void kmsg_dump_rewind(struct kmsg_dumper *dumper)
 }
 EXPORT_SYMBOL_GPL(kmsg_dump_rewind);
 
-static bool console_can_emergency(int level)
-{
-	struct console *con;
+#endif
 
-	for_each_console(con) {
-		if (!(con->flags & CON_ENABLED))
-			continue;
-		if (con->write_atomic && oops_in_progress)
-			return true;
-		if (con->write && (con->flags & CON_BOOT))
+struct prb_cpulock {
+	atomic_t owner;
+	unsigned long __percpu *irqflags;
+};
+
+#define DECLARE_STATIC_PRINTKRB_CPULOCK(name)				\
+static DEFINE_PER_CPU(unsigned long, _##name##_percpu_irqflags);	\
+static struct prb_cpulock name = {					\
+	.owner = ATOMIC_INIT(-1),					\
+	.irqflags = &_##name##_percpu_irqflags,				\
+}
+
+static bool __prb_trylock(struct prb_cpulock *cpu_lock,
+			  unsigned int *cpu_store)
+{
+	unsigned long *flags;
+	unsigned int cpu;
+
+	cpu = get_cpu();
+
+	*cpu_store = atomic_read(&cpu_lock->owner);
+	/* memory barrier to ensure the current lock owner is visible */
+	smp_rmb();
+	if (*cpu_store == -1) {
+		flags = per_cpu_ptr(cpu_lock->irqflags, cpu);
+		local_irq_save(*flags);
+		if (atomic_try_cmpxchg_acquire(&cpu_lock->owner,
+					       cpu_store, cpu)) {
 			return true;
+		}
+		local_irq_restore(*flags);
+	} else if (*cpu_store == cpu) {
+		return true;
 	}
+
+	put_cpu();
 	return false;
 }
 
-static void call_emergency_console_drivers(int level, const char *text,
-					   size_t text_len)
+/*
+ * prb_lock: Perform a processor-reentrant spin lock.
+ * @cpu_lock: A pointer to the lock object.
+ * @cpu_store: A "flags" pointer to store lock status information.
+ *
+ * If no processor has the lock, the calling processor takes the lock and
+ * becomes the owner. If the calling processor is already the owner of the
+ * lock, this function succeeds immediately. If lock is locked by another
+ * processor, this function spins until the calling processor becomes the
+ * owner.
+ *
+ * It is safe to call this function from any context and state.
+ */
+static void prb_lock(struct prb_cpulock *cpu_lock, unsigned int *cpu_store)
 {
-	struct console *con;
-
-	for_each_console(con) {
-		if (!(con->flags & CON_ENABLED))
-			continue;
-		if (con->write_atomic && oops_in_progress) {
-			con->write_atomic(con, text, text_len);
-			continue;
-		}
-		if (con->write && (con->flags & CON_BOOT)) {
-			con->write(con, text, text_len);
-			continue;
-		}
+	for (;;) {
+		if (__prb_trylock(cpu_lock, cpu_store))
+			break;
+		cpu_relax();
 	}
 }
 
-static void printk_emergency(char *buffer, int level, u64 ts_nsec, u16 cpu,
-			     char *text, u16 text_len)
+/*
+ * prb_unlock: Perform a processor-reentrant spin unlock.
+ * @cpu_lock: A pointer to the lock object.
+ * @cpu_store: A "flags" object storing lock status information.
+ *
+ * Release the lock. The calling processor must be the owner of the lock.
+ *
+ * It is safe to call this function from any context and state.
+ */
+static void prb_unlock(struct prb_cpulock *cpu_lock, unsigned int cpu_store)
 {
-	struct printk_log msg;
-	size_t prefix_len;
+	unsigned long *flags;
+	unsigned int cpu;
 
-	if (!console_can_emergency(level))
-		return;
+	cpu = atomic_read(&cpu_lock->owner);
+	atomic_set_release(&cpu_lock->owner, cpu_store);
 
-	msg.level = level;
-	msg.ts_nsec = ts_nsec;
-	msg.cpu = cpu;
-	msg.facility = 0;
+	if (cpu_store == -1) {
+		flags = per_cpu_ptr(cpu_lock->irqflags, cpu);
+		local_irq_restore(*flags);
+	}
 
-	/* "text" must have PREFIX_MAX preceding bytes available */
-
-	prefix_len = print_prefix(&msg,
-				  console_msg_format & MSG_FORMAT_SYSLOG,
-				  printk_time, buffer);
-	/* move the prefix forward to the beginning of the message text */
-	text -= prefix_len;
-	memmove(text, buffer, prefix_len);
-	text_len += prefix_len;
-
-	text[text_len++] = '\n';
-
-	call_emergency_console_drivers(level, text, text_len);
-
-	touch_softlockup_watchdog_sync();
-	clocksource_touch_watchdog();
-	rcu_cpu_stall_reset();
-	touch_nmi_watchdog();
-
-	printk_delay(level);
+	put_cpu();
 }
-#endif
+
+DECLARE_STATIC_PRINTKRB_CPULOCK(printk_cpulock);
 
 void console_atomic_lock(unsigned int *flags)
 {
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
new file mode 100644
index 0000000000000..24a960a89aa89
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.c
@@ -0,0 +1,2086 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/kernel.h>
+#include <linux/irqflags.h>
+#include <linux/string.h>
+#include <linux/errno.h>
+#include <linux/bug.h>
+#include "printk_ringbuffer.h"
+
+/**
+ * DOC: printk_ringbuffer overview
+ *
+ * Data Structure
+ * --------------
+ * The printk_ringbuffer is made up of 3 internal ringbuffers:
+ *
+ *   desc_ring
+ *     A ring of descriptors and their meta data (such as sequence number,
+ *     timestamp, loglevel, etc.) as well as internal state information about
+ *     the record and logical positions specifying where in the other
+ *     ringbuffer the text strings are located.
+ *
+ *   text_data_ring
+ *     A ring of data blocks. A data block consists of an unsigned long
+ *     integer (ID) that maps to a desc_ring index followed by the text
+ *     string of the record.
+ *
+ * The internal state information of a descriptor is the key element to allow
+ * readers and writers to locklessly synchronize access to the data.
+ *
+ * Implementation
+ * --------------
+ *
+ * Descriptor Ring
+ * ~~~~~~~~~~~~~~~
+ * The descriptor ring is an array of descriptors. A descriptor contains
+ * essential meta data to track the data of a printk record using
+ * blk_lpos structs pointing to associated text data blocks (see
+ * "Data Rings" below). Each descriptor is assigned an ID that maps
+ * directly to index values of the descriptor array and has a state. The ID
+ * and the state are bitwise combined into a single descriptor field named
+ * @state_var, allowing ID and state to be synchronously and atomically
+ * updated.
+ *
+ * Descriptors have four states:
+ *
+ *   reserved
+ *     A writer is modifying the record.
+ *
+ *   committed
+ *     The record and all its data are written. A writer can reopen the
+ *     descriptor (transitioning it back to reserved), but in the committed
+ *     state the data is consistent.
+ *
+ *   finalized
+ *     The record and all its data are complete and available for reading. A
+ *     writer cannot reopen the descriptor.
+ *
+ *   reusable
+ *     The record exists, but its text and/or meta data may no longer be
+ *     available.
+ *
+ * Querying the @state_var of a record requires providing the ID of the
+ * descriptor to query. This can yield a possible fifth (pseudo) state:
+ *
+ *   miss
+ *     The descriptor being queried has an unexpected ID.
+ *
+ * The descriptor ring has a @tail_id that contains the ID of the oldest
+ * descriptor and @head_id that contains the ID of the newest descriptor.
+ *
+ * When a new descriptor should be created (and the ring is full), the tail
+ * descriptor is invalidated by first transitioning to the reusable state and
+ * then invalidating all tail data blocks up to and including the data blocks
+ * associated with the tail descriptor (for the text ring). Then
+ * @tail_id is advanced, followed by advancing @head_id. And finally the
+ * @state_var of the new descriptor is initialized to the new ID and reserved
+ * state.
+ *
+ * The @tail_id can only be advanced if the new @tail_id would be in the
+ * committed or reusable queried state. This makes it possible that a valid
+ * sequence number of the tail is always available.
+ *
+ * Descriptor Finalization
+ * ~~~~~~~~~~~~~~~~~~~~~~~
+ * When a writer calls the commit function prb_commit(), record data is
+ * fully stored and is consistent within the ringbuffer. However, a writer can
+ * reopen that record, claiming exclusive access (as with prb_reserve()), and
+ * modify that record. When finished, the writer must again commit the record.
+ *
+ * In order for a record to be made available to readers (and also become
+ * recyclable for writers), it must be finalized. A finalized record cannot be
+ * reopened and can never become "unfinalized". Record finalization can occur
+ * in three different scenarios:
+ *
+ *   1) A writer can simultaneously commit and finalize its record by calling
+ *      prb_final_commit() instead of prb_commit().
+ *
+ *   2) When a new record is reserved and the previous record has been
+ *      committed via prb_commit(), that previous record is automatically
+ *      finalized.
+ *
+ *   3) When a record is committed via prb_commit() and a newer record
+ *      already exists, the record being committed is automatically finalized.
+ *
+ * Data Ring
+ * ~~~~~~~~~
+ * The text data ring is a byte array composed of data blocks. Data blocks are
+ * referenced by blk_lpos structs that point to the logical position of the
+ * beginning of a data block and the beginning of the next adjacent data
+ * block. Logical positions are mapped directly to index values of the byte
+ * array ringbuffer.
+ *
+ * Each data block consists of an ID followed by the writer data. The ID is
+ * the identifier of a descriptor that is associated with the data block. A
+ * given data block is considered valid if all of the following conditions
+ * are met:
+ *
+ *   1) The descriptor associated with the data block is in the committed
+ *      or finalized queried state.
+ *
+ *   2) The blk_lpos struct within the descriptor associated with the data
+ *      block references back to the same data block.
+ *
+ *   3) The data block is within the head/tail logical position range.
+ *
+ * If the writer data of a data block would extend beyond the end of the
+ * byte array, only the ID of the data block is stored at the logical
+ * position and the full data block (ID and writer data) is stored at the
+ * beginning of the byte array. The referencing blk_lpos will point to the
+ * ID before the wrap and the next data block will be at the logical
+ * position adjacent the full data block after the wrap.
+ *
+ * Data rings have a @tail_lpos that points to the beginning of the oldest
+ * data block and a @head_lpos that points to the logical position of the
+ * next (not yet existing) data block.
+ *
+ * When a new data block should be created (and the ring is full), tail data
+ * blocks will first be invalidated by putting their associated descriptors
+ * into the reusable state and then pushing the @tail_lpos forward beyond
+ * them. Then the @head_lpos is pushed forward and is associated with a new
+ * descriptor. If a data block is not valid, the @tail_lpos cannot be
+ * advanced beyond it.
+ *
+ * Info Array
+ * ~~~~~~~~~~
+ * The general meta data of printk records are stored in printk_info structs,
+ * stored in an array with the same number of elements as the descriptor ring.
+ * Each info corresponds to the descriptor of the same index in the
+ * descriptor ring. Info validity is confirmed by evaluating the corresponding
+ * descriptor before and after loading the info.
+ *
+ * Usage
+ * -----
+ * Here are some simple examples demonstrating writers and readers. For the
+ * examples a global ringbuffer (test_rb) is available (which is not the
+ * actual ringbuffer used by printk)::
+ *
+ *	DEFINE_PRINTKRB(test_rb, 15, 5);
+ *
+ * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of
+ * 1 MiB (2 ^ (15 + 5)) for text data.
+ *
+ * Sample writer code::
+ *
+ *	const char *textstr = "message text";
+ *	struct prb_reserved_entry e;
+ *	struct printk_record r;
+ *
+ *	// specify how much to allocate
+ *	prb_rec_init_wr(&r, strlen(textstr) + 1);
+ *
+ *	if (prb_reserve(&e, &test_rb, &r)) {
+ *		snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
+ *
+ *		r.info->text_len = strlen(textstr);
+ *		r.info->ts_nsec = local_clock();
+ *		r.info->caller_id = printk_caller_id();
+ *
+ *		// commit and finalize the record
+ *		prb_final_commit(&e);
+ *	}
+ *
+ * Note that additional writer functions are available to extend a record
+ * after it has been committed but not yet finalized. This can be done as
+ * long as no new records have been reserved and the caller is the same.
+ *
+ * Sample writer code (record extending)::
+ *
+ *		// alternate rest of previous example
+ *
+ *		r.info->text_len = strlen(textstr);
+ *		r.info->ts_nsec = local_clock();
+ *		r.info->caller_id = printk_caller_id();
+ *
+ *		// commit the record (but do not finalize yet)
+ *		prb_commit(&e);
+ *	}
+ *
+ *	...
+ *
+ *	// specify additional 5 bytes text space to extend
+ *	prb_rec_init_wr(&r, 5);
+ *
+ *	// try to extend, but only if it does not exceed 32 bytes
+ *	if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id()), 32) {
+ *		snprintf(&r.text_buf[r.info->text_len],
+ *			 r.text_buf_size - r.info->text_len, "hello");
+ *
+ *		r.info->text_len += 5;
+ *
+ *		// commit and finalize the record
+ *		prb_final_commit(&e);
+ *	}
+ *
+ * Sample reader code::
+ *
+ *	struct printk_info info;
+ *	struct printk_record r;
+ *	char text_buf[32];
+ *	u64 seq;
+ *
+ *	prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf));
+ *
+ *	prb_for_each_record(0, &test_rb, &seq, &r) {
+ *		if (info.seq != seq)
+ *			pr_warn("lost %llu records\n", info.seq - seq);
+ *
+ *		if (info.text_len > r.text_buf_size) {
+ *			pr_warn("record %llu text truncated\n", info.seq);
+ *			text_buf[r.text_buf_size - 1] = 0;
+ *		}
+ *
+ *		pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec,
+ *			&text_buf[0]);
+ *	}
+ *
+ * Note that additional less convenient reader functions are available to
+ * allow complex record access.
+ *
+ * ABA Issues
+ * ~~~~~~~~~~
+ * To help avoid ABA issues, descriptors are referenced by IDs (array index
+ * values combined with tagged bits counting array wraps) and data blocks are
+ * referenced by logical positions (array index values combined with tagged
+ * bits counting array wraps). However, on 32-bit systems the number of
+ * tagged bits is relatively small such that an ABA incident is (at least
+ * theoretically) possible. For example, if 4 million maximally sized (1KiB)
+ * printk messages were to occur in NMI context on a 32-bit system, the
+ * interrupted context would not be able to recognize that the 32-bit integer
+ * completely wrapped and thus represents a different data block than the one
+ * the interrupted context expects.
+ *
+ * To help combat this possibility, additional state checking is performed
+ * (such as using cmpxchg() even though set() would suffice). These extra
+ * checks are commented as such and will hopefully catch any ABA issue that
+ * a 32-bit system might experience.
+ *
+ * Memory Barriers
+ * ~~~~~~~~~~~~~~~
+ * Multiple memory barriers are used. To simplify proving correctness and
+ * generating litmus tests, lines of code related to memory barriers
+ * (loads, stores, and the associated memory barriers) are labeled::
+ *
+ *	LMM(function:letter)
+ *
+ * Comments reference the labels using only the "function:letter" part.
+ *
+ * The memory barrier pairs and their ordering are:
+ *
+ *   desc_reserve:D / desc_reserve:B
+ *     push descriptor tail (id), then push descriptor head (id)
+ *
+ *   desc_reserve:D / data_push_tail:B
+ *     push data tail (lpos), then set new descriptor reserved (state)
+ *
+ *   desc_reserve:D / desc_push_tail:C
+ *     push descriptor tail (id), then set new descriptor reserved (state)
+ *
+ *   desc_reserve:D / prb_first_seq:C
+ *     push descriptor tail (id), then set new descriptor reserved (state)
+ *
+ *   desc_reserve:F / desc_read:D
+ *     set new descriptor id and reserved (state), then allow writer changes
+ *
+ *   data_alloc:A (or data_realloc:A) / desc_read:D
+ *     set old descriptor reusable (state), then modify new data block area
+ *
+ *   data_alloc:A (or data_realloc:A) / data_push_tail:B
+ *     push data tail (lpos), then modify new data block area
+ *
+ *   _prb_commit:B / desc_read:B
+ *     store writer changes, then set new descriptor committed (state)
+ *
+ *   desc_reopen_last:A / _prb_commit:B
+ *     set descriptor reserved (state), then read descriptor data
+ *
+ *   _prb_commit:B / desc_reserve:D
+ *     set new descriptor committed (state), then check descriptor head (id)
+ *
+ *   data_push_tail:D / data_push_tail:A
+ *     set descriptor reusable (state), then push data tail (lpos)
+ *
+ *   desc_push_tail:B / desc_reserve:D
+ *     set descriptor reusable (state), then push descriptor tail (id)
+ */
+
+#define DATA_SIZE(data_ring)		_DATA_SIZE((data_ring)->size_bits)
+#define DATA_SIZE_MASK(data_ring)	(DATA_SIZE(data_ring) - 1)
+
+#define DESCS_COUNT(desc_ring)		_DESCS_COUNT((desc_ring)->count_bits)
+#define DESCS_COUNT_MASK(desc_ring)	(DESCS_COUNT(desc_ring) - 1)
+
+/* Determine the data array index from a logical position. */
+#define DATA_INDEX(data_ring, lpos)	((lpos) & DATA_SIZE_MASK(data_ring))
+
+/* Determine the desc array index from an ID or sequence number. */
+#define DESC_INDEX(desc_ring, n)	((n) & DESCS_COUNT_MASK(desc_ring))
+
+/* Determine how many times the data array has wrapped. */
+#define DATA_WRAPS(data_ring, lpos)	((lpos) >> (data_ring)->size_bits)
+
+/* Determine if a logical position refers to a data-less block. */
+#define LPOS_DATALESS(lpos)		((lpos) & 1UL)
+#define BLK_DATALESS(blk)		(LPOS_DATALESS((blk)->begin) && \
+					 LPOS_DATALESS((blk)->next))
+
+/* Get the logical position at index 0 of the current wrap. */
+#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
+((lpos) & ~DATA_SIZE_MASK(data_ring))
+
+/* Get the ID for the same index of the previous wrap as the given ID. */
+#define DESC_ID_PREV_WRAP(desc_ring, id) \
+DESC_ID((id) - DESCS_COUNT(desc_ring))
+
+/*
+ * A data block: mapped directly to the beginning of the data block area
+ * specified as a logical position within the data ring.
+ *
+ * @id:   the ID of the associated descriptor
+ * @data: the writer data
+ *
+ * Note that the size of a data block is only known by its associated
+ * descriptor.
+ */
+struct prb_data_block {
+	unsigned long	id;
+	char		data[0];
+};
+
+/*
+ * Return the descriptor associated with @n. @n can be either a
+ * descriptor ID or a sequence number.
+ */
+static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
+{
+	return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
+}
+
+/*
+ * Return the printk_info associated with @n. @n can be either a
+ * descriptor ID or a sequence number.
+ */
+static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n)
+{
+	return &desc_ring->infos[DESC_INDEX(desc_ring, n)];
+}
+
+static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
+				       unsigned long begin_lpos)
+{
+	return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
+}
+
+/*
+ * Increase the data size to account for data block meta data plus any
+ * padding so that the adjacent data block is aligned on the ID size.
+ */
+static unsigned int to_blk_size(unsigned int size)
+{
+	struct prb_data_block *db = NULL;
+
+	size += sizeof(*db);
+	size = ALIGN(size, sizeof(db->id));
+	return size;
+}
+
+/*
+ * Sanity checker for reserve size. The ringbuffer code assumes that a data
+ * block does not exceed the maximum possible size that could fit within the
+ * ringbuffer. This function provides that basic size check so that the
+ * assumption is safe.
+ */
+static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
+{
+	struct prb_data_block *db = NULL;
+
+	if (size == 0)
+		return true;
+
+	/*
+	 * Ensure the alignment padded size could possibly fit in the data
+	 * array. The largest possible data block must still leave room for
+	 * at least the ID of the next block.
+	 */
+	size = to_blk_size(size);
+	if (size > DATA_SIZE(data_ring) - sizeof(db->id))
+		return false;
+
+	return true;
+}
+
+/* Query the state of a descriptor. */
+static enum desc_state get_desc_state(unsigned long id,
+				      unsigned long state_val)
+{
+	if (id != DESC_ID(state_val))
+		return desc_miss;
+
+	return DESC_STATE(state_val);
+}
+
+/*
+ * Get a copy of a specified descriptor and return its queried state. If the
+ * descriptor is in an inconsistent state (miss or reserved), the caller can
+ * only expect the descriptor's @state_var field to be valid.
+ *
+ * The sequence number and caller_id can be optionally retrieved. Like all
+ * non-state_var data, they are only valid if the descriptor is in a
+ * consistent state.
+ */
+static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
+				 unsigned long id, struct prb_desc *desc_out,
+				 u64 *seq_out, u32 *caller_id_out)
+{
+	struct printk_info *info = to_info(desc_ring, id);
+	struct prb_desc *desc = to_desc(desc_ring, id);
+	atomic_long_t *state_var = &desc->state_var;
+	enum desc_state d_state;
+	unsigned long state_val;
+
+	/* Check the descriptor state. */
+	state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
+	d_state = get_desc_state(id, state_val);
+	if (d_state == desc_miss || d_state == desc_reserved) {
+		/*
+		 * The descriptor is in an inconsistent state. Set at least
+		 * @state_var so that the caller can see the details of
+		 * the inconsistent state.
+		 */
+		goto out;
+	}
+
+	/*
+	 * Guarantee the state is loaded before copying the descriptor
+	 * content. This avoids copying obsolete descriptor content that might
+	 * not apply to the descriptor state. This pairs with _prb_commit:B.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_read:A reads from _prb_commit:B, then desc_read:C reads
+	 * from _prb_commit:A.
+	 *
+	 * Relies on:
+	 *
+	 * WMB from _prb_commit:A to _prb_commit:B
+	 *    matching
+	 * RMB from desc_read:A to desc_read:C
+	 */
+	smp_rmb(); /* LMM(desc_read:B) */
+
+	/*
+	 * Copy the descriptor data. The data is not valid until the
+	 * state has been re-checked. A memcpy() for all of @desc
+	 * cannot be used because of the atomic_t @state_var field.
+	 */
+	memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos,
+	       sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */
+	if (seq_out)
+		*seq_out = info->seq; /* also part of desc_read:C */
+	if (caller_id_out)
+		*caller_id_out = info->caller_id; /* also part of desc_read:C */
+
+	/*
+	 * 1. Guarantee the descriptor content is loaded before re-checking
+	 *    the state. This avoids reading an obsolete descriptor state
+	 *    that may not apply to the copied content. This pairs with
+	 *    desc_reserve:F.
+	 *
+	 *    Memory barrier involvement:
+	 *
+	 *    If desc_read:C reads from desc_reserve:G, then desc_read:E
+	 *    reads from desc_reserve:F.
+	 *
+	 *    Relies on:
+	 *
+	 *    WMB from desc_reserve:F to desc_reserve:G
+	 *       matching
+	 *    RMB from desc_read:C to desc_read:E
+	 *
+	 * 2. Guarantee the record data is loaded before re-checking the
+	 *    state. This avoids reading an obsolete descriptor state that may
+	 *    not apply to the copied data. This pairs with data_alloc:A and
+	 *    data_realloc:A.
+	 *
+	 *    Memory barrier involvement:
+	 *
+	 *    If copy_data:A reads from data_alloc:B, then desc_read:E
+	 *    reads from desc_make_reusable:A.
+	 *
+	 *    Relies on:
+	 *
+	 *    MB from desc_make_reusable:A to data_alloc:B
+	 *       matching
+	 *    RMB from desc_read:C to desc_read:E
+	 *
+	 *    Note: desc_make_reusable:A and data_alloc:B can be different
+	 *          CPUs. However, the data_alloc:B CPU (which performs the
+	 *          full memory barrier) must have previously seen
+	 *          desc_make_reusable:A.
+	 */
+	smp_rmb(); /* LMM(desc_read:D) */
+
+	/*
+	 * The data has been copied. Return the current descriptor state,
+	 * which may have changed since the load above.
+	 */
+	state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
+	d_state = get_desc_state(id, state_val);
+out:
+	atomic_long_set(&desc_out->state_var, state_val);
+	return d_state;
+}
+
+/*
+ * Take a specified descriptor out of the finalized state by attempting
+ * the transition from finalized to reusable. Either this context or some
+ * other context will have been successful.
+ */
+static void desc_make_reusable(struct prb_desc_ring *desc_ring,
+			       unsigned long id)
+{
+	unsigned long val_finalized = DESC_SV(id, desc_finalized);
+	unsigned long val_reusable = DESC_SV(id, desc_reusable);
+	struct prb_desc *desc = to_desc(desc_ring, id);
+	atomic_long_t *state_var = &desc->state_var;
+
+	atomic_long_cmpxchg_relaxed(state_var, val_finalized,
+				    val_reusable); /* LMM(desc_make_reusable:A) */
+}
+
+/*
+ * Given the text data ring, put the associated descriptor of each
+ * data block from @lpos_begin until @lpos_end into the reusable state.
+ *
+ * If there is any problem making the associated descriptor reusable, either
+ * the descriptor has not yet been finalized or another writer context has
+ * already pushed the tail lpos past the problematic data block. Regardless,
+ * on error the caller can re-load the tail lpos to determine the situation.
+ */
+static bool data_make_reusable(struct printk_ringbuffer *rb,
+			       struct prb_data_ring *data_ring,
+			       unsigned long lpos_begin,
+			       unsigned long lpos_end,
+			       unsigned long *lpos_out)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct prb_data_block *blk;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos;
+	unsigned long id;
+
+	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
+	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
+		blk = to_block(data_ring, lpos_begin);
+
+		/*
+		 * Load the block ID from the data block. This is a data race
+		 * against a writer that may have newly reserved this data
+		 * area. If the loaded value matches a valid descriptor ID,
+		 * the blk_lpos of that descriptor will be checked to make
+		 * sure it points back to this data block. If the check fails,
+		 * the data area has been recycled by another writer.
+		 */
+		id = blk->id; /* LMM(data_make_reusable:A) */
+
+		d_state = desc_read(desc_ring, id, &desc,
+				    NULL, NULL); /* LMM(data_make_reusable:B) */
+
+		switch (d_state) {
+		case desc_miss:
+		case desc_reserved:
+		case desc_committed:
+			return false;
+		case desc_finalized:
+			/*
+			 * This data block is invalid if the descriptor
+			 * does not point back to it.
+			 */
+			if (blk_lpos->begin != lpos_begin)
+				return false;
+			desc_make_reusable(desc_ring, id);
+			break;
+		case desc_reusable:
+			/*
+			 * This data block is invalid if the descriptor
+			 * does not point back to it.
+			 */
+			if (blk_lpos->begin != lpos_begin)
+				return false;
+			break;
+		}
+
+		/* Advance @lpos_begin to the next data block. */
+		lpos_begin = blk_lpos->next;
+	}
+
+	*lpos_out = lpos_begin;
+	return true;
+}
+
+/*
+ * Advance the data ring tail to at least @lpos. This function puts
+ * descriptors into the reusable state if the tail is pushed beyond
+ * their associated data block.
+ */
+static bool data_push_tail(struct printk_ringbuffer *rb,
+			   struct prb_data_ring *data_ring,
+			   unsigned long lpos)
+{
+	unsigned long tail_lpos_new;
+	unsigned long tail_lpos;
+	unsigned long next_lpos;
+
+	/* If @lpos is from a data-less block, there is nothing to do. */
+	if (LPOS_DATALESS(lpos))
+		return true;
+
+	/*
+	 * Any descriptor states that have transitioned to reusable due to the
+	 * data tail being pushed to this loaded value will be visible to this
+	 * CPU. This pairs with data_push_tail:D.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If data_push_tail:A reads from data_push_tail:D, then this CPU can
+	 * see desc_make_reusable:A.
+	 *
+	 * Relies on:
+	 *
+	 * MB from desc_make_reusable:A to data_push_tail:D
+	 *    matches
+	 * READFROM from data_push_tail:D to data_push_tail:A
+	 *    thus
+	 * READFROM from desc_make_reusable:A to this CPU
+	 */
+	tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */
+
+	/*
+	 * Loop until the tail lpos is at or beyond @lpos. This condition
+	 * may already be satisfied, resulting in no full memory barrier
+	 * from data_push_tail:D being performed. However, since this CPU
+	 * sees the new tail lpos, any descriptor states that transitioned to
+	 * the reusable state must already be visible.
+	 */
+	while ((lpos - tail_lpos) - 1 < DATA_SIZE(data_ring)) {
+		/*
+		 * Make all descriptors reusable that are associated with
+		 * data blocks before @lpos.
+		 */
+		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
+					&next_lpos)) {
+			/*
+			 * 1. Guarantee the block ID loaded in
+			 *    data_make_reusable() is performed before
+			 *    reloading the tail lpos. The failed
+			 *    data_make_reusable() may be due to a newly
+			 *    recycled data area causing the tail lpos to
+			 *    have been previously pushed. This pairs with
+			 *    data_alloc:A and data_realloc:A.
+			 *
+			 *    Memory barrier involvement:
+			 *
+			 *    If data_make_reusable:A reads from data_alloc:B,
+			 *    then data_push_tail:C reads from
+			 *    data_push_tail:D.
+			 *
+			 *    Relies on:
+			 *
+			 *    MB from data_push_tail:D to data_alloc:B
+			 *       matching
+			 *    RMB from data_make_reusable:A to
+			 *    data_push_tail:C
+			 *
+			 *    Note: data_push_tail:D and data_alloc:B can be
+			 *          different CPUs. However, the data_alloc:B
+			 *          CPU (which performs the full memory
+			 *          barrier) must have previously seen
+			 *          data_push_tail:D.
+			 *
+			 * 2. Guarantee the descriptor state loaded in
+			 *    data_make_reusable() is performed before
+			 *    reloading the tail lpos. The failed
+			 *    data_make_reusable() may be due to a newly
+			 *    recycled descriptor causing the tail lpos to
+			 *    have been previously pushed. This pairs with
+			 *    desc_reserve:D.
+			 *
+			 *    Memory barrier involvement:
+			 *
+			 *    If data_make_reusable:B reads from
+			 *    desc_reserve:F, then data_push_tail:C reads
+			 *    from data_push_tail:D.
+			 *
+			 *    Relies on:
+			 *
+			 *    MB from data_push_tail:D to desc_reserve:F
+			 *       matching
+			 *    RMB from data_make_reusable:B to
+			 *    data_push_tail:C
+			 *
+			 *    Note: data_push_tail:D and desc_reserve:F can
+			 *          be different CPUs. However, the
+			 *          desc_reserve:F CPU (which performs the
+			 *          full memory barrier) must have previously
+			 *          seen data_push_tail:D.
+			 */
+			smp_rmb(); /* LMM(data_push_tail:B) */
+
+			tail_lpos_new = atomic_long_read(&data_ring->tail_lpos
+							); /* LMM(data_push_tail:C) */
+			if (tail_lpos_new == tail_lpos)
+				return false;
+
+			/* Another CPU pushed the tail. Try again. */
+			tail_lpos = tail_lpos_new;
+			continue;
+		}
+
+		/*
+		 * Guarantee any descriptor states that have transitioned to
+		 * reusable are stored before pushing the tail lpos. A full
+		 * memory barrier is needed since other CPUs may have made
+		 * the descriptor states reusable. This pairs with
+		 * data_push_tail:A.
+		 */
+		if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
+					    next_lpos)) { /* LMM(data_push_tail:D) */
+			break;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Advance the desc ring tail. This function advances the tail by one
+ * descriptor, thus invalidating the oldest descriptor. Before advancing
+ * the tail, the tail descriptor is made reusable and all data blocks up to
+ * and including the descriptor's data block are invalidated (i.e. the data
+ * ring tail is pushed past the data block of the descriptor being made
+ * reusable).
+ */
+static bool desc_push_tail(struct printk_ringbuffer *rb,
+			   unsigned long tail_id)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	enum desc_state d_state;
+	struct prb_desc desc;
+
+	d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL);
+
+	switch (d_state) {
+	case desc_miss:
+		/*
+		 * If the ID is exactly 1 wrap behind the expected, it is
+		 * in the process of being reserved by another writer and
+		 * must be considered reserved.
+		 */
+		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
+		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
+			return false;
+		}
+
+		/*
+		 * The ID has changed. Another writer must have pushed the
+		 * tail and recycled the descriptor already. Success is
+		 * returned because the caller is only interested in the
+		 * specified tail being pushed, which it was.
+		 */
+		return true;
+	case desc_reserved:
+	case desc_committed:
+		return false;
+	case desc_finalized:
+		desc_make_reusable(desc_ring, tail_id);
+		break;
+	case desc_reusable:
+		break;
+	}
+
+	/*
+	 * Data blocks must be invalidated before their associated
+	 * descriptor can be made available for recycling. Invalidating
+	 * them later is not possible because there is no way to trust
+	 * data blocks once their associated descriptor is gone.
+	 */
+
+	if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
+		return false;
+
+	/*
+	 * Check the next descriptor after @tail_id before pushing the tail
+	 * to it because the tail must always be in a finalized or reusable
+	 * state. The implementation of prb_first_seq() relies on this.
+	 *
+	 * A successful read implies that the next descriptor is less than or
+	 * equal to @head_id so there is no risk of pushing the tail past the
+	 * head.
+	 */
+	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc,
+			    NULL, NULL); /* LMM(desc_push_tail:A) */
+
+	if (d_state == desc_finalized || d_state == desc_reusable) {
+		/*
+		 * Guarantee any descriptor states that have transitioned to
+		 * reusable are stored before pushing the tail ID. This allows
+		 * verifying the recycled descriptor state. A full memory
+		 * barrier is needed since other CPUs may have made the
+		 * descriptor states reusable. This pairs with desc_reserve:D.
+		 */
+		atomic_long_cmpxchg(&desc_ring->tail_id, tail_id,
+				    DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
+	} else {
+		/*
+		 * Guarantee the last state load from desc_read() is before
+		 * reloading @tail_id in order to see a new tail ID in the
+		 * case that the descriptor has been recycled. This pairs
+		 * with desc_reserve:D.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If desc_push_tail:A reads from desc_reserve:F, then
+		 * desc_push_tail:D reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:F
+		 *    matching
+		 * RMB from desc_push_tail:A to desc_push_tail:D
+		 *
+		 * Note: desc_push_tail:B and desc_reserve:F can be different
+		 *       CPUs. However, the desc_reserve:F CPU (which performs
+		 *       the full memory barrier) must have previously seen
+		 *       desc_push_tail:B.
+		 */
+		smp_rmb(); /* LMM(desc_push_tail:C) */
+
+		/*
+		 * Re-check the tail ID. The descriptor following @tail_id is
+		 * not in an allowed tail state. But if the tail has since
+		 * been moved by another CPU, then it does not matter.
+		 */
+		if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */
+			return false;
+	}
+
+	return true;
+}
+
+/* Reserve a new descriptor, invalidating the oldest if necessary. */
+static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	unsigned long prev_state_val;
+	unsigned long id_prev_wrap;
+	struct prb_desc *desc;
+	unsigned long head_id;
+	unsigned long id;
+
+	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
+
+	do {
+		desc = to_desc(desc_ring, head_id);
+
+		id = DESC_ID(head_id + 1);
+		id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
+
+		/*
+		 * Guarantee the head ID is read before reading the tail ID.
+		 * Since the tail ID is updated before the head ID, this
+		 * guarantees that @id_prev_wrap is never ahead of the tail
+		 * ID. This pairs with desc_reserve:D.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If desc_reserve:A reads from desc_reserve:D, then
+		 * desc_reserve:C reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:D
+		 *    matching
+		 * RMB from desc_reserve:A to desc_reserve:C
+		 *
+		 * Note: desc_push_tail:B and desc_reserve:D can be different
+		 *       CPUs. However, the desc_reserve:D CPU (which performs
+		 *       the full memory barrier) must have previously seen
+		 *       desc_push_tail:B.
+		 */
+		smp_rmb(); /* LMM(desc_reserve:B) */
+
+		if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
+						    )) { /* LMM(desc_reserve:C) */
+			/*
+			 * Make space for the new descriptor by
+			 * advancing the tail.
+			 */
+			if (!desc_push_tail(rb, id_prev_wrap))
+				return false;
+		}
+
+		/*
+		 * 1. Guarantee the tail ID is read before validating the
+		 *    recycled descriptor state. A read memory barrier is
+		 *    sufficient for this. This pairs with desc_push_tail:B.
+		 *
+		 *    Memory barrier involvement:
+		 *
+		 *    If desc_reserve:C reads from desc_push_tail:B, then
+		 *    desc_reserve:E reads from desc_make_reusable:A.
+		 *
+		 *    Relies on:
+		 *
+		 *    MB from desc_make_reusable:A to desc_push_tail:B
+		 *       matching
+		 *    RMB from desc_reserve:C to desc_reserve:E
+		 *
+		 *    Note: desc_make_reusable:A and desc_push_tail:B can be
+		 *          different CPUs. However, the desc_push_tail:B CPU
+		 *          (which performs the full memory barrier) must have
+		 *          previously seen desc_make_reusable:A.
+		 *
+		 * 2. Guarantee the tail ID is stored before storing the head
+		 *    ID. This pairs with desc_reserve:B.
+		 *
+		 * 3. Guarantee any data ring tail changes are stored before
+		 *    recycling the descriptor. Data ring tail changes can
+		 *    happen via desc_push_tail()->data_push_tail(). A full
+		 *    memory barrier is needed since another CPU may have
+		 *    pushed the data ring tails. This pairs with
+		 *    data_push_tail:B.
+		 *
+		 * 4. Guarantee a new tail ID is stored before recycling the
+		 *    descriptor. A full memory barrier is needed since
+		 *    another CPU may have pushed the tail ID. This pairs
+		 *    with desc_push_tail:C and this also pairs with
+		 *    prb_first_seq:C.
+		 *
+		 * 5. Guarantee the head ID is stored before trying to
+		 *    finalize the previous descriptor. This pairs with
+		 *    _prb_commit:B.
+		 */
+	} while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id,
+					  id)); /* LMM(desc_reserve:D) */
+
+	desc = to_desc(desc_ring, id);
+
+	/*
+	 * If the descriptor has been recycled, verify the old state val.
+	 * See "ABA Issues" about why this verification is performed.
+	 */
+	prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
+	if (prev_state_val &&
+	    get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) {
+		WARN_ON_ONCE(1);
+		return false;
+	}
+
+	/*
+	 * Assign the descriptor a new ID and set its state to reserved.
+	 * See "ABA Issues" about why cmpxchg() instead of set() is used.
+	 *
+	 * Guarantee the new descriptor ID and state is stored before making
+	 * any other changes. A write memory barrier is sufficient for this.
+	 * This pairs with desc_read:D.
+	 */
+	if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
+			DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */
+		WARN_ON_ONCE(1);
+		return false;
+	}
+
+	/* Now data in @desc can be modified: LMM(desc_reserve:G) */
+
+	*id_out = id;
+	return true;
+}
+
+/* Determine the end of a data block. */
+static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
+				   unsigned long lpos, unsigned int size)
+{
+	unsigned long begin_lpos;
+	unsigned long next_lpos;
+
+	begin_lpos = lpos;
+	next_lpos = lpos + size;
+
+	/* First check if the data block does not wrap. */
+	if (DATA_WRAPS(data_ring, begin_lpos) == DATA_WRAPS(data_ring, next_lpos))
+		return next_lpos;
+
+	/* Wrapping data blocks store their data at the beginning. */
+	return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
+}
+
+/*
+ * Allocate a new data block, invalidating the oldest data block(s)
+ * if necessary. This function also associates the data block with
+ * a specified descriptor.
+ */
+static char *data_alloc(struct printk_ringbuffer *rb,
+			struct prb_data_ring *data_ring, unsigned int size,
+			struct prb_data_blk_lpos *blk_lpos, unsigned long id)
+{
+	struct prb_data_block *blk;
+	unsigned long begin_lpos;
+	unsigned long next_lpos;
+
+	if (size == 0) {
+		/* Specify a data-less block. */
+		blk_lpos->begin = NO_LPOS;
+		blk_lpos->next = NO_LPOS;
+		return NULL;
+	}
+
+	size = to_blk_size(size);
+
+	begin_lpos = atomic_long_read(&data_ring->head_lpos);
+
+	do {
+		next_lpos = get_next_lpos(data_ring, begin_lpos, size);
+
+		if (!data_push_tail(rb, data_ring, next_lpos - DATA_SIZE(data_ring))) {
+			/* Failed to allocate, specify a data-less block. */
+			blk_lpos->begin = FAILED_LPOS;
+			blk_lpos->next = FAILED_LPOS;
+			return NULL;
+		}
+
+		/*
+		 * 1. Guarantee any descriptor states that have transitioned
+		 *    to reusable are stored before modifying the newly
+		 *    allocated data area. A full memory barrier is needed
+		 *    since other CPUs may have made the descriptor states
+		 *    reusable. See data_push_tail:A about why the reusable
+		 *    states are visible. This pairs with desc_read:D.
+		 *
+		 * 2. Guarantee any updated tail lpos is stored before
+		 *    modifying the newly allocated data area. Another CPU may
+		 *    be in data_make_reusable() and is reading a block ID
+		 *    from this area. data_make_reusable() can handle reading
+		 *    a garbage block ID value, but then it must be able to
+		 *    load a new tail lpos. A full memory barrier is needed
+		 *    since other CPUs may have updated the tail lpos. This
+		 *    pairs with data_push_tail:B.
+		 */
+	} while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
+					  next_lpos)); /* LMM(data_alloc:A) */
+
+	blk = to_block(data_ring, begin_lpos);
+	blk->id = id; /* LMM(data_alloc:B) */
+
+	if (DATA_WRAPS(data_ring, begin_lpos) != DATA_WRAPS(data_ring, next_lpos)) {
+		/* Wrapping data blocks store their data at the beginning. */
+		blk = to_block(data_ring, 0);
+
+		/*
+		 * Store the ID on the wrapped block for consistency.
+		 * The printk_ringbuffer does not actually use it.
+		 */
+		blk->id = id;
+	}
+
+	blk_lpos->begin = begin_lpos;
+	blk_lpos->next = next_lpos;
+
+	return &blk->data[0];
+}
+
+/*
+ * Try to resize an existing data block associated with the descriptor
+ * specified by @id. If the resized data block should become wrapped, it
+ * copies the old data to the new data block. If @size yields a data block
+ * with the same or less size, the data block is left as is.
+ *
+ * Fail if this is not the last allocated data block or if there is not
+ * enough space or it is not possible make enough space.
+ *
+ * Return a pointer to the beginning of the entire data buffer or NULL on
+ * failure.
+ */
+static char *data_realloc(struct printk_ringbuffer *rb,
+			  struct prb_data_ring *data_ring, unsigned int size,
+			  struct prb_data_blk_lpos *blk_lpos, unsigned long id)
+{
+	struct prb_data_block *blk;
+	unsigned long head_lpos;
+	unsigned long next_lpos;
+	bool wrapped;
+
+	/* Reallocation only works if @blk_lpos is the newest data block. */
+	head_lpos = atomic_long_read(&data_ring->head_lpos);
+	if (head_lpos != blk_lpos->next)
+		return NULL;
+
+	/* Keep track if @blk_lpos was a wrapping data block. */
+	wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
+
+	size = to_blk_size(size);
+
+	next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
+
+	/* If the data block does not increase, there is nothing to do. */
+	if (head_lpos - next_lpos < DATA_SIZE(data_ring)) {
+		if (wrapped)
+			blk = to_block(data_ring, 0);
+		else
+			blk = to_block(data_ring, blk_lpos->begin);
+		return &blk->data[0];
+	}
+
+	if (!data_push_tail(rb, data_ring, next_lpos - DATA_SIZE(data_ring)))
+		return NULL;
+
+	/* The memory barrier involvement is the same as data_alloc:A. */
+	if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
+				     next_lpos)) { /* LMM(data_realloc:A) */
+		return NULL;
+	}
+
+	blk = to_block(data_ring, blk_lpos->begin);
+
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
+		struct prb_data_block *old_blk = blk;
+
+		/* Wrapping data blocks store their data at the beginning. */
+		blk = to_block(data_ring, 0);
+
+		/*
+		 * Store the ID on the wrapped block for consistency.
+		 * The printk_ringbuffer does not actually use it.
+		 */
+		blk->id = id;
+
+		if (!wrapped) {
+			/*
+			 * Since the allocated space is now in the newly
+			 * created wrapping data block, copy the content
+			 * from the old data block.
+			 */
+			memcpy(&blk->data[0], &old_blk->data[0],
+			       (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
+		}
+	}
+
+	blk_lpos->next = next_lpos;
+
+	return &blk->data[0];
+}
+
+/* Return the number of bytes used by a data block. */
+static unsigned int space_used(struct prb_data_ring *data_ring,
+			       struct prb_data_blk_lpos *blk_lpos)
+{
+	/* Data-less blocks take no space. */
+	if (BLK_DATALESS(blk_lpos))
+		return 0;
+
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next)) {
+		/* Data block does not wrap. */
+		return (DATA_INDEX(data_ring, blk_lpos->next) -
+			DATA_INDEX(data_ring, blk_lpos->begin));
+	}
+
+	/*
+	 * For wrapping data blocks, the trailing (wasted) space is
+	 * also counted.
+	 */
+	return (DATA_INDEX(data_ring, blk_lpos->next) +
+		DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin));
+}
+
+/*
+ * Given @blk_lpos, return a pointer to the writer data from the data block
+ * and calculate the size of the data part. A NULL pointer is returned if
+ * @blk_lpos specifies values that could never be legal.
+ *
+ * This function (used by readers) performs strict validation on the lpos
+ * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
+ * triggered if an internal error is detected.
+ */
+static const char *get_data(struct prb_data_ring *data_ring,
+			    struct prb_data_blk_lpos *blk_lpos,
+			    unsigned int *data_size)
+{
+	struct prb_data_block *db;
+
+	/* Data-less data block description. */
+	if (BLK_DATALESS(blk_lpos)) {
+		if (blk_lpos->begin == NO_LPOS && blk_lpos->next == NO_LPOS) {
+			*data_size = 0;
+			return "";
+		}
+		return NULL;
+	}
+
+	/* Regular data block: @begin less than @next and in same wrap. */
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next) &&
+	    blk_lpos->begin < blk_lpos->next) {
+		db = to_block(data_ring, blk_lpos->begin);
+		*data_size = blk_lpos->next - blk_lpos->begin;
+
+	/* Wrapping data block: @begin is one wrap behind @next. */
+	} else if (DATA_WRAPS(data_ring, blk_lpos->begin + DATA_SIZE(data_ring)) ==
+		   DATA_WRAPS(data_ring, blk_lpos->next)) {
+		db = to_block(data_ring, 0);
+		*data_size = DATA_INDEX(data_ring, blk_lpos->next);
+
+	/* Illegal block description. */
+	} else {
+		WARN_ON_ONCE(1);
+		return NULL;
+	}
+
+	/* A valid data block will always be aligned to the ID size. */
+	if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) ||
+	    WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) {
+		return NULL;
+	}
+
+	/* A valid data block will always have at least an ID. */
+	if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
+		return NULL;
+
+	/* Subtract block ID space from size to reflect data size. */
+	*data_size -= sizeof(db->id);
+
+	return &db->data[0];
+}
+
+/*
+ * Attempt to transition the newest descriptor from committed back to reserved
+ * so that the record can be modified by a writer again. This is only possible
+ * if the descriptor is not yet finalized and the provided @caller_id matches.
+ */
+static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
+					 u32 caller_id, unsigned long *id_out)
+{
+	unsigned long prev_state_val;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	struct prb_desc *d;
+	unsigned long id;
+	u32 cid;
+
+	id = atomic_long_read(&desc_ring->head_id);
+
+	/*
+	 * To reduce unnecessarily reopening, first check if the descriptor
+	 * state and caller ID are correct.
+	 */
+	d_state = desc_read(desc_ring, id, &desc, NULL, &cid);
+	if (d_state != desc_committed || cid != caller_id)
+		return NULL;
+
+	d = to_desc(desc_ring, id);
+
+	prev_state_val = DESC_SV(id, desc_committed);
+
+	/*
+	 * Guarantee the reserved state is stored before reading any
+	 * record data. A full memory barrier is needed because @state_var
+	 * modification is followed by reading. This pairs with _prb_commit:B.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_reopen_last:A reads from _prb_commit:B, then
+	 * prb_reserve_in_last:A reads from _prb_commit:A.
+	 *
+	 * Relies on:
+	 *
+	 * WMB from _prb_commit:A to _prb_commit:B
+	 *    matching
+	 * MB If desc_reopen_last:A to prb_reserve_in_last:A
+	 */
+	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+			DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */
+		return NULL;
+	}
+
+	*id_out = id;
+	return d;
+}
+
+/**
+ * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer
+ *                         used by the newest record.
+ *
+ * @e:         The entry structure to setup.
+ * @rb:        The ringbuffer to re-reserve and extend data in.
+ * @r:         The record structure to allocate buffers for.
+ * @caller_id: The caller ID of the caller (reserving writer).
+ * @max_size:  Fail if the extended size would be greater than this.
+ *
+ * This is the public function available to writers to re-reserve and extend
+ * data.
+ *
+ * The writer specifies the text size to extend (not the new total size) by
+ * setting the @text_buf_size field of @r. To ensure proper initialization
+ * of @r, prb_rec_init_wr() should be used.
+ *
+ * This function will fail if @caller_id does not match the caller ID of the
+ * newest record. In that case the caller must reserve new data using
+ * prb_reserve().
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if text data could be extended, otherwise false.
+ *
+ * On success:
+ *
+ *   - @r->text_buf points to the beginning of the entire text buffer.
+ *
+ *   - @r->text_buf_size is set to the new total size of the buffer.
+ *
+ *   - @r->info is not touched so that @r->info->text_len could be used
+ *     to append the text.
+ *
+ *   - prb_record_text_space() can be used on @e to query the new
+ *     actually used space.
+ *
+ * Important: All @r->info fields will already be set with the current values
+ *            for the record. I.e. @r->info->text_len will be less than
+ *            @text_buf_size. Writers can use @r->info->text_len to know
+ *            where concatenation begins and writers should update
+ *            @r->info->text_len after concatenating.
+ */
+bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+			 struct printk_record *r, u32 caller_id, unsigned int max_size)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct printk_info *info;
+	unsigned int data_size;
+	struct prb_desc *d;
+	unsigned long id;
+
+	local_irq_save(e->irqflags);
+
+	/* Transition the newest descriptor back to the reserved state. */
+	d = desc_reopen_last(desc_ring, caller_id, &id);
+	if (!d) {
+		local_irq_restore(e->irqflags);
+		goto fail_reopen;
+	}
+
+	/* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */
+
+	info = to_info(desc_ring, id);
+
+	/*
+	 * Set the @e fields here so that prb_commit() can be used if
+	 * anything fails from now on.
+	 */
+	e->rb = rb;
+	e->id = id;
+
+	/*
+	 * desc_reopen_last() checked the caller_id, but there was no
+	 * exclusive access at that point. The descriptor may have
+	 * changed since then.
+	 */
+	if (caller_id != info->caller_id)
+		goto fail;
+
+	if (BLK_DATALESS(&d->text_blk_lpos)) {
+		if (WARN_ON_ONCE(info->text_len != 0)) {
+			pr_warn_once("wrong text_len value (%hu, expecting 0)\n",
+				     info->text_len);
+			info->text_len = 0;
+		}
+
+		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+			goto fail;
+
+		if (r->text_buf_size > max_size)
+			goto fail;
+
+		r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+					 &d->text_blk_lpos, id);
+	} else {
+		if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size))
+			goto fail;
+
+		/*
+		 * Increase the buffer size to include the original size. If
+		 * the meta data (@text_len) is not sane, use the full data
+		 * block size.
+		 */
+		if (WARN_ON_ONCE(info->text_len > data_size)) {
+			pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n",
+				     info->text_len, data_size);
+			info->text_len = data_size;
+		}
+		r->text_buf_size += info->text_len;
+
+		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+			goto fail;
+
+		if (r->text_buf_size > max_size)
+			goto fail;
+
+		r->text_buf = data_realloc(rb, &rb->text_data_ring, r->text_buf_size,
+					   &d->text_blk_lpos, id);
+	}
+	if (r->text_buf_size && !r->text_buf)
+		goto fail;
+
+	r->info = info;
+
+	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
+
+	return true;
+fail:
+	prb_commit(e);
+	/* prb_commit() re-enabled interrupts. */
+fail_reopen:
+	/* Make it clear to the caller that the re-reserve failed. */
+	memset(r, 0, sizeof(*r));
+	return false;
+}
+
+/*
+ * Attempt to finalize a specified descriptor. If this fails, the descriptor
+ * is either already final or it will finalize itself when the writer commits.
+ */
+static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
+{
+	unsigned long prev_state_val = DESC_SV(id, desc_committed);
+	struct prb_desc *d = to_desc(desc_ring, id);
+
+	atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
+			DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
+}
+
+/**
+ * prb_reserve() - Reserve space in the ringbuffer.
+ *
+ * @e:  The entry structure to setup.
+ * @rb: The ringbuffer to reserve data in.
+ * @r:  The record structure to allocate buffers for.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * The writer specifies the text size to reserve by setting the
+ * @text_buf_size field of @r. To ensure proper initialization of @r,
+ * prb_rec_init_wr() should be used.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if at least text data could be allocated, otherwise false.
+ *
+ * On success, the fields @info and @text_buf of @r will be set by this
+ * function and should be filled in by the writer before committing. Also
+ * on success, prb_record_text_space() can be used on @e to query the actual
+ * space used for the text data block.
+ *
+ * Important: @info->text_len needs to be set correctly by the writer in
+ *            order for data to be readable and/or extended. Its value
+ *            is initialized to 0.
+ */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		 struct printk_record *r)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct printk_info *info;
+	struct prb_desc *d;
+	unsigned long id;
+	u64 seq;
+
+	if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+		goto fail;
+
+	/*
+	 * Descriptors in the reserved state act as blockers to all further
+	 * reservations once the desc_ring has fully wrapped. Disable
+	 * interrupts during the reserve/commit window in order to minimize
+	 * the likelihood of this happening.
+	 */
+	local_irq_save(e->irqflags);
+
+	if (!desc_reserve(rb, &id)) {
+		/* Descriptor reservation failures are tracked. */
+		atomic_long_inc(&rb->fail);
+		local_irq_restore(e->irqflags);
+		goto fail;
+	}
+
+	d = to_desc(desc_ring, id);
+	info = to_info(desc_ring, id);
+
+	/*
+	 * All @info fields (except @seq) are cleared and must be filled in
+	 * by the writer. Save @seq before clearing because it is used to
+	 * determine the new sequence number.
+	 */
+	seq = info->seq;
+	memset(info, 0, sizeof(*info));
+
+	/*
+	 * Set the @e fields here so that prb_commit() can be used if
+	 * text data allocation fails.
+	 */
+	e->rb = rb;
+	e->id = id;
+
+	/*
+	 * Initialize the sequence number if it has "never been set".
+	 * Otherwise just increment it by a full wrap.
+	 *
+	 * @seq is considered "never been set" if it has a value of 0,
+	 * _except_ for @infos[0], which was specially setup by the ringbuffer
+	 * initializer and therefore is always considered as set.
+	 *
+	 * See the "Bootstrap" comment block in printk_ringbuffer.h for
+	 * details about how the initializer bootstraps the descriptors.
+	 */
+	if (seq == 0 && DESC_INDEX(desc_ring, id) != 0)
+		info->seq = DESC_INDEX(desc_ring, id);
+	else
+		info->seq = seq + DESCS_COUNT(desc_ring);
+
+	/*
+	 * New data is about to be reserved. Once that happens, previous
+	 * descriptors are no longer able to be extended. Finalize the
+	 * previous descriptor now so that it can be made available to
+	 * readers. (For seq==0 there is no previous descriptor.)
+	 */
+	if (info->seq > 0)
+		desc_make_final(desc_ring, DESC_ID(id - 1));
+
+	r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+				 &d->text_blk_lpos, id);
+	/* If text data allocation fails, a data-less record is committed. */
+	if (r->text_buf_size && !r->text_buf) {
+		prb_commit(e);
+		/* prb_commit() re-enabled interrupts. */
+		goto fail;
+	}
+
+	r->info = info;
+
+	/* Record full text space used by record. */
+	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
+
+	return true;
+fail:
+	/* Make it clear to the caller that the reserve failed. */
+	memset(r, 0, sizeof(*r));
+	return false;
+}
+
+/* Commit the data (possibly finalizing it) and restore interrupts. */
+static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val)
+{
+	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+	struct prb_desc *d = to_desc(desc_ring, e->id);
+	unsigned long prev_state_val = DESC_SV(e->id, desc_reserved);
+
+	/* Now the writer has finished all writing: LMM(_prb_commit:A) */
+
+	/*
+	 * Set the descriptor as committed. See "ABA Issues" about why
+	 * cmpxchg() instead of set() is used.
+	 *
+	 * 1  Guarantee all record data is stored before the descriptor state
+	 *    is stored as committed. A write memory barrier is sufficient
+	 *    for this. This pairs with desc_read:B and desc_reopen_last:A.
+	 *
+	 * 2. Guarantee the descriptor state is stored as committed before
+	 *    re-checking the head ID in order to possibly finalize this
+	 *    descriptor. This pairs with desc_reserve:D.
+	 *
+	 *    Memory barrier involvement:
+	 *
+	 *    If prb_commit:A reads from desc_reserve:D, then
+	 *    desc_make_final:A reads from _prb_commit:B.
+	 *
+	 *    Relies on:
+	 *
+	 *    MB _prb_commit:B to prb_commit:A
+	 *       matching
+	 *    MB desc_reserve:D to desc_make_final:A
+	 */
+	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+			DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */
+		WARN_ON_ONCE(1);
+	}
+
+	/* Restore interrupts, the reserve/commit window is finished. */
+	local_irq_restore(e->irqflags);
+}
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Note that the data is not yet available to readers until it is finalized.
+ * Finalizing happens automatically when space for the next record is
+ * reserved.
+ *
+ * See prb_final_commit() for a version of this function that finalizes
+ * immediately.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+	unsigned long head_id;
+
+	_prb_commit(e, desc_committed);
+
+	/*
+	 * If this descriptor is no longer the head (i.e. a new record has
+	 * been allocated), extending the data for this record is no longer
+	 * allowed and therefore it must be finalized.
+	 */
+	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
+	if (head_id != e->id)
+		desc_make_final(desc_ring, e->id);
+}
+
+/**
+ * prb_final_commit() - Commit and finalize (previously reserved) data to
+ *                      the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit+finalize data.
+ *
+ * By finalizing, the data is made immediately available to readers.
+ *
+ * This function should only be used if there are no intentions of extending
+ * this data using prb_reserve_in_last().
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_final_commit(struct prb_reserved_entry *e)
+{
+	_prb_commit(e, desc_finalized);
+}
+
+/*
+ * Count the number of lines in provided text. All text has at least 1 line
+ * (even if @text_size is 0). Each '\n' processed is counted as an additional
+ * line.
+ */
+static unsigned int count_lines(const char *text, unsigned int text_size)
+{
+	unsigned int next_size = text_size;
+	unsigned int line_count = 1;
+	const char *next = text;
+
+	while (next_size) {
+		next = memchr(next, '\n', next_size);
+		if (!next)
+			break;
+		line_count++;
+		next++;
+		next_size = text_size - (next - text);
+	}
+
+	return line_count;
+}
+
+/*
+ * Given @blk_lpos, copy an expected @len of data into the provided buffer.
+ * If @line_count is provided, count the number of lines in the data.
+ *
+ * This function (used by readers) performs strict validation on the data
+ * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
+ * triggered if an internal error is detected.
+ */
+static bool copy_data(struct prb_data_ring *data_ring,
+		      struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
+		      unsigned int buf_size, unsigned int *line_count)
+{
+	unsigned int data_size;
+	const char *data;
+
+	/* Caller might not want any data. */
+	if ((!buf || !buf_size) && !line_count)
+		return true;
+
+	data = get_data(data_ring, blk_lpos, &data_size);
+	if (!data)
+		return false;
+
+	/*
+	 * Actual cannot be less than expected. It can be more than expected
+	 * because of the trailing alignment padding.
+	 *
+	 * Note that invalid @len values can occur because the caller loads
+	 * the value during an allowed data race.
+	 */
+	if (data_size < (unsigned int)len)
+		return false;
+
+	/* Caller interested in the line count? */
+	if (line_count)
+		*line_count = count_lines(data, data_size);
+
+	/* Caller interested in the data content? */
+	if (!buf || !buf_size)
+		return true;
+
+	data_size = min_t(u16, buf_size, len);
+
+	memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */
+	return true;
+}
+
+/*
+ * This is an extended version of desc_read(). It gets a copy of a specified
+ * descriptor. However, it also verifies that the record is finalized and has
+ * the sequence number @seq. On success, 0 is returned.
+ *
+ * Error return values:
+ * -EINVAL: A finalized record with sequence number @seq does not exist.
+ * -ENOENT: A finalized record with sequence number @seq exists, but its data
+ *          is not available. This is a valid record, so readers should
+ *          continue with the next record.
+ */
+static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring,
+				   unsigned long id, u64 seq,
+				   struct prb_desc *desc_out)
+{
+	struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos;
+	enum desc_state d_state;
+	u64 s;
+
+	d_state = desc_read(desc_ring, id, desc_out, &s, NULL);
+
+	/*
+	 * An unexpected @id (desc_miss) or @seq mismatch means the record
+	 * does not exist. A descriptor in the reserved or committed state
+	 * means the record does not yet exist for the reader.
+	 */
+	if (d_state == desc_miss ||
+	    d_state == desc_reserved ||
+	    d_state == desc_committed ||
+	    s != seq) {
+		return -EINVAL;
+	}
+
+	/*
+	 * A descriptor in the reusable state may no longer have its data
+	 * available; report it as existing but with lost data. Or the record
+	 * may actually be a record with lost data.
+	 */
+	if (d_state == desc_reusable ||
+	    (blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) {
+		return -ENOENT;
+	}
+
+	return 0;
+}
+
+/*
+ * Copy the ringbuffer data from the record with @seq to the provided
+ * @r buffer. On success, 0 is returned.
+ *
+ * See desc_read_finalized_seq() for error return values.
+ */
+static int prb_read(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r, unsigned int *line_count)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct printk_info *info = to_info(desc_ring, seq);
+	struct prb_desc *rdesc = to_desc(desc_ring, seq);
+	atomic_long_t *state_var = &rdesc->state_var;
+	struct prb_desc desc;
+	unsigned long id;
+	int err;
+
+	/* Extract the ID, used to specify the descriptor to read. */
+	id = DESC_ID(atomic_long_read(state_var));
+
+	/* Get a local copy of the correct descriptor (if available). */
+	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
+
+	/*
+	 * If @r is NULL, the caller is only interested in the availability
+	 * of the record.
+	 */
+	if (err || !r)
+		return err;
+
+	/* If requested, copy meta data. */
+	if (r->info)
+		memcpy(r->info, info, sizeof(*(r->info)));
+
+	/* Copy text data. If it fails, this is a data-less record. */
+	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
+		       r->text_buf, r->text_buf_size, line_count)) {
+		return -ENOENT;
+	}
+
+	/* Ensure the record is still finalized and has the same @seq. */
+	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
+}
+
+/* Get the sequence number of the tail descriptor. */
+static u64 prb_first_seq(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	unsigned long id;
+	u64 seq;
+
+	for (;;) {
+		id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */
+
+		d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */
+
+		/*
+		 * This loop will not be infinite because the tail is
+		 * _always_ in the finalized or reusable state.
+		 */
+		if (d_state == desc_finalized || d_state == desc_reusable)
+			break;
+
+		/*
+		 * Guarantee the last state load from desc_read() is before
+		 * reloading @tail_id in order to see a new tail in the case
+		 * that the descriptor has been recycled. This pairs with
+		 * desc_reserve:D.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If prb_first_seq:B reads from desc_reserve:F, then
+		 * prb_first_seq:A reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:F
+		 *    matching
+		 * RMB prb_first_seq:B to prb_first_seq:A
+		 */
+		smp_rmb(); /* LMM(prb_first_seq:C) */
+	}
+
+	return seq;
+}
+
+/*
+ * Non-blocking read of a record. Updates @seq to the last finalized record
+ * (which may have no data available).
+ *
+ * See the description of prb_read_valid() and prb_read_valid_info()
+ * for details.
+ */
+static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
+			    struct printk_record *r, unsigned int *line_count)
+{
+	u64 tail_seq;
+	int err;
+
+	while ((err = prb_read(rb, *seq, r, line_count))) {
+		tail_seq = prb_first_seq(rb);
+
+		if (*seq < tail_seq) {
+			/*
+			 * Behind the tail. Catch up and try again. This
+			 * can happen for -ENOENT and -EINVAL cases.
+			 */
+			*seq = tail_seq;
+
+		} else if (err == -ENOENT) {
+			/* Record exists, but no data available. Skip. */
+			(*seq)++;
+
+		} else {
+			/* Non-existent/non-finalized record. Must stop. */
+			return false;
+		}
+	}
+
+	return true;
+}
+
+/**
+ * prb_read_valid() - Non-blocking read of a requested record or (if gone)
+ *                    the next available record.
+ *
+ * @rb:  The ringbuffer to read from.
+ * @seq: The sequence number of the record to read.
+ * @r:   A record data buffer to store the read record to.
+ *
+ * This is the public function available to readers to read a record.
+ *
+ * The reader provides the @info and @text_buf buffers of @r to be
+ * filled in. Any of the buffer pointers can be set to NULL if the reader
+ * is not interested in that data. To ensure proper initialization of @r,
+ * prb_rec_init_rd() should be used.
+ *
+ * Context: Any context.
+ * Return: true if a record was read, otherwise false.
+ *
+ * On success, the reader must check r->info.seq to see which record was
+ * actually read. This allows the reader to detect dropped records.
+ *
+ * Failure means @seq refers to a not yet written record.
+ */
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r)
+{
+	return _prb_read_valid(rb, &seq, r, NULL);
+}
+
+/**
+ * prb_read_valid_info() - Non-blocking read of meta data for a requested
+ *                         record or (if gone) the next available record.
+ *
+ * @rb:         The ringbuffer to read from.
+ * @seq:        The sequence number of the record to read.
+ * @info:       A buffer to store the read record meta data to.
+ * @line_count: A buffer to store the number of lines in the record text.
+ *
+ * This is the public function available to readers to read only the
+ * meta data of a record.
+ *
+ * The reader provides the @info, @line_count buffers to be filled in.
+ * Either of the buffer pointers can be set to NULL if the reader is not
+ * interested in that data.
+ *
+ * Context: Any context.
+ * Return: true if a record's meta data was read, otherwise false.
+ *
+ * On success, the reader must check info->seq to see which record meta data
+ * was actually read. This allows the reader to detect dropped records.
+ *
+ * Failure means @seq refers to a not yet written record.
+ */
+bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
+			 struct printk_info *info, unsigned int *line_count)
+{
+	struct printk_record r;
+
+	prb_rec_init_rd(&r, info, NULL, 0);
+
+	return _prb_read_valid(rb, &seq, &r, line_count);
+}
+
+/**
+ * prb_first_valid_seq() - Get the sequence number of the oldest available
+ *                         record.
+ *
+ * @rb: The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what the
+ * first/oldest valid sequence number is.
+ *
+ * This provides readers a starting point to begin iterating the ringbuffer.
+ *
+ * Context: Any context.
+ * Return: The sequence number of the first/oldest record or, if the
+ *         ringbuffer is empty, 0 is returned.
+ */
+u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
+{
+	u64 seq = 0;
+
+	if (!_prb_read_valid(rb, &seq, NULL, NULL))
+		return 0;
+
+	return seq;
+}
+
+/**
+ * prb_next_seq() - Get the sequence number after the last available record.
+ *
+ * @rb:  The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what the next
+ * newest sequence number available to readers will be.
+ *
+ * This provides readers a sequence number to jump to if all currently
+ * available records should be skipped.
+ *
+ * Context: Any context.
+ * Return: The sequence number of the next newest (not yet available) record
+ *         for readers.
+ */
+u64 prb_next_seq(struct printk_ringbuffer *rb)
+{
+	u64 seq = 0;
+
+	/* Search forward from the oldest descriptor. */
+	while (_prb_read_valid(rb, &seq, NULL, NULL))
+		seq++;
+
+	return seq;
+}
+
+/**
+ * prb_init() - Initialize a ringbuffer to use provided external buffers.
+ *
+ * @rb:       The ringbuffer to initialize.
+ * @text_buf: The data buffer for text data.
+ * @textbits: The size of @text_buf as a power-of-2 value.
+ * @descs:    The descriptor buffer for ringbuffer records.
+ * @descbits: The count of @descs items as a power-of-2 value.
+ * @infos:    The printk_info buffer for ringbuffer records.
+ *
+ * This is the public function available to writers to setup a ringbuffer
+ * during runtime using provided buffers.
+ *
+ * This must match the initialization of DEFINE_PRINTKRB().
+ *
+ * Context: Any context.
+ */
+void prb_init(struct printk_ringbuffer *rb,
+	      char *text_buf, unsigned int textbits,
+	      struct prb_desc *descs, unsigned int descbits,
+	      struct printk_info *infos)
+{
+	memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0]));
+	memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0]));
+
+	rb->desc_ring.count_bits = descbits;
+	rb->desc_ring.descs = descs;
+	rb->desc_ring.infos = infos;
+	atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
+	atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
+
+	rb->text_data_ring.size_bits = textbits;
+	rb->text_data_ring.data = text_buf;
+	atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits));
+	atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits));
+
+	atomic_long_set(&rb->fail, 0);
+
+	atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits));
+	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS;
+	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS;
+
+	infos[0].seq = -(u64)_DESCS_COUNT(descbits);
+	infos[_DESCS_COUNT(descbits) - 1].seq = 0;
+}
+
+/**
+ * prb_record_text_space() - Query the full actual used ringbuffer space for
+ *                           the text data of a reserved entry.
+ *
+ * @e: The successfully reserved entry to query.
+ *
+ * This is the public function available to writers to see how much actual
+ * space is used in the ringbuffer to store the text data of the specified
+ * entry.
+ *
+ * This function is only valid if @e has been successfully reserved using
+ * prb_reserve().
+ *
+ * Context: Any context.
+ * Return: The size in bytes used by the text data of the associated record.
+ */
+unsigned int prb_record_text_space(struct prb_reserved_entry *e)
+{
+	return e->text_space;
+}
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
new file mode 100644
index 0000000000000..5dc9d022db070
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.h
@@ -0,0 +1,382 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_RINGBUFFER_H
+#define _KERNEL_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+#include <linux/dev_printk.h>
+
+/*
+ * Meta information about each stored message.
+ *
+ * All fields are set by the printk code except for @seq, which is
+ * set by the ringbuffer code.
+ */
+struct printk_info {
+	u64	seq;		/* sequence number */
+	u64	ts_nsec;	/* timestamp in nanoseconds */
+	u16	text_len;	/* length of text message */
+	u8	facility;	/* syslog facility */
+	u8	flags:5;	/* internal record flags */
+	u8	level:3;	/* syslog level */
+	u32	caller_id;	/* thread id or processor id */
+
+	struct dev_printk_info	dev_info;
+};
+
+/*
+ * A structure providing the buffers, used by writers and readers.
+ *
+ * Writers:
+ * Using prb_rec_init_wr(), a writer sets @text_buf_size before calling
+ * prb_reserve(). On success, prb_reserve() sets @info and @text_buf to
+ * buffers reserved for that writer.
+ *
+ * Readers:
+ * Using prb_rec_init_rd(), a reader sets all fields before calling
+ * prb_read_valid(). Note that the reader provides the @info and @text_buf,
+ * buffers. On success, the struct pointed to by @info will be filled and
+ * the char array pointed to by @text_buf will be filled with text data.
+ */
+struct printk_record {
+	struct printk_info	*info;
+	char			*text_buf;
+	unsigned int		text_buf_size;
+};
+
+/* Specifies the logical position and span of a data block. */
+struct prb_data_blk_lpos {
+	unsigned long	begin;
+	unsigned long	next;
+};
+
+/*
+ * A descriptor: the complete meta-data for a record.
+ *
+ * @state_var: A bitwise combination of descriptor ID and descriptor state.
+ */
+struct prb_desc {
+	atomic_long_t			state_var;
+	struct prb_data_blk_lpos	text_blk_lpos;
+};
+
+/* A ringbuffer of "ID + data" elements. */
+struct prb_data_ring {
+	unsigned int	size_bits;
+	char		*data;
+	atomic_long_t	head_lpos;
+	atomic_long_t	tail_lpos;
+};
+
+/* A ringbuffer of "struct prb_desc" elements. */
+struct prb_desc_ring {
+	unsigned int		count_bits;
+	struct prb_desc		*descs;
+	struct printk_info	*infos;
+	atomic_long_t		head_id;
+	atomic_long_t		tail_id;
+};
+
+/*
+ * The high level structure representing the printk ringbuffer.
+ *
+ * @fail: Count of failed prb_reserve() calls where not even a data-less
+ *        record was created.
+ */
+struct printk_ringbuffer {
+	struct prb_desc_ring	desc_ring;
+	struct prb_data_ring	text_data_ring;
+	atomic_long_t		fail;
+};
+
+/*
+ * Used by writers as a reserve/commit handle.
+ *
+ * @rb:         Ringbuffer where the entry is reserved.
+ * @irqflags:   Saved irq flags to restore on entry commit.
+ * @id:         ID of the reserved descriptor.
+ * @text_space: Total occupied buffer space in the text data ring, including
+ *              ID, alignment padding, and wrapping data blocks.
+ *
+ * This structure is an opaque handle for writers. Its contents are only
+ * to be used by the ringbuffer implementation.
+ */
+struct prb_reserved_entry {
+	struct printk_ringbuffer	*rb;
+	unsigned long			irqflags;
+	unsigned long			id;
+	unsigned int			text_space;
+};
+
+/* The possible responses of a descriptor state-query. */
+enum desc_state {
+	desc_miss	=  -1,	/* ID mismatch (pseudo state) */
+	desc_reserved	= 0x0,	/* reserved, in use by writer */
+	desc_committed	= 0x1,	/* committed by writer, could get reopened */
+	desc_finalized	= 0x2,	/* committed, no further modification allowed */
+	desc_reusable	= 0x3,	/* free, not yet used by any writer */
+};
+
+#define _DATA_SIZE(sz_bits)	(1UL << (sz_bits))
+#define _DESCS_COUNT(ct_bits)	(1U << (ct_bits))
+#define DESC_SV_BITS		(sizeof(unsigned long) * 8)
+#define DESC_FLAGS_SHIFT	(DESC_SV_BITS - 2)
+#define DESC_FLAGS_MASK		(3UL << DESC_FLAGS_SHIFT)
+#define DESC_STATE(sv)		(3UL & (sv >> DESC_FLAGS_SHIFT))
+#define DESC_SV(id, state)	(((unsigned long)state << DESC_FLAGS_SHIFT) | id)
+#define DESC_ID_MASK		(~DESC_FLAGS_MASK)
+#define DESC_ID(sv)		((sv) & DESC_ID_MASK)
+#define FAILED_LPOS		0x1
+#define NO_LPOS			0x3
+
+#define FAILED_BLK_LPOS	\
+{				\
+	.begin	= FAILED_LPOS,	\
+	.next	= FAILED_LPOS,	\
+}
+
+/*
+ * Descriptor Bootstrap
+ *
+ * The descriptor array is minimally initialized to allow immediate usage
+ * by readers and writers. The requirements that the descriptor array
+ * initialization must satisfy:
+ *
+ *   Req1
+ *     The tail must point to an existing (committed or reusable) descriptor.
+ *     This is required by the implementation of prb_first_seq().
+ *
+ *   Req2
+ *     Readers must see that the ringbuffer is initially empty.
+ *
+ *   Req3
+ *     The first record reserved by a writer is assigned sequence number 0.
+ *
+ * To satisfy Req1, the tail initially points to a descriptor that is
+ * minimally initialized (having no data block, i.e. data-less with the
+ * data block's lpos @begin and @next values set to FAILED_LPOS).
+ *
+ * To satisfy Req2, the initial tail descriptor is initialized to the
+ * reusable state. Readers recognize reusable descriptors as existing
+ * records, but skip over them.
+ *
+ * To satisfy Req3, the last descriptor in the array is used as the initial
+ * head (and tail) descriptor. This allows the first record reserved by a
+ * writer (head + 1) to be the first descriptor in the array. (Only the first
+ * descriptor in the array could have a valid sequence number of 0.)
+ *
+ * The first time a descriptor is reserved, it is assigned a sequence number
+ * with the value of the array index. A "first time reserved" descriptor can
+ * be recognized because it has a sequence number of 0 but does not have an
+ * index of 0. (Only the first descriptor in the array could have a valid
+ * sequence number of 0.) After the first reservation, all future reservations
+ * (recycling) simply involve incrementing the sequence number by the array
+ * count.
+ *
+ *   Hack #1
+ *     Only the first descriptor in the array is allowed to have the sequence
+ *     number 0. In this case it is not possible to recognize if it is being
+ *     reserved the first time (set to index value) or has been reserved
+ *     previously (increment by the array count). This is handled by _always_
+ *     incrementing the sequence number by the array count when reserving the
+ *     first descriptor in the array. In order to satisfy Req3, the sequence
+ *     number of the first descriptor in the array is initialized to minus
+ *     the array count. Then, upon the first reservation, it is incremented
+ *     to 0, thus satisfying Req3.
+ *
+ *   Hack #2
+ *     prb_first_seq() can be called at any time by readers to retrieve the
+ *     sequence number of the tail descriptor. However, due to Req2 and Req3,
+ *     initially there are no records to report the sequence number of
+ *     (sequence numbers are u64 and there is nothing less than 0). To handle
+ *     this, the sequence number of the initial tail descriptor is initialized
+ *     to 0. Technically this is incorrect, because there is no record with
+ *     sequence number 0 (yet) and the tail descriptor is not the first
+ *     descriptor in the array. But it allows prb_read_valid() to correctly
+ *     report the existence of a record for _any_ given sequence number at all
+ *     times. Bootstrapping is complete when the tail is pushed the first
+ *     time, thus finally pointing to the first descriptor reserved by a
+ *     writer, which has the assigned sequence number 0.
+ */
+
+/*
+ * Initiating Logical Value Overflows
+ *
+ * Both logical position (lpos) and ID values can be mapped to array indexes
+ * but may experience overflows during the lifetime of the system. To ensure
+ * that printk_ringbuffer can handle the overflows for these types, initial
+ * values are chosen that map to the correct initial array indexes, but will
+ * result in overflows soon.
+ *
+ *   BLK0_LPOS
+ *     The initial @head_lpos and @tail_lpos for data rings. It is at index
+ *     0 and the lpos value is such that it will overflow on the first wrap.
+ *
+ *   DESC0_ID
+ *     The initial @head_id and @tail_id for the desc ring. It is at the last
+ *     index of the descriptor array (see Req3 above) and the ID value is such
+ *     that it will overflow on the second wrap.
+ */
+#define BLK0_LPOS(sz_bits)	(-(_DATA_SIZE(sz_bits)))
+#define DESC0_ID(ct_bits)	DESC_ID(-(_DESCS_COUNT(ct_bits) + 1))
+#define DESC0_SV(ct_bits)	DESC_SV(DESC0_ID(ct_bits), desc_reusable)
+
+/*
+ * Define a ringbuffer with an external text data buffer. The same as
+ * DEFINE_PRINTKRB() but requires specifying an external buffer for the
+ * text data.
+ *
+ * Note: The specified external buffer must be of the size:
+ *       2 ^ (descbits + avgtextbits)
+ */
+#define _DEFINE_PRINTKRB(name, descbits, avgtextbits, text_buf)			\
+static struct prb_desc _##name##_descs[_DESCS_COUNT(descbits)] = {				\
+	/* the initial head and tail */								\
+	[_DESCS_COUNT(descbits) - 1] = {							\
+		/* reusable */									\
+		.state_var	= ATOMIC_INIT(DESC0_SV(descbits)),				\
+		/* no associated data block */							\
+		.text_blk_lpos	= FAILED_BLK_LPOS,						\
+	},											\
+};												\
+static struct printk_info _##name##_infos[_DESCS_COUNT(descbits)] = {				\
+	/* this will be the first record reserved by a writer */				\
+	[0] = {											\
+		/* will be incremented to 0 on the first reservation */				\
+		.seq = -(u64)_DESCS_COUNT(descbits),						\
+	},											\
+	/* the initial head and tail */								\
+	[_DESCS_COUNT(descbits) - 1] = {							\
+		/* reports the first seq value during the bootstrap phase */			\
+		.seq = 0,									\
+	},											\
+};												\
+static struct printk_ringbuffer name = {							\
+	.desc_ring = {										\
+		.count_bits	= descbits,							\
+		.descs		= &_##name##_descs[0],						\
+		.infos		= &_##name##_infos[0],						\
+		.head_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
+		.tail_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
+	},											\
+	.text_data_ring = {									\
+		.size_bits	= (avgtextbits) + (descbits),					\
+		.data		= text_buf,							\
+		.head_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS((avgtextbits) + (descbits))),	\
+		.tail_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS((avgtextbits) + (descbits))),	\
+	},											\
+	.fail			= ATOMIC_LONG_INIT(0),						\
+}
+
+/**
+ * DEFINE_PRINTKRB() - Define a ringbuffer.
+ *
+ * @name:        The name of the ringbuffer variable.
+ * @descbits:    The number of descriptors as a power-of-2 value.
+ * @avgtextbits: The average text data size per record as a power-of-2 value.
+ *
+ * This is a macro for defining a ringbuffer and all internal structures
+ * such that it is ready for immediate use. See _DEFINE_PRINTKRB() for a
+ * variant where the text data buffer can be specified externally.
+ */
+#define DEFINE_PRINTKRB(name, descbits, avgtextbits)				\
+static char _##name##_text[1U << ((avgtextbits) + (descbits))]			\
+			__aligned(__alignof__(unsigned long));			\
+_DEFINE_PRINTKRB(name, descbits, avgtextbits, &_##name##_text[0])
+
+/* Writer Interface */
+
+/**
+ * prb_rec_init_wd() - Initialize a buffer for writing records.
+ *
+ * @r:             The record to initialize.
+ * @text_buf_size: The needed text buffer size.
+ */
+static inline void prb_rec_init_wr(struct printk_record *r,
+				   unsigned int text_buf_size)
+{
+	r->info = NULL;
+	r->text_buf = NULL;
+	r->text_buf_size = text_buf_size;
+}
+
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		 struct printk_record *r);
+bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+			 struct printk_record *r, u32 caller_id, unsigned int max_size);
+void prb_commit(struct prb_reserved_entry *e);
+void prb_final_commit(struct prb_reserved_entry *e);
+
+void prb_init(struct printk_ringbuffer *rb,
+	      char *text_buf, unsigned int text_buf_size,
+	      struct prb_desc *descs, unsigned int descs_count_bits,
+	      struct printk_info *infos);
+unsigned int prb_record_text_space(struct prb_reserved_entry *e);
+
+/* Reader Interface */
+
+/**
+ * prb_rec_init_rd() - Initialize a buffer for reading records.
+ *
+ * @r:             The record to initialize.
+ * @info:          A buffer to store record meta-data.
+ * @text_buf:      A buffer to store text data.
+ * @text_buf_size: The size of @text_buf.
+ *
+ * Initialize all the fields that a reader is interested in. All arguments
+ * (except @r) are optional. Only record data for arguments that are
+ * non-NULL or non-zero will be read.
+ */
+static inline void prb_rec_init_rd(struct printk_record *r,
+				   struct printk_info *info,
+				   char *text_buf, unsigned int text_buf_size)
+{
+	r->info = info;
+	r->text_buf = text_buf;
+	r->text_buf_size = text_buf_size;
+}
+
+/**
+ * prb_for_each_record() - Iterate over the records of a ringbuffer.
+ *
+ * @from: The sequence number to begin with.
+ * @rb:   The ringbuffer to iterate over.
+ * @s:    A u64 to store the sequence number on each iteration.
+ * @r:    A printk_record to store the record on each iteration.
+ *
+ * This is a macro for conveniently iterating over a ringbuffer.
+ * Note that @s may not be the sequence number of the record on each
+ * iteration. For the sequence number, @r->info->seq should be checked.
+ *
+ * Context: Any context.
+ */
+#define prb_for_each_record(from, rb, s, r) \
+for ((s) = from; prb_read_valid(rb, s, r); (s) = (r)->info->seq + 1)
+
+/**
+ * prb_for_each_info() - Iterate over the meta data of a ringbuffer.
+ *
+ * @from: The sequence number to begin with.
+ * @rb:   The ringbuffer to iterate over.
+ * @s:    A u64 to store the sequence number on each iteration.
+ * @i:    A printk_info to store the record meta data on each iteration.
+ * @lc:   An unsigned int to store the text line count of each record.
+ *
+ * This is a macro for conveniently iterating over a ringbuffer.
+ * Note that @s may not be the sequence number of the record on each
+ * iteration. For the sequence number, @r->info->seq should be checked.
+ *
+ * Context: Any context.
+ */
+#define prb_for_each_info(from, rb, s, i, lc) \
+for ((s) = from; prb_read_valid_info(rb, s, i, lc); (s) = (i)->seq + 1)
+
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r);
+bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
+			 struct printk_info *info, unsigned int *line_count);
+
+u64 prb_first_valid_seq(struct printk_ringbuffer *rb);
+u64 prb_next_seq(struct printk_ringbuffer *rb);
+
+#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 29d8062ec4f5c..2d54f1e7ef867 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -2103,7 +2103,75 @@ struct set_affinity_pending {
 };
 
 /*
- * This function is wildly self concurrent, consider at least 3 times.
+ * This function is wildly self concurrent; here be dragons.
+ *
+ *
+ * When given a valid mask, __set_cpus_allowed_ptr() must block until the
+ * designated task is enqueued on an allowed CPU. If that task is currently
+ * running, we have to kick it out using the CPU stopper.
+ *
+ * Migrate-Disable comes along and tramples all over our nice sandcastle.
+ * Consider:
+ *
+ *     Initial conditions: P0->cpus_mask = [0, 1]
+ *
+ *     P0@...0                  P1
+ *
+ *     migrate_disable();
+ *     <preempted>
+ *                              set_cpus_allowed_ptr(P0, [1]);
+ *
+ * P1 *cannot* return from this set_cpus_allowed_ptr() call until P0 executes
+ * its outermost migrate_enable() (i.e. it exits its Migrate-Disable region).
+ * This means we need the following scheme:
+ *
+ *     P0@...0                  P1
+ *
+ *     migrate_disable();
+ *     <preempted>
+ *                              set_cpus_allowed_ptr(P0, [1]);
+ *                                <blocks>
+ *     <resumes>
+ *     migrate_enable();
+ *       __set_cpus_allowed_ptr();
+ *       <wakes local stopper>
+ *                         `--> <woken on migration completion>
+ *
+ * Now the fun stuff: there may be several P1-like tasks, i.e. multiple
+ * concurrent set_cpus_allowed_ptr(P0, [*]) calls. CPU affinity changes of any
+ * task p are serialized by p->pi_lock, which we can leverage: the one that
+ * should come into effect at the end of the Migrate-Disable region is the last
+ * one. This means we only need to track a single cpumask (i.e. p->cpus_mask),
+ * but we still need to properly signal those waiting tasks at the appropriate
+ * moment.
+ *
+ * This is implemented using struct set_affinity_pending. The first
+ * __set_cpus_allowed_ptr() caller within a given Migrate-Disable region will
+ * setup an instance of that struct and install it on the targeted task_struct.
+ * Any and all further callers will reuse that instance. Those then wait for
+ * a completion signaled at the tail of the CPU stopper callback (1), triggered
+ * on the end of the Migrate-Disable region (i.e. outermost migrate_enable()).
+ *
+ *
+ * (1) In the cases covered above. There is one more where the completion is
+ * signaled within affine_move_task() itself: when a subsequent affinity request
+ * cancels the need for an active migration. Consider:
+ *
+ *     Initial conditions: P0->cpus_mask = [0, 1]
+ *
+ *     P0@...0            P1                             P2
+ *
+ *     migrate_disable();
+ *     <preempted>
+ *                        set_cpus_allowed_ptr(P0, [1]);
+ *                          <blocks>
+ *                                                       set_cpus_allowed_ptr(P0, [0, 1]);
+ *                                                         <signal completion>
+ *                          <awakes>
+ *
+ * Note that the above is safe vs a concurrent migrate_enable(), as any
+ * pending affinity completion is preceded an uninstallion of
+ * p->migration_pending done with p->pi_lock held.
  */
 static int affine_move_task(struct rq *rq, struct rq_flags *rf,
 			    struct task_struct *p, int dest_cpu, unsigned int flags)
@@ -2127,6 +2195,7 @@ static int affine_move_task(struct rq *rq, struct rq_flags *rf,
 
 		pending = p->migration_pending;
 		if (pending) {
+			refcount_inc(&pending->refs);
 			p->migration_pending = NULL;
 			complete = true;
 		}
@@ -2146,6 +2215,7 @@ static int affine_move_task(struct rq *rq, struct rq_flags *rf,
 	if (!(flags & SCA_MIGRATE_ENABLE)) {
 		/* serialized by p->pi_lock */
 		if (!p->migration_pending) {
+			/* Install the request */
 			refcount_set(&my_pending.refs, 1);
 			init_completion(&my_pending.done);
 			p->migration_pending = &my_pending;
@@ -2184,7 +2254,11 @@ static int affine_move_task(struct rq *rq, struct rq_flags *rf,
 	}
 
 	if (task_running(rq, p) || p->state == TASK_WAKING) {
-
+		/*
+		 * Lessen races (and headaches) by delegating
+		 * is_migration_disabled(p) checks to the stopper, which will
+		 * run on the same CPU as said p.
+		 */
 		task_rq_unlock(rq, p, rf);
 		stop_one_cpu(cpu_of(rq), migration_cpu_stop, &arg);
 
@@ -2209,6 +2283,10 @@ static int affine_move_task(struct rq *rq, struct rq_flags *rf,
 	if (refcount_dec_and_test(&pending->refs))
 		wake_up_var(&pending->refs);
 
+	/*
+	 * Block the original owner of &pending until all subsequent callers
+	 * have seen the completion and decremented the refcount
+	 */
 	wait_var_event(&my_pending.refs, !refcount_read(&my_pending.refs));
 
 	return 0;
@@ -2257,8 +2335,17 @@ static int __set_cpus_allowed_ptr(struct task_struct *p,
 		goto out;
 	}
 
-	if (!(flags & SCA_MIGRATE_ENABLE) && cpumask_equal(&p->cpus_mask, new_mask))
-		goto out;
+	if (!(flags & SCA_MIGRATE_ENABLE)) {
+		if (cpumask_equal(&p->cpus_mask, new_mask))
+			goto out;
+
+		if (WARN_ON_ONCE(p == current &&
+				 is_migration_disabled(p) &&
+				 !cpumask_test_cpu(task_cpu(p), new_mask))) {
+			ret = -EBUSY;
+			goto out;
+		}
+	}
 
 	/*
 	 * Picking a ~random cpu helps in cases where we are changing affinity
@@ -3960,20 +4047,19 @@ static inline void balance_callbacks(struct rq *rq, struct callback_head *head)
 	}
 }
 
-static bool balance_push(struct rq *rq);
+static void balance_push(struct rq *rq);
 
 static inline void balance_switch(struct rq *rq)
 {
-	if (unlikely(rq->balance_flags)) {
-		/*
-		 * Run the balance_callbacks, except on hotplug
-		 * when we need to push the current task away.
-		 */
-		if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) ||
-		    !(rq->balance_flags & BALANCE_PUSH) ||
-		    !balance_push(rq))
-			__balance_callbacks(rq);
+	if (likely(!rq->balance_flags))
+		return;
+
+	if (rq->balance_flags & BALANCE_PUSH) {
+		balance_push(rq);
+		return;
 	}
+
+	__balance_callbacks(rq);
 }
 
 #else
@@ -7233,7 +7319,7 @@ static DEFINE_PER_CPU(struct cpu_stop_work, push_work);
 /*
  * Ensure we only run per-cpu kthreads once the CPU goes !active.
  */
-static bool balance_push(struct rq *rq)
+static void balance_push(struct rq *rq)
 {
 	struct task_struct *push_task = rq->curr;
 
@@ -7262,7 +7348,7 @@ static bool balance_push(struct rq *rq)
 			rcuwait_wake_up(&rq->hotplug_wait);
 			raw_spin_lock(&rq->lock);
 		}
-		return false;
+		return;
 	}
 
 	get_task_struct(push_task);
@@ -7279,8 +7365,6 @@ static bool balance_push(struct rq *rq)
 	 * which is_per_cpu_kthread() and will push this task away.
 	 */
 	raw_spin_lock(&rq->lock);
-
-	return true;
 }
 
 static void balance_push_set(int cpu, bool on)
@@ -7313,12 +7397,11 @@ static void balance_hotplug_wait(void)
 
 #else
 
-static inline bool balance_push(struct rq *rq)
+static inline void balance_push(struct rq *rq)
 {
-	return false;
 }
 
-static void balance_push_set(int cpu, bool on)
+static inline void balance_push_set(int cpu, bool on)
 {
 }
 
diff --git a/kernel/sched/deadline.c b/kernel/sched/deadline.c
index 15320ede2f456..6df71d487ed06 100644
--- a/kernel/sched/deadline.c
+++ b/kernel/sched/deadline.c
@@ -1978,8 +1978,8 @@ static int find_later_rq(struct task_struct *task)
 				return this_cpu;
 			}
 
-			best_cpu = cpumask_first_and(later_mask,
-							sched_domain_span(sd));
+			best_cpu = cpumask_any_and_distribute(later_mask,
+							      sched_domain_span(sd));
 			/*
 			 * Last chance: if a CPU being in both later_mask
 			 * and current sd span is valid, that becomes our
@@ -2105,6 +2105,9 @@ static int push_dl_task(struct rq *rq)
 		return 0;
 
 retry:
+	if (is_migration_disabled(next_task))
+		return 0;
+
 	if (WARN_ON(next_task == rq->curr))
 		return 0;
 
@@ -2336,6 +2339,9 @@ static void rq_online_dl(struct rq *rq)
 /* Assumes rq->lock is held */
 static void rq_offline_dl(struct rq *rq)
 {
+	if (rq->dl.overloaded)
+		dl_clear_overload(rq);
+
 	cpudl_clear(&rq->rd->cpudl, rq->cpu);
 	cpudl_clear_freecpu(&rq->rd->cpudl, rq->cpu);
 }
diff --git a/kernel/sched/rt.c b/kernel/sched/rt.c
index e90a69b3e85c0..03f7b397716dd 100644
--- a/kernel/sched/rt.c
+++ b/kernel/sched/rt.c
@@ -2289,6 +2289,9 @@ static void rq_online_rt(struct rq *rq)
 /* Assumes rq->lock is held */
 static void rq_offline_rt(struct rq *rq)
 {
+	if (rq->rt.overloaded)
+		rt_clear_overload(rq);
+
 	__disable_runtime(rq);
 
 	cpupri_set(&rq->rd->cpupri, rq->cpu, CPUPRI_INVALID);
diff --git a/lib/Kconfig.debug b/lib/Kconfig.debug
index cd5f1440c5bea..16fcda68c2b6b 100644
--- a/lib/Kconfig.debug
+++ b/lib/Kconfig.debug
@@ -61,23 +61,6 @@ config CONSOLE_LOGLEVEL_QUIET
 	  will be used as the loglevel. IOW passing "quiet" will be the
 	  equivalent of passing "loglevel=<CONSOLE_LOGLEVEL_QUIET>"
 
-config CONSOLE_LOGLEVEL_EMERGENCY
-	int "Emergency console loglevel (1-15)"
-	range 1 15
-	default "5"
-	help
-	  The loglevel to determine if a console message is an emergency
-	  message.
-
-	  If supported by the console driver, emergency messages will be
-	  flushed to the console immediately. This can cause significant system
-	  latencies so the value should be set such that only significant
-	  messages are classified as emergency messages.
-
-	  Setting a default here is equivalent to passing in
-	  emergency_loglevel=<x> in the kernel bootargs. emergency_loglevel=<x>
-	  continues to override whatever value is specified here as well.
-
 config MESSAGE_LOGLEVEL_DEFAULT
 	int "Default message log level (1-7)"
 	range 1 7
diff --git a/lib/Makefile b/lib/Makefile
index e2822830764a1..a4a4c6864f518 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -32,7 +32,7 @@ KCSAN_SANITIZE_random32.o := n
 
 lib-y := ctype.o string.o vsprintf.o cmdline.o \
 	 rbtree.o radix-tree.o timerqueue.o xarray.o \
-	 idr.o extable.o sha1.o irq_regs.o argv_split.o printk_ringbuffer.o \
+	 idr.o extable.o sha1.o irq_regs.o argv_split.o \
 	 flex_proportions.o ratelimit.o show_mem.o \
 	 is_single_threaded.o plist.o decompress.o kobject_uevent.o \
 	 earlycpio.o seq_buf.o siphash.o dec_and_lock.o \
diff --git a/lib/bust_spinlocks.c b/lib/bust_spinlocks.c
index c6e083323d1b9..8be59f84eaeaf 100644
--- a/lib/bust_spinlocks.c
+++ b/lib/bust_spinlocks.c
@@ -26,6 +26,7 @@ void bust_spinlocks(int yes)
 		unblank_screen();
 #endif
 		console_unblank();
-		--oops_in_progress;
+		if (--oops_in_progress == 0)
+			wake_up_klogd();
 	}
 }
diff --git a/lib/printk_ringbuffer.c b/lib/printk_ringbuffer.c
deleted file mode 100644
index 9a31d7dbdc005..0000000000000
--- a/lib/printk_ringbuffer.c
+++ /dev/null
@@ -1,589 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-#include <linux/sched.h>
-#include <linux/smp.h>
-#include <linux/string.h>
-#include <linux/errno.h>
-#include <linux/printk_ringbuffer.h>
-
-#define PRB_SIZE(rb) (1 << rb->size_bits)
-#define PRB_SIZE_BITMASK(rb) (PRB_SIZE(rb) - 1)
-#define PRB_INDEX(rb, lpos) (lpos & PRB_SIZE_BITMASK(rb))
-#define PRB_WRAPS(rb, lpos) (lpos >> rb->size_bits)
-#define PRB_WRAP_LPOS(rb, lpos, xtra) \
-	((PRB_WRAPS(rb, lpos) + xtra) << rb->size_bits)
-#define PRB_DATA_SIZE(e) (e->size - sizeof(struct prb_entry))
-#define PRB_DATA_ALIGN sizeof(long)
-
-static bool __prb_trylock(struct prb_cpulock *cpu_lock,
-			  unsigned int *cpu_store)
-{
-	unsigned long *flags;
-	unsigned int cpu;
-
-	cpu = get_cpu();
-
-	*cpu_store = atomic_read(&cpu_lock->owner);
-	/* memory barrier to ensure the current lock owner is visible */
-	smp_rmb();
-	if (*cpu_store == -1) {
-		flags = per_cpu_ptr(cpu_lock->irqflags, cpu);
-		local_irq_save(*flags);
-		if (atomic_try_cmpxchg_acquire(&cpu_lock->owner,
-					       cpu_store, cpu)) {
-			return true;
-		}
-		local_irq_restore(*flags);
-	} else if (*cpu_store == cpu) {
-		return true;
-	}
-
-	put_cpu();
-	return false;
-}
-
-/*
- * prb_lock: Perform a processor-reentrant spin lock.
- * @cpu_lock: A pointer to the lock object.
- * @cpu_store: A "flags" pointer to store lock status information.
- *
- * If no processor has the lock, the calling processor takes the lock and
- * becomes the owner. If the calling processor is already the owner of the
- * lock, this function succeeds immediately. If lock is locked by another
- * processor, this function spins until the calling processor becomes the
- * owner.
- *
- * It is safe to call this function from any context and state.
- */
-void prb_lock(struct prb_cpulock *cpu_lock, unsigned int *cpu_store)
-{
-	for (;;) {
-		if (__prb_trylock(cpu_lock, cpu_store))
-			break;
-		cpu_relax();
-	}
-}
-
-/*
- * prb_unlock: Perform a processor-reentrant spin unlock.
- * @cpu_lock: A pointer to the lock object.
- * @cpu_store: A "flags" object storing lock status information.
- *
- * Release the lock. The calling processor must be the owner of the lock.
- *
- * It is safe to call this function from any context and state.
- */
-void prb_unlock(struct prb_cpulock *cpu_lock, unsigned int cpu_store)
-{
-	unsigned long *flags;
-	unsigned int cpu;
-
-	cpu = atomic_read(&cpu_lock->owner);
-	atomic_set_release(&cpu_lock->owner, cpu_store);
-
-	if (cpu_store == -1) {
-		flags = per_cpu_ptr(cpu_lock->irqflags, cpu);
-		local_irq_restore(*flags);
-	}
-
-	put_cpu();
-}
-
-static struct prb_entry *to_entry(struct printk_ringbuffer *rb,
-				  unsigned long lpos)
-{
-	char *buffer = rb->buffer;
-	buffer += PRB_INDEX(rb, lpos);
-	return (struct prb_entry *)buffer;
-}
-
-static int calc_next(struct printk_ringbuffer *rb, unsigned long tail,
-		     unsigned long lpos, int size, unsigned long *calced_next)
-{
-	unsigned long next_lpos;
-	int ret = 0;
-again:
-	next_lpos = lpos + size;
-	if (next_lpos - tail > PRB_SIZE(rb))
-		return -1;
-
-	if (PRB_WRAPS(rb, lpos) != PRB_WRAPS(rb, next_lpos)) {
-		lpos = PRB_WRAP_LPOS(rb, next_lpos, 0);
-		ret |= 1;
-		goto again;
-	}
-
-	*calced_next = next_lpos;
-	return ret;
-}
-
-static bool push_tail(struct printk_ringbuffer *rb, unsigned long tail)
-{
-	unsigned long new_tail;
-	struct prb_entry *e;
-	unsigned long head;
-
-	if (tail != atomic_long_read(&rb->tail))
-		return true;
-
-	e = to_entry(rb, tail);
-	if (e->size != -1)
-		new_tail = tail + e->size;
-	else
-		new_tail = PRB_WRAP_LPOS(rb, tail, 1);
-
-	/* make sure the new tail does not overtake the head */
-	head = atomic_long_read(&rb->head);
-	if (head - new_tail > PRB_SIZE(rb))
-		return false;
-
-	atomic_long_cmpxchg(&rb->tail, tail, new_tail);
-	return true;
-}
-
-/*
- * prb_commit: Commit a reserved entry to the ring buffer.
- * @h: An entry handle referencing the data entry to commit.
- *
- * Commit data that has been reserved using prb_reserve(). Once the data
- * block has been committed, it can be invalidated at any time. If a writer
- * is interested in using the data after committing, the writer should make
- * its own copy first or use the prb_iter_ reader functions to access the
- * data in the ring buffer.
- *
- * It is safe to call this function from any context and state.
- */
-void prb_commit(struct prb_handle *h)
-{
-	struct printk_ringbuffer *rb = h->rb;
-	bool changed = false;
-	struct prb_entry *e;
-	unsigned long head;
-	unsigned long res;
-
-	for (;;) {
-		if (atomic_read(&rb->ctx) != 1) {
-			/* the interrupted context will fixup head */
-			atomic_dec(&rb->ctx);
-			break;
-		}
-		/* assign sequence numbers before moving head */
-		head = atomic_long_read(&rb->head);
-		res = atomic_long_read(&rb->reserve);
-		while (head != res) {
-			e = to_entry(rb, head);
-			if (e->size == -1) {
-				head = PRB_WRAP_LPOS(rb, head, 1);
-				continue;
-			}
-			while (atomic_long_read(&rb->lost)) {
-				atomic_long_dec(&rb->lost);
-				rb->seq++;
-			}
-			e->seq = ++rb->seq;
-			head += e->size;
-			changed = true;
-		}
-		atomic_long_set_release(&rb->head, res);
-
-		atomic_dec(&rb->ctx);
-
-		if (atomic_long_read(&rb->reserve) == res)
-			break;
-		atomic_inc(&rb->ctx);
-	}
-
-	prb_unlock(rb->cpulock, h->cpu);
-
-	if (changed) {
-		atomic_long_inc(&rb->wq_counter);
-		if (wq_has_sleeper(rb->wq)) {
-#ifdef CONFIG_IRQ_WORK
-			irq_work_queue(rb->wq_work);
-#else
-			if (!in_nmi())
-				wake_up_interruptible_all(rb->wq);
-#endif
-		}
-	}
-}
-
-/*
- * prb_reserve: Reserve an entry within a ring buffer.
- * @h: An entry handle to be setup and reference an entry.
- * @rb: A ring buffer to reserve data within.
- * @size: The number of bytes to reserve.
- *
- * Reserve an entry of at least @size bytes to be used by the caller. If
- * successful, the data region of the entry belongs to the caller and cannot
- * be invalidated by any other task/context. For this reason, the caller
- * should call prb_commit() as quickly as possible in order to avoid preventing
- * other tasks/contexts from reserving data in the case that the ring buffer
- * has wrapped.
- *
- * It is safe to call this function from any context and state.
- *
- * Returns a pointer to the reserved entry (and @h is setup to reference that
- * entry) or NULL if it was not possible to reserve data.
- */
-char *prb_reserve(struct prb_handle *h, struct printk_ringbuffer *rb,
-		  unsigned int size)
-{
-	unsigned long tail, res1, res2;
-	int ret;
-
-	if (size == 0)
-		return NULL;
-	size += sizeof(struct prb_entry);
-	size += PRB_DATA_ALIGN - 1;
-	size &= ~(PRB_DATA_ALIGN - 1);
-	if (size >= PRB_SIZE(rb))
-		return NULL;
-
-	h->rb = rb;
-	prb_lock(rb->cpulock, &h->cpu);
-
-	atomic_inc(&rb->ctx);
-
-	do {
-		for (;;) {
-			tail = atomic_long_read(&rb->tail);
-			res1 = atomic_long_read(&rb->reserve);
-			ret = calc_next(rb, tail, res1, size, &res2);
-			if (ret >= 0)
-				break;
-			if (!push_tail(rb, tail)) {
-				prb_commit(h);
-				return NULL;
-			}
-		}
-	} while (!atomic_long_try_cmpxchg_acquire(&rb->reserve, &res1, res2));
-
-	h->entry = to_entry(rb, res1);
-
-	if (ret) {
-		/* handle wrap */
-		h->entry->size = -1;
-		h->entry = to_entry(rb, PRB_WRAP_LPOS(rb, res2, 0));
-	}
-
-	h->entry->size = size;
-
-	return &h->entry->data[0];
-}
-
-/*
- * prb_iter_copy: Copy an iterator.
- * @dest: The iterator to copy to.
- * @src: The iterator to copy from.
- *
- * Make a deep copy of an iterator. This is particularly useful for making
- * backup copies of an iterator in case a form of rewinding it needed.
- *
- * It is safe to call this function from any context and state. But
- * note that this function is not atomic. Callers should not make copies
- * to/from iterators that can be accessed by other tasks/contexts.
- */
-void prb_iter_copy(struct prb_iterator *dest, struct prb_iterator *src)
-{
-	memcpy(dest, src, sizeof(*dest));
-}
-
-/*
- * prb_iter_init: Initialize an iterator for a ring buffer.
- * @iter: The iterator to initialize.
- * @rb: A ring buffer to that @iter should iterate.
- * @seq: The sequence number of the position preceding the first record.
- *       May be NULL.
- *
- * Initialize an iterator to be used with a specified ring buffer. If @seq
- * is non-NULL, it will be set such that prb_iter_next() will provide a
- * sequence value of "@seq + 1" if no records were missed.
- *
- * It is safe to call this function from any context and state.
- */
-void prb_iter_init(struct prb_iterator *iter, struct printk_ringbuffer *rb,
-		   u64 *seq)
-{
-	memset(iter, 0, sizeof(*iter));
-	iter->rb = rb;
-	iter->lpos = PRB_INIT;
-
-	if (!seq)
-		return;
-
-	for (;;) {
-		struct prb_iterator tmp_iter;
-		int ret;
-
-		prb_iter_copy(&tmp_iter, iter);
-
-		ret = prb_iter_next(&tmp_iter, NULL, 0, seq);
-		if (ret < 0)
-			continue;
-
-		if (ret == 0)
-			*seq = 0;
-		else
-			(*seq)--;
-		break;
-	}
-}
-
-static bool is_valid(struct printk_ringbuffer *rb, unsigned long lpos)
-{
-	unsigned long head, tail;
-
-	tail = atomic_long_read(&rb->tail);
-	head = atomic_long_read(&rb->head);
-	head -= tail;
-	lpos -= tail;
-
-	if (lpos >= head)
-		return false;
-	return true;
-}
-
-/*
- * prb_iter_data: Retrieve the record data at the current position.
- * @iter: Iterator tracking the current position.
- * @buf: A buffer to store the data of the record. May be NULL.
- * @size: The size of @buf. (Ignored if @buf is NULL.)
- * @seq: The sequence number of the record. May be NULL.
- *
- * If @iter is at a record, provide the data and/or sequence number of that
- * record (if specified by the caller).
- *
- * It is safe to call this function from any context and state.
- *
- * Returns >=0 if the current record contains valid data (returns 0 if @buf
- * is NULL or returns the size of the data block if @buf is non-NULL) or
- * -EINVAL if @iter is now invalid.
- */
-int prb_iter_data(struct prb_iterator *iter, char *buf, int size, u64 *seq)
-{
-	struct printk_ringbuffer *rb = iter->rb;
-	unsigned long lpos = iter->lpos;
-	unsigned int datsize = 0;
-	struct prb_entry *e;
-
-	if (buf || seq) {
-		e = to_entry(rb, lpos);
-		if (!is_valid(rb, lpos))
-			return -EINVAL;
-		/* memory barrier to ensure valid lpos */
-		smp_rmb();
-		if (buf) {
-			datsize = PRB_DATA_SIZE(e);
-			/* memory barrier to ensure load of datsize */
-			smp_rmb();
-			if (!is_valid(rb, lpos))
-				return -EINVAL;
-			if (PRB_INDEX(rb, lpos) + datsize >
-			    PRB_SIZE(rb) - PRB_DATA_ALIGN) {
-				return -EINVAL;
-			}
-			if (size > datsize)
-				size = datsize;
-			memcpy(buf, &e->data[0], size);
-		}
-		if (seq)
-			*seq = e->seq;
-		/* memory barrier to ensure loads of entry data */
-		smp_rmb();
-	}
-
-	if (!is_valid(rb, lpos))
-		return -EINVAL;
-
-	return datsize;
-}
-
-/*
- * prb_iter_next: Advance to the next record.
- * @iter: Iterator tracking the current position.
- * @buf: A buffer to store the data of the next record. May be NULL.
- * @size: The size of @buf. (Ignored if @buf is NULL.)
- * @seq: The sequence number of the next record. May be NULL.
- *
- * If a next record is available, @iter is advanced and (if specified)
- * the data and/or sequence number of that record are provided.
- *
- * It is safe to call this function from any context and state.
- *
- * Returns 1 if @iter was advanced, 0 if @iter is at the end of the list, or
- * -EINVAL if @iter is now invalid.
- */
-int prb_iter_next(struct prb_iterator *iter, char *buf, int size, u64 *seq)
-{
-	struct printk_ringbuffer *rb = iter->rb;
-	unsigned long next_lpos;
-	struct prb_entry *e;
-	unsigned int esize;
-
-	if (iter->lpos == PRB_INIT) {
-		next_lpos = atomic_long_read(&rb->tail);
-	} else {
-		if (!is_valid(rb, iter->lpos))
-			return -EINVAL;
-		/* memory barrier to ensure valid lpos */
-		smp_rmb();
-		e = to_entry(rb, iter->lpos);
-		esize = e->size;
-		/* memory barrier to ensure load of size */
-		smp_rmb();
-		if (!is_valid(rb, iter->lpos))
-			return -EINVAL;
-		next_lpos = iter->lpos + esize;
-	}
-	if (next_lpos == atomic_long_read(&rb->head))
-		return 0;
-	if (!is_valid(rb, next_lpos))
-		return -EINVAL;
-	/* memory barrier to ensure valid lpos */
-	smp_rmb();
-
-	iter->lpos = next_lpos;
-	e = to_entry(rb, iter->lpos);
-	esize = e->size;
-	/* memory barrier to ensure load of size */
-	smp_rmb();
-	if (!is_valid(rb, iter->lpos))
-		return -EINVAL;
-	if (esize == -1)
-		iter->lpos = PRB_WRAP_LPOS(rb, iter->lpos, 1);
-
-	if (prb_iter_data(iter, buf, size, seq) < 0)
-		return -EINVAL;
-
-	return 1;
-}
-
-/*
- * prb_iter_wait_next: Advance to the next record, blocking if none available.
- * @iter: Iterator tracking the current position.
- * @buf: A buffer to store the data of the next record. May be NULL.
- * @size: The size of @buf. (Ignored if @buf is NULL.)
- * @seq: The sequence number of the next record. May be NULL.
- *
- * If a next record is already available, this function works like
- * prb_iter_next(). Otherwise block interruptible until a next record is
- * available.
- *
- * When a next record is available, @iter is advanced and (if specified)
- * the data and/or sequence number of that record are provided.
- *
- * This function might sleep.
- *
- * Returns 1 if @iter was advanced, -EINVAL if @iter is now invalid, or
- * -ERESTARTSYS if interrupted by a signal.
- */
-int prb_iter_wait_next(struct prb_iterator *iter, char *buf, int size, u64 *seq)
-{
-	unsigned long last_seen;
-	int ret;
-
-	for (;;) {
-		last_seen = atomic_long_read(&iter->rb->wq_counter);
-
-		ret = prb_iter_next(iter, buf, size, seq);
-		if (ret != 0)
-			break;
-
-		ret = wait_event_interruptible(*iter->rb->wq,
-			last_seen != atomic_long_read(&iter->rb->wq_counter));
-		if (ret < 0)
-			break;
-	}
-
-	return ret;
-}
-
-/*
- * prb_iter_seek: Seek forward to a specific record.
- * @iter: Iterator to advance.
- * @seq: Record number to advance to.
- *
- * Advance @iter such that a following call to prb_iter_data() will provide
- * the contents of the specified record. If a record is specified that does
- * not yet exist, advance @iter to the end of the record list.
- *
- * Note that iterators cannot be rewound. So if a record is requested that
- * exists but is previous to @iter in position, @iter is considered invalid.
- *
- * It is safe to call this function from any context and state.
- *
- * Returns 1 on succces, 0 if specified record does not yet exist (@iter is
- * now at the end of the list), or -EINVAL if @iter is now invalid.
- */
-int prb_iter_seek(struct prb_iterator *iter, u64 seq)
-{
-	u64 cur_seq;
-	int ret;
-
-	/* first check if the iterator is already at the wanted seq */
-	if (seq == 0) {
-		if (iter->lpos == PRB_INIT)
-			return 1;
-		else
-			return -EINVAL;
-	}
-	if (iter->lpos != PRB_INIT) {
-		if (prb_iter_data(iter, NULL, 0, &cur_seq) >= 0) {
-			if (cur_seq == seq)
-				return 1;
-			if (cur_seq > seq)
-				return -EINVAL;
-		}
-	}
-
-	/* iterate to find the wanted seq */
-	for (;;) {
-		ret = prb_iter_next(iter, NULL, 0, &cur_seq);
-		if (ret <= 0)
-			break;
-
-		if (cur_seq == seq)
-			break;
-
-		if (cur_seq > seq) {
-			ret = -EINVAL;
-			break;
-		}
-	}
-
-	return ret;
-}
-
-/*
- * prb_buffer_size: Get the size of the ring buffer.
- * @rb: The ring buffer to get the size of.
- *
- * Return the number of bytes used for the ring buffer entry storage area.
- * Note that this area stores both entry header and entry data. Therefore
- * this represents an upper bound to the amount of data that can be stored
- * in the ring buffer.
- *
- * It is safe to call this function from any context and state.
- *
- * Returns the size in bytes of the entry storage area.
- */
-int prb_buffer_size(struct printk_ringbuffer *rb)
-{
-	return PRB_SIZE(rb);
-}
-
-/*
- * prb_inc_lost: Increment the seq counter to signal a lost record.
- * @rb: The ring buffer to increment the seq of.
- *
- * Increment the seq counter so that a seq number is intentially missing
- * for the readers. This allows readers to identify that a record is
- * missing. A writer will typically use this function if prb_reserve()
- * fails.
- *
- * It is safe to call this function from any context and state.
- */
-void prb_inc_lost(struct printk_ringbuffer *rb)
-{
-	atomic_long_inc(&rb->lost);
-}
diff --git a/localversion-rt b/localversion-rt
index 1e584b47c987e..9e7cd66d9f44f 100644
--- a/localversion-rt
+++ b/localversion-rt
@@ -1 +1 @@
--rt17
+-rt18
diff --git a/scripts/gdb/linux/dmesg.py b/scripts/gdb/linux/dmesg.py
index 2fa7bb83885f0..a92c55bd8de54 100644
--- a/scripts/gdb/linux/dmesg.py
+++ b/scripts/gdb/linux/dmesg.py
@@ -16,8 +16,13 @@ import sys
 
 from linux import utils
 
-printk_log_type = utils.CachedType("struct printk_log")
-
+printk_info_type = utils.CachedType("struct printk_info")
+prb_data_blk_lpos_type = utils.CachedType("struct prb_data_blk_lpos")
+prb_desc_type = utils.CachedType("struct prb_desc")
+prb_desc_ring_type = utils.CachedType("struct prb_desc_ring")
+prb_data_ring_type = utils.CachedType("struct prb_data_ring")
+printk_ringbuffer_type = utils.CachedType("struct printk_ringbuffer")
+atomic_long_type = utils.CachedType("atomic_long_t")
 
 class LxDmesg(gdb.Command):
     """Print Linux kernel log buffer."""
@@ -26,44 +31,110 @@ printk_log_type = utils.CachedType("struct printk_log")
         super(LxDmesg, self).__init__("lx-dmesg", gdb.COMMAND_DATA)
 
     def invoke(self, arg, from_tty):
-        log_buf_addr = int(str(gdb.parse_and_eval(
-            "(void *)'printk.c'::log_buf")).split()[0], 16)
-        log_first_idx = int(gdb.parse_and_eval("'printk.c'::log_first_idx"))
-        log_next_idx = int(gdb.parse_and_eval("'printk.c'::log_next_idx"))
-        log_buf_len = int(gdb.parse_and_eval("'printk.c'::log_buf_len"))
-
         inf = gdb.inferiors()[0]
-        start = log_buf_addr + log_first_idx
-        if log_first_idx < log_next_idx:
-            log_buf_2nd_half = -1
-            length = log_next_idx - log_first_idx
-            log_buf = utils.read_memoryview(inf, start, length).tobytes()
-        else:
-            log_buf_2nd_half = log_buf_len - log_first_idx
-            a = utils.read_memoryview(inf, start, log_buf_2nd_half)
-            b = utils.read_memoryview(inf, log_buf_addr, log_next_idx)
-            log_buf = a.tobytes() + b.tobytes()
 
-        length_offset = printk_log_type.get_type()['len'].bitpos // 8
-        text_len_offset = printk_log_type.get_type()['text_len'].bitpos // 8
-        time_stamp_offset = printk_log_type.get_type()['ts_nsec'].bitpos // 8
-        text_offset = printk_log_type.get_type().sizeof
+        # read in prb structure
+        prb_addr = int(str(gdb.parse_and_eval("(void *)'printk.c'::prb")).split()[0], 16)
+        sz = printk_ringbuffer_type.get_type().sizeof
+        prb = utils.read_memoryview(inf, prb_addr, sz).tobytes()
 
-        pos = 0
-        while pos < log_buf.__len__():
-            length = utils.read_u16(log_buf, pos + length_offset)
-            if length == 0:
-                if log_buf_2nd_half == -1:
-                    gdb.write("Corrupted log buffer!\n")
+        # read in descriptor ring structure
+        off = printk_ringbuffer_type.get_type()['desc_ring'].bitpos // 8
+        addr = prb_addr + off
+        sz = prb_desc_ring_type.get_type().sizeof
+        desc_ring = utils.read_memoryview(inf, addr, sz).tobytes()
+
+        # read in descriptor array
+        off = prb_desc_ring_type.get_type()['count_bits'].bitpos // 8
+        desc_ring_count = 1 << utils.read_u32(desc_ring, off)
+        desc_sz = prb_desc_type.get_type().sizeof
+        off = prb_desc_ring_type.get_type()['descs'].bitpos // 8
+        addr = utils.read_ulong(desc_ring, off)
+        descs = utils.read_memoryview(inf, addr, desc_sz * desc_ring_count).tobytes()
+
+        # read in info array
+        info_sz = printk_info_type.get_type().sizeof
+        off = prb_desc_ring_type.get_type()['infos'].bitpos // 8
+        addr = utils.read_ulong(desc_ring, off)
+        infos = utils.read_memoryview(inf, addr, info_sz * desc_ring_count).tobytes()
+
+        # read in text data ring structure
+        off = printk_ringbuffer_type.get_type()['text_data_ring'].bitpos // 8
+        addr = prb_addr + off
+        sz = prb_data_ring_type.get_type().sizeof
+        text_data_ring = utils.read_memoryview(inf, addr, sz).tobytes()
+
+        # read in text data
+        off = prb_data_ring_type.get_type()['size_bits'].bitpos // 8
+        text_data_sz = 1 << utils.read_u32(text_data_ring, off)
+        off = prb_data_ring_type.get_type()['data'].bitpos // 8
+        addr = utils.read_ulong(text_data_ring, off)
+        text_data = utils.read_memoryview(inf, addr, text_data_sz).tobytes()
+
+        counter_off = atomic_long_type.get_type()['counter'].bitpos // 8
+
+        sv_off = prb_desc_type.get_type()['state_var'].bitpos // 8
+
+        off = prb_desc_type.get_type()['text_blk_lpos'].bitpos // 8
+        begin_off = off + (prb_data_blk_lpos_type.get_type()['begin'].bitpos // 8)
+        next_off = off + (prb_data_blk_lpos_type.get_type()['next'].bitpos // 8)
+
+        ts_off = printk_info_type.get_type()['ts_nsec'].bitpos // 8
+        len_off = printk_info_type.get_type()['text_len'].bitpos // 8
+
+        # definitions from kernel/printk/printk_ringbuffer.h
+        desc_committed = 1
+        desc_finalized = 2
+        desc_sv_bits = utils.get_long_type().sizeof * 8
+        desc_flags_shift = desc_sv_bits - 2
+        desc_flags_mask = 3 << desc_flags_shift
+        desc_id_mask = ~desc_flags_mask
+
+        # read in tail and head descriptor ids
+        off = prb_desc_ring_type.get_type()['tail_id'].bitpos // 8
+        tail_id = utils.read_u64(desc_ring, off + counter_off)
+        off = prb_desc_ring_type.get_type()['head_id'].bitpos // 8
+        head_id = utils.read_u64(desc_ring, off + counter_off)
+
+        did = tail_id
+        while True:
+            ind = did % desc_ring_count
+            desc_off = desc_sz * ind
+            info_off = info_sz * ind
+
+            # skip non-committed record
+            state = 3 & (utils.read_u64(descs, desc_off + sv_off +
+                                        counter_off) >> desc_flags_shift)
+            if state != desc_committed and state != desc_finalized:
+                if did == head_id:
                     break
-                pos = log_buf_2nd_half
+                did = (did + 1) & desc_id_mask
                 continue
 
-            text_len = utils.read_u16(log_buf, pos + text_len_offset)
-            text_start = pos + text_offset
-            text = log_buf[text_start:text_start + text_len].decode(
-                encoding='utf8', errors='replace')
-            time_stamp = utils.read_u64(log_buf, pos + time_stamp_offset)
+            begin = utils.read_ulong(descs, desc_off + begin_off) % text_data_sz
+            end = utils.read_ulong(descs, desc_off + next_off) % text_data_sz
+
+            # handle data-less record
+            if begin & 1 == 1:
+                text = ""
+            else:
+                # handle wrapping data block
+                if begin > end:
+                    begin = 0
+
+                # skip over descriptor id
+                text_start = begin + utils.get_long_type().sizeof
+
+                text_len = utils.read_u16(infos, info_off + len_off)
+
+                # handle truncated message
+                if end - text_start < text_len:
+                    text_len = end - text_start
+
+                text = text_data[text_start:text_start + text_len].decode(
+                    encoding='utf8', errors='replace')
+
+            time_stamp = utils.read_u64(infos, info_off + ts_off)
 
             for line in text.splitlines():
                 msg = u"[{time:12.6f}] {line}\n".format(
@@ -75,7 +146,9 @@ printk_log_type = utils.CachedType("struct printk_log")
                     msg = msg.encode(encoding='utf8', errors='replace')
                 gdb.write(msg)
 
-            pos += length
+            if did == head_id:
+                break
+            did = (did + 1) & desc_id_mask
 
 
 LxDmesg()
diff --git a/scripts/gdb/linux/utils.py b/scripts/gdb/linux/utils.py
index ea94221dbd392..ff7c1799d588f 100644
--- a/scripts/gdb/linux/utils.py
+++ b/scripts/gdb/linux/utils.py
@@ -123,6 +123,13 @@ target_endianness = None
         return read_u32(buffer, offset + 4) + (read_u32(buffer, offset) << 32)
 
 
+def read_ulong(buffer, offset):
+    if get_long_type().sizeof == 8:
+        return read_u64(buffer, offset)
+    else:
+        return read_u32(buffer, offset)
+
+
 target_arch = None
 
 

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ