lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1454591299-30305-3-git-send-email-javier@javigon.com>
Date:	Thu,  4 Feb 2016 14:08:17 +0100
From:	"Javier González" <jg@...htnvm.io>
To:	mb@...htnvm.io
Cc:	linux-kernel@...r.kernel.org, linux-block@...r.kernel.org,
	Javier González <javier@...xlabs.com>
Subject: [RFC 2/4] lightnvm: add write buffering for rrpc

Flash controllers typically define flash pages as a collection of flash
sectors of typically 4K. Moreover, flash controllers might program flash
pages across several planes. This defines the write granurality at which
flash can be programmed. This is different for each flash controller. In
rrpc, the assumption has been that flash pages are 4K, thus writes could
be sent out to the media as they were received.

This patch adds a per-block write buffer to rrpc. Writes are flushed to
the media in chuncks of the minimum granurality allowed by the
controller, though the strategy can be changed to, for example, only
flush at the maximum supported (64 pages in nvme). Apart from enabling
the use of rrpc in real hardware, this buffering strategy will be used
for recovery; if a block becomes bad, a new block can be allocated to
persist valid data.

Signed-off-by: Javier González <javier@...xlabs.com>
---
 drivers/lightnvm/rrpc.c  | 851 ++++++++++++++++++++++++++++++++++++++---------
 drivers/lightnvm/rrpc.h  |  82 ++++-
 include/linux/lightnvm.h |   6 +-
 3 files changed, 771 insertions(+), 168 deletions(-)

diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
index 8187bf3..e9fb19d 100644
--- a/drivers/lightnvm/rrpc.c
+++ b/drivers/lightnvm/rrpc.c
@@ -16,11 +16,12 @@
 
 #include "rrpc.h"
 
-static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
+static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
+			*rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
 static DECLARE_RWSEM(rrpc_lock);
 
 static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags);
+				struct rrpc_rq *rqd, unsigned long flags);
 
 #define rrpc_for_each_lun(rrpc, rlun, i) \
 		for ((i) = 0, rlun = &(rrpc)->luns[0]; \
@@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
 	spin_unlock(&rrpc->rev_lock);
 }
 
-static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
+static void rrpc_release_and_free_rrqd(struct kref *ref)
+{
+	struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
+	struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;
+
+	rrpc_unlock_rq(rrpc, rrqd);
+	mempool_free(rrqd, rrpc->rrq_pool);
+}
+
+static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
 					sector_t laddr, unsigned int pages)
 {
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	struct rrpc_inflight_rq *inf;
 
-	rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
-	if (!rqd)
+	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
+	if (!rrqd)
 		return ERR_PTR(-ENOMEM);
+	kref_init(&rrqd->refs);
 
-	inf = rrpc_get_inflight_rq(rqd);
+	inf = rrpc_get_inflight_rq(rrqd);
 	if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
-		mempool_free(rqd, rrpc->rq_pool);
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NULL;
 	}
 
-	return rqd;
+	return rrqd;
 }
 
-static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
+static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
 {
-	struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
-
-	rrpc_unlock_laddr(rrpc, inf);
-
-	mempool_free(rqd, rrpc->rq_pool);
+	kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
 }
 
 static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
 {
 	sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
 	sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 
 	do {
-		rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
+		rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
 		schedule();
-	} while (!rqd);
+	} while (!rrqd);
 
-	if (IS_ERR(rqd)) {
+	if (IS_ERR(rrqd)) {
 		pr_err("rrpc: unable to acquire inflight IO\n");
 		bio_io_error(bio);
 		return;
 	}
 
 	rrpc_invalidate_range(rrpc, slba, len);
-	rrpc_inflight_laddr_release(rrpc, rqd);
+	rrpc_inflight_laddr_release(rrpc, rrqd);
 }
 
 static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
@@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
 {
 	struct rrpc *rrpc = rlun->rrpc;
 
-	BUG_ON(!rblk);
-
 	if (rlun->cur) {
 		spin_lock(&rlun->cur->lock);
 		WARN_ON(!block_is_full(rrpc, rlun->cur));
@@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
 	rlun->cur = rblk;
 }
 
+static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
+{
+try:
+	spin_lock(&rblk->w_buf.s_lock);
+	if (atomic_read(&rblk->w_buf.refs) > 0) {
+		spin_unlock(&rblk->w_buf.s_lock);
+		schedule();
+		goto try;
+	}
+
+	mempool_free(rblk->w_buf.entries, rrpc->block_pool);
+	rblk->w_buf.entries = NULL;
+	spin_unlock(&rblk->w_buf.s_lock);
+
+	/* TODO: Reuse the same buffers if the block size is the same */
+	mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
+	kfree(rblk->w_buf.sync_bitmap);
+
+	rblk->w_buf.mem = NULL;
+	rblk->w_buf.subm = NULL;
+	rblk->w_buf.sync_bitmap = NULL;
+	rblk->w_buf.data = NULL;
+	rblk->w_buf.nentries = 0;
+	rblk->w_buf.cur_mem = 0;
+	rblk->w_buf.cur_subm = 0;
+}
+
+static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
+{
+	struct rrpc_lun *rlun = rblk->rlun;
+	struct nvm_lun *lun = rlun->parent;
+	struct rrpc_w_buf *buf = &rblk->w_buf;
+
+try:
+	spin_lock(&buf->w_lock);
+	/* Flush inflight I/Os */
+	if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
+		spin_unlock(&buf->w_lock);
+		schedule();
+		goto try;
+	}
+	spin_unlock(&buf->w_lock);
+
+	if (rblk->w_buf.entries)
+		rrpc_free_w_buffer(rrpc, rblk);
+
+	spin_lock(&lun->lock);
+	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
+	list_del(&rblk->list);
+	spin_unlock(&lun->lock);
+}
+
+static void rrpc_put_blks(struct rrpc *rrpc)
+{
+	struct rrpc_lun *rlun;
+	int i;
+
+	for (i = 0; i < rrpc->nr_luns; i++) {
+		rlun = &rrpc->luns[i];
+		if (rlun->cur)
+			rrpc_put_blk(rrpc, rlun->cur);
+		if (rlun->gc_cur)
+			rrpc_put_blk(rrpc, rlun->gc_cur);
+	}
+}
+
 static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
 							unsigned long flags)
 {
+	struct nvm_dev *dev = rrpc->dev;
 	struct nvm_lun *lun = rlun->parent;
 	struct nvm_block *blk;
 	struct rrpc_block *rblk;
+	struct buf_entry *entries;
+	unsigned long *sync_bitmap;
+	void *data;
+	int nentries = dev->pgs_per_blk * dev->sec_per_pg;
+
+	data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
+	if (!data) {
+		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
+		return NULL;
+	}
+
+	entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
+	if (!entries) {
+		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
+		mempool_free(data, rrpc->write_buf_pool);
+		return NULL;
+	}
+
+	/* TODO: Mempool? */
+	sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
+					sizeof(unsigned long), GFP_ATOMIC);
+	if (!sync_bitmap) {
+		mempool_free(data, rrpc->write_buf_pool);
+		mempool_free(entries, rrpc->block_pool);
+		return NULL;
+	}
+
+	bitmap_zero(sync_bitmap, nentries);
 
 	spin_lock(&lun->lock);
 	blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
@@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
 	}
 
 	rblk = &rlun->blocks[blk->id];
-	list_add_tail(&rblk->list, &rlun->open_list);
-	spin_unlock(&lun->lock);
 
 	blk->priv = rblk;
-	bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
+	bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
 	rblk->next_page = 0;
 	rblk->nr_invalid_pages = 0;
-	atomic_set(&rblk->data_cmnt_size, 0);
+
+	rblk->w_buf.data = data;
+	rblk->w_buf.entries = entries;
+	rblk->w_buf.sync_bitmap = sync_bitmap;
+
+	rblk->w_buf.entries->data  = rblk->w_buf.data;
+	rblk->w_buf.mem = rblk->w_buf.entries;
+	rblk->w_buf.subm = rblk->w_buf.entries;
+	rblk->w_buf.nentries = nentries;
+	rblk->w_buf.cur_mem = 0;
+	rblk->w_buf.cur_subm = 0;
+
+	atomic_set(&rblk->w_buf.refs, 0);
+
+	spin_lock_init(&rblk->w_buf.w_lock);
+	spin_lock_init(&rblk->w_buf.s_lock);
+
+	list_add_tail(&rblk->list, &rlun->open_list);
+	spin_unlock(&lun->lock);
 
 	return rblk;
 }
 
-static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
-{
-	struct rrpc_lun *rlun = rblk->rlun;
-	struct nvm_lun *lun = rlun->parent;
-
-	spin_lock(&lun->lock);
-	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
-	list_del(&rblk->list);
-	spin_unlock(&lun->lock);
-}
-
-static void rrpc_put_blks(struct rrpc *rrpc)
-{
-	struct rrpc_lun *rlun;
-	int i;
-
-	for (i = 0; i < rrpc->nr_luns; i++) {
-		rlun = &rrpc->luns[i];
-		if (rlun->cur)
-			rrpc_put_blk(rrpc, rlun->cur);
-		if (rlun->gc_cur)
-			rrpc_put_blk(rrpc, rlun->gc_cur);
-	}
-}
-
 static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
 {
 	int next = atomic_inc_return(&rrpc->next_lun);
@@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
 	}
 }
 
+static void rrpc_writer_kick(struct rrpc *rrpc)
+{
+	struct rrpc_lun *rlun;
+	unsigned int i;
+
+	for (i = 0; i < rrpc->nr_luns; i++) {
+		rlun = &rrpc->luns[i];
+		queue_work(rrpc->kw_wq, &rlun->ws_writer);
+	}
+}
+
 /*
  * timed GC every interval.
  */
@@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
 {
 	struct request_queue *q = rrpc->dev->q;
 	struct rrpc_rev_addr *rev;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	struct bio *bio;
 	struct page *page;
 	int slot;
@@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
 	}
 
 	while ((slot = find_first_zero_bit(rblk->invalid_pages,
-					    nr_pgs_per_blk)) < nr_pgs_per_blk) {
+					nr_pgs_per_blk)) < nr_pgs_per_blk) {
 
 		/* Lock laddr */
 		phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
@@ -321,8 +423,8 @@ try:
 			continue;
 		}
 
-		rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
-		if (IS_ERR_OR_NULL(rqd)) {
+		rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
+		if (IS_ERR_OR_NULL(rrqd)) {
 			spin_unlock(&rrpc->rev_lock);
 			schedule();
 			goto try;
@@ -339,14 +441,14 @@ try:
 		/* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
 		bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
 
-		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
+		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
 			pr_err("rrpc: gc read failed.\n");
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
 		wait_for_completion_io(&wait);
 		if (bio->bi_error) {
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
 
@@ -363,14 +465,16 @@ try:
 		/* turn the command around and write the data back to a new
 		 * address
 		 */
-		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
+		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
+							!= NVM_IO_DONE) {
 			pr_err("rrpc: gc write failed.\n");
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
+		bio_endio(bio);
 		wait_for_completion_io(&wait);
 
-		rrpc_inflight_laddr_release(rrpc, rqd);
+		/* rrpc_inflight_laddr_release(rrpc, rrqd); */
 		if (bio->bi_error)
 			goto finished;
 
@@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
 	list_move_tail(&rblk->list, &rlun->closed_list);
 	spin_unlock(&lun->lock);
 
+	rrpc_free_w_buffer(rrpc, rblk);
+
 	mempool_free(gcb, rrpc->gcb_pool);
 	pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
 							rblk->parent->id);
@@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
 	queue_work(rrpc->kgc_wq, &gcb->ws_gc);
 }
 
-static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
-						sector_t laddr, uint8_t npages)
+static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
 {
-	struct rrpc_addr *p;
 	struct rrpc_block *rblk;
+	struct rrpc_w_buf *buf;
 	struct nvm_lun *lun;
-	int cmnt_size, i;
+	unsigned long bppa;
 
-	for (i = 0; i < npages; i++) {
-		p = &rrpc->trans_map[laddr + i];
-		rblk = p->rblk;
-		lun = rblk->parent->lun;
+	rblk = p->rblk;
+	buf = &rblk->w_buf;
+	lun = rblk->parent->lun;
 
-		cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
-		if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
-			rrpc_run_gc(rrpc, rblk);
+	bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
+
+	WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
+
+	if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
+		/* Write buffer out-of-bounds */
+		WARN_ON((buf->cur_mem != buf->nentries) &&
+					(buf->cur_mem != buf->cur_subm));
+
+		rrpc_run_gc(rrpc, rblk);
+	}
+}
+
+static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
+							uint8_t nr_pages)
+{
+	struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_rq *rrqd;
+	int i;
+
+	for (i = 0; i < nr_pages; i++) {
+		rrqd = brrqd[i].rrqd;
+		rrpc_sync_buffer(rrpc, brrqd[i].addr);
+		kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
 	}
+
+	mempool_free(brrqd, rrpc->m_rrq_pool);
+	rrpc_writer_kick(rrpc);
+}
+
+static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
+							uint8_t nr_pages)
+{
+	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+
+	if (rqd->flags & NVM_IOTYPE_GC)
+		return;
+
+	rrpc_unlock_rq(rrpc, rrqd);
+	mempool_free(rrqd, rrpc->rrq_pool);
 }
 
 static void rrpc_end_io(struct nvm_rq *rqd)
 {
 	struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
-	uint8_t npages = rqd->nr_pages;
-	sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
+	uint8_t nr_pages = rqd->nr_pages;
 
 	if (bio_data_dir(rqd->bio) == WRITE)
-		rrpc_end_io_write(rrpc, rrqd, laddr, npages);
+		rrpc_end_io_write(rrpc, rqd, nr_pages);
+	else
+		rrpc_end_io_read(rrpc, rqd, nr_pages);
 
 	bio_put(rqd->bio);
 
-	if (rrqd->flags & NVM_IOTYPE_GC)
-		return;
-
-	rrpc_unlock_rq(rrpc, rqd);
-
-	if (npages > 1)
+	if (nr_pages > 1)
 		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
 	if (rqd->metadata)
 		nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
@@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
 }
 
 static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, int npages)
+			struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
+			unsigned long flags, int nr_pages)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
 	struct rrpc_addr *gp;
 	sector_t laddr = rrpc_get_laddr(bio);
 	int is_gc = flags & NVM_IOTYPE_GC;
 	int i;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
 		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_REQUEUE;
 	}
 
-	for (i = 0; i < npages; i++) {
+	for (i = 0; i < nr_pages; i++) {
 		/* We assume that mapping occurs at 4KB granularity */
 		BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
 		gp = &rrpc->trans_map[laddr + i];
@@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
 			rrpc_unlock_laddr(rrpc, r);
 			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
 							rqd->dma_ppa_list);
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
 			return NVM_IO_DONE;
 		}
+
+		brrqd[i].addr = gp;
 	}
 
 	rqd->opcode = NVM_OP_HBREAD;
@@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 	sector_t laddr = rrpc_get_laddr(bio);
 	struct rrpc_addr *gp;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_REQUEUE;
+	}
 
 	BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
 	gp = &rrpc->trans_map[laddr];
@@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 		rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
 	} else {
 		BUG_ON(is_gc);
-		rrpc_unlock_rq(rrpc, rqd);
+		rrpc_unlock_rq(rrpc, rrqd);
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_DONE;
 	}
 
@@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 	return NVM_IO_OK;
 }
 
+/*
+ * Copy data from current bio to block write buffer. This if necessary
+ * to guarantee durability if a flash block becomes bad before all pages
+ * are written. This buffer is also used to write at the right page
+ * granurality
+ */
+static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, struct rrpc_addr *addr,
+				struct rrpc_w_buf *w_buf,
+				unsigned long flags)
+{
+	struct nvm_dev *dev = rrpc->dev;
+	unsigned int bio_len = bio_cur_bytes(bio);
+
+	if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
+		return NVM_IO_ERR;
+
+	spin_lock(&w_buf->w_lock);
+
+	WARN_ON(w_buf->cur_mem == w_buf->nentries);
+
+	w_buf->mem->rrqd = rrqd;
+	w_buf->mem->addr = addr;
+	w_buf->mem->flags = flags;
+
+	memcpy(w_buf->mem->data, bio_data(bio), bio_len);
+
+	w_buf->cur_mem++;
+	if (likely(w_buf->cur_mem < w_buf->nentries)) {
+		w_buf->mem++;
+		w_buf->mem->data =
+				w_buf->data + (w_buf->cur_mem * dev->sec_size);
+	}
+
+	spin_unlock(&w_buf->w_lock);
+
+	return 0;
+}
+
 static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, int npages)
+			struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
+	struct rrpc_w_buf *w_buf;
 	struct rrpc_addr *p;
+	struct rrpc_lun *rlun;
 	sector_t laddr = rrpc_get_laddr(bio);
 	int is_gc = flags & NVM_IOTYPE_GC;
+	int err;
 	int i;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
-		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NVM_IO_REQUEUE;
 	}
 
-	for (i = 0; i < npages; i++) {
+	for (i = 0; i < nr_pages; i++) {
+		kref_get(&rrqd->refs);
+
 		/* We assume that mapping occurs at 4KB granularity */
 		p = rrpc_map_page(rrpc, laddr + i, is_gc);
 		if (!p) {
 			BUG_ON(is_gc);
 			rrpc_unlock_laddr(rrpc, r);
-			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
-							rqd->dma_ppa_list);
+			mempool_free(rrqd, rrpc->rrq_pool);
 			rrpc_gc_kick(rrpc);
 			return NVM_IO_REQUEUE;
 		}
 
-		rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
-								p->addr);
+		w_buf = &p->rblk->w_buf;
+		rlun = p->rblk->rlun;
+
+		rrqd->addr = p;
+
+		err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
+		if (err) {
+			pr_err("rrpc: could not write to write buffer\n");
+			return err;
+		}
+
+		bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
+
+		queue_work(rrpc->kw_wq, &rlun->ws_writer);
 	}
 
-	rqd->opcode = NVM_OP_HBWRITE;
+	if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
+		pr_err("rrpc: request reference counter dailed\n");
+		return NVM_IO_ERR;
+	}
 
-	return NVM_IO_OK;
+	return NVM_IO_DONE;
 }
 
 static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags)
+				struct rrpc_rq *rrqd, unsigned long flags)
 {
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_w_buf *w_buf;
 	struct rrpc_addr *p;
+	struct rrpc_lun *rlun;
 	int is_gc = flags & NVM_IOTYPE_GC;
+	int err;
 	sector_t laddr = rrpc_get_laddr(bio);
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NVM_IO_REQUEUE;
+	}
 
 	p = rrpc_map_page(rrpc, laddr, is_gc);
 	if (!p) {
 		BUG_ON(is_gc);
-		rrpc_unlock_rq(rrpc, rqd);
+		rrpc_unlock_rq(rrpc, rrqd);
+		mempool_free(rrqd, rrpc->rrq_pool);
 		rrpc_gc_kick(rrpc);
 		return NVM_IO_REQUEUE;
 	}
 
-	rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
-	rqd->opcode = NVM_OP_HBWRITE;
+	w_buf = &p->rblk->w_buf;
+	rlun = p->rblk->rlun;
+
 	rrqd->addr = p;
 
-	return NVM_IO_OK;
+	err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
+	if (err) {
+		pr_err("rrpc: could not write to write buffer\n");
+		return err;
+	}
+
+	queue_work(rrpc->kw_wq, &rlun->ws_writer);
+	return NVM_IO_DONE;
 }
 
-static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
+static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
 {
-	if (npages > 1) {
+	struct nvm_rq *rqd;
+	struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
+	uint8_t nr_pages = rrpc_get_pages(bio);
+	int err;
+
+	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
+	if (!rqd) {
+		pr_err_ratelimited("rrpc: not able to queue bio.");
+		bio_io_error(bio);
+		return NVM_IO_ERR;
+	}
+	rqd->metadata = NULL;
+	rqd->priv = rrqd;
+
+	if (nr_pages > 1) {
 		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
-							&rqd->dma_ppa_list);
+						&rqd->dma_ppa_list);
 		if (!rqd->ppa_list) {
 			pr_err("rrpc: not able to allocate ppa list\n");
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
 			return NVM_IO_ERR;
 		}
 
-		if (bio_rw(bio) == WRITE)
-			return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
-									npages);
-
-		return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
+		err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
+								nr_pages);
+		if (err) {
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
+			return err;
+		}
+	} else {
+		err = rrpc_read_rq(rrpc, bio, rqd, flags);
+		if (err)
+			return err;
 	}
 
-	if (bio_rw(bio) == WRITE)
-		return rrpc_write_rq(rrpc, bio, rqd, flags);
-
-	return rrpc_read_rq(rrpc, bio, rqd, flags);
-}
-
-static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags)
-{
-	int err;
-	struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
-	uint8_t nr_pages = rrpc_get_pages(bio);
-	int bio_size = bio_sectors(bio) << 9;
-
-	if (bio_size < rrpc->dev->sec_size)
-		return NVM_IO_ERR;
-	else if (bio_size > rrpc->dev->max_rq_size)
-		return NVM_IO_ERR;
-
-	err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
-	if (err)
-		return err;
-
 	bio_get(bio);
 	rqd->bio = bio;
 	rqd->ins = &rrpc->instance;
-	rqd->nr_pages = nr_pages;
-	rrq->flags = flags;
+	rqd->nr_pages = rrqd->nr_pages = nr_pages;
+	rqd->flags = flags;
 
 	err = nvm_submit_io(rrpc->dev, rqd);
 	if (err) {
 		pr_err("rrpc: I/O submission failed: %d\n", err);
 		bio_put(bio);
 		if (!(flags & NVM_IOTYPE_GC)) {
-			rrpc_unlock_rq(rrpc, rqd);
+			rrpc_unlock_rq(rrpc, rrqd);
 			if (rqd->nr_pages > 1)
 				nvm_dev_dma_free(rrpc->dev,
 			rqd->ppa_list, rqd->dma_ppa_list);
@@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
 	return NVM_IO_OK;
 }
 
+static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
+{
+	uint8_t nr_pages = rrpc_get_pages(bio);
+
+	rrqd->nr_pages = nr_pages;
+
+	if (nr_pages > 1)
+		return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
+	else
+		return rrpc_write_rq(rrpc, bio, rrqd, flags);
+}
+
+static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
+{
+	int bio_size = bio_sectors(bio) << 9;
+
+	if (bio_size < rrpc->dev->sec_size)
+		return NVM_IO_ERR;
+	else if (bio_size > rrpc->dev->max_rq_size)
+		return NVM_IO_ERR;
+
+	if (bio_rw(bio) == READ)
+		return rrpc_submit_read(rrpc, bio, rrqd, flags);
+
+	return rrpc_buffer_write(rrpc, bio, rrqd, flags);
+}
+
 static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 {
 	struct rrpc *rrpc = q->queuedata;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	int err;
 
 	if (bio->bi_rw & REQ_DISCARD) {
@@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 		return BLK_QC_T_NONE;
 	}
 
-	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
-	if (!rqd) {
-		pr_err_ratelimited("rrpc: not able to queue bio.");
+	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
+	if (!rrqd) {
+		pr_err_ratelimited("rrpc: not able to allocate rrqd.");
 		bio_io_error(bio);
 		return BLK_QC_T_NONE;
 	}
-	memset(rqd, 0, sizeof(struct nvm_rq));
+	kref_init(&rrqd->refs);
 
-	err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
+	err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
 	switch (err) {
 	case NVM_IO_OK:
 		return BLK_QC_T_NONE;
@@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 		break;
 	}
 
-	mempool_free(rqd, rrpc->rq_pool);
 	return BLK_QC_T_NONE;
 }
 
+static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
+								void *data)
+{
+	struct page *page;
+	int err;
+
+	if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
+		return -1;
+
+	page = virt_to_page(data);
+	if (!page) {
+		pr_err("nvm: rrpc: could not alloc page\n");
+		return -1;
+	}
+
+	err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
+	if (err != RRPC_EXPOSED_PAGE_SIZE) {
+		pr_err("nvm: rrpc: could not add page to bio\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+static void rrpc_submit_write(struct work_struct *work)
+{
+	struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
+	struct rrpc *rrpc = rlun->rrpc;
+	struct nvm_dev *dev = rrpc->dev;
+	struct rrpc_addr *addr;
+	struct rrpc_rq *rrqd;
+	struct rrpc_buf_rq *brrqd;
+	void *data;
+	struct nvm_rq *rqd;
+	struct rrpc_block *rblk;
+	struct bio *bio;
+	int pgs_to_sync, pgs_avail;
+	int sync = NVM_SYNC_HARD;
+	int err;
+	int i;
+
+	/* Note that OS pages are typically mapped to flash page sectors, which
+	 * are 4K; a flash page might be formed of several sectors. Also,
+	 * controllers typically program flash pages across multiple planes.
+	 * This is the flash programing granurality, and the reason behind the
+	 * sync strategy performed in this write thread.
+	 */
+try:
+	spin_lock(&rlun->parent->lock);
+	list_for_each_entry(rblk, &rlun->open_list, list) {
+		if (!spin_trylock(&rblk->w_buf.w_lock))
+			continue;
+
+		/* If the write thread has already submitted all I/Os in the
+		 * write buffer for this block ignore that the block is in the
+		 * open list; it is on its way to the closed list. This enables
+		 * us to avoid taking a lock on the list.
+		 */
+		if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
+			spin_unlock(&rblk->w_buf.w_lock);
+			spin_unlock(&rlun->parent->lock);
+			schedule();
+			goto try;
+		}
+		pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
+
+		switch (sync) {
+		case NVM_SYNC_SOFT:
+			pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
+					rrpc->max_write_pgs : 0;
+			break;
+		case NVM_SYNC_HARD:
+			if (pgs_avail >= rrpc->max_write_pgs)
+				pgs_to_sync = rrpc->max_write_pgs;
+			else if (pgs_avail >= rrpc->min_write_pgs)
+				pgs_to_sync = rrpc->min_write_pgs *
+					(pgs_avail / rrpc->min_write_pgs);
+			else
+				pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
+			break;
+		case NVM_SYNC_OPORT:
+			if (pgs_avail >= rrpc->max_write_pgs)
+				pgs_to_sync = rrpc->max_write_pgs;
+			else if (pgs_avail >= rrpc->min_write_pgs)
+				pgs_to_sync = rrpc->min_write_pgs *
+					(pgs_avail / rrpc->min_write_pgs);
+			else
+				pgs_to_sync = 0;
+		}
+
+		if (pgs_to_sync == 0) {
+			spin_unlock(&rblk->w_buf.w_lock);
+			continue;
+		}
+
+		bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
+		if (!bio) {
+			pr_err("nvm: rrpc: could not alloc write bio\n");
+			goto out1;
+		}
+
+		rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
+		if (!rqd) {
+			pr_err_ratelimited("rrpc: not able to create w req.");
+			goto out2;
+		}
+		rqd->metadata = NULL;
+
+		brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
+		if (!brrqd) {
+			pr_err_ratelimited("rrpc: not able to create w rea.");
+			goto out3;
+		}
+
+		bio->bi_iter.bi_sector = 0; /* artificial bio */
+		bio->bi_rw = WRITE;
+
+		rqd->opcode = NVM_OP_HBWRITE;
+		rqd->bio = bio;
+		rqd->ins = &rrpc->instance;
+		rqd->nr_pages = pgs_to_sync;
+		rqd->priv = brrqd;
+
+		if (pgs_to_sync == 1) {
+			rrqd = rblk->w_buf.subm->rrqd;
+			addr = rblk->w_buf.subm->addr;
+			rqd->flags = rblk->w_buf.subm->flags;
+			data = rblk->w_buf.subm->data;
+
+			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
+			if (err) {
+				pr_err("rrpc: cannot allocate page in bio\n");
+				goto out4;
+			}
+
+			/* TODO: This address can be skipped */
+			if (addr->addr == ADDR_EMPTY)
+				pr_err_ratelimited("rrpc: submitting empty rq");
+
+			rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
+
+			brrqd[0].rrqd = rrqd;
+			brrqd[0].addr = addr;
+
+			rblk->w_buf.subm++;
+			rblk->w_buf.cur_subm++;
+
+			goto submit_io;
+		}
+
+		/* This bio will contain several pppas */
+		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
+							&rqd->dma_ppa_list);
+		if (!rqd->ppa_list) {
+			pr_err("rrpc: not able to allocate ppa list\n");
+			goto out4;
+		}
+
+		for (i = 0; i < pgs_to_sync; i++) {
+			rrqd = rblk->w_buf.subm->rrqd;
+			addr = rblk->w_buf.subm->addr;
+			rqd->flags = rblk->w_buf.subm->flags;
+			data = rblk->w_buf.subm->data;
+
+			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
+			if (err) {
+				pr_err("rrpc: cannot allocate page in bio\n");
+				goto out5;
+			}
+
+			/* TODO: This address can be skipped */
+			if (addr->addr == ADDR_EMPTY)
+				pr_err_ratelimited("rrpc: submitting empty rq");
+
+			rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
+
+			brrqd[i].rrqd = rrqd;
+			brrqd[i].addr = addr;
+
+			rblk->w_buf.subm++;
+			rblk->w_buf.cur_subm++;
+		}
+
+submit_io:
+		WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
+
+		spin_unlock(&rblk->w_buf.w_lock);
+
+		err = nvm_submit_io(dev, rqd);
+		if (err) {
+			pr_err("rrpc: I/O submission failed: %d\n", err);
+			mempool_free(rqd, rrpc->rq_pool);
+			bio_put(bio);
+		}
+	}
+
+	spin_unlock(&rlun->parent->lock);
+	return;
+
+out5:
+	nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+out4:
+	mempool_free(brrqd, rrpc->m_rrq_pool);
+out3:
+	mempool_free(rqd, rrpc->rq_pool);
+out2:
+	bio_put(bio);
+out1:
+	spin_unlock(&rblk->w_buf.w_lock);
+	spin_unlock(&rlun->parent->lock);
+}
+
 static void rrpc_requeue(struct work_struct *work)
 {
 	struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
@@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
 
 static int rrpc_gc_init(struct rrpc *rrpc)
 {
-	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
+	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,
 								rrpc->nr_luns);
 	if (!rrpc->krqd_wq)
 		return -ENOMEM;
@@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
 
 static int rrpc_core_init(struct rrpc *rrpc)
 {
+	struct nvm_dev *dev = rrpc->dev;
+
 	down_write(&rrpc_lock);
 	if (!rrpc_gcb_cache) {
 		rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
@@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
 			return -ENOMEM;
 		}
 
-		rrpc_rq_cache = kmem_cache_create("rrpc_rq",
-				sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
-				0, 0, NULL);
+		rrpc_rq_cache = kmem_cache_create("nvm_rq",
+					sizeof(struct nvm_rq), 0, 0, NULL);
 		if (!rrpc_rq_cache) {
 			kmem_cache_destroy(rrpc_gcb_cache);
 			up_write(&rrpc_lock);
 			return -ENOMEM;
 		}
+
+		rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
+					sizeof(struct rrpc_rq), 0, 0, NULL);
+		if (!rrpc_rrq_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+
+		rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
+			rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
+			0, 0, NULL);
+		if (!rrpc_buf_rrq_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+	}
+
+	/* we assume that sec->sec_size is the same as the page size exposed by
+	 * rrpc (4KB). We need extra logic otherwise
+	 */
+	if (!rrpc_block_cache) {
+		/* Write buffer: Allocate all buffer (for all block) at once. We
+		 * avoid having to allocate a memory from the pool for each IO
+		 * at the cost pre-allocating memory for the whole block when a
+		 * new block is allocated from the media manager.
+		 */
+		rrpc_wb_cache = kmem_cache_create("nvm_wb",
+			dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
+			0, 0, NULL);
+		if (!rrpc_wb_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			kmem_cache_destroy(rrpc_buf_rrq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+
+		/* Write buffer entries */
+		rrpc_block_cache = kmem_cache_create("nvm_entry",
+			dev->pgs_per_blk * dev->sec_per_pg *
+			sizeof(struct buf_entry),
+			0, 0, NULL);
+		if (!rrpc_block_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			kmem_cache_destroy(rrpc_buf_rrq_cache);
+			kmem_cache_destroy(rrpc_wb_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
 	}
 	up_write(&rrpc_lock);
 
@@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
 	if (!rrpc->rq_pool)
 		return -ENOMEM;
 
+	rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
+	if (!rrpc->rrq_pool)
+		return -ENOMEM;
+
+	rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
+	if (!rrpc->m_rrq_pool)
+		return -ENOMEM;
+
+	rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
+	if (!rrpc->block_pool)
+		return -ENOMEM;
+
+	rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
+	if (!rrpc->write_buf_pool)
+		return -ENOMEM;
+
 	spin_lock_init(&rrpc->inflights.lock);
 	INIT_LIST_HEAD(&rrpc->inflights.reqs);
 
+	rrpc->kw_wq = alloc_workqueue("rrpc-writer",
+				WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
+	if (!rrpc->kw_wq)
+		return -ENOMEM;
+
 	return 0;
 }
 
 static void rrpc_core_free(struct rrpc *rrpc)
 {
+	if (rrpc->kw_wq)
+		destroy_workqueue(rrpc->kw_wq);
+
 	mempool_destroy(rrpc->page_pool);
 	mempool_destroy(rrpc->gcb_pool);
+	mempool_destroy(rrpc->m_rrq_pool);
+	mempool_destroy(rrpc->rrq_pool);
 	mempool_destroy(rrpc->rq_pool);
+	mempool_destroy(rrpc->block_pool);
+	mempool_destroy(rrpc->write_buf_pool);
 }
 
 static void rrpc_luns_free(struct rrpc *rrpc)
@@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
 		INIT_LIST_HEAD(&rlun->open_list);
 		INIT_LIST_HEAD(&rlun->closed_list);
 
+		INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
+
 		INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
 		spin_lock_init(&rlun->lock);
 
@@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
 
 	flush_workqueue(rrpc->krqd_wq);
 	flush_workqueue(rrpc->kgc_wq);
+	/* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/
 
 	rrpc_free(rrpc);
 }
diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
index 868e91a..6e188b4 100644
--- a/drivers/lightnvm/rrpc.h
+++ b/drivers/lightnvm/rrpc.h
@@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
 struct rrpc_rq {
 	struct rrpc_inflight_rq inflight_rq;
 	struct rrpc_addr *addr;
+	int nr_pages;
+
+	struct kref refs;
+};
+
+struct rrpc_buf_rq {
+	struct rrpc_addr *addr;
+	struct rrpc_rq *rrqd;
+};
+
+/* Sync strategies from write buffer to media */
+enum {
+	NVM_SYNC_SOFT	= 0x0,		/* Only submit at max_write_pgs
+					 * supported by the device. Typically 64
+					 * pages (256k).
+					 */
+	NVM_SYNC_HARD	= 0x1,		/* Submit the whole buffer. Add padding
+					 * if necessary to respect the device's
+					 * min_write_pgs.
+					 */
+	NVM_SYNC_OPORT	= 0x2,		/* Submit what we can, always respecting
+					 * the device's min_write_pgs.
+					 */
+};
+
+struct buf_entry {
+	struct rrpc_rq *rrqd;
+	void *data;
+	struct rrpc_addr *addr;
 	unsigned long flags;
 };
 
+struct rrpc_w_buf {
+	struct buf_entry *entries;	/* Entries */
+	struct buf_entry *mem;		/* Points to the next writable entry */
+	struct buf_entry *subm;		/* Points to the last submitted entry */
+	int cur_mem;			/* Current memory entry. Follows mem */
+	int cur_subm;			/* Entries have been submitted to dev */
+	int nentries;			/* Number of entries in write buffer */
+
+	void *data;			/* Actual data */
+	unsigned long *sync_bitmap;	/* Bitmap representing physical
+					 * addresses that have been synced to
+					 * the media
+					 */
+
+	atomic_t refs;
+
+	spinlock_t w_lock;
+	spinlock_t s_lock;
+};
+
 struct rrpc_block {
 	struct nvm_block *parent;
 	struct rrpc_lun *rlun;
 	struct list_head prio;
 	struct list_head list;
+	struct rrpc_w_buf w_buf;
 
 #define MAX_INVALID_PAGES_STORAGE 8
-	/* Bitmap for invalid page intries */
+	/* Bitmap for invalid page entries */
 	unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
 	/* points to the next writable page within a block */
 	unsigned int next_page;
@@ -67,7 +117,6 @@ struct rrpc_block {
 	unsigned int nr_invalid_pages;
 
 	spinlock_t lock;
-	atomic_t data_cmnt_size; /* data pages committed to stable storage */
 };
 
 struct rrpc_lun {
@@ -86,6 +135,7 @@ struct rrpc_lun {
 					 */
 
 	struct work_struct ws_gc;
+	struct work_struct ws_writer;
 
 	spinlock_t lock;
 };
@@ -136,10 +186,15 @@ struct rrpc {
 	mempool_t *page_pool;
 	mempool_t *gcb_pool;
 	mempool_t *rq_pool;
+	mempool_t *rrq_pool;
+	mempool_t *m_rrq_pool;
+	mempool_t *block_pool;
+	mempool_t *write_buf_pool;
 
 	struct timer_list gc_timer;
 	struct workqueue_struct *krqd_wq;
 	struct workqueue_struct *kgc_wq;
+	struct workqueue_struct *kw_wq;
 };
 
 struct rrpc_block_gc {
@@ -181,7 +236,7 @@ static inline int request_intersects(struct rrpc_inflight_rq *r,
 }
 
 static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
-			     unsigned pages, struct rrpc_inflight_rq *r)
+				unsigned pages, struct rrpc_inflight_rq *r)
 {
 	sector_t laddr_end = laddr + pages - 1;
 	struct rrpc_inflight_rq *rtmp;
@@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
 }
 
 static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
-				 unsigned pages,
-				 struct rrpc_inflight_rq *r)
+				unsigned pages,
+				struct rrpc_inflight_rq *r)
 {
 	BUG_ON((laddr + pages) > rrpc->nr_sects);
 
 	return __rrpc_lock_laddr(rrpc, laddr, pages, r);
 }
 
-static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
+static inline struct rrpc_inflight_rq
+				*rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
 {
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
-
 	return &rrqd->inflight_rq;
 }
 
 static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
-							struct nvm_rq *rqd)
+							struct rrpc_rq *rrqd)
 {
 	sector_t laddr = rrpc_get_laddr(bio);
 	unsigned int pages = rrpc_get_pages(bio);
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
 
 	return rrpc_lock_laddr(rrpc, laddr, pages, r);
 }
@@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
 	spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
 }
 
-static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
+static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
-	uint8_t pages = rqd->nr_pages;
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
+	uint8_t nr_pages = rrqd->nr_pages;
 
-	BUG_ON((r->l_start + pages) > rrpc->nr_sects);
+	BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
 
 	rrpc_unlock_laddr(rrpc, r);
 }
diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
index b94f2d5..eda9743 100644
--- a/include/linux/lightnvm.h
+++ b/include/linux/lightnvm.h
@@ -231,6 +231,8 @@ struct nvm_rq {
 	void *metadata;
 	dma_addr_t dma_metadata;
 
+	void *priv;
+
 	struct completion *wait;
 	nvm_end_io_fn *end_io;
 
@@ -243,12 +245,12 @@ struct nvm_rq {
 
 static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
 {
-	return pdu - sizeof(struct nvm_rq);
+	return container_of(pdu, struct nvm_rq, priv);
 }
 
 static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
 {
-	return rqdata + 1;
+	return rqdata->priv;
 }
 
 struct nvm_block;
-- 
2.1.4

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ