[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <5243922B.6070705@gmail.com>
Date: Thu, 26 Sep 2013 10:47:23 +0900
From: Akira Hayakawa <ruby.wktk@...il.com>
To: snitzer@...hat.com
CC: gregkh@...uxfoundation.org, devel@...verdev.osuosl.org,
linux-kernel@...r.kernel.org, dm-devel@...hat.com,
cesarb@...arb.net, joe@...ches.com, akpm@...ux-foundation.org,
agk@...hat.com, m.chehab@...sung.com, ejt@...hat.com,
dan.carpenter@...cle.com, ruby.wktk@...il.com
Subject: Re: Reworking dm-writeboost [was: Re: staging: Add dm-writeboost]
Hi, Mike
The monolithic source code (3.2k)
is nicely splitted into almost 20 *.c files
according to the functionality and
data strucutures in OOP style.
The aim of this posting
is to share how the splitting looks like.
I believe that
at least reading the *.h files
can convince you the splitting is clear.
The code is now tainted with
almost 20 version switch macros
and WB* debug macros
but I will clean them up
for sending patch.
Again,
the latest code can be cloned by
git clone https://github.com/akiradeveloper/dm-writeboost.git
I will make few updates to the source codes on this weekend
so please track it to follow the latest version.
Below is only the snapshot.
Akira
---------- Summary ----------
33 Makefile
10 bigarray.h
19 cache-alloc.h
10 defer-barrier.h
8 dirty-sync.h
8 flush-daemon.h
10 format-cache.h
24 handle-io.h
16 hashtable.h
18 migrate-daemon.h
7 migrate-modulator.h
12 queue-flush-job.h
8 rambuf.h
13 recover.h
18 segment.h
8 superblock-recorder.h
9 target.h
30 util.h
384 writeboost.h
99 bigarray.c
192 cache-alloc.c
36 defer-barrier.c
33 dirty-sync.c
85 flush-daemon.c
234 format-cache.c
553 handle-io.c
109 hashtable.c
345 migrate-daemon.c
41 migrate-modulator.c
169 queue-flush-job.c
52 rambuf.c
308 recover.c
118 segment.c
61 superblock-recorder.c
376 target.c
126 util.c
---------- Makefile ----------
KERNEL_TREE := /lib/modules/$(shell uname -r)/build
# KERNEL_TREE := $(HOME)/linux-$(KERN_VERSION)
PWD := $(shell pwd)
# EXTRA_CFLAGS += -O0 -DCONFIG_DM_DEBUG -fno-inline #-Wall
# EXTRA_CFLAGS += -O2 -UCONFIG_DM_DEBUG
obj-m := dm-writeboost.o
dm-writeboost-objs := \
target.o \
handle-io.o \
queue-flush-job.o \
flush-daemon.o \
migrate-daemon.o \
migrate-modulator.o \
defer-barrier.o \
superblock-recorder.o \
dirty-sync.o \
bigarray.o \
segment.o \
hashtable.o \
cache-alloc.o \
format-cache.o \
recover.o \
rambuf.o \
util.o
all:
$(MAKE) -C $(KERNEL_TREE) M=$(PWD) modules
clean:
$(MAKE) -C $(KERNEL_TREE) M=$(PWD) clean
---------- bigarray.h ----------
#ifndef WRITEBOOST_BIGARRAY_H
#define WRITEBOOST_BIGARRAY_H
#include "writeboost.h"
struct bigarray;
struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems);
void kill_bigarray(struct bigarray *);
void *bigarray_at(struct bigarray *, size_t i);
#endif
---------- cache-alloc.h ----------
#ifndef WRITEBOOST_CACHE_ALLOC_H
#define WRITEBOOST_CACHE_ALLOC_H
#include "writeboost.h"
#include "segment.h"
#include "flush-daemon.h"
#include "migrate-daemon.h"
#include "migrate-modulator.h"
#include "rambuf.h"
#include "hashtable.h"
#include "superblock-recorder.h"
#include "dirty-sync.h"
#include "recover.h"
#include "defer-barrier.h"
#include "handle-io.h"
int __must_check resume_cache(struct wb_cache *, struct dm_dev *);
void free_cache(struct wb_cache *);
#endif
---------- defer-barrier.h ----------
#ifndef WRITEBOOST_DEFER_BARRIER_H
#define WRITEBOOST_DEFER_BARRIER_H
#include "writeboost.h"
#include "queue-flush-job.h"
void queue_barrier_io(struct wb_cache *, struct bio *);
void flush_barrier_ios(struct work_struct *);
void barrier_deadline_proc(unsigned long data);
#endif
---------- dirty-sync.h ----------
#ifndef WRITEBOOST_DIRTY_SYNC_H
#define WRITEBOOST_DIRTY_SYNC_H
#include "writeboost.h"
#include "queue-flush-job.h"
void sync_proc(struct work_struct *);
#endif
---------- flush-daemon.h ----------
#ifndef WRITEBOOST_FLUSH_DAEMON_H
#define WRITEBOOST_FLUSH_DAEMON_H
#include "writeboost.h"
#include "util.h"
void flush_proc(struct work_struct *);
#endif
---------- format-cache.h ----------
#ifndef WRITEBOOST_FORMAT_CACHE_H
#define WRITEBOOST_FORMAT_CACHE_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
int __must_check audit_cache_device(struct dm_dev *, bool *cache_valid);
int __must_check format_cache_device(struct dm_dev *);
#endif
---------- handle-io.h ----------
#ifndef WRITEBOOST_HANDLE_IO_H
#define WRITEBOOST_HANDLE_IO_H
#include "writeboost.h"
#include "bigarray.h"
#include "util.h"
#include "defer-barrier.h"
#include "hashtable.h"
#include "segment.h"
#include "queue-flush-job.h"
int writeboost_map(struct dm_target *, struct bio *
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *
#endif
);
int writeboost_end_io(struct dm_target *, struct bio *, int error
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *
#endif
);
void inc_nr_dirty_caches(struct wb_device *);
void clear_stat(struct wb_cache *);
#endif
---------- hashtable.h ----------
#ifndef WRITEBOOST_HASHTABLE_H
#define WRITEBOOST_HASHTABLE_H
#include "writeboost.h"
#include "segment.h"
int __must_check ht_empty_init(struct wb_cache *);
cache_nr ht_hash(struct wb_cache *, struct lookup_key *);
struct metablock *ht_lookup(struct wb_cache *,
struct ht_head *, struct lookup_key *);
void ht_register(struct wb_cache *, struct ht_head *,
struct lookup_key *, struct metablock *);
void ht_del(struct wb_cache *, struct metablock *);
void discard_caches_inseg(struct wb_cache *,
struct segment_header *);
#endif
---------- migrate-daemon.h ----------
#ifndef WRITEBOOST_MIGRATE_DAEMON_H
#define WRITEBOOST_MIGRATE_DAEMON_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
u8 atomic_read_mb_dirtiness(struct segment_header *,
struct metablock *);
void cleanup_mb_if_dirty(struct wb_cache *,
struct segment_header *,
struct metablock *);
void migrate_proc(struct work_struct *);
void wait_for_migration(struct wb_cache *, size_t id);
#endif
---------- migrate-modulator.h ----------
#ifndef WRITEBOOST_MIGRATE_MODULATOR_H
#define WRITEBOOST_MIGRATE_MODULATOR_H
#include "writeboost.h"
void modulator_proc(struct work_struct *);
#endif
---------- queue-flush-job.h ----------
#ifndef WRITEBOOST_QUEUE_FLUSH_JOB
#define WRITEBOOST_QUEUE_FLUSH_JOB
#include "writeboost.h"
#include "segment.h"
#include "hashtable.h"
#include "util.h"
#include "migrate-daemon.h"
void queue_current_buffer(struct wb_cache *);
void flush_current_buffer(struct wb_cache *);
#endif
---------- rambuf.h ----------
#ifndef WRITEBOOST_RAMBUF_H
#define WRITEBOOST_RAMBUF_H
#include "writeboost.h"
int __must_check init_rambuf_pool(struct wb_cache *);
void free_rambuf_pool(struct wb_cache *);
#endif
---------- recover.h ----------
#ifndef WRITEBOOST_RECOVER_H
#define WRITEBOOST_RECOVER_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
#include "bigarray.h"
#include "hashtable.h"
#include "migrate-daemon.h"
#include "handle-io.h"
int __must_check recover_cache(struct wb_cache *);
#endif
---------- segment.h ----------
#ifndef WRITEBOOST_SEGMENT_H
#define WRITEBOOST_SEGMENT_H
#include "writeboost.h"
#include "segment.h"
#include "bigarray.h"
#include "util.h"
int __must_check init_segment_header_array(struct wb_cache *);
u64 calc_nr_segments(struct dm_dev *);
struct segment_header *get_segment_header_by_id(struct wb_cache *,
size_t segment_id);
sector_t calc_segment_header_start(size_t segment_idx);
sector_t calc_mb_start_sector(struct segment_header *, cache_nr mb_idx);
u32 calc_segment_lap(struct wb_cache *, size_t segment_id);
struct metablock *mb_at(struct wb_cache *, cache_nr idx);
bool is_on_buffer(struct wb_cache *, cache_nr mb_idx);
#endif
---------- superblock-recorder.h ----------
#ifndef WRITEBOOST_SUPERBLOCK_RECORDER_H
#define WRITEBOOST_SUPERBLOCK_RECORDER_H
#include "writeboost.h"
#include "util.h"
void recorder_proc(struct work_struct *);
#endif
---------- target.h ----------
#ifndef WRITEBOOST_TARGET_H
#define WRITEBOOST_TARGET_H
#include "writeboost.h"
#include "format-cache.h"
#include "cache-alloc.h"
#include "handle-io.h"
#include "util.h"
#endif
---------- util.h ----------
#ifndef WRITEBOOST_UTIL_H
#define WRITEBOOST_UTIL_H
#include "writeboost.h"
extern struct workqueue_struct *safe_io_wq;
extern struct dm_io_client *wb_io_client;
void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno);
#define kmalloc_retry(size, flags) \
do_kmalloc_retry((size), (flags), __LINE__)
int dm_safe_io_internal(
struct dm_io_request *,
unsigned num_regions, struct dm_io_region *,
unsigned long *err_bits, bool thread, int lineno);
#define dm_safe_io(io_req, num_regions, regions, err_bits, thread) \
dm_safe_io_internal((io_req), (num_regions), (regions), \
(err_bits), (thread), __LINE__)
void dm_safe_io_retry_internal(
struct dm_io_request *,
unsigned num_regions, struct dm_io_region *,
bool thread, int lineno);
#define dm_safe_io_retry(io_req, num_regions, regions, thread) \
dm_safe_io_retry_internal((io_req), (num_regions), (regions), \
(thread), __LINE__)
sector_t dm_devsize(struct dm_dev *);
#endif
---------- writeboost.h ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#ifndef DM_WRITEBOOST_H
#define DM_WRITEBOOST_H
#define DM_MSG_PREFIX "writeboost"
#include <linux/module.h>
#include <linux/version.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/mutex.h>
#include <linux/sched.h>
#include <linux/timer.h>
#include <linux/device-mapper.h>
#include <linux/dm-io.h>
#define WBERR(f, args...) \
DMERR("err@%d " f, __LINE__, ## args)
#define WBWARN(f, args...) \
DMWARN("warn@%d " f, __LINE__, ## args)
#define WBINFO(f, args...) \
DMINFO("info@%d " f, __LINE__, ## args)
/*
* (1 << x) sector.
* 4 <= x <= 11
* dm-writeboost supports segment size up to 1MB.
*
* All the comments are if
* the segment size is the maximum 1MB.
*/
#define WB_SEGMENTSIZE_ORDER 11
/*
* By default,
* we allocate 64 * 1MB RAM buffers statically.
*/
#define NR_RAMBUF_POOL 64
/*
* The first 4KB (1<<3 sectors) in segment
* is for metadata.
*/
#define NR_CACHES_INSEG ((1 << (WB_SEGMENTSIZE_ORDER - 3)) - 1)
/*
* The Detail of the Disk Format
*
* Whole:
* Superblock(1MB) Segment(1MB) Segment(1MB) ...
* We reserve the first segment (1MB) as the superblock.
*
* Superblock(1MB):
* head <---- ----> tail
* superblock header(512B) ... superblock record(512B)
*
* Segment(1MB):
* segment_header_device(4KB) metablock_device(4KB) * NR_CACHES_INSEG
*/
/*
* Superblock Header
* First one sector of the super block region.
* The value is fixed after formatted.
*/
/*
* Magic Number
* "WBst"
*/
#define WRITEBOOST_MAGIC 0x57427374
struct superblock_header_device {
__le32 magic;
} __packed;
/*
* Superblock Record (Mutable)
* Last one sector of the superblock region.
* Record the current cache status in need.
*/
struct superblock_record_device {
__le64 last_migrated_segment_id;
} __packed;
/*
* Cache line index.
*
* dm-writeboost can supoort a cache device
* with size less than 4KB * (1 << 32)
* that is 16TB.
*/
typedef u32 cache_nr;
/*
* Metadata of a 4KB cache line
*
* Dirtiness is defined for each sector
* in this cache line.
*/
struct metablock {
sector_t sector; /* key */
cache_nr idx; /* Const */
struct hlist_node ht_list;
/*
* 8 bit flag for dirtiness
* for each sector in cache line.
*
* Current implementation
* only recovers dirty caches.
* Recovering clean caches complicates the code
* but couldn't be effective
* since only few of the caches are clean.
*/
u8 dirty_bits;
};
/*
* On-disk metablock
*/
struct metablock_device {
__le64 sector;
u8 dirty_bits;
__le32 lap;
} __packed;
#define SZ_MAX (~(size_t)0)
struct segment_header {
struct metablock mb_array[NR_CACHES_INSEG];
/*
* ID uniformly increases.
* ID 0 is used to tell that the segment is invalid
* and valid id >= 1.
*/
u64 global_id;
/*
* Segment can be flushed half-done.
* length is the number of
* metablocks that must be counted in
* in resuming.
*/
u8 length;
cache_nr start_idx; /* Const */
sector_t start_sector; /* Const */
struct list_head migrate_list;
/*
* This segment can not be migrated
* to backin store
* until flushed.
* Flushed segment is in cache device.
*/
struct completion flush_done;
/*
* This segment can not be overwritten
* until migrated.
*/
struct completion migrate_done;
spinlock_t lock;
atomic_t nr_inflight_ios;
};
/*
* (Locking)
* Locking metablocks by their granularity
* needs too much memory space for lock structures.
* We only locks a metablock by locking the parent segment
* that includes the metablock.
*/
#define lockseg(seg, flags) spin_lock_irqsave(&(seg)->lock, flags)
#define unlockseg(seg, flags) spin_unlock_irqrestore(&(seg)->lock, flags)
/*
* On-disk segment header.
*
* Must be at most 4KB large.
*/
struct segment_header_device {
/* - FROM - At most512 byte for atomicity. --- */
__le64 global_id;
/*
* How many cache lines in this segments
* should be counted in resuming.
*/
u8 length;
/*
* On what lap in rorating on cache device
* used to find the head and tail in the
* segments in cache device.
*/
__le32 lap;
/* - TO -------------------------------------- */
/* This array must locate at the tail */
struct metablock_device mbarr[NR_CACHES_INSEG];
} __packed;
struct rambuffer {
void *data;
struct completion done;
};
enum STATFLAG {
STAT_WRITE = 0,
STAT_HIT,
STAT_ON_BUFFER,
STAT_FULLSIZE,
};
#define STATLEN (1 << 4)
struct lookup_key {
sector_t sector;
};
struct ht_head {
struct hlist_head ht_list;
};
struct wb_device;
struct wb_cache {
struct wb_device *wb;
struct dm_dev *device;
struct mutex io_lock;
cache_nr nr_caches; /* Const */
u64 nr_segments; /* Const */
struct bigarray *segment_header_array;
/*
* Chained hashtable
*
* Writeboost uses chained hashtable
* to cache lookup.
* Cache discarding often happedns
* This structure fits our needs.
*/
struct bigarray *htable;
size_t htsize;
struct ht_head *null_head;
cache_nr cursor; /* Index that has been written the most lately */
struct segment_header *current_seg;
struct rambuffer *current_rambuf;
struct rambuffer *rambuf_pool;
u64 last_migrated_segment_id;
u64 last_flushed_segment_id;
u64 reserving_segment_id;
/*
* Flush daemon
*
* Writeboost first queue the segment to flush
* and flush daemon asynchronously
* flush them to the cache device.
*/
struct work_struct flush_work;
struct workqueue_struct *flush_wq;
spinlock_t flush_queue_lock;
struct list_head flush_queue;
wait_queue_head_t flush_wait_queue;
/*
* Deferred ACK for barriers.
*/
struct work_struct barrier_deadline_work;
struct timer_list barrier_deadline_timer;
struct bio_list barrier_ios;
unsigned long barrier_deadline_ms; /* param */
/*
* Migration daemon
*
* Migartion also works in background.
*
* If allow_migrate is true,
* migrate daemon goes into migration
* if they are segments to migrate.
*/
struct work_struct migrate_work;
struct workqueue_struct *migrate_wq;
bool allow_migrate; /* param */
/*
* Batched Migration
*
* Migration is done atomically
* with number of segments batched.
*/
wait_queue_head_t migrate_wait_queue;
atomic_t migrate_fail_count;
atomic_t migrate_io_count;
struct list_head migrate_list;
u8 *dirtiness_snapshot;
void *migrate_buffer;
size_t nr_cur_batched_migration;
size_t nr_max_batched_migration; /* param */
/*
* Migration modulator
*
* This daemon turns on and off
* the migration
* according to the load of backing store.
*/
struct work_struct modulator_work;
bool enable_migration_modulator; /* param */
/*
* Superblock Recorder
*
* Update the superblock record
* periodically.
*/
struct work_struct recorder_work;
unsigned long update_record_interval; /* param */
/*
* Cache Synchronizer
*
* Sync the dirty writes
* periodically.
*/
struct work_struct sync_work;
unsigned long sync_interval; /* param */
/*
* on_terminate is true
* to notify all the background daemons to
* stop their operations.
*/
bool on_terminate;
atomic64_t stat[STATLEN];
};
struct wb_device {
struct dm_target *ti;
struct dm_dev *device;
struct wb_cache *cache;
u8 migrate_threshold;
atomic64_t nr_dirty_caches;
};
struct flush_job {
struct list_head flush_queue;
struct segment_header *seg;
/*
* The data to flush to cache device.
*/
struct rambuffer *rambuf;
/*
* List of bios with barrier flags.
*/
struct bio_list barrier_ios;
};
#define PER_BIO_VERSION KERNEL_VERSION(3, 8, 0)
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data {
void *ptr;
};
#endif
#endif
---------- bigarray.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
/*
* A array like structure
* that can contain million of elements.
* The aim of this class is the same as
* flex_array.
* The reason we don't use flex_array is
* that the class trades the performance
* to get the resizability.
* struct arr is fast and light-weighted.
*/
#include "bigarray.h"
struct part {
void *memory;
};
struct bigarray {
struct part *parts;
size_t nr_elems;
size_t elemsize;
};
#define ALLOC_SIZE (1 << 16)
static size_t nr_elems_in_part(struct bigarray *arr)
{
return ALLOC_SIZE / arr->elemsize;
};
static size_t nr_parts(struct bigarray *arr)
{
return dm_div_up(arr->nr_elems, nr_elems_in_part(arr));
}
struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems)
{
size_t i, j;
struct part *part;
struct bigarray *arr = kmalloc(sizeof(*arr), GFP_KERNEL);
if (!arr) {
WBERR();
return NULL;
}
arr->elemsize = elemsize;
arr->nr_elems = nr_elems;
arr->parts = kmalloc(sizeof(struct part) * nr_parts(arr), GFP_KERNEL);
if (!arr->parts) {
WBERR();
goto bad_alloc_parts;
}
for (i = 0; i < nr_parts(arr); i++) {
part = arr->parts + i;
part->memory = kmalloc(ALLOC_SIZE, GFP_KERNEL);
if (!part->memory) {
WBERR();
for (j = 0; j < i; j++) {
part = arr->parts + j;
kfree(part->memory);
}
goto bad_alloc_parts_memory;
}
}
return arr;
bad_alloc_parts_memory:
kfree(arr->parts);
bad_alloc_parts:
kfree(arr);
return NULL;
}
void kill_bigarray(struct bigarray *arr)
{
size_t i;
for (i = 0; i < nr_parts(arr); i++) {
struct part *part = arr->parts + i;
kfree(part->memory);
}
kfree(arr->parts);
kfree(arr);
}
void *bigarray_at(struct bigarray *arr, size_t i)
{
size_t n = nr_elems_in_part(arr);
size_t j = i / n;
size_t k = i % n;
struct part *part = arr->parts + j;
return part->memory + (arr->elemsize * k);
}
---------- cache-alloc.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
/*
* Cache resume/free operations are provided.
* Resuming a cache is to construct in-core
* metadata structures from the metadata
* region in the cache device.
*/
#include "cache-alloc.h"
int __must_check resume_cache(struct wb_cache *cache, struct dm_dev *dev)
{
int r = 0;
cache->device = dev;
cache->nr_segments = calc_nr_segments(cache->device);
cache->nr_caches = cache->nr_segments * NR_CACHES_INSEG;
cache->on_terminate = false;
cache->allow_migrate = true;
cache->reserving_segment_id = 0;
mutex_init(&cache->io_lock);
cache->enable_migration_modulator = true;
cache->update_record_interval = 60;
cache->sync_interval = 60;
r = init_rambuf_pool(cache);
if (r) {
WBERR();
goto bad_init_rambuf_pool;
}
/*
* Select arbitrary one as the initial rambuffer.
*/
cache->current_rambuf = cache->rambuf_pool + 0;
r = init_segment_header_array(cache);
if (r) {
WBERR();
goto bad_alloc_segment_header_array;
}
r = ht_empty_init(cache);
if (r) {
WBERR();
goto bad_alloc_ht;
}
/*
* All in-core structures are allocated and
* initialized.
* Next, read metadata from the cache device.
*/
r = recover_cache(cache);
if (r) {
WBERR();
goto bad_recover;
}
/* Data structures for Migration */
cache->migrate_buffer = vmalloc(NR_CACHES_INSEG << 12);
if (!cache->migrate_buffer) {
WBERR();
goto bad_alloc_migrate_buffer;
}
cache->dirtiness_snapshot = kmalloc(
NR_CACHES_INSEG,
GFP_KERNEL);
if (!cache->dirtiness_snapshot) {
WBERR();
goto bad_alloc_dirtiness_snapshot;
}
cache->migrate_wq = create_singlethread_workqueue("migratewq");
if (!cache->migrate_wq) {
WBERR();
goto bad_migratewq;
}
cache->flush_wq = create_singlethread_workqueue("flushwq");
if (!cache->flush_wq) {
WBERR();
goto bad_flushwq;
}
/* Migration Daemon */
INIT_WORK(&cache->migrate_work, migrate_proc);
init_waitqueue_head(&cache->migrate_wait_queue);
INIT_LIST_HEAD(&cache->migrate_list);
atomic_set(&cache->migrate_fail_count, 0);
atomic_set(&cache->migrate_io_count, 0);
cache->nr_max_batched_migration = 1;
cache->nr_cur_batched_migration = 1;
queue_work(cache->migrate_wq, &cache->migrate_work);
/* Deferred ACK for barrier writes */
setup_timer(&cache->barrier_deadline_timer,
barrier_deadline_proc, (unsigned long) cache);
bio_list_init(&cache->barrier_ios);
/*
* Deadline is 3 ms by default.
* 2.5 us to process on bio
* and 3 ms is enough long to process 255 bios.
* If the buffer doesn't get full within 3 ms,
* we can doubt write starves
* by waiting formerly submitted barrier to be complete.
*/
cache->barrier_deadline_ms = 3;
INIT_WORK(&cache->barrier_deadline_work, flush_barrier_ios);
/* Flush Daemon */
INIT_WORK(&cache->flush_work, flush_proc);
spin_lock_init(&cache->flush_queue_lock);
INIT_LIST_HEAD(&cache->flush_queue);
init_waitqueue_head(&cache->flush_wait_queue);
queue_work(cache->flush_wq, &cache->flush_work);
/* Migartion Modulator */
INIT_WORK(&cache->modulator_work, modulator_proc);
schedule_work(&cache->modulator_work);
/* Superblock Recorder */
INIT_WORK(&cache->recorder_work, recorder_proc);
schedule_work(&cache->recorder_work);
/* Dirty Synchronizer */
INIT_WORK(&cache->sync_work, sync_proc);
schedule_work(&cache->sync_work);
clear_stat(cache);
return 0;
bad_flushwq:
destroy_workqueue(cache->migrate_wq);
bad_migratewq:
kfree(cache->dirtiness_snapshot);
bad_alloc_dirtiness_snapshot:
vfree(cache->migrate_buffer);
bad_alloc_migrate_buffer:
bad_recover:
kill_bigarray(cache->htable);
bad_alloc_ht:
kill_bigarray(cache->segment_header_array);
bad_alloc_segment_header_array:
free_rambuf_pool(cache);
bad_init_rambuf_pool:
kfree(cache);
return r;
}
void free_cache(struct wb_cache *cache)
{
cache->on_terminate = true;
/* Kill in-kernel daemons */
cancel_work_sync(&cache->sync_work);
cancel_work_sync(&cache->recorder_work);
cancel_work_sync(&cache->modulator_work);
cancel_work_sync(&cache->flush_work);
destroy_workqueue(cache->flush_wq);
cancel_work_sync(&cache->barrier_deadline_work);
cancel_work_sync(&cache->migrate_work);
destroy_workqueue(cache->migrate_wq);
kfree(cache->dirtiness_snapshot);
vfree(cache->migrate_buffer);
/* Destroy in-core structures */
kill_bigarray(cache->htable);
kill_bigarray(cache->segment_header_array);
free_rambuf_pool(cache);
}
---------- defer-barrier.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "defer-barrier.h"
void queue_barrier_io(struct wb_cache *cache, struct bio *bio)
{
mutex_lock(&cache->io_lock);
bio_list_add(&cache->barrier_ios, bio);
mutex_unlock(&cache->io_lock);
if (!timer_pending(&cache->barrier_deadline_timer))
mod_timer(&cache->barrier_deadline_timer,
msecs_to_jiffies(cache->barrier_deadline_ms));
}
void barrier_deadline_proc(unsigned long data)
{
struct wb_cache *cache = (struct wb_cache *) data;
schedule_work(&cache->barrier_deadline_work);
}
void flush_barrier_ios(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache,
barrier_deadline_work);
if (bio_list_empty(&cache->barrier_ios))
return;
flush_current_buffer(cache);
}
---------- dirty-sync.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "dirty-sync.h"
void sync_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, sync_work);
unsigned long intvl;
while (true) {
if (cache->on_terminate)
return;
/* sec -> ms */
intvl = cache->sync_interval * 1000;
if (!intvl) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
WBINFO();
flush_current_buffer(cache);
blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL);
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- flush-daemon.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "flush-daemon.h"
void flush_proc(struct work_struct *work)
{
unsigned long flags;
struct wb_cache *cache =
container_of(work, struct wb_cache, flush_work);
while (true) {
struct flush_job *job;
struct segment_header *seg;
struct dm_io_request io_req;
struct dm_io_region region;
WBINFO();
spin_lock_irqsave(&cache->flush_queue_lock, flags);
while (list_empty(&cache->flush_queue)) {
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
wait_event_interruptible_timeout(
cache->flush_wait_queue,
(!list_empty(&cache->flush_queue)),
msecs_to_jiffies(100));
spin_lock_irqsave(&cache->flush_queue_lock, flags);
if (cache->on_terminate)
return;
}
/*
* Pop a fluch_context from a list
* and flush it.
*/
job = list_first_entry(
&cache->flush_queue, struct flush_job, flush_queue);
list_del(&job->flush_queue);
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
seg = job->seg;
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = job->rambuf->data,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = seg->start_sector,
.count = (seg->length + 1) << 3,
};
dm_safe_io_retry(&io_req, 1, ®ion, false);
cache->last_flushed_segment_id = seg->global_id;
complete_all(&seg->flush_done);
complete_all(&job->rambuf->done);
/*
* Deferred ACK
*/
if (!bio_list_empty(&job->barrier_ios)) {
struct bio *bio;
blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL);
while ((bio = bio_list_pop(&job->barrier_ios)))
bio_endio(bio, 0);
mod_timer(&cache->barrier_deadline_timer,
msecs_to_jiffies(cache->barrier_deadline_ms));
}
kfree(job);
}
}
---------- format-cache.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "format-cache.h"
static int read_superblock_header(struct superblock_header_device *sup,
struct dm_dev *dev)
{
int r = 0;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = 1,
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(sup, buf, sizeof(*sup));
return 0;
}
static int audit_superblock_header(struct superblock_header_device *sup)
{
u32 magic = le32_to_cpu(sup->magic);
if (magic != WRITEBOOST_MAGIC) {
WBERR();
return -EINVAL;
}
return 0;
}
/*
* Check if the cache device is already formatted.
* Returns 0 iff this routine runs without failure.
* cache_valid is stored true iff the cache device
* is formatted and needs not to be re-fomatted.
*/
int __must_check audit_cache_device(struct dm_dev *dev,
bool *cache_valid)
{
int r = 0;
struct superblock_header_device sup;
r = read_superblock_header(&sup, dev);
if (r)
return r;
*cache_valid = audit_superblock_header(&sup) ? false : true;
return r;
}
static int format_superblock_header(struct dm_dev *dev)
{
int r = 0;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
struct superblock_header_device sup = {
.magic = cpu_to_le32(WRITEBOOST_MAGIC),
};
void *buf = kzalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
memcpy(buf, &sup, sizeof(sup));
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = 1,
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
return 0;
}
struct format_segmd_context {
int err;
atomic64_t count;
};
static void format_segmd_endio(unsigned long error, void *__context)
{
struct format_segmd_context *context = __context;
if (error)
context->err = 1;
atomic64_dec(&context->count);
}
/*
* Format superblock header and
* all the metadata regions over the cache device.
*/
int __must_check format_cache_device(struct dm_dev *dev)
{
u64 i, nr_segments = calc_nr_segments(dev);
struct format_segmd_context context;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
void *buf;
int r = 0;
/*
* Zeroing the full superblock
*/
buf = kzalloc(1 << 20, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = (1 << 11),
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
format_superblock_header(dev);
/* Format the metadata regions */
/*
* Count the number of segments
*/
atomic64_set(&context.count, nr_segments);
context.err = 0;
buf = kzalloc(1 << 12, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
/*
* Submit all the writes asynchronously.
*/
for (i = 0; i < nr_segments; i++) {
struct dm_io_request io_req_seg = {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = format_segmd_endio,
.notify.context = &context,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
struct dm_io_region region_seg = {
.bdev = dev->bdev,
.sector = calc_segment_header_start(i),
.count = (1 << 3),
};
r = dm_safe_io(&io_req_seg, 1, ®ion_seg, NULL, false);
if (r) {
WBERR();
break;
}
}
kfree(buf);
if (r) {
WBERR();
return r;
}
/*
* Wait for all the writes complete.
*/
while (atomic64_read(&context.count))
schedule_timeout_interruptible(msecs_to_jiffies(100));
if (context.err) {
WBERR();
return -EIO;
}
return blkdev_issue_flush(dev->bdev, GFP_KERNEL, NULL);
}
---------- handle-io.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "handle-io.h"
void inc_nr_dirty_caches(struct wb_device *wb)
{
BUG_ON(!wb);
atomic64_inc(&wb->nr_dirty_caches);
}
static void dec_nr_dirty_caches(struct wb_device *wb)
{
BUG_ON(!wb);
atomic64_dec(&wb->nr_dirty_caches);
}
void cleanup_mb_if_dirty(struct wb_cache *cache,
struct segment_header *seg,
struct metablock *mb)
{
unsigned long flags;
bool b = false;
lockseg(seg, flags);
if (mb->dirty_bits) {
mb->dirty_bits = 0;
b = true;
}
unlockseg(seg, flags);
if (b)
dec_nr_dirty_caches(cache->wb);
}
u8 atomic_read_mb_dirtiness(struct segment_header *seg,
struct metablock *mb)
{
unsigned long flags;
u8 r;
lockseg(seg, flags);
r = mb->dirty_bits;
unlockseg(seg, flags);
return r;
}
static void inc_stat(struct wb_cache *cache,
int rw, bool found, bool on_buffer, bool fullsize)
{
atomic64_t *v;
int i = 0;
if (rw)
i |= (1 << STAT_WRITE);
if (found)
i |= (1 << STAT_HIT);
if (on_buffer)
i |= (1 << STAT_ON_BUFFER);
if (fullsize)
i |= (1 << STAT_FULLSIZE);
v = &cache->stat[i];
atomic64_inc(v);
}
void clear_stat(struct wb_cache *cache)
{
int i;
for (i = 0; i < STATLEN; i++) {
atomic64_t *v = &cache->stat[i];
atomic64_set(v, 0);
}
}
/*
* Migrate a data on the cache device
*/
static void migrate_mb(struct wb_cache *cache, struct segment_header *seg,
struct metablock *mb, u8 dirty_bits, bool thread)
{
struct wb_device *wb = cache->wb;
if (!dirty_bits)
return;
if (dirty_bits == 255) {
void *buf = kmalloc_retry(1 << 12, GFP_NOIO);
struct dm_io_request io_req_r, io_req_w;
struct dm_io_region region_r, region_w;
io_req_r = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_r = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = calc_mb_start_sector(seg, mb->idx),
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector,
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread);
kfree(buf);
} else {
void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO);
size_t i;
for (i = 0; i < 8; i++) {
bool bit_on = dirty_bits & (1 << i);
struct dm_io_request io_req_r, io_req_w;
struct dm_io_region region_r, region_w;
sector_t src;
if (!bit_on)
continue;
io_req_r = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
/* A tmp variable just to avoid 80 cols rule */
src = calc_mb_start_sector(seg, mb->idx) + i;
region_r = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = src,
.count = 1,
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector + 1 * i,
.count = 1,
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread);
}
kfree(buf);
}
}
/*
* Migrate the cache on the RAM buffer.
* Calling this function is really rare.
*/
static void migrate_buffered_mb(struct wb_cache *cache,
struct metablock *mb, u8 dirty_bits)
{
struct wb_device *wb = cache->wb;
u8 i, k = 1 + (mb->idx % NR_CACHES_INSEG);
sector_t offset = (k << 3);
void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO);
for (i = 0; i < 8; i++) {
struct dm_io_request io_req;
struct dm_io_region region;
void *src;
sector_t dest;
bool bit_on = dirty_bits & (1 << i);
if (!bit_on)
continue;
src = cache->current_rambuf->data +
((offset + i) << SECTOR_SHIFT);
memcpy(buf, src, 1 << SECTOR_SHIFT);
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
dest = mb->sector + 1 * i;
region = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = dest,
.count = 1,
};
dm_safe_io_retry(&io_req, 1, ®ion, true);
}
kfree(buf);
}
static void bio_remap(struct bio *bio, struct dm_dev *dev, sector_t sector)
{
bio->bi_bdev = dev->bdev;
bio->bi_sector = sector;
}
static sector_t calc_cache_alignment(struct wb_cache *cache,
sector_t bio_sector)
{
return (bio_sector / (1 << 3)) * (1 << 3);
}
int writeboost_map(struct dm_target *ti, struct bio *bio
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *map_context
#endif
)
{
unsigned long flags;
struct segment_header *uninitialized_var(seg);
struct metablock *mb, *new_mb;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data *map_context;
#endif
sector_t bio_count, bio_offset, s;
bool bio_fullsize, found, on_buffer,
refresh_segment, b;
int rw;
struct lookup_key key;
struct ht_head *head;
cache_nr update_mb_idx, idx_inseg, k;
size_t start;
void *data;
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
struct dm_dev *orig = wb->device;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
map_context = dm_per_bio_data(bio, ti->per_bio_data_size);
#endif
map_context->ptr = NULL;
/*
* We only discard only the backing store because
* blocks on cache device are unlikely to be discarded.
*
* Discarding blocks is likely to be operated
* long after writing;
* the block is likely to be migrated before.
* Moreover,
* we discard the segment at the end of migration
* and that's enough for discarding blocks.
*/
if (bio->bi_rw & REQ_DISCARD) {
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
/*
* defered ACK for barrier writes
*
* bio with REQ_FLUSH is guaranteed
* to have no data.
* So, simply queue it and return.
*/
if (bio->bi_rw & REQ_FLUSH) {
BUG_ON(bio->bi_size);
queue_barrier_io(cache, bio);
return DM_MAPIO_SUBMITTED;
}
bio_count = bio->bi_size >> SECTOR_SHIFT;
bio_fullsize = (bio_count == (1 << 3));
bio_offset = bio->bi_sector % (1 << 3);
rw = bio_data_dir(bio);
key = (struct lookup_key) {
.sector = calc_cache_alignment(cache, bio->bi_sector),
};
k = ht_hash(cache, &key);
head = bigarray_at(cache->htable, k);
/*
* (Locking)
* Why mutex?
*
* The reason we use mutex instead of rw_semaphore
* that can allow truely concurrent read access
* is that mutex is even lighter than rw_semaphore.
* Since dm-writebuffer is a real performance centric software
* the overhead of rw_semaphore is crucial.
* All in all,
* since exclusive region in read path is enough small
* and cheap, using rw_semaphore and let the reads
* execute concurrently won't improve the performance
* as much as one expects.
*/
mutex_lock(&cache->io_lock);
mb = ht_lookup(cache, head, &key);
if (mb) {
seg = ((void *) mb) - (mb->idx % NR_CACHES_INSEG) *
sizeof(struct metablock);
atomic_inc(&seg->nr_inflight_ios);
}
found = (mb != NULL);
on_buffer = false;
if (found)
on_buffer = is_on_buffer(cache, mb->idx);
inc_stat(cache, rw, found, on_buffer, bio_fullsize);
if (!rw) {
u8 dirty_bits;
mutex_unlock(&cache->io_lock);
if (!found) {
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
dirty_bits = atomic_read_mb_dirtiness(seg, mb);
if (unlikely(on_buffer)) {
if (dirty_bits)
migrate_buffered_mb(cache, mb, dirty_bits);
/*
* Cache class
* Live and Stable
*
* Live:
* The cache is on the RAM buffer.
*
* Stable:
* The cache is not on the RAM buffer
* but at least queued in flush_queue.
*/
/*
* (Locking)
* Dirtiness of a live cache
*
* We can assume dirtiness of a cache only increase
* when it is on the buffer, we call this cache is live.
* This eases the locking because
* we don't worry the dirtiness of
* a live cache fluctuates.
*/
atomic_dec(&seg->nr_inflight_ios);
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
wait_for_completion(&seg->flush_done);
if (likely(dirty_bits == 255)) {
bio_remap(bio,
cache->device,
calc_mb_start_sector(seg, mb->idx)
+ bio_offset);
map_context->ptr = seg;
} else {
/*
* (Locking)
* Dirtiness of a stable cache
*
* Unlike the live caches that don't
* fluctuate the dirtiness,
* stable caches which are not on the buffer
* but on the cache device
* may decrease the dirtiness by other processes
* than the migrate daemon.
* This works fine
* because migrating the same cache twice
* doesn't craze the cache concistency.
*/
migrate_mb(cache, seg, mb, dirty_bits, true);
cleanup_mb_if_dirty(cache, seg, mb);
atomic_dec(&seg->nr_inflight_ios);
bio_remap(bio, orig, bio->bi_sector);
}
return DM_MAPIO_REMAPPED;
}
if (found) {
if (unlikely(on_buffer)) {
mutex_unlock(&cache->io_lock);
update_mb_idx = mb->idx;
goto write_on_buffer;
} else {
u8 dirty_bits = atomic_read_mb_dirtiness(seg, mb);
/*
* First clean up the previous cache
* and migrate the cache if needed.
*/
bool needs_cleanup_prev_cache =
!bio_fullsize || !(dirty_bits == 255);
if (unlikely(needs_cleanup_prev_cache)) {
wait_for_completion(&seg->flush_done);
migrate_mb(cache, seg, mb, dirty_bits, true);
}
/*
* Fullsize dirty cache
* can be discarded without migration.
*/
cleanup_mb_if_dirty(cache, seg, mb);
ht_del(cache, mb);
atomic_dec(&seg->nr_inflight_ios);
goto write_not_found;
}
}
write_not_found:
;
/*
* If cache->cursor is 254, 509, ...
* that is the last cache line in the segment.
* We must flush the current segment and
* get the new one.
*/
refresh_segment = !((cache->cursor + 1) % NR_CACHES_INSEG);
if (refresh_segment)
queue_current_buffer(cache);
cache->cursor = (cache->cursor + 1) % cache->nr_caches;
/*
* update_mb_idx is the cache line index to update.
*/
update_mb_idx = cache->cursor;
seg = cache->current_seg;
atomic_inc(&seg->nr_inflight_ios);
new_mb = seg->mb_array + (update_mb_idx % NR_CACHES_INSEG);
new_mb->dirty_bits = 0;
ht_register(cache, head, &key, new_mb);
mutex_unlock(&cache->io_lock);
mb = new_mb;
write_on_buffer:
;
idx_inseg = update_mb_idx % NR_CACHES_INSEG;
s = (idx_inseg + 1) << 3;
b = false;
lockseg(seg, flags);
if (!mb->dirty_bits) {
seg->length++;
BUG_ON(seg->length > NR_CACHES_INSEG);
b = true;
}
if (likely(bio_fullsize)) {
mb->dirty_bits = 255;
} else {
u8 i;
u8 acc_bits = 0;
s += bio_offset;
for (i = bio_offset; i < (bio_offset+bio_count); i++)
acc_bits += (1 << i);
mb->dirty_bits |= acc_bits;
}
BUG_ON(!mb->dirty_bits);
unlockseg(seg, flags);
if (b)
inc_nr_dirty_caches(wb);
start = s << SECTOR_SHIFT;
data = bio_data(bio);
memcpy(cache->current_rambuf->data + start, data, bio->bi_size);
atomic_dec(&seg->nr_inflight_ios);
/*
* deferred ACK for barrier writes
*
* bio with REQ_FUA flag has data.
* So, we run through the path for the
* ordinary bio. And the data is
* now stored in the RAM buffer.
* After that, queue it and return
* to defer completion.
*/
if (bio->bi_rw & REQ_FUA) {
queue_barrier_io(cache, bio);
return DM_MAPIO_SUBMITTED;
}
bio_endio(bio, 0);
return DM_MAPIO_SUBMITTED;
}
int writeboost_end_io(struct dm_target *ti, struct bio *bio, int error
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *map_context
#endif
)
{
struct segment_header *seg;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data *map_context =
dm_per_bio_data(bio, ti->per_bio_data_size);
#endif
if (!map_context->ptr)
return 0;
seg = map_context->ptr;
atomic_dec(&seg->nr_inflight_ios);
return 0;
}
---------- hashtable.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "hashtable.h"
/*
* Initialize the Hash Table.
*/
int __must_check ht_empty_init(struct wb_cache *cache)
{
cache_nr idx;
size_t i;
size_t nr_heads;
struct bigarray *arr;
cache->htsize = cache->nr_caches;
nr_heads = cache->htsize + 1;
arr = make_bigarray(sizeof(struct ht_head), nr_heads);
if (!arr) {
WBERR();
return -ENOMEM;
}
cache->htable = arr;
for (i = 0; i < nr_heads; i++) {
struct ht_head *hd = bigarray_at(arr, i);
INIT_HLIST_HEAD(&hd->ht_list);
}
/*
* Our hashtable has one special bucket called null head.
* Orphan metablocks are linked to the null head.
*/
cache->null_head = bigarray_at(cache->htable, cache->htsize);
for (idx = 0; idx < cache->nr_caches; idx++) {
struct metablock *mb = mb_at(cache, idx);
hlist_add_head(&mb->ht_list, &cache->null_head->ht_list);
}
return 0;
}
cache_nr ht_hash(struct wb_cache *cache, struct lookup_key *key)
{
return key->sector % cache->htsize;
}
static bool mb_hit(struct metablock *mb, struct lookup_key *key)
{
return mb->sector == key->sector;
}
void ht_del(struct wb_cache *cache, struct metablock *mb)
{
struct ht_head *null_head;
hlist_del(&mb->ht_list);
null_head = cache->null_head;
hlist_add_head(&mb->ht_list, &null_head->ht_list);
}
void ht_register(struct wb_cache *cache, struct ht_head *head,
struct lookup_key *key, struct metablock *mb)
{
hlist_del(&mb->ht_list);
hlist_add_head(&mb->ht_list, &head->ht_list);
mb->sector = key->sector;
};
struct metablock *ht_lookup(struct wb_cache *cache,
struct ht_head *head,
struct lookup_key *key)
{
struct metablock *mb, *found = NULL;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0)
hlist_for_each_entry(mb, &head->ht_list, ht_list)
#else
struct hlist_node *pos;
hlist_for_each_entry(mb, pos, &head->ht_list, ht_list)
#endif
{
if (mb_hit(mb, key)) {
found = mb;
break;
}
}
return found;
}
/*
* Discard all the metablock in a segment.
*/
void discard_caches_inseg(struct wb_cache *cache,
struct segment_header *seg)
{
u8 i;
for (i = 0; i < NR_CACHES_INSEG; i++) {
struct metablock *mb = seg->mb_array + i;
ht_del(cache, mb);
}
}
---------- migrate-daemon.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "migrate-daemon.h"
static void migrate_endio(unsigned long error, void *context)
{
struct wb_cache *cache = context;
if (error)
atomic_inc(&cache->migrate_fail_count);
if (atomic_dec_and_test(&cache->migrate_io_count))
wake_up_interruptible(&cache->migrate_wait_queue);
}
/*
* Submit the segment data at position k
* in migrate buffer.
* Batched migration first gather all the segments
* to migrate into a migrate buffer.
* So, there are a number of segment data
* in the buffer.
* This function submits the one in position k.
*/
static void submit_migrate_io(struct wb_cache *cache,
struct segment_header *seg, size_t k)
{
u8 i, j;
size_t a = NR_CACHES_INSEG * k;
void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k;
for (i = 0; i < seg->length; i++) {
struct metablock *mb = seg->mb_array + i;
struct wb_device *wb = cache->wb;
u8 dirty_bits = *(cache->dirtiness_snapshot + (a + i));
unsigned long offset;
void *base, *addr;
struct dm_io_request io_req_w;
struct dm_io_region region_w;
if (!dirty_bits)
continue;
offset = i << 12;
base = p + offset;
if (dirty_bits == 255) {
addr = base;
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = migrate_endio,
.notify.context = cache,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = addr,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector,
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, false);
} else {
for (j = 0; j < 8; j++) {
bool b = dirty_bits & (1 << j);
if (!b)
continue;
addr = base + (j << SECTOR_SHIFT);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = migrate_endio,
.notify.context = cache,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = addr,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector + j,
.count = 1,
};
dm_safe_io_retry(
&io_req_w, 1, ®ion_w, false);
}
}
}
}
static void memorize_dirty_state(struct wb_cache *cache,
struct segment_header *seg, size_t k,
size_t *migrate_io_count)
{
u8 i, j;
size_t a = NR_CACHES_INSEG * k;
void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k;
struct metablock *mb;
struct dm_io_request io_req_r = {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = p,
};
struct dm_io_region region_r = {
.bdev = cache->device->bdev,
.sector = seg->start_sector + (1 << 3),
.count = seg->length << 3,
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, false);
/*
* We take snapshot of the dirtiness in the segments.
* The snapshot segments
* are dirtier than themselves of any future moment
* and we will migrate the possible dirtiest
* state of the segments
* which won't lose any dirty data that was acknowledged.
*/
for (i = 0; i < seg->length; i++) {
mb = seg->mb_array + i;
*(cache->dirtiness_snapshot + (a + i)) =
atomic_read_mb_dirtiness(seg, mb);
}
for (i = 0; i < seg->length; i++) {
u8 dirty_bits;
mb = seg->mb_array + i;
dirty_bits = *(cache->dirtiness_snapshot + (a + i));
if (!dirty_bits)
continue;
if (dirty_bits == 255) {
(*migrate_io_count)++;
} else {
for (j = 0; j < 8; j++) {
if (dirty_bits & (1 << j))
(*migrate_io_count)++;
}
}
}
}
static void cleanup_segment(struct wb_cache *cache, struct segment_header *seg)
{
u8 i;
for (i = 0; i < seg->length; i++) {
struct metablock *mb = seg->mb_array + i;
cleanup_mb_if_dirty(cache, seg, mb);
}
}
static void migrate_linked_segments(struct wb_cache *cache)
{
struct segment_header *seg;
size_t k, migrate_io_count = 0;
/*
* Memorize the dirty state to migrate before going in.
* - How many migration writes should be submitted atomically,
* - Which cache lines are dirty to migarate
* - etc.
*/
k = 0;
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
memorize_dirty_state(cache, seg, k, &migrate_io_count);
k++;
}
migrate_write:
atomic_set(&cache->migrate_io_count, migrate_io_count);
atomic_set(&cache->migrate_fail_count, 0);
k = 0;
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
submit_migrate_io(cache, seg, k);
k++;
}
wait_event_interruptible(cache->migrate_wait_queue,
atomic_read(&cache->migrate_io_count) == 0);
if (atomic_read(&cache->migrate_fail_count)) {
WBWARN("%u writebacks failed. retry.",
atomic_read(&cache->migrate_fail_count));
goto migrate_write;
}
BUG_ON(atomic_read(&cache->migrate_io_count));
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
cleanup_segment(cache, seg);
}
/*
* The segment may have a block
* that returns ACK for persistent write
* on the cache device.
* Migrating them in non-persistent way
* is betrayal to the client
* who received the ACK and
* expects the data is persistent.
* Since it is difficult to know
* whether a cache in a segment
* is of that status
* we are on the safe side
* on this issue by always
* migrating those data persistently.
*/
blkdev_issue_flush(cache->wb->device->bdev, GFP_NOIO, NULL);
/*
* Discarding the migrated regions
* can avoid unnecessary wear amplifier in the future.
*
* But note that we should not discard
* the metablock region because
* whether or not to ensure
* the discarded block returns certain value
* is depends on venders
* and unexpected metablock data
* will craze the cache.
*/
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
blkdev_issue_discard(cache->device->bdev,
seg->start_sector + (1 << 3),
seg->length << 3,
GFP_NOIO, 0);
}
}
void migrate_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, migrate_work);
while (true) {
bool allow_migrate;
size_t i, nr_mig_candidates, nr_mig;
struct segment_header *seg, *tmp;
WBINFO();
if (cache->on_terminate)
return;
/*
* If reserving_id > 0
* Migration should be immediate.
*/
allow_migrate = cache->reserving_segment_id ||
cache->allow_migrate;
if (!allow_migrate) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
nr_mig_candidates = cache->last_flushed_segment_id -
cache->last_migrated_segment_id;
if (!nr_mig_candidates) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
if (cache->nr_cur_batched_migration !=
cache->nr_max_batched_migration){
vfree(cache->migrate_buffer);
kfree(cache->dirtiness_snapshot);
cache->nr_cur_batched_migration =
cache->nr_max_batched_migration;
cache->migrate_buffer =
vmalloc(cache->nr_cur_batched_migration *
(NR_CACHES_INSEG << 12));
cache->dirtiness_snapshot =
kmalloc_retry(cache->nr_cur_batched_migration *
NR_CACHES_INSEG,
GFP_NOIO);
BUG_ON(!cache->migrate_buffer);
BUG_ON(!cache->dirtiness_snapshot);
}
/*
* Batched Migration:
* We will migrate at most nr_max_batched_migration
* segments at a time.
*/
nr_mig = min(nr_mig_candidates,
cache->nr_cur_batched_migration);
/*
* Add segments to migrate atomically.
*/
for (i = 1; i <= nr_mig; i++) {
seg = get_segment_header_by_id(
cache,
cache->last_migrated_segment_id + i);
list_add_tail(&seg->migrate_list, &cache->migrate_list);
}
migrate_linked_segments(cache);
/*
* (Locking)
* Only line of code changes
* last_migrate_segment_id during runtime.
*/
cache->last_migrated_segment_id += nr_mig;
list_for_each_entry_safe(seg, tmp,
&cache->migrate_list,
migrate_list) {
complete_all(&seg->migrate_done);
list_del(&seg->migrate_list);
}
}
}
void wait_for_migration(struct wb_cache *cache, size_t id)
{
struct segment_header *seg = get_segment_header_by_id(cache, id);
/*
* Set reserving_segment_id to non zero
* to force the migartion daemon
* to complete migarate of this segment
* immediately.
*/
cache->reserving_segment_id = id;
wait_for_completion(&seg->migrate_done);
cache->reserving_segment_id = 0;
}
---------- migrate-modulator.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "migrate-modulator.h"
void modulator_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, modulator_work);
struct wb_device *wb = cache->wb;
struct hd_struct *hd = wb->device->bdev->bd_part;
unsigned long old = 0, new, util;
unsigned long intvl = 1000;
while (true) {
if (cache->on_terminate)
return;
new = jiffies_to_msecs(part_stat_read(hd, io_ticks));
if (!cache->enable_migration_modulator)
goto modulator_update;
util = (100 * (new - old)) / 1000;
WBINFO("%u", (unsigned) util);
if (util < wb->migrate_threshold)
cache->allow_migrate = true;
else
cache->allow_migrate = false;
modulator_update:
old = new;
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- queue-flush-job.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "queue-flush-job.h"
static u8 count_dirty_caches_remained(struct segment_header *seg)
{
u8 i, count = 0;
struct metablock *mb;
for (i = 0; i < seg->length; i++) {
mb = seg->mb_array + i;
if (mb->dirty_bits)
count++;
}
return count;
}
/*
* Make a metadata in segment data to flush.
* @dest The metadata part of the segment to flush
*/
static void prepare_segment_header_device(struct segment_header_device *dest,
struct wb_cache *cache,
struct segment_header *src)
{
cache_nr i;
u8 left, right;
dest->global_id = cpu_to_le64(src->global_id);
dest->length = src->length;
dest->lap = cpu_to_le32(calc_segment_lap(cache, src->global_id));
left = src->length - 1;
right = (cache->cursor) % NR_CACHES_INSEG;
BUG_ON(left != right);
for (i = 0; i < src->length; i++) {
struct metablock *mb = src->mb_array + i;
struct metablock_device *mbdev = &dest->mbarr[i];
mbdev->sector = cpu_to_le64(mb->sector);
mbdev->dirty_bits = mb->dirty_bits;
mbdev->lap = cpu_to_le32(dest->lap);
}
}
static void prepare_meta_rambuffer(void *rambuffer,
struct wb_cache *cache,
struct segment_header *seg)
{
prepare_segment_header_device(rambuffer, cache, seg);
}
/*
* Queue the current segment into the queue
* and prepare a new segment.
*/
static void queue_flushing(struct wb_cache *cache)
{
unsigned long flags;
struct segment_header *current_seg = cache->current_seg, *new_seg;
struct flush_job *job;
bool empty;
struct rambuffer *next_rambuf;
size_t n1 = 0, n2 = 0;
u64 next_id;
while (atomic_read(¤t_seg->nr_inflight_ios)) {
n1++;
if (n1 == 100)
WBWARN();
schedule_timeout_interruptible(msecs_to_jiffies(1));
}
prepare_meta_rambuffer(cache->current_rambuf->data, cache,
cache->current_seg);
INIT_COMPLETION(current_seg->migrate_done);
INIT_COMPLETION(current_seg->flush_done);
job = kmalloc_retry(sizeof(*job), GFP_NOIO);
INIT_LIST_HEAD(&job->flush_queue);
job->seg = current_seg;
job->rambuf = cache->current_rambuf;
bio_list_init(&job->barrier_ios);
bio_list_merge(&job->barrier_ios, &cache->barrier_ios);
bio_list_init(&cache->barrier_ios);
spin_lock_irqsave(&cache->flush_queue_lock, flags);
empty = list_empty(&cache->flush_queue);
list_add_tail(&job->flush_queue, &cache->flush_queue);
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
if (empty)
wake_up_interruptible(&cache->flush_wait_queue);
next_id = current_seg->global_id + 1;
new_seg = get_segment_header_by_id(cache, next_id);
new_seg->global_id = next_id;
while (atomic_read(&new_seg->nr_inflight_ios)) {
n2++;
if (n2 == 100)
WBWARN();
schedule_timeout_interruptible(msecs_to_jiffies(1));
}
BUG_ON(count_dirty_caches_remained(new_seg));
discard_caches_inseg(cache, new_seg);
/*
* Set the cursor to the last of the flushed segment.
*/
cache->cursor = current_seg->start_idx + (NR_CACHES_INSEG - 1);
new_seg->length = 0;
next_rambuf = cache->rambuf_pool + (next_id % NR_RAMBUF_POOL);
wait_for_completion(&next_rambuf->done);
INIT_COMPLETION(next_rambuf->done);
cache->current_rambuf = next_rambuf;
cache->current_seg = new_seg;
}
void queue_current_buffer(struct wb_cache *cache)
{
/*
* Before we get the next segment
* we must wait until the segment is all clean.
* A clean segment doesn't have
* log to flush and dirties to migrate.
*/
u64 next_id = cache->current_seg->global_id + 1;
struct segment_header *next_seg =
get_segment_header_by_id(cache, next_id);
wait_for_completion(&next_seg->flush_done);
wait_for_migration(cache, next_id);
queue_flushing(cache);
}
/*
* flush all the dirty data at a moment
* but NOT persistently.
* Clean up the writes before termination
* is an example of the usecase.
*/
void flush_current_buffer(struct wb_cache *cache)
{
struct segment_header *old_seg;
mutex_lock(&cache->io_lock);
old_seg = cache->current_seg;
queue_current_buffer(cache);
cache->cursor = (cache->cursor + 1) % cache->nr_caches;
cache->current_seg->length = 1;
mutex_unlock(&cache->io_lock);
wait_for_completion(&old_seg->flush_done);
}
---------- rambuf.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "rambuf.h"
int __must_check init_rambuf_pool(struct wb_cache *cache)
{
size_t i, j;
struct rambuffer *rambuf;
cache->rambuf_pool = kmalloc(sizeof(struct rambuffer) * NR_RAMBUF_POOL,
GFP_KERNEL);
if (!cache->rambuf_pool) {
WBERR();
return -ENOMEM;
}
for (i = 0; i < NR_RAMBUF_POOL; i++) {
rambuf = cache->rambuf_pool + i;
init_completion(&rambuf->done);
complete_all(&rambuf->done);
rambuf->data = kmalloc(
1 << (WB_SEGMENTSIZE_ORDER + SECTOR_SHIFT),
GFP_KERNEL);
if (!rambuf->data) {
WBERR();
for (j = 0; j < i; j++) {
rambuf = cache->rambuf_pool + j;
kfree(rambuf->data);
}
kfree(cache->rambuf_pool);
return -ENOMEM;
}
}
return 0;
}
void free_rambuf_pool(struct wb_cache *cache)
{
struct rambuffer *rambuf;
size_t i;
for (i = 0; i < NR_RAMBUF_POOL; i++) {
rambuf = cache->rambuf_pool + i;
kfree(rambuf->data);
}
kfree(cache->rambuf_pool);
}
---------- recover.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "recover.h"
static int __must_check
read_superblock_record(struct superblock_record_device *record,
struct wb_cache *cache)
{
int r = 0;
struct dm_io_request io_req;
struct dm_io_region region;
void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = (1 << 11) - 1,
.count = 1,
};
r = dm_safe_io(&io_req, 1, ®ion, NULL, true);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(record, buf, sizeof(*record));
return r;
}
static int __must_check
read_segment_header_device(struct segment_header_device *dest,
struct wb_cache *cache, size_t segment_idx)
{
int r = 0;
struct dm_io_request io_req;
struct dm_io_region region;
void *buf = kmalloc(1 << 12, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = calc_segment_header_start(segment_idx),
.count = (1 << 3),
};
r = dm_safe_io(&io_req, 1, ®ion, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(dest, buf, sizeof(*dest));
return r;
}
/*
* Read the on-disk metadata of the segment
* and update the in-core cache metadata structure
* like Hash Table.
*/
static void update_by_segment_header_device(struct wb_cache *cache,
struct segment_header_device *src)
{
cache_nr i;
struct segment_header *seg =
get_segment_header_by_id(cache, src->global_id);
seg->length = src->length;
INIT_COMPLETION(seg->migrate_done);
for (i = 0 ; i < src->length; i++) {
cache_nr k;
struct lookup_key key;
struct ht_head *head;
struct metablock *found, *mb = seg->mb_array + i;
struct metablock_device *mbdev = &src->mbarr[i];
if (!mbdev->dirty_bits)
continue;
mb->sector = le64_to_cpu(mbdev->sector);
mb->dirty_bits = mbdev->dirty_bits;
inc_nr_dirty_caches(cache->wb);
key = (struct lookup_key) {
.sector = mb->sector,
};
k = ht_hash(cache, &key);
head = bigarray_at(cache->htable, k);
found = ht_lookup(cache, head, &key);
if (found)
ht_del(cache, found);
ht_register(cache, head, &key, mb);
}
}
/*
* If only if the lap attributes
* are the same between header and all the metablock,
* the segment is judged to be flushed correctly
* and then merge into the runtime structure.
* Otherwise, ignored.
*/
static bool checkup_atomicity(struct segment_header_device *header)
{
u8 i;
u32 a = le32_to_cpu(header->lap), b;
for (i = 0; i < header->length; i++) {
struct metablock_device *o;
o = header->mbarr + i;
b = le32_to_cpu(o->lap);
if (a != b)
return false;
}
return true;
}
int __must_check recover_cache(struct wb_cache *cache)
{
int r = 0;
struct segment_header_device *header;
struct segment_header *seg;
u64 i, j,
max_id, oldest_id, last_flushed_id, init_segment_id,
oldest_idx, nr_segments = cache->nr_segments,
header_id, record_id;
struct superblock_record_device uninitialized_var(record);
r = read_superblock_record(&record, cache);
if (r) {
WBERR();
return r;
}
WBINFO("%llu", record.last_migrated_segment_id);
record_id = le64_to_cpu(record.last_migrated_segment_id);
WBINFO("%llu", record_id);
header = kmalloc(sizeof(*header), GFP_KERNEL);
if (!header) {
WBERR();
return -ENOMEM;
}
/*
* Finding the oldest, non-zero id and its index.
*/
max_id = SZ_MAX;
oldest_id = max_id;
oldest_idx = 0;
for (i = 0; i < nr_segments; i++) {
r = read_segment_header_device(header, cache, i);
if (r) {
WBERR();
kfree(header);
return r;
}
header_id = le64_to_cpu(header->global_id);
if (header_id < 1)
continue;
if (header_id < oldest_id) {
oldest_idx = i;
oldest_id = header_id;
}
}
last_flushed_id = 0;
/*
* This is an invariant.
* We always start from the segment
* that is right after the last_flush_id.
*/
init_segment_id = last_flushed_id + 1;
/*
* If no segment was flushed
* then there is nothing to recover.
*/
if (oldest_id == max_id)
goto setup_init_segment;
/*
* What we have to do in the next loop is to
* revive the segments that are
* flushed but yet not migrated.
*/
/*
* Example:
* There are only 5 segments.
* The segments we will consider are of id k+2 and k+3
* because they are dirty but not migrated.
*
* id: [ k+3 ][ k+4 ][ k ][ k+1 ][ K+2 ]
* last_flushed init_seg migrated last_migrated flushed
*/
for (i = oldest_idx; i < (nr_segments + oldest_idx); i++) {
j = i % nr_segments;
r = read_segment_header_device(header, cache, j);
if (r) {
WBERR();
kfree(header);
return r;
}
header_id = le64_to_cpu(header->global_id);
/*
* Valid global_id > 0.
* We encounter header with global_id = 0 and
* we can consider
* this and the followings are all invalid.
*/
if (header_id <= last_flushed_id)
break;
if (!checkup_atomicity(header)) {
WBWARN("header atomicity broken id %llu",
header_id);
break;
}
/*
* Now the header is proven valid.
*/
last_flushed_id = header_id;
init_segment_id = last_flushed_id + 1;
/*
* If the data is already on the backing store,
* we ignore the segment.
*/
if (header_id <= record_id)
continue;
update_by_segment_header_device(cache, header);
}
setup_init_segment:
kfree(header);
seg = get_segment_header_by_id(cache, init_segment_id);
seg->global_id = init_segment_id;
atomic_set(&seg->nr_inflight_ios, 0);
cache->last_flushed_segment_id = seg->global_id - 1;
cache->last_migrated_segment_id =
cache->last_flushed_segment_id > cache->nr_segments ?
cache->last_flushed_segment_id - cache->nr_segments : 0;
if (record_id > cache->last_migrated_segment_id)
cache->last_migrated_segment_id = record_id;
WBINFO("%llu", cache->last_migrated_segment_id);
wait_for_migration(cache, seg->global_id);
discard_caches_inseg(cache, seg);
/*
* cursor is set to the first element of the segment.
* This means that we will not use the element.
*/
cache->cursor = seg->start_idx;
seg->length = 1;
cache->current_seg = seg;
return 0;
}
---------- segment.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "segment.h"
/*
* Get the in-core metablock of the given index.
*/
struct metablock *mb_at(struct wb_cache *cache, cache_nr idx)
{
u64 seg_idx = idx / NR_CACHES_INSEG;
struct segment_header *seg =
bigarray_at(cache->segment_header_array, seg_idx);
cache_nr idx_inseg = idx % NR_CACHES_INSEG;
return seg->mb_array + idx_inseg;
}
static void mb_array_empty_init(struct wb_cache *cache)
{
size_t i;
for (i = 0; i < cache->nr_caches; i++) {
struct metablock *mb = mb_at(cache, i);
INIT_HLIST_NODE(&mb->ht_list);
mb->idx = i;
mb->dirty_bits = 0;
}
}
int __must_check init_segment_header_array(struct wb_cache *cache)
{
u64 segment_idx, nr_segments = cache->nr_segments;
cache->segment_header_array =
make_bigarray(sizeof(struct segment_header), nr_segments);
if (!cache->segment_header_array) {
WBERR();
return -ENOMEM;
}
for (segment_idx = 0; segment_idx < nr_segments; segment_idx++) {
struct segment_header *seg =
bigarray_at(cache->segment_header_array, segment_idx);
seg->start_idx = NR_CACHES_INSEG * segment_idx;
seg->start_sector =
((segment_idx % nr_segments) + 1) *
(1 << WB_SEGMENTSIZE_ORDER);
seg->length = 0;
atomic_set(&seg->nr_inflight_ios, 0);
spin_lock_init(&seg->lock);
INIT_LIST_HEAD(&seg->migrate_list);
init_completion(&seg->flush_done);
complete_all(&seg->flush_done);
init_completion(&seg->migrate_done);
complete_all(&seg->migrate_done);
}
mb_array_empty_init(cache);
return 0;
}
/*
* Get the segment from the segment id.
* The Index of the segment is calculated from the segment id.
*/
struct segment_header *get_segment_header_by_id(struct wb_cache *cache,
size_t segment_id)
{
struct segment_header *r =
bigarray_at(cache->segment_header_array,
(segment_id - 1) % cache->nr_segments);
return r;
}
u32 calc_segment_lap(struct wb_cache *cache, size_t segment_id)
{
u32 a = (segment_id - 1) / cache->nr_segments;
return a + 1;
};
sector_t calc_mb_start_sector(struct segment_header *seg,
cache_nr mb_idx)
{
size_t k = 1 + (mb_idx % NR_CACHES_INSEG);
return seg->start_sector + (k << 3);
}
sector_t calc_segment_header_start(size_t segment_idx)
{
return (1 << WB_SEGMENTSIZE_ORDER) * (segment_idx + 1);
}
u64 calc_nr_segments(struct dm_dev *dev)
{
sector_t devsize = dm_devsize(dev);
return devsize / (1 << WB_SEGMENTSIZE_ORDER) - 1;
}
bool is_on_buffer(struct wb_cache *cache, cache_nr mb_idx)
{
cache_nr start = cache->current_seg->start_idx;
if (mb_idx < start)
return false;
if (mb_idx >= (start + NR_CACHES_INSEG))
return false;
return true;
}
---------- superblock-recorder.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "superblock-recorder.h"
static void update_superblock_record(struct wb_cache *cache)
{
struct superblock_record_device o;
void *buf;
struct dm_io_request io_req;
struct dm_io_region region;
o.last_migrated_segment_id =
cpu_to_le64(cache->last_migrated_segment_id);
buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO | __GFP_ZERO);
memcpy(buf, &o, sizeof(o));
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = (1 << 11) - 1,
.count = 1,
};
dm_safe_io_retry(&io_req, 1, ®ion, true);
kfree(buf);
}
void recorder_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, recorder_work);
unsigned long intvl;
while (true) {
if (cache->on_terminate)
return;
/* sec -> ms */
intvl = cache->update_record_interval * 1000;
if (!intvl) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
WBINFO();
update_superblock_record(cache);
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- target.c ----------
/*
* writeboost
* Log-structured Caching for Linux
*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "target.h"
/*
* <backing dev> <cache dev>
*/
static int writeboost_ctr(struct dm_target *ti, unsigned int argc, char **argv)
{
int r = 0;
bool cache_valid;
struct wb_device *wb;
struct wb_cache *cache;
struct dm_dev *origdev, *cachedev;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0)
r = dm_set_target_max_io_len(ti, (1 << 3));
if (r) {
WBERR();
return r;
}
#else
ti->split_io = (1 << 3);
#endif
wb = kzalloc(sizeof(*wb), GFP_KERNEL);
if (!wb) {
WBERR();
return -ENOMEM;
}
/*
* EMC's textbook on storage system says
* storage should keep its disk util less
* than 70%.
*/
wb->migrate_threshold = 70;
atomic64_set(&wb->nr_dirty_caches, 0);
r = dm_get_device(ti, argv[0], dm_table_get_mode(ti->table),
&origdev);
if (r) {
WBERR("%d", r);
goto bad_get_device_orig;
}
wb->device = origdev;
wb->cache = NULL;
if (dm_get_device(ti, argv[1], dm_table_get_mode(ti->table),
&cachedev)) {
WBERR();
goto bad_get_device_cache;
}
r = audit_cache_device(cachedev, &cache_valid);
if (r) {
WBERR("%d", r);
/*
* If something happens in auditing the cache
* such as read io error either go formatting
* or resume it trusting the cache is valid
* are dangerous. So we quit.
*/
goto bad_audit_cache;
}
if (!cache_valid) {
r = format_cache_device(cachedev);
if (r) {
WBERR("%d", r);
goto bad_format_cache;
}
}
cache = kzalloc(sizeof(*cache), GFP_KERNEL);
if (!cache) {
WBERR();
goto bad_alloc_cache;
}
wb->cache = cache;
wb->cache->wb = wb;
r = resume_cache(cache, cachedev);
if (r) {
WBERR("%d", r);
goto bad_resume_cache;
}
wb->ti = ti;
ti->private = wb;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
ti->per_bio_data_size = sizeof(struct per_bio_data);
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0)
ti->num_flush_bios = 1;
ti->num_discard_bios = 1;
#else
ti->num_flush_requests = 1;
ti->num_discard_requests = 1;
#endif
ti->discard_zeroes_data_unsupported = true;
return 0;
bad_resume_cache:
kfree(cache);
bad_alloc_cache:
bad_format_cache:
bad_audit_cache:
dm_put_device(ti, cachedev);
bad_get_device_cache:
dm_put_device(ti, origdev);
bad_get_device_orig:
kfree(wb);
return r;
}
static void writeboost_dtr(struct dm_target *ti)
{
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
/*
* Synchronize all the dirty writes
* before termination.
*/
cache->sync_interval = 1;
free_cache(cache);
kfree(cache);
dm_put_device(wb->ti, cache->device);
dm_put_device(ti, wb->device);
ti->private = NULL;
kfree(wb);
}
static int writeboost_message(struct dm_target *ti, unsigned argc, char **argv)
{
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
char *cmd = argv[0];
unsigned long tmp;
if (!strcasecmp(cmd, "clear_stat")) {
struct wb_cache *cache = wb->cache;
clear_stat(cache);
return 0;
}
if (kstrtoul(argv[1], 10, &tmp))
return -EINVAL;
if (!strcasecmp(cmd, "allow_migrate")) {
if (tmp > 1)
return -EINVAL;
cache->allow_migrate = tmp;
return 0;
}
if (!strcasecmp(cmd, "enable_migration_modulator")) {
if (tmp > 1)
return -EINVAL;
cache->enable_migration_modulator = tmp;
return 0;
}
if (!strcasecmp(cmd, "barrier_deadline_ms")) {
if (tmp < 1)
return -EINVAL;
cache->barrier_deadline_ms = tmp;
return 0;
}
if (!strcasecmp(cmd, "nr_max_batched_migration")) {
if (tmp < 1)
return -EINVAL;
cache->nr_max_batched_migration = tmp;
return 0;
}
if (!strcasecmp(cmd, "migrate_threshold")) {
wb->migrate_threshold = tmp;
return 0;
}
if (!strcasecmp(cmd, "update_record_interval")) {
cache->update_record_interval = tmp;
return 0;
}
if (!strcasecmp(cmd, "sync_interval")) {
cache->sync_interval = tmp;
return 0;
}
return -EINVAL;
}
static int writeboost_merge(struct dm_target *ti, struct bvec_merge_data *bvm,
struct bio_vec *biovec, int max_size)
{
struct wb_device *wb = ti->private;
struct dm_dev *device = wb->device;
struct request_queue *q = bdev_get_queue(device->bdev);
if (!q->merge_bvec_fn)
return max_size;
bvm->bi_bdev = device->bdev;
return min(max_size, q->merge_bvec_fn(q, bvm, biovec));
}
static int writeboost_iterate_devices(struct dm_target *ti,
iterate_devices_callout_fn fn, void *data)
{
struct wb_device *wb = ti->private;
struct dm_dev *orig = wb->device;
sector_t start = 0;
sector_t len = dm_devsize(orig);
return fn(ti, orig, start, len, data);
}
static void writeboost_io_hints(struct dm_target *ti,
struct queue_limits *limits)
{
blk_limits_io_min(limits, 512);
blk_limits_io_opt(limits, 4096);
}
static
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 8, 0)
void
#else
int
#endif
writeboost_status(
struct dm_target *ti, status_type_t type,
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0)
unsigned flags,
#endif
char *result,
unsigned maxlen)
{
unsigned int sz = 0;
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
size_t i;
switch (type) {
case STATUSTYPE_INFO:
DMEMIT("%llu %llu %llu %llu %llu %u ",
(long long unsigned int)
atomic64_read(&wb->nr_dirty_caches),
(long long unsigned int) cache->nr_segments,
(long long unsigned int) cache->last_migrated_segment_id,
(long long unsigned int) cache->last_flushed_segment_id,
(long long unsigned int) cache->current_seg->global_id,
(unsigned int) cache->cursor);
for (i = 0; i < STATLEN; i++) {
atomic64_t *v;
if (i == (STATLEN-1))
break;
v = &cache->stat[i];
DMEMIT("%lu ", atomic64_read(v));
}
DMEMIT("%d ", 7);
DMEMIT("barrier_deadline_ms %lu ",
cache->barrier_deadline_ms);
DMEMIT("allow_migrate %d ",
cache->allow_migrate ? 1 : 0);
DMEMIT("enable_migration_modulator %d ",
cache->enable_migration_modulator ? 1 : 0);
DMEMIT("migrate_threshold %d ", wb->migrate_threshold);
DMEMIT("nr_cur_batched_migration %lu ",
cache->nr_cur_batched_migration);
DMEMIT("sync_interval %lu ",
cache->sync_interval);
DMEMIT("update_record_interval %lu",
cache->update_record_interval);
break;
case STATUSTYPE_TABLE:
DMEMIT("%s %s", wb->device->name, wb->cache->device->name);
break;
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(3, 8, 0)
return 0;
#endif
}
static struct target_type writeboost_target = {
.name = "writeboost",
.version = {0, 1, 0},
.module = THIS_MODULE,
.map = writeboost_map,
.ctr = writeboost_ctr,
.dtr = writeboost_dtr,
.end_io = writeboost_end_io,
.merge = writeboost_merge,
.message = writeboost_message,
.status = writeboost_status,
.io_hints = writeboost_io_hints,
.iterate_devices = writeboost_iterate_devices,
};
struct dm_io_client *wb_io_client;
struct workqueue_struct *safe_io_wq;
static int __init writeboost_module_init(void)
{
int r = 0;
r = dm_register_target(&writeboost_target);
if (r < 0) {
WBERR("%d", r);
return r;
}
r = -ENOMEM;
safe_io_wq = alloc_workqueue("safeiowq",
WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
if (!safe_io_wq) {
WBERR();
goto bad_wq;
}
wb_io_client = dm_io_client_create();
if (IS_ERR(wb_io_client)) {
WBERR();
r = PTR_ERR(wb_io_client);
goto bad_io_client;
}
return 0;
bad_io_client:
destroy_workqueue(safe_io_wq);
bad_wq:
dm_unregister_target(&writeboost_target);
return r;
}
static void __exit writeboost_module_exit(void)
{
dm_io_client_destroy(wb_io_client);
destroy_workqueue(safe_io_wq);
dm_unregister_target(&writeboost_target);
}
module_init(writeboost_module_init);
module_exit(writeboost_module_exit);
MODULE_AUTHOR("Akira Hayakawa <ruby.wktk@...il.com>");
MODULE_DESCRIPTION(DM_NAME " writeboost target");
MODULE_LICENSE("GPL");
---------- util.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@...il.com>
*
* This file is released under the GPL.
*/
#include "util.h"
void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno)
{
size_t count = 0;
void *p;
retry_alloc:
p = kmalloc(size, flags);
if (!p) {
count++;
WBWARN("L%d size:%lu, count:%lu",
lineno, size, count);
schedule_timeout_interruptible(msecs_to_jiffies(1));
goto retry_alloc;
}
return p;
}
struct safe_io {
struct work_struct work;
int err;
unsigned long err_bits;
struct dm_io_request *io_req;
unsigned num_regions;
struct dm_io_region *regions;
};
static void safe_io_proc(struct work_struct *work)
{
struct safe_io *io = container_of(work, struct safe_io, work);
io->err_bits = 0;
io->err = dm_io(io->io_req, io->num_regions, io->regions,
&io->err_bits);
}
/*
* dm_io wrapper.
* @thread run this operation in other thread to avoid deadlock.
*/
int dm_safe_io_internal(
struct dm_io_request *io_req,
unsigned num_regions, struct dm_io_region *regions,
unsigned long *err_bits, bool thread, int lineno)
{
int err;
dev_t dev;
if (thread) {
struct safe_io io = {
.io_req = io_req,
.regions = regions,
.num_regions = num_regions,
};
INIT_WORK_ONSTACK(&io.work, safe_io_proc);
queue_work(safe_io_wq, &io.work);
flush_work(&io.work);
err = io.err;
if (err_bits)
*err_bits = io.err_bits;
} else {
err = dm_io(io_req, num_regions, regions, err_bits);
}
dev = regions->bdev->bd_dev;
/* dm_io routines permits NULL for err_bits pointer. */
if (err || (err_bits && *err_bits)) {
unsigned long eb;
if (!err_bits)
eb = (~(unsigned long)0);
else
eb = *err_bits;
WBERR("L%d err(%d, %lu), rw(%d), sector(%lu), dev(%u:%u)",
lineno, err, eb,
io_req->bi_rw, regions->sector,
MAJOR(dev), MINOR(dev));
}
return err;
}
void dm_safe_io_retry_internal(
struct dm_io_request *io_req,
unsigned num_regions, struct dm_io_region *regions,
bool thread, int lineno)
{
int err, count = 0;
unsigned long err_bits;
dev_t dev;
retry_io:
err_bits = 0;
err = dm_safe_io_internal(io_req, num_regions, regions, &err_bits,
thread, lineno);
dev = regions->bdev->bd_dev;
if (err || err_bits) {
count++;
WBWARN("L%d count(%d)", lineno, count);
schedule_timeout_interruptible(msecs_to_jiffies(1000));
goto retry_io;
}
if (count) {
WBWARN("L%d rw(%d), sector(%lu), dev(%u:%u)",
lineno,
io_req->bi_rw, regions->sector,
MAJOR(dev), MINOR(dev));
}
}
sector_t dm_devsize(struct dm_dev *dev)
{
return i_size_read(dev->bdev->bd_inode) >> SECTOR_SHIFT;
}
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists