lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [day] [month] [year] [list]
Date:	Wed, 22 Oct 2008 01:52:39 -0500
From:	Tom Zanussi <zanussi@...cast.net>
To:	Linux Kernel Mailing List <linux-kernel@...r.kernel.org>
Cc:	Pekka Enberg <penberg@...helsinki.fi>,
	Martin Bligh <mbligh@...gle.com>,
	Peter Zijlstra <a.p.zijlstra@...llo.nl>,
	prasad@...ux.vnet.ibm.com,
	Linus Torvalds <torvalds@...ux-foundation.org>,
	Thomas Gleixner <tglx@...utronix.de>,
	Mathieu Desnoyers <compudj@...stal.dyndns.org>,
	Steven Rostedt <rostedt@...dmis.org>, od@...e.com,
	"Frank Ch. Eigler" <fche@...hat.com>,
	Andrew Morton <akpm@...ux-foundation.org>, hch@....de,
	David Wilder <dwilder@...ibm.com>,
	Jens Axboe <jens.axboe@...cle.com>,
	Eduard - Gabriel Munteanu <eduard.munteanu@...ux360.ro>
Subject: [RFC PATCH 1/1] relay revamp 7, full patch

---
 block/blktrace.c                 |   69 +--
 include/linux/blktrace_api.h     |    7 +-
 include/linux/relay.h            |  255 ++-----
 include/linux/relay_pagewriter.h |  294 ++++++++
 kernel/Makefile                  |    2 +-
 kernel/relay.c                   | 1469 +++++++++++++++-----------------------
 kernel/relay_pagewriter.c        |  868 ++++++++++++++++++++++
 virt/kvm/kvm_trace.c             |   84 +--
 8 files changed, 1854 insertions(+), 1194 deletions(-)
 create mode 100644 include/linux/relay_pagewriter.h
 create mode 100644 kernel/relay_pagewriter.c

diff --git a/block/blktrace.c b/block/blktrace.c
index 85049a7..19e417c 100644
--- a/block/blktrace.c
+++ b/block/blktrace.c
@@ -35,7 +35,7 @@ static void trace_note(struct blk_trace *bt, pid_t pid, int action,
 {
 	struct blk_io_trace *t;
 
-	t = relay_reserve(bt->rchan, sizeof(*t) + len);
+	t = pagewriter_reserve(bt->pagewriter, sizeof(*t) + len);
 	if (t) {
 		const int cpu = smp_processor_id();
 
@@ -153,7 +153,7 @@ void __blk_add_trace(struct blk_trace *bt, sector_t sector, int bytes,
 	if (unlikely(tsk->btrace_seq != blktrace_seq))
 		trace_note_tsk(bt, tsk);
 
-	t = relay_reserve(bt->rchan, sizeof(*t) + pdu_len);
+	t = pagewriter_reserve(bt->pagewriter, sizeof(*t) + pdu_len);
 	if (t) {
 		cpu = smp_processor_id();
 		sequence = per_cpu_ptr(bt->sequence, cpu);
@@ -230,7 +230,7 @@ err:
 
 static void blk_trace_cleanup(struct blk_trace *bt)
 {
-	relay_close(bt->rchan);
+	pagewriter_close(bt->pagewriter);
 	debugfs_remove(bt->msg_file);
 	debugfs_remove(bt->dropped_file);
 	blk_remove_tree(bt->dir);
@@ -268,7 +268,8 @@ static ssize_t blk_dropped_read(struct file *filp, char __user *buffer,
 	struct blk_trace *bt = filp->private_data;
 	char buf[16];
 
-	snprintf(buf, sizeof(buf), "%u\n", atomic_read(&bt->dropped));
+	snprintf(buf, sizeof(buf), "%u\n",
+		 atomic_read(&bt->pagewriter->dropped));
 
 	return simple_read_from_buffer(buffer, count, ppos, buf, strlen(buf));
 }
@@ -317,43 +318,19 @@ static const struct file_operations blk_msg_fops = {
 	.write =	blk_msg_write,
 };
 
-/*
- * Keep track of how many times we encountered a full subbuffer, to aid
- * the user space app in telling how many lost events there were.
- */
-static int blk_subbuf_start_callback(struct rchan_buf *buf, void *subbuf,
-				     void *prev_subbuf, size_t prev_padding)
+static void blk_write_padding_callback(struct pagewriter_buf *buf,
+				       size_t length,
+				       void *reserved)
 {
-	struct blk_trace *bt;
+	struct blk_io_trace *t = reserved;
 
-	if (!relay_buf_full(buf))
-		return 1;
-
-	bt = buf->chan->private_data;
-	atomic_inc(&bt->dropped);
-	return 0;
-}
-
-static int blk_remove_buf_file_callback(struct dentry *dentry)
-{
-	debugfs_remove(dentry);
-	return 0;
-}
-
-static struct dentry *blk_create_buf_file_callback(const char *filename,
-						   struct dentry *parent,
-						   int mode,
-						   struct rchan_buf *buf,
-						   int *is_global)
-{
-	return debugfs_create_file(filename, mode, parent, buf,
-					&relay_file_operations);
+	t->magic = BLK_IO_TRACE_MAGIC | BLK_IO_TRACE_VERSION;
+	t->action = BLK_TN_PADDING;
+	t->pdu_len = length - sizeof(*t);
 }
 
-static struct rchan_callbacks blk_relay_callbacks = {
-	.subbuf_start		= blk_subbuf_start_callback,
-	.create_buf_file	= blk_create_buf_file_callback,
-	.remove_buf_file	= blk_remove_buf_file_callback,
+static struct pagewriter_callbacks blk_pagewriter_callbacks = {
+	.write_padding           = blk_write_padding_callback,
 };
 
 /*
@@ -365,6 +342,7 @@ int do_blk_trace_setup(struct request_queue *q, char *name, dev_t dev,
 	struct blk_trace *old_bt, *bt = NULL;
 	struct dentry *dir = NULL;
 	int ret, i;
+	int n_pages, n_pages_wakeup;
 
 	if (!buts->buf_size || !buts->buf_nr)
 		return -EINVAL;
@@ -400,7 +378,6 @@ int do_blk_trace_setup(struct request_queue *q, char *name, dev_t dev,
 
 	bt->dir = dir;
 	bt->dev = dev;
-	atomic_set(&bt->dropped, 0);
 
 	ret = -EIO;
 	bt->dropped_file = debugfs_create_file("dropped", 0444, dir, bt, &blk_dropped_fops);
@@ -411,9 +388,13 @@ int do_blk_trace_setup(struct request_queue *q, char *name, dev_t dev,
 	if (!bt->msg_file)
 		goto err;
 
-	bt->rchan = relay_open("trace", dir, buts->buf_size,
-				buts->buf_nr, &blk_relay_callbacks, bt);
-	if (!bt->rchan)
+	n_pages = (buts->buf_size * buts->buf_nr) / PAGE_SIZE;
+	n_pages_wakeup = buts->buf_size / PAGE_SIZE;
+	bt->pagewriter = pagewriter_open("trace", dir, n_pages, n_pages_wakeup,
+					 sizeof(struct blk_io_trace),
+					 &blk_pagewriter_callbacks, bt,
+					 PAGEWRITER_PAD_WRITES);
+	if (!bt->pagewriter)
 		goto err;
 
 	bt->act_mask = buts->act_mask;
@@ -446,8 +427,8 @@ err:
 			debugfs_remove(bt->dropped_file);
 		free_percpu(bt->sequence);
 		free_percpu(bt->msg_data);
-		if (bt->rchan)
-			relay_close(bt->rchan);
+		if (bt->pagewriter)
+			pagewriter_close(bt->pagewriter);
 		kfree(bt);
 	}
 	return ret;
@@ -500,7 +481,7 @@ int blk_trace_startstop(struct request_queue *q, int start)
 	} else {
 		if (bt->trace_state == Blktrace_running) {
 			bt->trace_state = Blktrace_stopped;
-			relay_flush(bt->rchan);
+			pagewriter_flush(bt->pagewriter);
 			ret = 0;
 		}
 	}
diff --git a/include/linux/blktrace_api.h b/include/linux/blktrace_api.h
index bdf505d..b14e6e4 100644
--- a/include/linux/blktrace_api.h
+++ b/include/linux/blktrace_api.h
@@ -3,7 +3,7 @@
 
 #ifdef __KERNEL__
 #include <linux/blkdev.h>
-#include <linux/relay.h>
+#include <linux/relay_pagewriter.h>
 #endif
 
 /*
@@ -62,6 +62,7 @@ enum blktrace_notify {
 	__BLK_TN_PROCESS = 0,		/* establish pid/name mapping */
 	__BLK_TN_TIMESTAMP,		/* include system clock */
 	__BLK_TN_MESSAGE,		/* Character string message */
+	__BLK_TN_PADDING,		/* Padding message */
 };
 
 
@@ -89,6 +90,7 @@ enum blktrace_notify {
 #define BLK_TN_PROCESS		(__BLK_TN_PROCESS | BLK_TC_ACT(BLK_TC_NOTIFY))
 #define BLK_TN_TIMESTAMP	(__BLK_TN_TIMESTAMP | BLK_TC_ACT(BLK_TC_NOTIFY))
 #define BLK_TN_MESSAGE		(__BLK_TN_MESSAGE | BLK_TC_ACT(BLK_TC_NOTIFY))
+#define BLK_TN_PADDING		(__BLK_TN_PADDING | BLK_TC_ACT(BLK_TC_NOTIFY))
 
 #define BLK_IO_TRACE_MAGIC	0x65617400
 #define BLK_IO_TRACE_VERSION	0x07
@@ -144,7 +146,7 @@ struct blk_user_trace_setup {
 #if defined(CONFIG_BLK_DEV_IO_TRACE)
 struct blk_trace {
 	int trace_state;
-	struct rchan *rchan;
+	struct pagewriter *pagewriter;
 	unsigned long *sequence;
 	unsigned char *msg_data;
 	u16 act_mask;
@@ -155,7 +157,6 @@ struct blk_trace {
 	struct dentry *dir;
 	struct dentry *dropped_file;
 	struct dentry *msg_file;
-	atomic_t dropped;
 };
 
 extern int blk_trace_ioctl(struct block_device *, unsigned, char __user *);
diff --git a/include/linux/relay.h b/include/linux/relay.h
index 953fc05..2c66026 100644
--- a/include/linux/relay.h
+++ b/include/linux/relay.h
@@ -3,6 +3,7 @@
  *
  * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@...ibm.com), IBM Corp
  * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@...rsys.com)
+ * Copyright (C) 2008 - Tom Zanussi (tzanussi@...il.com)
  *
  * CONFIG_RELAY definitions and declarations
  */
@@ -18,37 +19,38 @@
 #include <linux/fs.h>
 #include <linux/poll.h>
 #include <linux/kref.h>
+#include <linux/pagevec.h>
 
-/* Needs a _much_ better name... */
-#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE)
+/*
+ * relay channel flags
+ */
+#define RCHAN_GLOBAL_BUFFER		0x00000001	/* not using per-cpu */
 
 /*
- * Tracks changes to rchan/rchan_buf structs
+ * For page lists
  */
-#define RELAYFS_CHANNEL_VERSION		7
+struct relay_page {
+	struct page *page;
+	size_t len;
+	struct list_head list;
+	struct relay_page_callbacks *cb;
+	void *private_data;
+};
 
 /*
  * Per-cpu relay channel buffer
  */
-struct rchan_buf
-{
-	void *start;			/* start of channel buffer */
-	void *data;			/* start of current sub-buffer */
-	size_t offset;			/* current offset into sub-buffer */
-	size_t subbufs_produced;	/* count of sub-buffers produced */
-	size_t subbufs_consumed;	/* count of sub-buffers consumed */
+struct rchan_buf {
 	struct rchan *chan;		/* associated channel */
 	wait_queue_head_t read_wait;	/* reader wait queue */
 	struct timer_list timer; 	/* reader wake-up timer */
 	struct dentry *dentry;		/* channel file dentry */
 	struct kref kref;		/* channel buffer refcount */
-	struct page **page_array;	/* array of current buffer pages */
-	unsigned int page_count;	/* number of current buffer pages */
+	struct list_head pages;		/* current set of unconsumed pages */
+	size_t nr_pages;		/* number of unconsumed pages */
+	spinlock_t lock;		/* protect pages list */
+	size_t consumed_offset;		/* bytes consumed in cur page */
 	unsigned int finalized;		/* buffer has been finalized */
-	size_t *padding;		/* padding counts per sub-buffer */
-	size_t prev_padding;		/* temporary variable */
-	size_t bytes_consumed;		/* bytes consumed in cur read subbuf */
-	size_t early_bytes;		/* bytes consumed before VFS inited */
 	unsigned int cpu;		/* this buf's cpu */
 } ____cacheline_aligned;
 
@@ -57,20 +59,15 @@ struct rchan_buf
  */
 struct rchan
 {
-	u32 version;			/* the version of this struct */
-	size_t subbuf_size;		/* sub-buffer size */
-	size_t n_subbufs;		/* number of sub-buffers per buffer */
-	size_t alloc_size;		/* total buffer size allocated */
+	size_t n_pages_wakeup;		/* wake up readers after filling n */
 	struct rchan_callbacks *cb;	/* client callbacks */
 	struct kref kref;		/* channel refcount */
 	void *private_data;		/* for user-defined data */
-	size_t last_toobig;		/* tried to log event > subbuf size */
 	struct rchan_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
-	int is_global;			/* One global buffer ? */
 	struct list_head list;		/* for channel list */
 	struct dentry *parent;		/* parent dentry passed to open */
-	int has_base_filename;		/* has a filename associated? */
 	char base_filename[NAME_MAX];	/* saved base filename */
+	unsigned long flags;		/* relay flags for this channel */
 };
 
 /*
@@ -79,53 +76,11 @@ struct rchan
 struct rchan_callbacks
 {
 	/*
-	 * subbuf_start - called on buffer-switch to a new sub-buffer
-	 * @buf: the channel buffer containing the new sub-buffer
-	 * @subbuf: the start of the new sub-buffer
-	 * @prev_subbuf: the start of the previous sub-buffer
-	 * @prev_padding: unused space at the end of previous sub-buffer
-	 *
-	 * The client should return 1 to continue logging, 0 to stop
-	 * logging.
-	 *
-	 * NOTE: subbuf_start will also be invoked when the buffer is
-	 *       created, so that the first sub-buffer can be initialized
-	 *       if necessary.  In this case, prev_subbuf will be NULL.
-	 *
-	 * NOTE: the client can reserve bytes at the beginning of the new
-	 *       sub-buffer by calling subbuf_start_reserve() in this callback.
-	 */
-	int (*subbuf_start) (struct rchan_buf *buf,
-			     void *subbuf,
-			     void *prev_subbuf,
-			     size_t prev_padding);
-
-	/*
-	 * buf_mapped - relay buffer mmap notification
-	 * @buf: the channel buffer
-	 * @filp: relay file pointer
-	 *
-	 * Called when a relay file is successfully mmapped
-	 */
-        void (*buf_mapped)(struct rchan_buf *buf,
-			   struct file *filp);
-
-	/*
-	 * buf_unmapped - relay buffer unmap notification
-	 * @buf: the channel buffer
-	 * @filp: relay file pointer
-	 *
-	 * Called when a relay file is successfully unmapped
-	 */
-        void (*buf_unmapped)(struct rchan_buf *buf,
-			     struct file *filp);
-	/*
 	 * create_buf_file - create file to represent a relay channel buffer
 	 * @filename: the name of the file to create
 	 * @parent: the parent of the file to create
 	 * @mode: the mode of the file to create
 	 * @buf: the channel buffer
-	 * @is_global: outparam - set non-zero if the buffer should be global
 	 *
 	 * Called during relay_open(), once for each per-cpu buffer,
 	 * to allow the client to create a file to be used to
@@ -136,17 +91,12 @@ struct rchan_callbacks
 	 * The callback should return the dentry of the file created
 	 * to represent the relay buffer.
 	 *
-	 * Setting the is_global outparam to a non-zero value will
-	 * cause relay_open() to create a single global buffer rather
-	 * than the default set of per-cpu buffers.
-	 *
 	 * See Documentation/filesystems/relayfs.txt for more info.
 	 */
 	struct dentry *(*create_buf_file)(const char *filename,
 					  struct dentry *parent,
 					  int mode,
-					  struct rchan_buf *buf,
-					  int *is_global);
+					  struct rchan_buf *buf);
 
 	/*
 	 * remove_buf_file - remove file representing a relay channel buffer
@@ -162,125 +112,60 @@ struct rchan_callbacks
 };
 
 /*
- * CONFIG_RELAY kernel API, kernel/relay.c
- */
-
-struct rchan *relay_open(const char *base_filename,
-			 struct dentry *parent,
-			 size_t subbuf_size,
-			 size_t n_subbufs,
-			 struct rchan_callbacks *cb,
-			 void *private_data);
-extern int relay_late_setup_files(struct rchan *chan,
-				  const char *base_filename,
-				  struct dentry *parent);
-extern void relay_close(struct rchan *chan);
-extern void relay_flush(struct rchan *chan);
-extern void relay_subbufs_consumed(struct rchan *chan,
-				   unsigned int cpu,
-				   size_t consumed);
-extern void relay_reset(struct rchan *chan);
-extern int relay_buf_full(struct rchan_buf *buf);
-
-extern size_t relay_switch_subbuf(struct rchan_buf *buf,
-				  size_t length);
-
-/**
- *	relay_write - write data into the channel
- *	@chan: relay channel
- *	@data: data to be written
- *	@length: number of bytes to write
- *
- *	Writes data into the current cpu's channel buffer.
- *
- *	Protects the buffer by disabling interrupts.  Use this
- *	if you might be logging from interrupt context.  Try
- *	__relay_write() if you know you	won't be logging from
- *	interrupt context.
- */
-static inline void relay_write(struct rchan *chan,
-			       const void *data,
-			       size_t length)
-{
-	unsigned long flags;
-	struct rchan_buf *buf;
-
-	local_irq_save(flags);
-	buf = chan->buf[smp_processor_id()];
-	if (unlikely(buf->offset + length > chan->subbuf_size))
-		length = relay_switch_subbuf(buf, length);
-	memcpy(buf->data + buf->offset, data, length);
-	buf->offset += length;
-	local_irq_restore(flags);
-}
-
-/**
- *	__relay_write - write data into the channel
- *	@chan: relay channel
- *	@data: data to be written
- *	@length: number of bytes to write
- *
- *	Writes data into the current cpu's channel buffer.
- *
- *	Protects the buffer by disabling preemption.  Use
- *	relay_write() if you might be logging from interrupt
- *	context.
+ * Relay page callbacks
  */
-static inline void __relay_write(struct rchan *chan,
-				 const void *data,
-				 size_t length)
+struct relay_page_callbacks
 {
-	struct rchan_buf *buf;
+	/*
+	 * page_released - notification that a page is ready for re-use
+	 * @page: the released page
+	 * @private_data: user-defined data associated with the page
+	 *
+	 * This callback is a notification that a given page has been
+	 * read by userspace and can be re-used.  Always called in
+	 * user context.
+	 */
+	void (*page_released) (struct page *page, void *private_data);
 
-	buf = chan->buf[get_cpu()];
-	if (unlikely(buf->offset + length > buf->chan->subbuf_size))
-		length = relay_switch_subbuf(buf, length);
-	memcpy(buf->data + buf->offset, data, length);
-	buf->offset += length;
-	put_cpu();
-}
+	/*
+	 * page_released - notification that a page has been stolen
+	 * @page: the stolen page
+	 * @private_data: user-defined data associated with the page
+	 *
+	 * This callback is a notification that a given page has been
+	 * stolen by userspace.  The owner may wish to replace it;
+	 * this gives it the opportunity to do so.  Always called in
+	 * user context.
+	 */
+	void (*page_stolen) (struct page *page, void *private_data);
+};
 
-/**
- *	relay_reserve - reserve slot in channel buffer
- *	@chan: relay channel
- *	@length: number of bytes to reserve
- *
- *	Returns pointer to reserved slot, NULL if full.
- *
- *	Reserves a slot in the current cpu's channel buffer.
- *	Does not protect the buffer at all - caller must provide
- *	appropriate synchronization.
+/*
+ * CONFIG_RELAY kernel API, kernel/relay.c
  */
-static inline void *relay_reserve(struct rchan *chan, size_t length)
-{
-	void *reserved;
-	struct rchan_buf *buf = chan->buf[smp_processor_id()];
-
-	if (unlikely(buf->offset + length > buf->chan->subbuf_size)) {
-		length = relay_switch_subbuf(buf, length);
-		if (!length)
-			return NULL;
-	}
-	reserved = buf->data + buf->offset;
-	buf->offset += length;
 
-	return reserved;
-}
-
-/**
- *	subbuf_start_reserve - reserve bytes at the start of a sub-buffer
- *	@buf: relay channel buffer
- *	@length: number of bytes to reserve
- *
- *	Helper function used to reserve bytes at the beginning of
- *	a sub-buffer in the subbuf_start() callback.
- */
-static inline void subbuf_start_reserve(struct rchan_buf *buf,
-					size_t length)
-{
-	BUG_ON(length >= buf->chan->subbuf_size - 1);
-	buf->offset = length;
-}
+extern struct rchan *relay_open(const char *base_filename,
+				struct dentry *parent,
+				size_t n_pages_wakeup,
+				struct rchan_callbacks *cb,
+				void *private_data,
+				unsigned long rchan_flags);
+extern void relay_add_partial_page(struct rchan *chan,
+				   struct page *page,
+				   size_t len,
+				   struct relay_page_callbacks *cb,
+				   void *private_data);
+extern void relay_add_page(struct rchan *chan,
+			   struct page *page,
+			   struct relay_page_callbacks *cb,
+			   void *private_data);
+extern void relay_add_pages(struct rchan *chan,
+			    struct pagevec *pages,
+			    struct relay_page_callbacks *cb,
+			    void *private_data);
+extern void relay_flush(struct rchan *chan);
+extern void relay_close(struct rchan *chan);
+extern void relay_reset(struct rchan *chan);
 
 /*
  * exported relay file operations, kernel/relay.c
diff --git a/include/linux/relay_pagewriter.h b/include/linux/relay_pagewriter.h
new file mode 100644
index 0000000..96b2c04
--- /dev/null
+++ b/include/linux/relay_pagewriter.h
@@ -0,0 +1,294 @@
+/*
+ * linux/include/linux/relay_pagewriter.h
+ *
+ * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@...ibm.com), IBM Corp
+ * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@...rsys.com)
+ * Copyright (C) 2008 - Tom Zanussi (tzanussi@...il.com)
+ *
+ * CONFIG_RELAY definitions and declarations
+ */
+
+#ifndef _LINUX_RELAY_PAGEWRITER_H
+#define _LINUX_RELAY_PAGEWRITER_H
+
+#include <linux/types.h>
+#include <linux/sched.h>
+#include <linux/timer.h>
+#include <linux/wait.h>
+#include <linux/list.h>
+#include <linux/fs.h>
+#include <linux/poll.h>
+#include <linux/kref.h>
+#include <linux/relay.h>
+
+/*
+ * pagewriter flags
+ */
+#define PAGEWRITER_PAD_WRITES		0x00010000	/* don't cross pages */
+#define PAGEWRITER_FLIGHT_MODE		0x00020000	/* n_pages page ring */
+#define PAGEWRITER_LATE_SETUP		0x00040000	/* delay chan create */
+
+/*
+ * Per-cpu pagewriter buffer
+ */
+struct pagewriter_buf {
+	struct relay_page *page;	/* current write page */
+	void *data;			/* address of current page */
+	size_t offset;			/* current offset into page */
+	struct pagewriter *pagewriter;	/* associated pagewriter */
+	struct kref kref;		/* channel buffer refcount */
+	struct list_head pool;		/* current set of unused pages */
+	struct list_head empty_rpage_structs;	/* cached rpage structs */
+	spinlock_t lock;		/* protect pool */
+	size_t n_pages_flight;		/* number full flight pages written */
+	unsigned int cpu;		/* this buf's cpu */
+} ____cacheline_aligned;
+
+/*
+ * Pagewriter data structure
+ */
+struct pagewriter {
+	struct rchan *rchan;		/* associated relay channel */
+	struct pagewriter_callbacks *cb;	/* client callbacks */
+	size_t n_pages;			/* number of pages per buffer */
+	size_t n_pages_wakeup;		/* save for LATE */
+	struct kref kref;		/* channel refcount */
+	void *private_data;		/* for user-defined data */
+	struct pagewriter_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
+	struct list_head list;		/* for channel list */
+	atomic_t dropped;		/* dropped events due to buffer-full */
+	char base_filename[NAME_MAX];	/* saved base filename, for LATE */
+	unsigned long flags;		/* pagewriter flags for this channel */
+	size_t end_reserve;		/* reserve at end of page for PAD  */
+};
+
+extern void pagewriter_pad_switch_page(struct pagewriter_buf *buf);
+extern void pagewriter_pad_flight_switch_page(struct pagewriter_buf *buf);
+extern void pagewriter_nopad_switch_page(struct pagewriter_buf *buf);
+extern void pagewriter_nopad_flight_switch_page(struct pagewriter_buf *buf);
+
+/*
+ * Pagewriter client callbacks
+ */
+struct pagewriter_callbacks {
+	/*
+	 * new_page - called on switch to a new page
+	 * @buf: the channel buffer containing the new page
+	 * @page_data: the start of the new page
+	 *
+	 * This is simply a notification that a new page has been
+	 * switched to.  The default version does nothing.  Clients
+	 * can use the channel private_data to track previous pages,
+	 * determine whether this is the first page, etc.
+	 *
+	 * NOTE: the client can reserve bytes at the beginning of the new
+	 *       page by calling page_start_reserve() in this callback.
+	 */
+	void (*new_page) (struct pagewriter_buf *buf,
+			  void *page_data);
+
+	/*
+	 * switch_page - page switch callback
+	 * @buf: the channel buffer
+	 *
+	 * This callback can be used to replace the complete write
+	 * path.  Normally clients wouldn't override this and would
+	 * use the default version instead.
+	 *
+	 * Switches to a new page and performs page-switch tasks.
+	 */
+	void (*switch_page)(struct pagewriter_buf *buf);
+
+	/*
+	 * write_padding - callback for writing padding events
+	 * @buf: the channel buffer
+	 * @length: the length of the padding
+	 * @reserved: a pointer to the start of padding
+	 *
+	 * This callback can be used to write a padding event when
+	 * pagewriter_reserve can't write a complete event.  The
+	 * length of the padding is guaranteed to be at least as large
+	 * as the end_reserve size passed into pagewriter_reserve().
+	 */
+	void (*write_padding)(struct pagewriter_buf *buf,
+			      size_t length,
+			      void *reserved);
+};
+
+/**
+ *	pagewriter_write - write data into the channel, without padding
+ *	@pagewriter: pagewriter
+ *	@data: data to be written
+ *	@length: number of bytes to write
+ *
+ *	Writes data into the current cpu's channel buffer, crossing
+ *	page boundaries.
+ *
+ *	Protects the buffer by disabling interrupts.  Use this if you
+ *	might be logging from interrupt context.  Try
+ *	__pagewriter_write() if you know you won't be logging from
+ *	interrupt context.
+ */
+static inline void pagewriter_write(struct pagewriter *pagewriter,
+				    const void *data,
+				    size_t length)
+{
+	size_t remainder = length;
+	struct pagewriter_buf *buf;
+	unsigned long flags;
+	void *reserved;
+
+	local_irq_save(flags);
+	buf = pagewriter->buf[smp_processor_id()];
+	reserved = buf->data + buf->offset;
+	if (buf->offset + length > PAGE_SIZE) {
+		if (!buf->data)
+			goto dropped;
+		if (length > PAGE_SIZE)
+			goto dropped;
+		remainder = length - (PAGE_SIZE - buf->offset);
+		pagewriter->cb->switch_page(buf);
+		if (!buf->data)
+			goto dropped;
+		length -= remainder;
+		memcpy(buf->data, data + length, remainder);
+	}
+	memcpy(reserved, data, length);
+	buf->offset += remainder;
+	local_irq_restore(flags);
+
+	return;
+dropped:
+	local_irq_restore(flags);
+	atomic_inc(&buf->pagewriter->dropped);
+}
+
+/**
+ *	__pagewriter_write - write data into the channel, without padding
+ *	@pagewriter: pagewriter
+ *	@data: data to be written
+ *	@length: number of bytes to write
+ *
+ *	Writes data into the current cpu's channel buffer, crossing
+ *	page boundaries.
+ *
+ *	Protects the buffer by disabling preemption.  Use
+ *	pagewriter_write() if you might be logging from interrupt
+ *	context.
+ */
+static inline void __pagewriter_write(struct pagewriter *pagewriter,
+				      const void *data,
+				      size_t length)
+{
+	size_t remainder = length;
+	struct pagewriter_buf *buf;
+	void *reserved;
+
+	buf = pagewriter->buf[get_cpu()];
+	reserved = buf->data + buf->offset;
+	if (buf->offset + length > PAGE_SIZE) {
+		if (!buf->data)
+			goto dropped;
+		if (length > PAGE_SIZE)
+			goto dropped;
+		remainder = length - (PAGE_SIZE - buf->offset);
+		pagewriter->cb->switch_page(buf);
+		if (!buf->data)
+			goto dropped;
+		length -= remainder;
+		memcpy(buf->data, data + length, remainder);
+	}
+	memcpy(reserved, data, length);
+	buf->offset += remainder;
+	put_cpu_no_resched();
+
+	return;
+dropped:
+	put_cpu_no_resched();
+	atomic_inc(&buf->pagewriter->dropped);
+}
+
+/**
+ *	pagewriter_reserve - reserve slot in channel buffer
+ *	@pagewriter: pagewriter
+ *	@length: number of bytes to reserve
+ *
+ *	Returns pointer to reserved slot, NULL if full.
+ *
+ *	Reserves a slot in the current cpu's channel buffer.
+ *	Does not protect the buffer at all - caller must provide
+ *	appropriate synchronization.
+ *
+ *	If the event won't fit, at least end_reserve bytes are
+ *	reserved for a padding event, and the write_padding() callback
+ *	function is called to allow the client to write the padding
+ *	event before switching to the next page.  The write_padding()
+ *	callback is passed a pointer to the start of the padding along
+ *	with its length.
+ */
+
+static inline void *pagewriter_reserve(struct pagewriter *pagewriter,
+				       size_t length)
+{
+	struct pagewriter_buf *buf;
+	void *reserved;
+
+	buf = pagewriter->buf[smp_processor_id()];
+	reserved = buf->data + buf->offset;
+	if (buf->offset + length > PAGE_SIZE - buf->pagewriter->end_reserve) {
+		size_t padding = PAGE_SIZE - buf->offset;
+		if (length != padding) {
+			if (!buf->data)
+				goto dropped;
+			if (length > PAGE_SIZE - buf->pagewriter->end_reserve)
+				goto dropped;
+			if (padding) {
+				reserved = buf->data + PAGE_SIZE - padding;
+				pagewriter->cb->write_padding(buf, padding,
+							      reserved);
+			}
+			pagewriter->cb->switch_page(buf);
+			if (!buf->data)
+				goto dropped;
+			reserved = buf->data;
+		}
+	}
+	buf->offset += length;
+
+	return reserved;
+dropped:
+	atomic_inc(&buf->pagewriter->dropped);
+	return NULL;
+}
+
+/**
+ *	page_start_reserve - reserve bytes at the start of a page
+ *	@buf: pagewriter channel buffer
+ *	@length: number of bytes to reserve
+ *
+ *	Helper function used to reserve bytes at the beginning of
+ *	a page in the new_page() callback.
+ */
+static inline void page_start_reserve(struct pagewriter_buf *buf,
+				      size_t length)
+{
+	BUG_ON(length >= PAGE_SIZE - buf->pagewriter->end_reserve - 1);
+	buf->offset = length;
+}
+
+extern struct pagewriter *pagewriter_open(const char *base_filename,
+					  struct dentry *parent,
+					  size_t n_pages,
+					  size_t n_pages_wakeup,
+					  size_t end_reserved,
+					  struct pagewriter_callbacks *cb,
+					  void *private_data,
+					  unsigned long rchan_flags);
+extern void pagewriter_flush(struct pagewriter *pagewriter);
+extern void pagewriter_close(struct pagewriter *pagewriter);
+extern void pagewriter_reset(struct pagewriter *pagewriter);
+extern void pagewriter_save_flight_data(struct pagewriter *pagewriter);
+extern int pagewriter_late_setup(struct pagewriter *pagewriter,
+				 struct dentry *parent);
+
+#endif /* _LINUX_RELAY_PAGEWRITER_H */
diff --git a/kernel/Makefile b/kernel/Makefile
index 066550a..81d28ce 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -80,7 +80,7 @@ obj-$(CONFIG_PREEMPT_RCU) += rcupreempt.o
 ifeq ($(CONFIG_PREEMPT_RCU),y)
 obj-$(CONFIG_RCU_TRACE) += rcupreempt_trace.o
 endif
-obj-$(CONFIG_RELAY) += relay.o
+obj-$(CONFIG_RELAY) += relay.o relay_pagewriter.o
 obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
 obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
 obj-$(CONFIG_TASKSTATS) += taskstats.o tsacct.o
diff --git a/kernel/relay.c b/kernel/relay.c
index 8d13a78..04edb1d 100644
--- a/kernel/relay.c
+++ b/kernel/relay.c
@@ -5,6 +5,7 @@
  *
  * Copyright (C) 2002-2005 - Tom Zanussi (zanussi@...ibm.com), IBM Corp
  * Copyright (C) 1999-2005 - Karim Yaghmour (karim@...rsys.com)
+ * Copyright (C) 2008 - Tom Zanussi (tzanussi@...il.com)
  *
  * Moved to kernel/relay.c by Paul Mundt, 2006.
  * November 2006 - CPU hotplug support by Mathieu Desnoyers
@@ -18,400 +19,431 @@
 #include <linux/module.h>
 #include <linux/string.h>
 #include <linux/relay.h>
-#include <linux/vmalloc.h>
 #include <linux/mm.h>
 #include <linux/cpu.h>
 #include <linux/splice.h>
+#include <linux/debugfs.h>
 
 /* list of open channels, for cpu hotplug */
 static DEFINE_MUTEX(relay_channels_mutex);
 static LIST_HEAD(relay_channels);
 
+/* forward declarations */
+static void setup_callbacks(struct rchan *chan, struct rchan_callbacks *cb);
+static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu);
+static inline void relay_wakeup_readers(struct rchan_buf *buf);
+static void relay_close_buf(struct rchan_buf *buf);
+static void relay_destroy_channel(struct kref *kref);
+static inline struct relay_page *__relay_get_rpage(struct rchan_buf *buf);
+static inline void __relay_add_page(struct rchan_buf *buf,
+				    struct relay_page *rpage);
+static inline void __relay_add_page_nolock(struct rchan_buf *buf,
+					   struct relay_page *rpage);
+static void __relay_reset(struct rchan_buf *buf, unsigned int init);
+
 /*
- * close() vm_op implementation for relay file mapping.
+ * relay kernel API
  */
-static void relay_file_mmap_close(struct vm_area_struct *vma)
-{
-	struct rchan_buf *buf = vma->vm_private_data;
-	buf->chan->cb->buf_unmapped(buf, vma->vm_file);
-}
 
-/*
- * fault() vm_op implementation for relay file mapping.
+/**
+ *	relay_open - create a new relay channel
+ *	@base_filename: base name of files to create, %NULL for buffering only
+ *	@parent: dentry of parent directory, %NULL for root directory or buffer
+ *	@n_pages_wakeup: wakeup readers after this many pages, 0 means never
+ *	@cb: client callback functions
+ *	@private_data: user-defined data
+ *	@flags: relay channel flags
+ *
+ *	Returns channel pointer if successful, %NULL otherwise.
+ *
+ *	Creates per-cpu channel lists (or a single list if the
+ *	RCHAN_GLOBAL_BUFFER flag is used) to receive pages from
+ *	tracers via relay_add_page()/relay_add_pages().  These lists
+ *	will be drained by userspace via read(2), splice(2), or
+ *	sendfile(2).  Pages added to relay will be either returned to
+ *	their owners after userspace has finished reading them or the
+ *	owners will be notified if they've been stolen (see
+ *	relay_add_page).
+ *
+ *	buffer files will be named base_filename0...base_filenameN-1.
+ *	File permissions will be %S_IRUSR.
  */
-static int relay_buf_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
+struct rchan *relay_open(const char *base_filename,
+			 struct dentry *parent,
+			 size_t n_pages_wakeup,
+			 struct rchan_callbacks *cb,
+			 void *private_data,
+			 unsigned long rchan_flags)
 {
-	struct page *page;
-	struct rchan_buf *buf = vma->vm_private_data;
-	pgoff_t pgoff = vmf->pgoff;
+	unsigned int i;
+	struct rchan *chan;
 
-	if (!buf)
-		return VM_FAULT_OOM;
+	chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
+	if (!chan)
+		return NULL;
 
-	page = vmalloc_to_page(buf->start + (pgoff << PAGE_SHIFT));
-	if (!page)
-		return VM_FAULT_SIGBUS;
-	get_page(page);
-	vmf->page = page;
+	chan->n_pages_wakeup = n_pages_wakeup;
+	chan->parent = parent;
+	chan->flags = rchan_flags;
 
-	return 0;
-}
+	chan->private_data = private_data;
+	strlcpy(chan->base_filename, base_filename, NAME_MAX);
 
-/*
- * vm_ops for relay file mappings.
- */
-static struct vm_operations_struct relay_file_mmap_ops = {
-	.fault = relay_buf_fault,
-	.close = relay_file_mmap_close,
-};
+	setup_callbacks(chan, cb);
+	kref_init(&chan->kref);
 
-/*
- * allocate an array of pointers of struct page
- */
-static struct page **relay_alloc_page_array(unsigned int n_pages)
-{
-	struct page **array;
-	size_t pa_size = n_pages * sizeof(struct page *);
-
-	if (pa_size > PAGE_SIZE) {
-		array = vmalloc(pa_size);
-		if (array)
-			memset(array, 0, pa_size);
-	} else {
-		array = kzalloc(pa_size, GFP_KERNEL);
+	mutex_lock(&relay_channels_mutex);
+	for_each_online_cpu(i) {
+		chan->buf[i] = relay_open_buf(chan, i);
+		if (!chan->buf[i])
+			goto free_bufs;
 	}
-	return array;
+	list_add(&chan->list, &relay_channels);
+	mutex_unlock(&relay_channels_mutex);
+
+	return chan;
+
+free_bufs:
+	for_each_online_cpu(i) {
+		if (!chan->buf[i])
+			break;
+		relay_close_buf(chan->buf[i]);
+	}
+
+	kref_put(&chan->kref, relay_destroy_channel);
+	mutex_unlock(&relay_channels_mutex);
+	return NULL;
 }
+EXPORT_SYMBOL_GPL(relay_open);
 
-/*
- * free an array of pointers of struct page
- */
-static void relay_free_page_array(struct page **array)
-{
-	if (is_vmalloc_addr(array))
-		vfree(array);
-	else
-		kfree(array);
+/**
+ *	relay_add_partial_page - add a partial page to relay
+ *	@chan: the relay channel
+ *	@page: the page to add
+ *	@len: the length of data in the page
+ *	@cb: relay_page callbacks associated with the page
+ *	@private_data: user data to be associated with the relay_page
+ *
+ *	Add a partial page to relay, meaning a page containing <=
+ *	PAGE_SIZE bytes.  See comments for relay_add_page(); this is
+ *	the same except that it allows the length of data contained in
+ *	the page to be specified, if it contains less than a page's
+ *	worth (or even if it contains a full page's worth -
+ *	relay_add_page() actually calls this internally.).
+ */
+void relay_add_partial_page(struct rchan *chan,
+			    struct page *page,
+			    size_t len,
+			    struct relay_page_callbacks *cb,
+			    void *private_data)
+{
+	struct relay_page *rpage;
+	struct rchan_buf *buf;
+
+	buf = chan->buf[get_cpu()];
+	put_cpu_no_resched();
+	rpage = __relay_get_rpage(buf);
+
+	if (likely(rpage)) {
+		rpage->page = page;
+		rpage->len = len;
+		set_page_private(rpage->page, (unsigned long)buf);
+		rpage->cb = cb;
+		rpage->private_data = private_data;
+		__relay_add_page(buf, rpage);
+	}
 }
+EXPORT_SYMBOL_GPL(relay_add_partial_page);
 
 /**
- *	relay_mmap_buf: - mmap channel buffer to process address space
- *	@buf: relay channel buffer
- *	@vma: vm_area_struct describing memory to be mapped
- *
- *	Returns 0 if ok, negative on error
+ *	relay_add_page - add a page to relay
+ *	@chan: the relay channel
+ *	@page: the page to add
+ *	@cb: relay_page callbacks associated with the page
+ *	@private_data: user data to be associated with the relay_page
  *
- *	Caller should already have grabbed mmap_sem.
+ *	Add a page to relay.  When the page has been read by
+ *	userspace, the owner will be notified.  If the page has been
+ *	copied and is available for re-use by the owner, the
+ *	relay_page_callbacks page_released() callback will be invoked.
+ *	If the page has been stolen, the owner will be notified of
+ *	this fact via the page_stolen() callback; because the
+ *	page_stolen() (and page_released()) callbacks are called from
+ *	user context, the owner can allocate a new page using
+ *	GFP_KERNEL if it wants to.
  */
-static int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
+void relay_add_page(struct rchan *chan,
+		    struct page *page,
+		    struct relay_page_callbacks *cb,
+		    void *private_data)
 {
-	unsigned long length = vma->vm_end - vma->vm_start;
-	struct file *filp = vma->vm_file;
-
-	if (!buf)
-		return -EBADF;
+	relay_add_partial_page(chan, page, PAGE_SIZE, cb, private_data);
+}
+EXPORT_SYMBOL_GPL(relay_add_page);
 
-	if (length != (unsigned long)buf->chan->alloc_size)
-		return -EINVAL;
+/**
+ *	relay_add_pages - add a set of pages to relay
+ *	@chan: the relay channel
+ *	@pages: the pages to add
+ *	@cb: relay_page callbacks associated with the pages
+ *	@private_data: user data to be associated with the relay_pages
+ *
+ *	Add a set of pages to relay.  The added pages are guaranteed
+ *	to be inserted together as a group and in the same order as in
+ *	the pagevec.  The comments for relay_add_page() apply in the
+ *	same way to relay_add_pages().
+ */
+void relay_add_pages(struct rchan *chan,
+		     struct pagevec *pages,
+		     struct relay_page_callbacks *cb,
+		     void *private_data)
+{
+	int i, nr_pages = pagevec_count(pages);
+	struct relay_page *rpage;
+	struct rchan_buf *buf;
+	unsigned long flags;
 
-	vma->vm_ops = &relay_file_mmap_ops;
-	vma->vm_flags |= VM_DONTEXPAND;
-	vma->vm_private_data = buf;
-	buf->chan->cb->buf_mapped(buf, filp);
+	buf = chan->buf[get_cpu()];
+	put_cpu_no_resched();
+	spin_lock_irqsave(&buf->lock, flags);
+	for (i = 0; i < nr_pages; i--) {
+		rpage = __relay_get_rpage(buf);
+
+		if (likely(rpage)) {
+			rpage->page = pages->pages[i];
+			rpage->len = PAGE_SIZE;
+			set_page_private(rpage->page, (unsigned long)buf);
+			rpage->cb = cb;
+			rpage->private_data = private_data;
+			__relay_add_page_nolock(buf, rpage);
+		}
+	}
+	spin_unlock_irqrestore(&buf->lock, flags);
 
-	return 0;
+	relay_wakeup_readers(buf);
 }
+EXPORT_SYMBOL_GPL(relay_add_pages);
 
 /**
- *	relay_alloc_buf - allocate a channel buffer
- *	@buf: the buffer struct
- *	@size: total size of the buffer
+ *	relay_flush - flush the channel
+ *	@chan: the channel
  *
- *	Returns a pointer to the resulting buffer, %NULL if unsuccessful. The
- *	passed in size will get page aligned, if it isn't already.
+ *	Flushes all channel buffers, i.e. wakes up readers
  */
-static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
+void relay_flush(struct rchan *chan)
 {
-	void *mem;
-	unsigned int i, j, n_pages;
+	unsigned int i;
+	size_t prev_wakeup = chan->n_pages_wakeup;
 
-	*size = PAGE_ALIGN(*size);
-	n_pages = *size >> PAGE_SHIFT;
+	if (!chan)
+		return;
 
-	buf->page_array = relay_alloc_page_array(n_pages);
-	if (!buf->page_array)
-		return NULL;
+	if (prev_wakeup)
+		chan->n_pages_wakeup = 1;
 
-	for (i = 0; i < n_pages; i++) {
-		buf->page_array[i] = alloc_page(GFP_KERNEL);
-		if (unlikely(!buf->page_array[i]))
-			goto depopulate;
-		set_page_private(buf->page_array[i], (unsigned long)buf);
+	if (chan->flags & RCHAN_GLOBAL_BUFFER && chan->buf[0]) {
+		chan->n_pages_wakeup = prev_wakeup;
+		return;
 	}
-	mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
-	if (!mem)
-		goto depopulate;
-
-	memset(mem, 0, *size);
-	buf->page_count = n_pages;
-	return mem;
-
-depopulate:
-	for (j = 0; j < i; j++)
-		__free_page(buf->page_array[j]);
-	relay_free_page_array(buf->page_array);
-	return NULL;
+
+	mutex_lock(&relay_channels_mutex);
+	for_each_possible_cpu(i)
+		if (chan->buf[i])
+			relay_wakeup_readers(chan->buf[i]);
+	mutex_unlock(&relay_channels_mutex);
+	chan->n_pages_wakeup = prev_wakeup;
 }
+EXPORT_SYMBOL_GPL(relay_flush);
 
 /**
- *	relay_create_buf - allocate and initialize a channel buffer
- *	@chan: the relay channel
+ *	relay_close - close the channel
+ *	@chan: the channel
  *
- *	Returns channel buffer if successful, %NULL otherwise.
+ *	Closes all channel buffers and frees the channel.
  */
-static struct rchan_buf *relay_create_buf(struct rchan *chan)
+void relay_close(struct rchan *chan)
 {
-	struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
-	if (!buf)
-		return NULL;
-
-	buf->padding = kmalloc(chan->n_subbufs * sizeof(size_t *), GFP_KERNEL);
-	if (!buf->padding)
-		goto free_buf;
+	unsigned int i;
 
-	buf->start = relay_alloc_buf(buf, &chan->alloc_size);
-	if (!buf->start)
-		goto free_buf;
+	if (!chan)
+		return;
 
-	buf->chan = chan;
-	kref_get(&buf->chan->kref);
-	return buf;
+	mutex_lock(&relay_channels_mutex);
+	if (chan->flags & RCHAN_GLOBAL_BUFFER && chan->buf[0])
+		relay_close_buf(chan->buf[0]);
+	else
+		for_each_possible_cpu(i)
+			if (chan->buf[i])
+				relay_close_buf(chan->buf[i]);
 
-free_buf:
-	kfree(buf->padding);
-	kfree(buf);
-	return NULL;
+	list_del(&chan->list);
+	kref_put(&chan->kref, relay_destroy_channel);
+	mutex_unlock(&relay_channels_mutex);
 }
+EXPORT_SYMBOL_GPL(relay_close);
 
 /**
- *	relay_destroy_channel - free the channel struct
- *	@kref: target kernel reference that contains the relay channel
+ *	relay_reset - reset the channel
+ *	@chan: the channel
  *
- *	Should only be called from kref_put().
- */
-static void relay_destroy_channel(struct kref *kref)
-{
-	struct rchan *chan = container_of(kref, struct rchan, kref);
-	kfree(chan);
-}
-
-/**
- *	relay_destroy_buf - destroy an rchan_buf struct and associated buffer
- *	@buf: the buffer struct
+ *	This has the effect of erasing all data from all channel buffers
+ *	and restarting the channel in its initial state.
+ *
+ *	NOTE. Care should be taken that the channel isn't actually
+ *	being used by anything when this call is made.
  */
-static void relay_destroy_buf(struct rchan_buf *buf)
+void relay_reset(struct rchan *chan)
 {
-	struct rchan *chan = buf->chan;
 	unsigned int i;
 
-	if (likely(buf->start)) {
-		vunmap(buf->start);
-		for (i = 0; i < buf->page_count; i++)
-			__free_page(buf->page_array[i]);
-		relay_free_page_array(buf->page_array);
+	if (!chan)
+		return;
+
+	if (chan->flags & RCHAN_GLOBAL_BUFFER && chan->buf[0]) {
+		__relay_reset(chan->buf[0], 0);
+		return;
 	}
-	chan->buf[buf->cpu] = NULL;
-	kfree(buf->padding);
-	kfree(buf);
-	kref_put(&chan->kref, relay_destroy_channel);
+
+	mutex_lock(&relay_channels_mutex);
+	for_each_online_cpu(i)
+		if (chan->buf[i])
+			__relay_reset(chan->buf[i], 0);
+	mutex_unlock(&relay_channels_mutex);
 }
+EXPORT_SYMBOL_GPL(relay_reset);
 
-/**
- *	relay_remove_buf - remove a channel buffer
- *	@kref: target kernel reference that contains the relay buffer
- *
- *	Removes the file from the fileystem, which also frees the
- *	rchan_buf_struct and the channel buffer.  Should only be called from
- *	kref_put().
+/*
+ * end relay kernel API
  */
-static void relay_remove_buf(struct kref *kref)
-{
-	struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
-	buf->chan->cb->remove_buf_file(buf->dentry);
-	relay_destroy_buf(buf);
-}
 
 /**
- *	relay_buf_empty - boolean, is the channel buffer empty?
- *	@buf: channel buffer
- *
- *	Returns 1 if the buffer is empty, 0 otherwise.
+ *	relay_update_filesize - increase relay file i_size by length
+ *	@buf: relay channel buffer
+ *	@length: length to add
  */
-static int relay_buf_empty(struct rchan_buf *buf)
+static inline void relay_update_filesize(struct rchan_buf *buf, size_t length)
 {
-	return (buf->subbufs_produced - buf->subbufs_consumed) ? 0 : 1;
+	buf->dentry->d_inode->i_size +=	length;
 }
 
 /**
- *	relay_buf_full - boolean, is the channel buffer full?
- *	@buf: channel buffer
- *
- *	Returns 1 if the buffer is full, 0 otherwise.
+ *	__relay_get_rpage - get an empty relay page struct
+ *	@buf: the buffer struct
  */
-int relay_buf_full(struct rchan_buf *buf)
+static inline struct relay_page *__relay_get_rpage(struct rchan_buf *buf)
 {
-	size_t ready = buf->subbufs_produced - buf->subbufs_consumed;
-	return (ready >= buf->chan->n_subbufs) ? 1 : 0;
+	return kmalloc(sizeof(struct relay_page), GFP_ATOMIC);
 }
-EXPORT_SYMBOL_GPL(relay_buf_full);
-
-/*
- * High-level relay kernel API and associated functions.
- */
 
-/*
- * rchan_callback implementations defining default channel behavior.  Used
- * in place of corresponding NULL values in client callback struct.
- */
-
-/*
- * subbuf_start() default callback.  Does nothing.
- */
-static int subbuf_start_default_callback (struct rchan_buf *buf,
-					  void *subbuf,
-					  void *prev_subbuf,
-					  size_t prev_padding)
+static inline void __relay_add_page_nolock(struct rchan_buf *buf,
+					   struct relay_page *rpage)
 {
-	if (relay_buf_full(buf))
-		return 0;
-
-	return 1;
+	list_add_tail(&rpage->list, &buf->pages);
+	buf->nr_pages++;
+	relay_update_filesize(buf, rpage->len);
 }
 
-/*
- * buf_mapped() default callback.  Does nothing.
- */
-static void buf_mapped_default_callback(struct rchan_buf *buf,
-					struct file *filp)
+static inline void __relay_add_page(struct rchan_buf *buf,
+				    struct relay_page *rpage)
 {
-}
+	unsigned long flags;
 
-/*
- * buf_unmapped() default callback.  Does nothing.
- */
-static void buf_unmapped_default_callback(struct rchan_buf *buf,
-					  struct file *filp)
-{
+	spin_lock_irqsave(&buf->lock, flags);
+	__relay_add_page_nolock(buf, rpage);
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	relay_wakeup_readers(buf);
 }
 
-/*
- * create_buf_file_create() default callback.  Does nothing.
+/**
+ *	__relay_remove_page - remove a page from relay
+ *	@buf: the buffer struct
+ *	@rpage: struct relay_page
  */
-static struct dentry *create_buf_file_default_callback(const char *filename,
-						       struct dentry *parent,
-						       int mode,
-						       struct rchan_buf *buf,
-						       int *is_global)
+static void __relay_remove_page(struct rchan_buf *buf,
+				struct relay_page *rpage)
 {
-	return NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_del(&rpage->list);
+	buf->nr_pages--;
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	kfree(rpage);
 }
 
-/*
- * remove_buf_file() default callback.  Does nothing.
+/**
+ *	__relay_release_page - remove page from relay and notify owner
+ *	@buf: the buffer struct
+ *	@rpage: struct relay_page
  */
-static int remove_buf_file_default_callback(struct dentry *dentry)
+static void __relay_release_page(struct rchan_buf *buf,
+				 struct relay_page *rpage)
 {
-	return -EINVAL;
-}
+	if (rpage->cb && rpage->cb->page_released)
+		rpage->cb->page_released(rpage->page, rpage->private_data);
 
-/* relay channel default callbacks */
-static struct rchan_callbacks default_channel_callbacks = {
-	.subbuf_start = subbuf_start_default_callback,
-	.buf_mapped = buf_mapped_default_callback,
-	.buf_unmapped = buf_unmapped_default_callback,
-	.create_buf_file = create_buf_file_default_callback,
-	.remove_buf_file = remove_buf_file_default_callback,
-};
+	__relay_remove_page(buf, rpage);
+}
 
 /**
- *	wakeup_readers - wake up readers waiting on a channel
- *	@data: contains the channel buffer
+ *	relay_destroy_channel - free the channel struct
+ *	@kref: target kernel reference that contains the relay channel
  *
- *	This is the timer function used to defer reader waking.
+ *	Should only be called from kref_put().
  */
-static void wakeup_readers(unsigned long data)
+static void relay_destroy_channel(struct kref *kref)
 {
-	struct rchan_buf *buf = (struct rchan_buf *)data;
-	wake_up_interruptible(&buf->read_wait);
+	struct rchan *chan = container_of(kref, struct rchan, kref);
+	kfree(chan);
 }
 
 /**
- *	__relay_reset - reset a channel buffer
- *	@buf: the channel buffer
- *	@init: 1 if this is a first-time initialization
- *
- *	See relay_reset() for description of effect.
+ *	relay_destroy_buf - destroy an rchan_buf struct and release pages
+ *	@buf: the buffer struct
  */
-static void __relay_reset(struct rchan_buf *buf, unsigned int init)
+static void relay_destroy_buf(struct rchan_buf *buf)
 {
-	size_t i;
-
-	if (init) {
-		init_waitqueue_head(&buf->read_wait);
-		kref_init(&buf->kref);
-		setup_timer(&buf->timer, wakeup_readers, (unsigned long)buf);
-	} else
-		del_timer_sync(&buf->timer);
-
-	buf->subbufs_produced = 0;
-	buf->subbufs_consumed = 0;
-	buf->bytes_consumed = 0;
-	buf->finalized = 0;
-	buf->data = buf->start;
-	buf->offset = 0;
+	struct rchan *chan = buf->chan;
+	struct relay_page *rpage, *rpage2;
 
-	for (i = 0; i < buf->chan->n_subbufs; i++)
-		buf->padding[i] = 0;
+	list_for_each_entry_safe(rpage, rpage2, &buf->pages, list)
+		__relay_release_page(buf, rpage);
 
-	buf->chan->cb->subbuf_start(buf, buf->data, NULL, 0);
+	chan->buf[buf->cpu] = NULL;
+	kfree(buf);
+	kref_put(&chan->kref, relay_destroy_channel);
 }
 
 /**
- *	relay_reset - reset the channel
- *	@chan: the channel
- *
- *	This has the effect of erasing all data from all channel buffers
- *	and restarting the channel in its initial state.  The buffers
- *	are not freed, so any mappings are still in effect.
+ *	relay_remove_buf - remove a channel buffer
+ *	@kref: target kernel reference that contains the relay buffer
  *
- *	NOTE. Care should be taken that the channel isn't actually
- *	being used by anything when this call is made.
+ *	Removes the file from the fileystem, which also frees the
+ *	rchan_buf_struct and the channel buffer.  Should only be called from
+ *	kref_put().
  */
-void relay_reset(struct rchan *chan)
+static void relay_remove_buf(struct kref *kref)
 {
-	unsigned int i;
-
-	if (!chan)
-		return;
-
-	if (chan->is_global && chan->buf[0]) {
-		__relay_reset(chan->buf[0], 0);
-		return;
-	}
-
-	mutex_lock(&relay_channels_mutex);
-	for_each_online_cpu(i)
-		if (chan->buf[i])
-			__relay_reset(chan->buf[i], 0);
-	mutex_unlock(&relay_channels_mutex);
+	struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
+	buf->chan->cb->remove_buf_file(buf->dentry);
+	relay_destroy_buf(buf);
 }
-EXPORT_SYMBOL_GPL(relay_reset);
 
-static inline void relay_set_buf_dentry(struct rchan_buf *buf,
-					struct dentry *dentry)
+/**
+ *	relay_close_buf - close a channel buffer
+ *	@buf: channel buffer
+ *
+ *	Marks the buffer finalized.  The channel buffer and channel
+ *	buffer data structure are then freed automatically when the
+ *	last reference is given up.
+ */
+static void relay_close_buf(struct rchan_buf *buf)
 {
-	buf->dentry = dentry;
-	buf->dentry->d_inode->i_size = buf->early_bytes;
+	buf->finalized = 1;
+	del_timer_sync(&buf->timer);
+	kref_put(&buf->kref, relay_remove_buf);
 }
 
 static struct dentry *relay_create_buf_file(struct rchan *chan,
@@ -428,14 +460,33 @@ static struct dentry *relay_create_buf_file(struct rchan *chan,
 
 	/* Create file in fs */
 	dentry = chan->cb->create_buf_file(tmpname, chan->parent,
-					   S_IRUSR, buf,
-					   &chan->is_global);
+					   S_IRUSR, buf);
 
 	kfree(tmpname);
 
 	return dentry;
 }
 
+/**
+ *	relay_create_buf - allocate and initialize a channel buffer
+ *	@chan: the relay channel
+ *
+ *	Returns channel buffer if successful, %NULL otherwise.
+ */
+static struct rchan_buf *relay_create_buf(struct rchan *chan)
+{
+	struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
+	if (!buf)
+		return NULL;
+
+	spin_lock_init(&buf->lock);
+	INIT_LIST_HEAD(&buf->pages);
+	buf->chan = chan;
+	kref_get(&buf->chan->kref);
+
+	return buf;
+}
+
 /*
  *	relay_open_buf - create a new relay channel buffer
  *
@@ -446,24 +497,23 @@ static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
  	struct rchan_buf *buf = NULL;
 	struct dentry *dentry;
 
- 	if (chan->is_global)
+	if (chan->flags & RCHAN_GLOBAL_BUFFER)
 		return chan->buf[0];
 
 	buf = relay_create_buf(chan);
 	if (!buf)
 		return NULL;
 
-	if (chan->has_base_filename) {
-		dentry = relay_create_buf_file(chan, buf, cpu);
-		if (!dentry)
-			goto free_buf;
-		relay_set_buf_dentry(buf, dentry);
-	}
+	dentry = relay_create_buf_file(chan, buf, cpu);
+	if (!dentry)
+		goto free_buf;
+	buf->dentry = dentry;
+	buf->dentry->d_inode->i_size = 0;
 
  	buf->cpu = cpu;
  	__relay_reset(buf, 1);
 
- 	if(chan->is_global) {
+	if (chan->flags & RCHAN_GLOBAL_BUFFER) {
  		chan->buf[0] = buf;
  		buf->cpu = 0;
   	}
@@ -476,393 +526,109 @@ free_buf:
 }
 
 /**
- *	relay_close_buf - close a channel buffer
- *	@buf: channel buffer
+ *	relay_wakeup_readers - wake up readers if applicable
+ *	@buf: relay channel buffer
  *
- *	Marks the buffer finalized and restores the default callbacks.
- *	The channel buffer and channel buffer data structure are then freed
- *	automatically when the last reference is given up.
+ *	Will wake up readers after each buf->n_pages_wakeup pages have
+ *	been produced.  To do no waking up, simply pass 0 into relay
+ *	open for this value.
  */
-static void relay_close_buf(struct rchan_buf *buf)
+static inline void relay_wakeup_readers(struct rchan_buf *buf)
 {
-	buf->finalized = 1;
-	del_timer_sync(&buf->timer);
-	kref_put(&buf->kref, relay_remove_buf);
-}
+	size_t wakeup = buf->chan->n_pages_wakeup;
 
-static void setup_callbacks(struct rchan *chan,
-				   struct rchan_callbacks *cb)
-{
-	if (!cb) {
-		chan->cb = &default_channel_callbacks;
-		return;
-	}
-
-	if (!cb->subbuf_start)
-		cb->subbuf_start = subbuf_start_default_callback;
-	if (!cb->buf_mapped)
-		cb->buf_mapped = buf_mapped_default_callback;
-	if (!cb->buf_unmapped)
-		cb->buf_unmapped = buf_unmapped_default_callback;
-	if (!cb->create_buf_file)
-		cb->create_buf_file = create_buf_file_default_callback;
-	if (!cb->remove_buf_file)
-		cb->remove_buf_file = remove_buf_file_default_callback;
-	chan->cb = cb;
+	if (wakeup && (buf->nr_pages % wakeup == 0) &&
+	    (waitqueue_active(&buf->read_wait)))
+		/*
+		 * Calling wake_up_interruptible() from here
+		 * will deadlock if we happen to be logging
+		 * from the scheduler (trying to re-grab
+		 * rq->lock), so defer it.
+		 */
+		__mod_timer(&buf->timer, jiffies + 1);
 }
 
 /**
- * 	relay_hotcpu_callback - CPU hotplug callback
- * 	@nb: notifier block
- * 	@action: hotplug action to take
- * 	@hcpu: CPU number
+ *	wakeup_readers - wake up readers waiting on a channel
+ *	@data: contains the channel buffer
  *
- * 	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ *	This is the timer function used to defer reader waking.
  */
-static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
-				unsigned long action,
-				void *hcpu)
+static void wakeup_readers(unsigned long data)
 {
-	unsigned int hotcpu = (unsigned long)hcpu;
-	struct rchan *chan;
-
-	switch(action) {
-	case CPU_UP_PREPARE:
-	case CPU_UP_PREPARE_FROZEN:
-		mutex_lock(&relay_channels_mutex);
-		list_for_each_entry(chan, &relay_channels, list) {
-			if (chan->buf[hotcpu])
-				continue;
-			chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
-			if(!chan->buf[hotcpu]) {
-				printk(KERN_ERR
-					"relay_hotcpu_callback: cpu %d buffer "
-					"creation failed\n", hotcpu);
-				mutex_unlock(&relay_channels_mutex);
-				return NOTIFY_BAD;
-			}
-		}
-		mutex_unlock(&relay_channels_mutex);
-		break;
-	case CPU_DEAD:
-	case CPU_DEAD_FROZEN:
-		/* No need to flush the cpu : will be flushed upon
-		 * final relay_flush() call. */
-		break;
-	}
-	return NOTIFY_OK;
+	struct rchan_buf *buf = (struct rchan_buf *)data;
+	wake_up_interruptible(&buf->read_wait);
 }
 
 /**
- *	relay_open - create a new relay channel
- *	@base_filename: base name of files to create, %NULL for buffering only
- *	@parent: dentry of parent directory, %NULL for root directory or buffer
- *	@subbuf_size: size of sub-buffers
- *	@n_subbufs: number of sub-buffers
- *	@cb: client callback functions
- *	@private_data: user-defined data
- *
- *	Returns channel pointer if successful, %NULL otherwise.
+ *	__relay_reset - reset a channel buffer
+ *	@buf: the channel buffer
+ *	@init: 1 if this is a first-time initialization
  *
- *	Creates a channel buffer for each cpu using the sizes and
- *	attributes specified.  The created channel buffer files
- *	will be named base_filename0...base_filenameN-1.  File
- *	permissions will be %S_IRUSR.
+ *	See relay_reset() for description of effect.
  */
-struct rchan *relay_open(const char *base_filename,
-			 struct dentry *parent,
-			 size_t subbuf_size,
-			 size_t n_subbufs,
-			 struct rchan_callbacks *cb,
-			 void *private_data)
+static void __relay_reset(struct rchan_buf *buf, unsigned int init)
 {
-	unsigned int i;
-	struct rchan *chan;
-
-	if (!(subbuf_size && n_subbufs))
-		return NULL;
-
-	chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
-	if (!chan)
-		return NULL;
-
-	chan->version = RELAYFS_CHANNEL_VERSION;
-	chan->n_subbufs = n_subbufs;
-	chan->subbuf_size = subbuf_size;
-	chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
-	chan->parent = parent;
-	chan->private_data = private_data;
-	if (base_filename) {
-		chan->has_base_filename = 1;
-		strlcpy(chan->base_filename, base_filename, NAME_MAX);
-	}
-	setup_callbacks(chan, cb);
-	kref_init(&chan->kref);
-
-	mutex_lock(&relay_channels_mutex);
-	for_each_online_cpu(i) {
-		chan->buf[i] = relay_open_buf(chan, i);
-		if (!chan->buf[i])
-			goto free_bufs;
-	}
-	list_add(&chan->list, &relay_channels);
-	mutex_unlock(&relay_channels_mutex);
-
-	return chan;
-
-free_bufs:
-	for_each_online_cpu(i) {
-		if (!chan->buf[i])
-			break;
-		relay_close_buf(chan->buf[i]);
-	}
+	struct relay_page *rpage, *rpage2;
 
-	kref_put(&chan->kref, relay_destroy_channel);
-	mutex_unlock(&relay_channels_mutex);
-	return NULL;
-}
-EXPORT_SYMBOL_GPL(relay_open);
-
-struct rchan_percpu_buf_dispatcher {
-	struct rchan_buf *buf;
-	struct dentry *dentry;
-};
+	if (init) {
+		init_waitqueue_head(&buf->read_wait);
+		kref_init(&buf->kref);
+		setup_timer(&buf->timer, wakeup_readers, (unsigned long)buf);
+	} else
+		del_timer_sync(&buf->timer);
 
-/* Called in atomic context. */
-static void __relay_set_buf_dentry(void *info)
-{
-	struct rchan_percpu_buf_dispatcher *p = info;
+	list_for_each_entry_safe(rpage, rpage2, &buf->pages, list)
+		__relay_release_page(buf, rpage);
 
-	relay_set_buf_dentry(p->buf, p->dentry);
+	buf->consumed_offset = 0;
+	buf->finalized = 0;
 }
 
-/**
- *	relay_late_setup_files - triggers file creation
- *	@chan: channel to operate on
- *	@base_filename: base name of files to create
- *	@parent: dentry of parent directory, %NULL for root directory
- *
- *	Returns 0 if successful, non-zero otherwise.
- *
- *	Use to setup files for a previously buffer-only channel.
- *	Useful to do early tracing in kernel, before VFS is up, for example.
+/*
+ * create_buf_file_create() default callback.  Creates debugfs file.
  */
-int relay_late_setup_files(struct rchan *chan,
-			   const char *base_filename,
-			   struct dentry *parent)
+static struct dentry *create_buf_file_default_callback(const char *filename,
+						       struct dentry *parent,
+						       int mode,
+						       struct rchan_buf *buf)
 {
-	int err = 0;
-	unsigned int i, curr_cpu;
-	unsigned long flags;
-	struct dentry *dentry;
-	struct rchan_percpu_buf_dispatcher disp;
-
-	if (!chan || !base_filename)
-		return -EINVAL;
-
-	strlcpy(chan->base_filename, base_filename, NAME_MAX);
-
-	mutex_lock(&relay_channels_mutex);
-	/* Is chan already set up? */
-	if (unlikely(chan->has_base_filename))
-		return -EEXIST;
-	chan->has_base_filename = 1;
-	chan->parent = parent;
-	curr_cpu = get_cpu();
-	/*
-	 * The CPU hotplug notifier ran before us and created buffers with
-	 * no files associated. So it's safe to call relay_setup_buf_file()
-	 * on all currently online CPUs.
-	 */
-	for_each_online_cpu(i) {
-		if (unlikely(!chan->buf[i])) {
-			printk(KERN_ERR "relay_late_setup_files: CPU %u "
-					"has no buffer, it must have!\n", i);
-			BUG();
-			err = -EINVAL;
-			break;
-		}
-
-		dentry = relay_create_buf_file(chan, chan->buf[i], i);
-		if (unlikely(!dentry)) {
-			err = -EINVAL;
-			break;
-		}
-
-		if (curr_cpu == i) {
-			local_irq_save(flags);
-			relay_set_buf_dentry(chan->buf[i], dentry);
-			local_irq_restore(flags);
-		} else {
-			disp.buf = chan->buf[i];
-			disp.dentry = dentry;
-			smp_mb();
-			/* relay_channels_mutex must be held, so wait. */
-			err = smp_call_function_single(i,
-						       __relay_set_buf_dentry,
-						       &disp, 1);
-		}
-		if (unlikely(err))
-			break;
-	}
-	put_cpu();
-	mutex_unlock(&relay_channels_mutex);
-
-	return err;
+	return debugfs_create_file(filename, mode, parent, buf,
+				   &relay_file_operations);
 }
 
-/**
- *	relay_switch_subbuf - switch to a new sub-buffer
- *	@buf: channel buffer
- *	@length: size of current event
- *
- *	Returns either the length passed in or 0 if full.
- *
- *	Performs sub-buffer-switch tasks such as invoking callbacks,
- *	updating padding counts, waking up readers, etc.
+/*
+ * remove_buf_file() default callback.  Removes debugfs file.
  */
-size_t relay_switch_subbuf(struct rchan_buf *buf, size_t length)
+static int remove_buf_file_default_callback(struct dentry *dentry)
 {
-	void *old, *new;
-	size_t old_subbuf, new_subbuf;
-
-	if (unlikely(length > buf->chan->subbuf_size))
-		goto toobig;
-
-	if (buf->offset != buf->chan->subbuf_size + 1) {
-		buf->prev_padding = buf->chan->subbuf_size - buf->offset;
-		old_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
-		buf->padding[old_subbuf] = buf->prev_padding;
-		buf->subbufs_produced++;
-		if (buf->dentry)
-			buf->dentry->d_inode->i_size +=
-				buf->chan->subbuf_size -
-				buf->padding[old_subbuf];
-		else
-			buf->early_bytes += buf->chan->subbuf_size -
-					    buf->padding[old_subbuf];
-		smp_mb();
-		if (waitqueue_active(&buf->read_wait))
-			/*
-			 * Calling wake_up_interruptible() from here
-			 * will deadlock if we happen to be logging
-			 * from the scheduler (trying to re-grab
-			 * rq->lock), so defer it.
-			 */
-			__mod_timer(&buf->timer, jiffies + 1);
-	}
-
-	old = buf->data;
-	new_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
-	new = buf->start + new_subbuf * buf->chan->subbuf_size;
-	buf->offset = 0;
-	if (!buf->chan->cb->subbuf_start(buf, new, old, buf->prev_padding)) {
-		buf->offset = buf->chan->subbuf_size + 1;
-		return 0;
-	}
-	buf->data = new;
-	buf->padding[new_subbuf] = 0;
-
-	if (unlikely(length + buf->offset > buf->chan->subbuf_size))
-		goto toobig;
-
-	return length;
-
-toobig:
-	buf->chan->last_toobig = length;
+	debugfs_remove(dentry);
 	return 0;
 }
-EXPORT_SYMBOL_GPL(relay_switch_subbuf);
-
-/**
- *	relay_subbufs_consumed - update the buffer's sub-buffers-consumed count
- *	@chan: the channel
- *	@cpu: the cpu associated with the channel buffer to update
- *	@subbufs_consumed: number of sub-buffers to add to current buf's count
- *
- *	Adds to the channel buffer's consumed sub-buffer count.
- *	subbufs_consumed should be the number of sub-buffers newly consumed,
- *	not the total consumed.
- *
- *	NOTE. Kernel clients don't need to call this function if the channel
- *	mode is 'overwrite'.
- */
-void relay_subbufs_consumed(struct rchan *chan,
-			    unsigned int cpu,
-			    size_t subbufs_consumed)
-{
-	struct rchan_buf *buf;
 
-	if (!chan)
-		return;
-
-	if (cpu >= NR_CPUS || !chan->buf[cpu])
-		return;
-
-	buf = chan->buf[cpu];
-	buf->subbufs_consumed += subbufs_consumed;
-	if (buf->subbufs_consumed > buf->subbufs_produced)
-		buf->subbufs_consumed = buf->subbufs_produced;
-}
-EXPORT_SYMBOL_GPL(relay_subbufs_consumed);
+/* relay channel default callbacks */
+static struct rchan_callbacks default_channel_callbacks = {
+	.create_buf_file = create_buf_file_default_callback,
+	.remove_buf_file = remove_buf_file_default_callback,
+};
 
-/**
- *	relay_close - close the channel
- *	@chan: the channel
- *
- *	Closes all channel buffers and frees the channel.
- */
-void relay_close(struct rchan *chan)
+static void setup_callbacks(struct rchan *chan, struct rchan_callbacks *cb)
 {
-	unsigned int i;
-
-	if (!chan)
+	if (!cb) {
+		chan->cb = &default_channel_callbacks;
 		return;
+	}
 
-	mutex_lock(&relay_channels_mutex);
-	if (chan->is_global && chan->buf[0])
-		relay_close_buf(chan->buf[0]);
-	else
-		for_each_possible_cpu(i)
-			if (chan->buf[i])
-				relay_close_buf(chan->buf[i]);
-
-	if (chan->last_toobig)
-		printk(KERN_WARNING "relay: one or more items not logged "
-		       "[item size (%Zd) > sub-buffer size (%Zd)]\n",
-		       chan->last_toobig, chan->subbuf_size);
-
-	list_del(&chan->list);
-	kref_put(&chan->kref, relay_destroy_channel);
-	mutex_unlock(&relay_channels_mutex);
+	if (!cb->create_buf_file)
+		cb->create_buf_file = create_buf_file_default_callback;
+	if (!cb->remove_buf_file)
+		cb->remove_buf_file = remove_buf_file_default_callback;
+	chan->cb = cb;
 }
-EXPORT_SYMBOL_GPL(relay_close);
 
-/**
- *	relay_flush - close the channel
- *	@chan: the channel
- *
- *	Flushes all channel buffers, i.e. forces buffer switch.
+/*
+ * relay userspace implementations
  */
-void relay_flush(struct rchan *chan)
-{
-	unsigned int i;
-
-	if (!chan)
-		return;
-
-	if (chan->is_global && chan->buf[0]) {
-		relay_switch_subbuf(chan->buf[0], 0);
-		return;
-	}
-
-	mutex_lock(&relay_channels_mutex);
-	for_each_possible_cpu(i)
-		if (chan->buf[i])
-			relay_switch_subbuf(chan->buf[i], 0);
-	mutex_unlock(&relay_channels_mutex);
-}
-EXPORT_SYMBOL_GPL(relay_flush);
 
 /**
  *	relay_file_open - open file op for relay files
@@ -881,19 +647,6 @@ static int relay_file_open(struct inode *inode, struct file *filp)
 }
 
 /**
- *	relay_file_mmap - mmap file op for relay files
- *	@filp: the file
- *	@vma: the vma describing what to map
- *
- *	Calls upon relay_mmap_buf() to map the file into user space.
- */
-static int relay_file_mmap(struct file *filp, struct vm_area_struct *vma)
-{
-	struct rchan_buf *buf = filp->private_data;
-	return relay_mmap_buf(buf, vma);
-}
-
-/**
  *	relay_file_poll - poll file op for relay files
  *	@filp: the file
  *	@wait: poll table
@@ -910,7 +663,7 @@ static unsigned int relay_file_poll(struct file *filp, poll_table *wait)
 
 	if (filp->f_mode & FMODE_READ) {
 		poll_wait(filp, &buf->read_wait, wait);
-		if (!relay_buf_empty(buf))
+		if (buf->nr_pages)
 			mask |= POLLIN | POLLRDNORM;
 	}
 
@@ -933,179 +686,65 @@ static int relay_file_release(struct inode *inode, struct file *filp)
 	return 0;
 }
 
-/*
- *	relay_file_read_consume - update the consumed count for the buffer
+/**
+ *	relay_file_read_page_avail - return bytes available in next page
+ *	@buf: relay channel buffer
  */
-static void relay_file_read_consume(struct rchan_buf *buf,
-				    size_t read_pos,
-				    size_t bytes_consumed)
+static size_t relay_file_read_page_avail(struct rchan_buf *buf)
 {
-	size_t subbuf_size = buf->chan->subbuf_size;
-	size_t n_subbufs = buf->chan->n_subbufs;
-	size_t read_subbuf;
-
-	if (buf->subbufs_produced == buf->subbufs_consumed &&
-	    buf->offset == buf->bytes_consumed)
-		return;
+	unsigned long flags;
+	size_t avail = 0;
 
-	if (buf->bytes_consumed + bytes_consumed > subbuf_size) {
-		relay_subbufs_consumed(buf->chan, buf->cpu, 1);
-		buf->bytes_consumed = 0;
+	spin_lock_irqsave(&buf->lock, flags);
+	if (!list_empty(&buf->pages)) {
+		struct relay_page *rpage;
+		rpage = list_first_entry(&buf->pages, struct relay_page, list);
+		avail = rpage->len - buf->consumed_offset;
 	}
+	spin_unlock_irqrestore(&buf->lock, flags);
 
-	buf->bytes_consumed += bytes_consumed;
-	if (!read_pos)
-		read_subbuf = buf->subbufs_consumed % n_subbufs;
-	else
-		read_subbuf = read_pos / buf->chan->subbuf_size;
-	if (buf->bytes_consumed + buf->padding[read_subbuf] == subbuf_size) {
-		if ((read_subbuf == buf->subbufs_produced % n_subbufs) &&
-		    (buf->offset == subbuf_size))
-			return;
-		relay_subbufs_consumed(buf->chan, buf->cpu, 1);
-		buf->bytes_consumed = 0;
-	}
+	return avail;
 }
 
 /*
- *	relay_file_read_avail - boolean, are there unconsumed bytes available?
+ *	relay_consume - update the consumed count for the buffer
  */
-static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
+static void relay_consume(struct rchan_buf *buf, int bytes_consumed)
 {
-	size_t subbuf_size = buf->chan->subbuf_size;
-	size_t n_subbufs = buf->chan->n_subbufs;
-	size_t produced = buf->subbufs_produced;
-	size_t consumed = buf->subbufs_consumed;
-
-	relay_file_read_consume(buf, read_pos, 0);
-
-	consumed = buf->subbufs_consumed;
-
-	if (unlikely(buf->offset > subbuf_size)) {
-		if (produced == consumed)
-			return 0;
-		return 1;
-	}
-
-	if (unlikely(produced - consumed >= n_subbufs)) {
-		consumed = produced - n_subbufs + 1;
-		buf->subbufs_consumed = consumed;
-		buf->bytes_consumed = 0;
-	}
-
-	produced = (produced % n_subbufs) * subbuf_size + buf->offset;
-	consumed = (consumed % n_subbufs) * subbuf_size + buf->bytes_consumed;
-
-	if (consumed > produced)
-		produced += n_subbufs * subbuf_size;
-
-	if (consumed == produced) {
-		if (buf->offset == subbuf_size &&
-		    buf->subbufs_produced > buf->subbufs_consumed)
-			return 1;
-		return 0;
-	}
-
-	return 1;
-}
+	unsigned long flags;
+	struct relay_page *rpage;
 
-/**
- *	relay_file_read_subbuf_avail - return bytes available in sub-buffer
- *	@read_pos: file read position
- *	@buf: relay channel buffer
- */
-static size_t relay_file_read_subbuf_avail(size_t read_pos,
-					   struct rchan_buf *buf)
-{
-	size_t padding, avail = 0;
-	size_t read_subbuf, read_offset, write_subbuf, write_offset;
-	size_t subbuf_size = buf->chan->subbuf_size;
-
-	write_subbuf = (buf->data - buf->start) / subbuf_size;
-	write_offset = buf->offset > subbuf_size ? subbuf_size : buf->offset;
-	read_subbuf = read_pos / subbuf_size;
-	read_offset = read_pos % subbuf_size;
-	padding = buf->padding[read_subbuf];
-
-	if (read_subbuf == write_subbuf) {
-		if (read_offset + padding < write_offset)
-			avail = write_offset - (read_offset + padding);
-	} else
-		avail = (subbuf_size - padding) - read_offset;
+	spin_lock_irqsave(&buf->lock, flags);
+	rpage = list_first_entry(&buf->pages, struct relay_page, list);
+	spin_unlock_irqrestore(&buf->lock, flags);
 
-	return avail;
-}
+	buf->consumed_offset += bytes_consumed;
 
-/**
- *	relay_file_read_start_pos - find the first available byte to read
- *	@read_pos: file read position
- *	@buf: relay channel buffer
- *
- *	If the @read_pos is in the middle of padding, return the
- *	position of the first actually available byte, otherwise
- *	return the original value.
- */
-static size_t relay_file_read_start_pos(size_t read_pos,
-					struct rchan_buf *buf)
-{
-	size_t read_subbuf, padding, padding_start, padding_end;
-	size_t subbuf_size = buf->chan->subbuf_size;
-	size_t n_subbufs = buf->chan->n_subbufs;
-	size_t consumed = buf->subbufs_consumed % n_subbufs;
-
-	if (!read_pos)
-		read_pos = consumed * subbuf_size + buf->bytes_consumed;
-	read_subbuf = read_pos / subbuf_size;
-	padding = buf->padding[read_subbuf];
-	padding_start = (read_subbuf + 1) * subbuf_size - padding;
-	padding_end = (read_subbuf + 1) * subbuf_size;
-	if (read_pos >= padding_start && read_pos < padding_end) {
-		read_subbuf = (read_subbuf + 1) % n_subbufs;
-		read_pos = read_subbuf * subbuf_size;
+	if (buf->consumed_offset == rpage->len) {
+		__relay_release_page(buf, rpage);
+		buf->consumed_offset = 0;
 	}
-
-	return read_pos;
-}
-
-/**
- *	relay_file_read_end_pos - return the new read position
- *	@read_pos: file read position
- *	@buf: relay channel buffer
- *	@count: number of bytes to be read
- */
-static size_t relay_file_read_end_pos(struct rchan_buf *buf,
-				      size_t read_pos,
-				      size_t count)
-{
-	size_t read_subbuf, padding, end_pos;
-	size_t subbuf_size = buf->chan->subbuf_size;
-	size_t n_subbufs = buf->chan->n_subbufs;
-
-	read_subbuf = read_pos / subbuf_size;
-	padding = buf->padding[read_subbuf];
-	if (read_pos % subbuf_size + count + padding == subbuf_size)
-		end_pos = (read_subbuf + 1) * subbuf_size;
-	else
-		end_pos = read_pos + count;
-	if (end_pos >= subbuf_size * n_subbufs)
-		end_pos = 0;
-
-	return end_pos;
 }
 
 /*
- *	subbuf_read_actor - read up to one subbuf's worth of data
+ *	page_read_actor - read up to one page's worth of data
  */
-static int subbuf_read_actor(size_t read_start,
-			     struct rchan_buf *buf,
-			     size_t avail,
-			     read_descriptor_t *desc,
-			     read_actor_t actor)
+static int page_read_actor(struct rchan_buf *buf,
+			   size_t avail,
+			   read_descriptor_t *desc,
+			   read_actor_t actor)
 {
-	void *from;
+	struct relay_page *rpage;
+	unsigned long flags;
 	int ret = 0;
+	void *from;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	rpage = list_first_entry(&buf->pages, struct relay_page, list);
+	spin_unlock_irqrestore(&buf->lock, flags);
 
-	from = buf->start + read_start;
+	from = page_address(rpage->page);
+	from += rpage->len - avail;
 	ret = avail;
 	if (copy_to_user(desc->arg.buf, from, avail)) {
 		desc->error = -EFAULT;
@@ -1118,22 +757,21 @@ static int subbuf_read_actor(size_t read_start,
 	return ret;
 }
 
-typedef int (*subbuf_actor_t) (size_t read_start,
-			       struct rchan_buf *buf,
-			       size_t avail,
-			       read_descriptor_t *desc,
-			       read_actor_t actor);
+typedef int (*page_actor_t) (struct rchan_buf *buf,
+			     size_t avail,
+			     read_descriptor_t *desc,
+			     read_actor_t actor);
 
 /*
- *	relay_file_read_subbufs - read count bytes, bridging subbuf boundaries
+ *	relay_file_read_pages - read count bytes, bridging page boundaries
  */
-static ssize_t relay_file_read_subbufs(struct file *filp, loff_t *ppos,
-					subbuf_actor_t subbuf_actor,
-					read_actor_t actor,
-					read_descriptor_t *desc)
+static ssize_t relay_file_read_pages(struct file *filp, loff_t *ppos,
+				     page_actor_t page_actor,
+				     read_actor_t actor,
+				     read_descriptor_t *desc)
 {
 	struct rchan_buf *buf = filp->private_data;
-	size_t read_start, avail;
+	size_t avail;
 	int ret;
 
 	if (!desc->count)
@@ -1141,22 +779,16 @@ static ssize_t relay_file_read_subbufs(struct file *filp, loff_t *ppos,
 
 	mutex_lock(&filp->f_path.dentry->d_inode->i_mutex);
 	do {
-		if (!relay_file_read_avail(buf, *ppos))
-			break;
-
-		read_start = relay_file_read_start_pos(*ppos, buf);
-		avail = relay_file_read_subbuf_avail(read_start, buf);
+		avail = relay_file_read_page_avail(buf);
 		if (!avail)
 			break;
-
 		avail = min(desc->count, avail);
-		ret = subbuf_actor(read_start, buf, avail, desc, actor);
+		ret = page_actor(buf, avail, desc, actor);
 		if (desc->error < 0)
 			break;
-
 		if (ret) {
-			relay_file_read_consume(buf, read_start, ret);
-			*ppos = relay_file_read_end_pos(buf, read_start, ret);
+			relay_consume(buf, ret);
+			*ppos += ret;
 		}
 	} while (desc->count && ret);
 	mutex_unlock(&filp->f_path.dentry->d_inode->i_mutex);
@@ -1174,27 +806,40 @@ static ssize_t relay_file_read(struct file *filp,
 	desc.count = count;
 	desc.arg.buf = buffer;
 	desc.error = 0;
-	return relay_file_read_subbufs(filp, ppos, subbuf_read_actor,
-				       NULL, &desc);
+	return relay_file_read_pages(filp, ppos, page_read_actor,
+				     NULL, &desc);
 }
 
-static void relay_consume_bytes(struct rchan_buf *rbuf, int bytes_consumed)
+static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
+				   struct pipe_buffer *pipe_buf)
 {
-	rbuf->bytes_consumed += bytes_consumed;
+	struct rchan_buf *buf;
 
-	if (rbuf->bytes_consumed >= rbuf->chan->subbuf_size) {
-		relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
-		rbuf->bytes_consumed %= rbuf->chan->subbuf_size;
-	}
+	buf = (struct rchan_buf *)page_private(pipe_buf->page);
+	relay_consume(buf, pipe_buf->private);
 }
 
-static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
-				   struct pipe_buffer *buf)
+static int relay_pipe_buf_steal(struct pipe_inode_info *pipe,
+				struct pipe_buffer *pipe_buf)
 {
-	struct rchan_buf *rbuf;
+	int ret;
+	struct rchan_buf *buf;
 
-	rbuf = (struct rchan_buf *)page_private(buf->page);
-	relay_consume_bytes(rbuf, buf->private);
+	buf = (struct rchan_buf *)page_private(pipe_buf->page);
+	ret = generic_pipe_buf_steal(pipe, pipe_buf);
+	if (!ret) {
+		struct relay_page *rpage;
+		unsigned long flags;
+		spin_lock_irqsave(&buf->lock, flags);
+		rpage = list_first_entry(&buf->pages, struct relay_page, list);
+		spin_unlock_irqrestore(&buf->lock, flags);
+		__relay_remove_page(buf, rpage);
+		if (rpage->cb && rpage->cb->page_stolen)
+			rpage->cb->page_stolen(pipe_buf->page,
+					       rpage->private_data);
+	}
+
+	return ret;
 }
 
 static struct pipe_buf_operations relay_pipe_buf_ops = {
@@ -1203,7 +848,7 @@ static struct pipe_buf_operations relay_pipe_buf_ops = {
 	.unmap = generic_pipe_buf_unmap,
 	.confirm = generic_pipe_buf_confirm,
 	.release = relay_pipe_buf_release,
-	.steal = generic_pipe_buf_steal,
+	.steal = relay_pipe_buf_steal,
 	.get = generic_pipe_buf_get,
 };
 
@@ -1212,24 +857,17 @@ static void relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
 }
 
 /*
- *	subbuf_splice_actor - splice up to one subbuf's worth of data
+ *	page_splice_actor - splice available data
  */
-static int subbuf_splice_actor(struct file *in,
-			       loff_t *ppos,
-			       struct pipe_inode_info *pipe,
-			       size_t len,
-			       unsigned int flags,
-			       int *nonpad_ret)
+static int page_splice_actor(struct file *in,
+			     struct pipe_inode_info *pipe,
+			     size_t len,
+			     unsigned int flags)
 {
-	unsigned int pidx, poff, total_len, subbuf_pages, nr_pages, ret;
-	struct rchan_buf *rbuf = in->private_data;
-	unsigned int subbuf_size = rbuf->chan->subbuf_size;
-	uint64_t pos = (uint64_t) *ppos;
-	uint32_t alloc_size = (uint32_t) rbuf->chan->alloc_size;
-	size_t read_start = (size_t) do_div(pos, alloc_size);
-	size_t read_subbuf = read_start / subbuf_size;
-	size_t padding = rbuf->padding[read_subbuf];
-	size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
+	unsigned int poff, total_len, nr_pages, ret;
+	struct rchan_buf *buf = in->private_data;
+	struct relay_page *rpage;
+	unsigned long lflags;
 	struct page *pages[PIPE_BUFFERS];
 	struct partial_page partial[PIPE_BUFFERS];
 	struct splice_pipe_desc spd = {
@@ -1241,61 +879,38 @@ static int subbuf_splice_actor(struct file *in,
 		.spd_release = relay_page_release,
 	};
 
-	if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
+	if (list_empty(&buf->pages))
 		return 0;
 
-	/*
-	 * Adjust read len, if longer than what is available
-	 */
-	if (len > (subbuf_size - read_start % subbuf_size))
-		len = subbuf_size - read_start % subbuf_size;
+	poff = buf->consumed_offset;
+	nr_pages = min_t(unsigned int, buf->nr_pages, PIPE_BUFFERS);
+	total_len = 0;
 
-	subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
-	pidx = (read_start / PAGE_SIZE) % subbuf_pages;
-	poff = read_start & ~PAGE_MASK;
-	nr_pages = min_t(unsigned int, subbuf_pages, PIPE_BUFFERS);
+	spin_lock_irqsave(&buf->lock, lflags);
+	list_for_each_entry(rpage, &buf->pages, list) {
+		unsigned int this_len;
 
-	for (total_len = 0; spd.nr_pages < nr_pages; spd.nr_pages++) {
-		unsigned int this_len, this_end, private;
-		unsigned int cur_pos = read_start + total_len;
+		if (spd.nr_pages >= nr_pages)
+			break;
 
 		if (!len)
 			break;
 
-		this_len = min_t(unsigned long, len, PAGE_SIZE - poff);
-		private = this_len;
+		this_len = min_t(unsigned long, len, rpage->len - poff);
 
-		spd.pages[spd.nr_pages] = rbuf->page_array[pidx];
+		spd.pages[spd.nr_pages] = rpage->page;
 		spd.partial[spd.nr_pages].offset = poff;
-
-		this_end = cur_pos + this_len;
-		if (this_end >= nonpad_end) {
-			this_len = nonpad_end - cur_pos;
-			private = this_len + padding;
-		}
 		spd.partial[spd.nr_pages].len = this_len;
-		spd.partial[spd.nr_pages].private = private;
+		spd.partial[spd.nr_pages].private = this_len;
 
 		len -= this_len;
 		total_len += this_len;
 		poff = 0;
-		pidx = (pidx + 1) % subbuf_pages;
-
-		if (this_end >= nonpad_end) {
-			spd.nr_pages++;
-			break;
-		}
+		spd.nr_pages++;
 	}
+	spin_unlock_irqrestore(&buf->lock, lflags);
 
-	if (!spd.nr_pages)
-		return 0;
-
-	ret = *nonpad_ret = splice_to_pipe(pipe, &spd);
-	if (ret < 0 || ret < total_len)
-		return ret;
-
-        if (read_start + ret == nonpad_end)
-                ret += padding;
+	ret = splice_to_pipe(pipe, &spd);
 
         return ret;
 }
@@ -1308,13 +923,12 @@ static ssize_t relay_file_splice_read(struct file *in,
 {
 	ssize_t spliced;
 	int ret;
-	int nonpad_ret = 0;
 
 	ret = 0;
 	spliced = 0;
 
 	while (len && !spliced) {
-		ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
+		ret = page_splice_actor(in, pipe, len, flags);
 		if (ret < 0)
 			break;
 		else if (!ret) {
@@ -1331,8 +945,7 @@ static ssize_t relay_file_splice_read(struct file *in,
 			len = 0;
 		else
 			len -= ret;
-		spliced += nonpad_ret;
-		nonpad_ret = 0;
+		spliced += ret;
 	}
 
 	if (spliced)
@@ -1344,7 +957,6 @@ static ssize_t relay_file_splice_read(struct file *in,
 const struct file_operations relay_file_operations = {
 	.open		= relay_file_open,
 	.poll		= relay_file_poll,
-	.mmap		= relay_file_mmap,
 	.read		= relay_file_read,
 	.llseek		= no_llseek,
 	.release	= relay_file_release,
@@ -1352,9 +964,50 @@ const struct file_operations relay_file_operations = {
 };
 EXPORT_SYMBOL_GPL(relay_file_operations);
 
-static __init int relay_init(void)
+/**
+ * 	relay_hotcpu_callback - CPU hotplug callback
+ * 	@nb: notifier block
+ * 	@action: hotplug action to take
+ * 	@hcpu: CPU number
+ *
+ * 	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
+				unsigned long action,
+				void *hcpu)
 {
+	unsigned int hotcpu = (unsigned long)hcpu;
+	struct rchan *chan;
+
+	switch (action) {
+	case CPU_UP_PREPARE:
+	case CPU_UP_PREPARE_FROZEN:
+		mutex_lock(&relay_channels_mutex);
+		list_for_each_entry(chan, &relay_channels, list) {
+			if (chan->buf[hotcpu])
+				continue;
+			chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
+			if (!chan->buf[hotcpu]) {
+				printk(KERN_ERR
+					"relay_hotcpu_callback: cpu %d buffer "
+					"creation failed\n", hotcpu);
+				mutex_unlock(&relay_channels_mutex);
+				return NOTIFY_BAD;
+			}
+		}
+		mutex_unlock(&relay_channels_mutex);
+		break;
+	case CPU_DEAD:
+	case CPU_DEAD_FROZEN:
+		/* No need to flush the cpu : will be flushed upon
+		 * final relay_flush() call. */
+		break;
+	}
+	return NOTIFY_OK;
+}
 
+static __init int relay_init(void)
+{
 	hotcpu_notifier(relay_hotcpu_callback, 0);
 	return 0;
 }
diff --git a/kernel/relay_pagewriter.c b/kernel/relay_pagewriter.c
new file mode 100644
index 0000000..2842d7e
--- /dev/null
+++ b/kernel/relay_pagewriter.c
@@ -0,0 +1,868 @@
+/*
+ * Provides per-cpu page writers and page pool management for current
+ * users of the relay interface.  Basically this provides functions to
+ * write into pages, feed them into a relay object for consumption by
+ * usespace, and reclaim them after they've been read.
+ *
+ * See Documentation/filesystems/relay.txt for an overview.
+ *
+ * Copyright (C) 2002-2005 - Tom Zanussi (zanussi@...ibm.com), IBM Corp
+ * Copyright (C) 1999-2005 - Karim Yaghmour (karim@...rsys.com)
+ * Copyright (C) 2008 - Tom Zanussi (tzanussi@...il.com)
+ *
+ * Moved to kernel/relay.c by Paul Mundt, 2006.
+ * November 2006 - CPU hotplug support by Mathieu Desnoyers
+ * 	(mathieu.desnoyers@...ymtl.ca)
+ *
+ * This file is released under the GPL.
+ */
+#include <linux/errno.h>
+#include <linux/stddef.h>
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/relay.h>
+#include <linux/vmalloc.h>
+#include <linux/mm.h>
+#include <linux/cpu.h>
+#include <linux/splice.h>
+#include <linux/relay_pagewriter.h>
+#include <linux/debugfs.h>
+
+/* list of open pagewriters, for cpu hotplug */
+static DEFINE_MUTEX(pagewriters_mutex);
+static LIST_HEAD(pagewriters);
+
+/* forward declarations */
+static void setup_callbacks(struct pagewriter *pagewriter,
+			    struct pagewriter_callbacks *cb,
+			    unsigned long flags);
+static void pagewriter_close_buf(struct pagewriter_buf *buf);
+static struct pagewriter_buf *pagewriter_open_buf(struct pagewriter *pw,
+						  unsigned int cpu);
+static void pagewriter_destroy(struct kref *kref);
+static void __pagewriter_reset(struct pagewriter_buf *buf, unsigned int init);
+static void pagewriter_save_flight_buf(struct pagewriter_buf *buf);
+static struct relay_page_callbacks pagewriter_relay_page_callbacks;
+static void add_empty_rpage_struct(struct pagewriter_buf *buf,
+				   struct relay_page *rpage);
+static inline void switch_to_next_page(struct pagewriter_buf *buf);
+
+/*
+ * pagewriter kernel API
+ */
+
+/**
+ *	pagewriter_open - create a new pagewriter
+ *	@base_filename: base name of files to create, %NULL for buffering only
+ *	@parent: dentry of parent directory, %NULL for root directory or buffer
+ *	@n_pages: number of pages to use for each buffer
+ *	@n_pages_wakeup: wakeup readers after this many pages, 0 means never
+ *	@end_reserve: reserve at least that for padding events, 0 if not needed
+ *	@cb: client callback functions
+ *	@private_data: user-defined data
+ *	@flags: channel flags, top half for pagewriter, bottom half for relay
+ *
+ *	Returns pagewriter pointer if successful, %NULL otherwise.
+ *
+ *	Creates a pagewriter page pool for each cpu using the sizes and
+ *	attributes specified.
+ */
+struct pagewriter *pagewriter_open(const char *base_filename,
+				   struct dentry *parent,
+				   size_t n_pages,
+				   size_t n_pages_wakeup,
+				   size_t end_reserve,
+				   struct pagewriter_callbacks *cb,
+				   void *private_data,
+				   unsigned long flags)
+{
+	unsigned int i;
+	struct pagewriter *pagewriter;
+
+	if (!n_pages)
+		return NULL;
+
+	pagewriter = kzalloc(sizeof(struct pagewriter), GFP_KERNEL);
+	if (!pagewriter)
+		return NULL;
+
+	if (flags & PAGEWRITER_LATE_SETUP) {
+		strlcpy(pagewriter->base_filename, base_filename, NAME_MAX);
+		pagewriter->n_pages_wakeup = n_pages_wakeup;
+	} else {
+		pagewriter->rchan = relay_open(base_filename, parent,
+					       n_pages_wakeup, NULL,
+					       private_data, flags);
+		if (!pagewriter->rchan) {
+			kfree(pagewriter);
+			return NULL;
+		}
+	}
+
+	pagewriter->flags = flags;
+	pagewriter->n_pages = n_pages;
+	pagewriter->end_reserve = end_reserve;
+	atomic_set(&pagewriter->dropped, 0);
+
+	pagewriter->private_data = private_data;
+	setup_callbacks(pagewriter, cb, flags);
+	kref_init(&pagewriter->kref);
+
+	mutex_lock(&pagewriters_mutex);
+	for_each_online_cpu(i) {
+		pagewriter->buf[i] = pagewriter_open_buf(pagewriter, i);
+		if (!pagewriter->buf[i])
+			goto free_bufs;
+	}
+	list_add(&pagewriter->list, &pagewriters);
+	mutex_unlock(&pagewriters_mutex);
+
+	return pagewriter;
+
+free_bufs:
+	for_each_online_cpu(i) {
+		if (!pagewriter->buf[i])
+			break;
+		pagewriter_close_buf(pagewriter->buf[i]);
+	}
+
+	relay_close(pagewriter->rchan);
+	kref_put(&pagewriter->kref, pagewriter_destroy);
+	kfree(pagewriter);
+	mutex_unlock(&pagewriters_mutex);
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(pagewriter_open);
+
+/**
+ *	relay_page - send a full page to relay
+ *	@pagewriter_buf: the pagewriter buf
+ *
+ *	'relays' a full page i.e. sends it to relay.
+ */
+static void relay_page(struct pagewriter_buf *buf)
+{
+	kref_get(&buf->kref);
+	relay_add_page(buf->pagewriter->rchan, buf->page->page,
+		       &pagewriter_relay_page_callbacks, (void *)buf);
+	buf->page->page = NULL;
+}
+
+/**
+ *	relay_partial_page - send a partial page to relay
+ *	@pagewriter_buf: the pagewriter buf
+ *
+ *	'relays' a partial page i.e. sends it to relay.
+ */
+static void relay_partial_page(struct pagewriter_buf *buf, unsigned int len)
+{
+	kref_get(&buf->kref);
+	relay_add_partial_page(buf->pagewriter->rchan, buf->page->page, len,
+			       &pagewriter_relay_page_callbacks, (void *)buf);
+	buf->page->page = NULL;
+}
+
+/**
+ *	pagewriter_flush_page - flush a possibly partial page
+ *	@pagewriter_bur: the pagewriter buf
+ *	@len: the length of data in the page
+ *
+ *	Used to flush the current, probably partial, non-padded page.
+ */
+static void pagewriter_flush_page(struct pagewriter_buf *buf, unsigned int len)
+{
+	unsigned long flags;
+
+	if (len == PAGE_SIZE) {
+		buf->pagewriter->cb->switch_page(buf);
+		return;
+	}
+
+	flags = buf->pagewriter->flags;
+	if (flags & PAGEWRITER_FLIGHT_MODE || flags & PAGEWRITER_LATE_SETUP) {
+		unsigned long flags;
+		buf->page->len = len;
+		spin_lock_irqsave(&buf->lock, flags);
+		list_add_tail(&buf->page->list, &buf->pool);
+		spin_unlock_irqrestore(&buf->lock, flags);
+		buf->n_pages_flight++;
+		return;
+	}
+	relay_partial_page(buf, len);
+	add_empty_rpage_struct(buf, buf->page);
+	switch_to_next_page(buf);
+}
+
+/**
+ *	pagewriter_flush - flush the pagewriter
+ *	@pagewriter: the pagewriter
+ *
+ *	Flushes all channel buffers, i.e. forces page switch.
+ */
+void pagewriter_flush(struct pagewriter *pagewriter)
+{
+	unsigned int i;
+
+	if (!pagewriter)
+		return;
+
+	mutex_lock(&pagewriters_mutex);
+	for_each_possible_cpu(i) {
+		struct pagewriter_buf *buf = pagewriter->buf[i];
+		if (!buf)
+			continue;
+		if (buf->pagewriter->flags & PAGEWRITER_PAD_WRITES) {
+			size_t len = PAGE_SIZE - buf->offset;
+			void *pad = buf->data + buf->offset;
+			if (len)
+				pagewriter->cb->write_padding(buf, len, pad);
+			pagewriter->cb->switch_page(buf);
+		} else {
+			size_t len = buf->offset;
+			pagewriter_flush_page(buf, len);
+		}
+	}
+	relay_flush(pagewriter->rchan);
+	mutex_unlock(&pagewriters_mutex);
+}
+EXPORT_SYMBOL_GPL(pagewriter_flush);
+
+/**
+ *	pagewriter_close - close the pagewriter
+ *	@pagewriter: the pagewriter
+ *
+ *	Closes all buffers and frees their page pools, and also frees
+ *	the pagewriter.
+ */
+void pagewriter_close(struct pagewriter *pagewriter)
+{
+	unsigned int i;
+
+	if (!pagewriter)
+		return;
+
+	mutex_lock(&pagewriters_mutex);
+	for_each_possible_cpu(i)
+		if (pagewriter->buf[i])
+			pagewriter_close_buf(pagewriter->buf[i]);
+
+	relay_close(pagewriter->rchan);
+
+	list_del(&pagewriter->list);
+	kref_put(&pagewriter->kref, pagewriter_destroy);
+	mutex_unlock(&pagewriters_mutex);
+}
+EXPORT_SYMBOL_GPL(pagewriter_close);
+
+/**
+ *	pagewriter_reset - reset the pagewriter
+ *	@pagewriter: the pagewriter
+ *
+ *	This has the effect of erasing all data from the current page
+ *	and restarting the pagewriter in its initial state.
+ *
+ *	NOTE. Care should be taken that the pagewriter isn't actually
+ *	being used by anything when this call is made.
+ */
+void pagewriter_reset(struct pagewriter *pagewriter)
+{
+	unsigned int i;
+
+	if (!pagewriter)
+		return;
+
+	mutex_lock(&pagewriters_mutex);
+	for_each_online_cpu(i)
+		if (pagewriter->buf[i])
+			__pagewriter_reset(pagewriter->buf[i], 0);
+	mutex_unlock(&pagewriters_mutex);
+}
+EXPORT_SYMBOL_GPL(pagewriter_reset);
+
+/**
+ *	pagewriter_save_flight_data - log all pages dirtied in flight mode
+ *	@pagewriter: pagewriter
+ *
+ *	In flight mode (PAGEWRITER_FLIGHT_MODE), the pages written to
+ *	via the pagewriter_write/reserve functions are simply cycled
+ *	around the per-cpu page pools, and not sent to relay.  This
+ *	function provides a way, at the user's request, to simply
+ *	sends all the dirty pages in the page pools to relay and
+ *	therefore onto their final destination e.g. disk or network.
+ *
+ *	The pagewriter and associated buffers will be in the same
+ *	state as if hey were reset after this call.
+ */
+void pagewriter_save_flight_data(struct pagewriter *pagewriter)
+{
+	unsigned int i;
+
+	if (!pagewriter)
+		return;
+
+	mutex_lock(&pagewriters_mutex);
+	for_each_possible_cpu(i)
+		if (pagewriter->buf[i])
+			pagewriter_save_flight_buf(pagewriter->buf[i]);
+	relay_flush(pagewriter->rchan);
+	mutex_unlock(&pagewriters_mutex);
+}
+EXPORT_SYMBOL_GPL(pagewriter_save_flight_data);
+
+/**
+ *	pagewriter_late_setup - create relay channel and log early pages
+ *	@pagewriter: pagewriter
+ *	@parent: dentry of parent directory, %NULL for root directory
+ *
+ *	If the pagewriter was initially created in early mode
+ *	(PAGEWRITER_LATE_SETUP), this creates the relay channel and
+ *	sends all the early pages in the page pools to relay and
+ *	therefore onto their final destination e.g. disk or network.
+ *
+ *	Returns 0 if successful, non-zero otherwise.
+ *
+ *	Use to setup files for a previously buffer-only channel.
+ *	Useful to do early tracing in kernel, before VFS is up, for example.
+ */
+int pagewriter_late_setup(struct pagewriter *pagewriter,
+			  struct dentry *parent)
+{
+	if (!pagewriter)
+		return -EINVAL;
+
+	pagewriter->rchan = relay_open(pagewriter->base_filename,
+				       parent,
+				       pagewriter->n_pages_wakeup,
+				       NULL,
+				       pagewriter->private_data,
+				       pagewriter->flags);
+	if (!pagewriter->rchan)
+		return -ENOMEM;
+
+	pagewriter->flags &= ~PAGEWRITER_LATE_SETUP;
+	pagewriter_save_flight_data(pagewriter);
+
+	return 0;
+}
+EXPORT_SYMBOL_GPL(pagewriter_late_setup);
+
+/*
+ * end relay kernel API
+ */
+
+/**
+ *	pagewriter_get_free_page - get a free relay_page from the pool
+ *	@buf: the buffer struct
+ *
+ *	Returns relay page if successful, NULL if not.
+ */
+static struct relay_page *pagewriter_get_free_page(struct pagewriter_buf *buf)
+{
+	struct relay_page *rpage = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	if (!list_empty(&buf->pool)) {
+		rpage = list_first_entry(&buf->pool, struct relay_page, list);
+		list_del(&rpage->list);
+	}
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	return rpage;
+}
+
+static inline void switch_to_next_page(struct pagewriter_buf *buf)
+{
+	struct relay_page *new_page = pagewriter_get_free_page(buf);
+	if (!new_page) {
+		buf->page = NULL;
+		buf->data = NULL;
+		return;
+	}
+	buf->page = new_page;
+	buf->data = page_address(buf->page->page);
+	buf->offset = 0;
+	buf->pagewriter->cb->new_page(buf, buf->data);
+}
+
+/**
+ *	get_empty_rpage_struct - get an empty rpage_struct to hold a page
+ *	@buf: the buffer struct
+ *
+ *	Returns an rpage_struct if successful, NULL if not.
+ */
+static struct relay_page *get_empty_rpage_struct(struct pagewriter_buf *buf)
+{
+	struct relay_page *rpage = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	if (!list_empty(&buf->empty_rpage_structs)) {
+		rpage = list_first_entry(&buf->empty_rpage_structs,
+					 struct relay_page, list);
+		list_del(&rpage->list);
+	}
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	return rpage;
+}
+
+static void add_empty_rpage_struct_nolock(struct pagewriter_buf *buf,
+					  struct relay_page *rpage)
+{
+	list_add_tail(&rpage->list, &buf->empty_rpage_structs);
+}
+
+/**
+ *	add_empty_rpage_struct - add/return a free rpage_struct to the pool
+ *	@buf: buffer struct
+ *	@rpage: struct relay_page
+ */
+static void add_empty_rpage_struct(struct pagewriter_buf *buf,
+				   struct relay_page *rpage)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	add_empty_rpage_struct_nolock(buf, rpage);
+	spin_unlock_irqrestore(&buf->lock, flags);
+}
+
+/**
+ *	pagewriter_destroy - free the pagewriter struct
+ *	@kref: target kernel reference that contains the relay channel
+ *
+ *	Should only be called from kref_put().
+ */
+static void pagewriter_destroy(struct kref *kref)
+{
+	struct pagewriter *pagewriter = container_of(kref, struct pagewriter,
+						     kref);
+	kfree(pagewriter);
+}
+
+/**
+ *	pagewriter_destroy_buf - destroy a pagewriter_buf struct and page pool
+ *	@buf: the buffer struct
+ */
+static void pagewriter_destroy_buf(struct pagewriter_buf *buf)
+{
+	struct pagewriter *pagewriter = buf->pagewriter;
+	struct relay_page *rpage, *rpage2;
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_for_each_entry_safe(rpage, rpage2, &buf->pool, list) {
+		__free_page(rpage->page);
+		list_del(&rpage->list);
+		kfree(rpage);
+	}
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	pagewriter->buf[buf->cpu] = NULL;
+	kfree(buf);
+	kref_put(&pagewriter->kref, pagewriter_destroy);
+}
+
+/**
+ *	pagewriter_remove_buf - remove a pagewriter buffer
+ *	@kref: target kernel reference that contains the relay buffer
+ *
+ *	Frees the pagweriter_buf and the buffer's page pool.  Should
+ *	only be called from kref_put().
+ */
+static void pagewriter_remove_buf(struct kref *kref)
+{
+	struct pagewriter_buf *buf = container_of(kref, struct pagewriter_buf,
+						  kref);
+	pagewriter_destroy_buf(buf);
+}
+
+/**
+ *	pagewriter_close_buf - close a pagewriter buffer
+ *	@buf: channel buffer
+ *
+ *	The channel buffer and channel buffer data structure are freed
+ *	automatically when the last reference is given up.
+ */
+static void pagewriter_close_buf(struct pagewriter_buf *buf)
+{
+	kref_put(&buf->kref, pagewriter_remove_buf);
+}
+
+/**
+ *	pagewriter_add_free_page - add/return a free relay_page to the pool
+ *	@buf: the buffer struct
+ *	@rpage: relay_page to add
+ *
+ *	Returns relay page if successful, NULL if not.
+ */
+static void pagewriter_add_free_page(struct pagewriter_buf *buf,
+				     struct relay_page *rpage)
+{
+	int was_empty = list_empty(&buf->pool);
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_add_tail(&rpage->list, &buf->pool);
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	if (was_empty)
+		switch_to_next_page(buf);
+
+	kref_put(&buf->kref, pagewriter_remove_buf);
+}
+
+/**
+ *	pagewriter_alloc_pool - allocate a pool of pages for the buffer
+ *	@buf: the buffer struct
+ *
+ *	Allocates buf->pagewriter->n_pages pages to the buffer.
+ *	Returns 0 if successful.
+ */
+static int pagewriter_alloc_pool(struct pagewriter_buf *buf)
+{
+	unsigned int i;
+	struct relay_page *rpage = NULL;
+
+	for (i = 0; i < buf->pagewriter->n_pages; i++) {
+		rpage = kmalloc(sizeof(struct relay_page), GFP_KERNEL);
+		if (unlikely(!rpage))
+			goto depopulate;
+		rpage->page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+		if (unlikely(!rpage->page))
+			goto depopulate;
+		list_add_tail(&rpage->list, &buf->pool);
+	}
+
+	return 0;
+
+depopulate:
+	list_for_each_entry(rpage, &buf->pool, list) {
+		__free_page(rpage->page);
+		list_del(&rpage->list);
+	}
+
+	return -ENOMEM;
+}
+
+/**
+ *	pagewriter_create_buf - allocate and initialize a buffer's page pool
+ *	@pagewriter: the pagewriter
+ *
+ *	Returns pagewriter buffer if successful, %NULL otherwise.
+ */
+static struct pagewriter_buf *pagewriter_create_buf(struct pagewriter *pw)
+{
+	struct pagewriter_buf *buf = kzalloc(sizeof(struct pagewriter_buf),
+					     GFP_KERNEL);
+	if (!buf)
+		return NULL;
+
+	spin_lock_init(&buf->lock);
+	INIT_LIST_HEAD(&buf->pool);
+	INIT_LIST_HEAD(&buf->empty_rpage_structs);
+	buf->pagewriter = pw;
+	kref_get(&buf->pagewriter->kref);
+
+	if (pagewriter_alloc_pool(buf))
+		goto free_buf;
+
+	switch_to_next_page(buf);
+
+	return buf;
+
+free_buf:
+	kfree(buf);
+	return NULL;
+}
+
+/*
+ *	pagewriter_open_buf - create a new pagewriter buf with page pool
+ *
+ *	used by pagewriter_open() and CPU hotplug.
+ */
+static struct pagewriter_buf *pagewriter_open_buf(struct pagewriter *pagewriter,
+					     unsigned int cpu)
+{
+	struct pagewriter_buf *buf = NULL;
+
+	buf = pagewriter_create_buf(pagewriter);
+	if (!buf)
+		return NULL;
+
+	buf->cpu = cpu;
+
+	__pagewriter_reset(buf, 1);
+
+	return buf;
+}
+
+/*
+ * new_page() default callback.
+ */
+static void new_page_default_callback(struct pagewriter_buf *buf,
+				      void *page_data)
+{
+}
+
+/*
+ * write_padding() default callback.
+ */
+void pagewriter_write_padding_default_callback(struct pagewriter_buf *buf,
+					       size_t length,
+					       void *reserved)
+{
+}
+
+/* pagewriter default callbacks */
+static struct pagewriter_callbacks default_pagewriter_callbacks = {
+	.new_page = new_page_default_callback,
+	.write_padding = pagewriter_write_padding_default_callback,
+};
+
+static void set_page_switch_cb(struct pagewriter_callbacks *cb,
+			       unsigned long flags)
+{
+	if (flags & PAGEWRITER_FLIGHT_MODE || flags & PAGEWRITER_LATE_SETUP) {
+		if (flags & PAGEWRITER_PAD_WRITES)
+			cb->switch_page = pagewriter_pad_flight_switch_page;
+		else
+			cb->switch_page = pagewriter_nopad_flight_switch_page;
+	} else {
+		if (flags & PAGEWRITER_PAD_WRITES)
+			cb->switch_page = pagewriter_pad_switch_page;
+		else
+			cb->switch_page = pagewriter_nopad_switch_page;
+	}
+}
+
+static void setup_callbacks(struct pagewriter *pagewriter,
+			    struct pagewriter_callbacks *cb,
+			    unsigned long flags)
+{
+	if (!cb)
+		pagewriter->cb = &default_pagewriter_callbacks;
+
+	if (!cb->switch_page)
+		set_page_switch_cb(cb, flags);
+	if (!cb->new_page)
+		cb->new_page = new_page_default_callback;
+	if (!cb->write_padding)
+		cb->write_padding = pagewriter_write_padding_default_callback;
+
+	pagewriter->cb = cb;
+}
+
+/**
+ * 	pagewriter_page_released_callback - relay_page page_released impl
+ * 	@page: the page released
+ * 	@private_data: contains associated pagewriter_buf
+ *
+ * 	relay has notified us that a page we gave it has been read and
+ * 	is now available for us to re-use.  We simply add it back to
+ * 	the page pool for that buf.
+ */
+static void pagewriter_page_released_callback(struct page *page,
+					      void *private_data)
+{
+	struct pagewriter_buf *buf = private_data;
+	struct relay_page *rpage = get_empty_rpage_struct(buf);
+
+	rpage->page = page;
+	pagewriter_add_free_page(buf, rpage);
+}
+
+/**
+ * 	pagewriter_page_stolen_callback - relay_page page_stolen impl
+ * 	@page: the page released
+ * 	@private_data: contains associated pagewriter_buf
+ *
+ * 	relay has notified us that a page we gave it has been stolen.
+ * 	We simply allocate a new one and add it to the page pool for
+ * 	that buf.
+ */
+static void pagewriter_page_stolen_callback(struct page *page,
+					    void *private_data)
+{
+	struct pagewriter_buf *buf = private_data;
+	struct relay_page *rpage;
+	struct page *new_page;
+
+	new_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+	if (unlikely(!new_page))
+		return;
+	set_page_private(new_page, (unsigned long)buf);
+	rpage = get_empty_rpage_struct(buf);
+
+	rpage->page = new_page;
+	pagewriter_add_free_page(buf, rpage);
+}
+
+static struct relay_page_callbacks pagewriter_relay_page_callbacks = {
+	.page_released	= pagewriter_page_released_callback,
+	.page_stolen	= pagewriter_page_stolen_callback,
+};
+
+/**
+ *	pagewriter_pad_switch_page - switch to a new page
+ *	@buf: channel buffer
+ *	@length: size of current event
+ *	@reserved: a pointer to the space reserved
+ *
+ *	Page switching function for pagewriter_write() functions,
+ *	which don't use padding because they write across page
+ *	boundaries.  Returns the remainder i.e. the amount that should
+ *	be written into the second page.
+ *
+ *	Performs page-switch tasks.
+ */
+void pagewriter_pad_switch_page(struct pagewriter_buf *buf)
+{
+	relay_page(buf);
+	add_empty_rpage_struct(buf, buf->page);
+	switch_to_next_page(buf);
+}
+EXPORT_SYMBOL_GPL(pagewriter_pad_switch_page);
+
+void pagewriter_pad_flight_switch_page(struct pagewriter_buf *buf)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_add_tail(&buf->page->list, &buf->pool);
+	spin_unlock_irqrestore(&buf->lock, flags);
+	buf->n_pages_flight++;
+
+	switch_to_next_page(buf);
+}
+EXPORT_SYMBOL_GPL(pagewriter_pad_flight_switch_page);
+
+void pagewriter_nopad_switch_page(struct pagewriter_buf *buf)
+{
+	relay_page(buf);
+	add_empty_rpage_struct(buf, buf->page);
+	switch_to_next_page(buf);
+}
+EXPORT_SYMBOL_GPL(pagewriter_nopad_switch_page);
+
+void pagewriter_nopad_flight_switch_page(struct pagewriter_buf *buf)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_add_tail(&buf->page->list, &buf->pool);
+	spin_unlock_irqrestore(&buf->lock, flags);
+	buf->n_pages_flight++;
+
+	switch_to_next_page(buf);
+}
+EXPORT_SYMBOL_GPL(pagewriter_nopad_flight_switch_page);
+
+/**
+ *	__pagewriter_reset - reset a pagewriter
+ *	@buf: the channel buffer
+ *	@init: 1 if this is a first-time initialization
+ *
+ *	See pagewriter_reset() for description of effect.
+ */
+static void __pagewriter_reset(struct pagewriter_buf *buf, unsigned int init)
+{
+	if (init)
+		kref_init(&buf->kref);
+
+	buf->page = pagewriter_get_free_page(buf);
+	buf->offset = 0;
+	if (buf->page)
+		buf->data = page_address(buf->page->page);
+	else
+		buf->data = NULL;
+	buf->n_pages_flight = 0;
+
+	buf->pagewriter->cb->new_page(buf, buf->data);
+}
+
+static void pagewriter_save_flight_buf(struct pagewriter_buf *buf)
+{
+	size_t first_page, n_pages = buf->n_pages_flight;
+	struct relay_page *first_rpage;
+	unsigned long flags;
+
+	buf->pagewriter->cb->switch_page(buf);
+
+	if(buf->n_pages_flight > buf->pagewriter->n_pages)
+		n_pages = buf->pagewriter->n_pages;
+
+	first_page = buf->pagewriter->n_pages - n_pages;
+
+	spin_lock_irqsave(&buf->lock, flags);
+	list_for_each_entry(first_rpage, &buf->pool, list)
+		if (!first_page--)
+			break;
+
+	list_for_each_entry_from(first_rpage, &buf->pool, list) {
+		if (buf->page->len == PAGE_SIZE) {
+			relay_page(buf);
+			add_empty_rpage_struct_nolock(buf, buf->page);
+		} else {
+			relay_partial_page(buf, buf->page->len);
+			add_empty_rpage_struct_nolock(buf, buf->page);
+		}
+	}
+	spin_unlock_irqrestore(&buf->lock, flags);
+
+	__pagewriter_reset(buf, 0);
+}
+
+/**
+ * 	pagewriter_hotcpu_callback - CPU hotplug callback
+ * 	@nb: notifier block
+ * 	@action: hotplug action to take
+ * 	@hcpu: CPU number
+ *
+ * 	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static int __cpuinit pagewriter_hotcpu_callback(struct notifier_block *nb,
+						unsigned long action,
+						void *hcpu)
+{
+	unsigned int hotcpu = (unsigned long)hcpu;
+	struct pagewriter *pagewriter;
+
+	switch (action) {
+	case CPU_UP_PREPARE:
+	case CPU_UP_PREPARE_FROZEN:
+		mutex_lock(&pagewriters_mutex);
+		list_for_each_entry(pagewriter, &pagewriters, list) {
+			if (pagewriter->buf[hotcpu])
+				continue;
+			pagewriter->buf[hotcpu] =
+				pagewriter_open_buf(pagewriter, hotcpu);
+			if (!pagewriter->buf[hotcpu]) {
+				printk(KERN_ERR
+					"pagewriter_hotcpu_callback: cpu %d "
+				       "buffer creation failed\n", hotcpu);
+				mutex_unlock(&pagewriters_mutex);
+				return NOTIFY_BAD;
+			}
+		}
+		mutex_unlock(&pagewriters_mutex);
+		break;
+	case CPU_DEAD:
+	case CPU_DEAD_FROZEN:
+		/* No need to flush the cpu : will be flushed upon
+		 * final relay_flush() call. */
+		break;
+	}
+	return NOTIFY_OK;
+}
+
+static __init int pagewriter_init(void)
+{
+
+	hotcpu_notifier(pagewriter_hotcpu_callback, 0);
+	return 0;
+}
+
+early_initcall(pagewriter_init);
diff --git a/virt/kvm/kvm_trace.c b/virt/kvm/kvm_trace.c
index 41dcc84..f5cab08 100644
--- a/virt/kvm/kvm_trace.c
+++ b/virt/kvm/kvm_trace.c
@@ -15,7 +15,7 @@
  */
 
 #include <linux/module.h>
-#include <linux/relay.h>
+#include <linux/relay_pagewriter.h>
 #include <linux/debugfs.h>
 #include <linux/ktime.h>
 
@@ -27,9 +27,9 @@
 
 struct kvm_trace {
 	int trace_state;
-	struct rchan *rchan;
+	struct pagewriter *pagewriter;
 	struct dentry *lost_file;
-	atomic_t lost_records;
+	int first_page;
 };
 static struct kvm_trace *kvm_trace;
 
@@ -84,7 +84,7 @@ static void kvm_add_trace(void *probe_private, void *call_data,
 	}
 
 	size = calc_rec_size(p->timestamp_in, extra * sizeof(u32));
-	relay_write(kt->rchan, &rec, size);
+	pagewriter_write(kt->pagewriter, &rec, size);
 }
 
 static struct kvm_trace_probe kvm_trace_probes[] = {
@@ -96,7 +96,7 @@ static int lost_records_get(void *data, u64 *val)
 {
 	struct kvm_trace *kt = data;
 
-	*val = atomic_read(&kt->lost_records);
+	*val = atomic_read(&kt->pagewriter->dropped);
 	return 0;
 }
 
@@ -107,56 +107,31 @@ DEFINE_SIMPLE_ATTRIBUTE(kvm_trace_lost_ops, lost_records_get, NULL, "%llu\n");
  *  many times we encountered a full subbuffer, to tell user space app the
  *  lost records there were.
  */
-static int kvm_subbuf_start_callback(struct rchan_buf *buf, void *subbuf,
-				     void *prev_subbuf, size_t prev_padding)
+static void kvm_new_page_callback(struct pagewriter_buf *buf,
+				  void *page_data)
 {
-	struct kvm_trace *kt;
-
-	if (!relay_buf_full(buf)) {
-		if (!prev_subbuf) {
-			/*
-			 * executed only once when the channel is opened
-			 * save metadata as first record
-			 */
-			subbuf_start_reserve(buf, sizeof(u32));
-			*(u32 *)subbuf = 0x12345678;
-		}
-
-		return 1;
+	struct kvm_trace *kt = buf->pagewriter->private_data;
+
+	if (kt->first_page) {
+		/*
+		 * executed only once when the channel is opened
+		 * save metadata as first record
+		 */
+		page_start_reserve(buf, sizeof(u32));
+		*(u32 *)page_data = 0x12345678;
+		kt->first_page = 0;
 	}
-
-	kt = buf->chan->private_data;
-	atomic_inc(&kt->lost_records);
-
-	return 0;
-}
-
-static struct dentry *kvm_create_buf_file_callack(const char *filename,
-						 struct dentry *parent,
-						 int mode,
-						 struct rchan_buf *buf,
-						 int *is_global)
-{
-	return debugfs_create_file(filename, mode, parent, buf,
-				   &relay_file_operations);
-}
-
-static int kvm_remove_buf_file_callback(struct dentry *dentry)
-{
-	debugfs_remove(dentry);
-	return 0;
 }
 
-static struct rchan_callbacks kvm_relay_callbacks = {
-	.subbuf_start 		= kvm_subbuf_start_callback,
-	.create_buf_file 	= kvm_create_buf_file_callack,
-	.remove_buf_file 	= kvm_remove_buf_file_callback,
+static struct pagewriter_callbacks kvm_pagewriter_callbacks = {
+	.new_page		= kvm_new_page_callback,
 };
 
 static int do_kvm_trace_enable(struct kvm_user_trace_setup *kuts)
 {
 	struct kvm_trace *kt;
 	int i, r = -ENOMEM;
+	int n_pages, n_pages_wakeup;
 
 	if (!kuts->buf_size || !kuts->buf_nr)
 		return -EINVAL;
@@ -166,15 +141,18 @@ static int do_kvm_trace_enable(struct kvm_user_trace_setup *kuts)
 		goto err;
 
 	r = -EIO;
-	atomic_set(&kt->lost_records, 0);
+	kt->first_page = 1;
 	kt->lost_file = debugfs_create_file("lost_records", 0444, kvm_debugfs_dir,
 					    kt, &kvm_trace_lost_ops);
 	if (!kt->lost_file)
 		goto err;
 
-	kt->rchan = relay_open("trace", kvm_debugfs_dir, kuts->buf_size,
-				kuts->buf_nr, &kvm_relay_callbacks, kt);
-	if (!kt->rchan)
+	n_pages = (kuts->buf_size * kuts->buf_nr) / PAGE_SIZE;
+	n_pages_wakeup = kuts->buf_size / PAGE_SIZE;
+	kt->pagewriter = pagewriter_open("trace", kvm_debugfs_dir, n_pages, 0,
+					 n_pages_wakeup,
+					 &kvm_pagewriter_callbacks, kt, 0UL);
+	if (!kt->pagewriter)
 		goto err;
 
 	kvm_trace = kt;
@@ -195,8 +173,8 @@ err:
 	if (kt) {
 		if (kt->lost_file)
 			debugfs_remove(kt->lost_file);
-		if (kt->rchan)
-			relay_close(kt->rchan);
+		if (kt->pagewriter)
+			pagewriter_close(kt->pagewriter);
 		kfree(kt);
 	}
 	return r;
@@ -228,7 +206,7 @@ static int kvm_trace_pause(void)
 
 	if (kt->trace_state == KVM_TRACE_STATE_RUNNING) {
 		kt->trace_state = KVM_TRACE_STATE_PAUSE;
-		relay_flush(kt->rchan);
+		pagewriter_flush(kt->pagewriter);
 		r = 0;
 	}
 
@@ -253,7 +231,7 @@ void kvm_trace_cleanup(void)
 			marker_probe_unregister(p->name, p->probe_func, p);
 		}
 
-		relay_close(kt->rchan);
+		pagewriter_close(kt->pagewriter);
 		debugfs_remove(kt->lost_file);
 		kfree(kt);
 	}
-- 
1.5.3.5



--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ