lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1404251250-22992-31-git-send-email-tst@schoebel-theuer.de>
Date:	Tue,  1 Jul 2014 23:47:10 +0200
From:	Thomas Schoebel-Theuer <tst@...oebel-theuer.de>
To:	linux-kernel@...r.kernel.org
Subject: [PATCH 30/50] mars: add new file drivers/block/mars/xio_bricks/xio_aio.c

Signed-off-by: Thomas Schoebel-Theuer <tst@...oebel-theuer.de>
---
 drivers/block/mars/xio_bricks/xio_aio.c | 1224 +++++++++++++++++++++++++++++++
 1 file changed, 1224 insertions(+)
 create mode 100644 drivers/block/mars/xio_bricks/xio_aio.c

diff --git a/drivers/block/mars/xio_bricks/xio_aio.c b/drivers/block/mars/xio_bricks/xio_aio.c
new file mode 100644
index 0000000..5356fdb
--- /dev/null
+++ b/drivers/block/mars/xio_bricks/xio_aio.c
@@ -0,0 +1,1224 @@
+/*  (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG */
+
+#define XIO_DEBUGGING
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/version.h>
+#include <linux/string.h>
+#include <linux/list.h>
+#include <linux/types.h>
+#include <linux/blkdev.h>
+#include <linux/spinlock.h>
+#include <linux/wait.h>
+#include <linux/file.h>
+
+#include <linux/xio.h>
+#include <linux/brick/lib_timing.h>
+#include <linux/lib_mapfree.h>
+
+#include <linux/xio/xio_aio.h>
+
+#define XIO_MAX_AIO			1024
+#define XIO_MAX_AIO_READ		32
+
+static struct timing_stats timings[3];
+
+struct threshold aio_submit_threshold = {
+	.thr_ban = &xio_global_ban,
+	.thr_limit = AIO_SUBMIT_MAX_LATENCY,
+	.thr_factor = 10,
+	.thr_plus = 10000,
+};
+EXPORT_SYMBOL_GPL(aio_submit_threshold);
+
+struct threshold aio_io_threshold[2] = {
+	[0] = {
+		.thr_ban = &xio_global_ban,
+		.thr_limit = AIO_IO_R_MAX_LATENCY,
+		.thr_factor = 100,
+		.thr_plus = 0,
+	},
+	[1] = {
+		.thr_ban = &xio_global_ban,
+		.thr_limit = AIO_IO_W_MAX_LATENCY,
+		.thr_factor = 100,
+		.thr_plus = 0,
+	},
+};
+EXPORT_SYMBOL_GPL(aio_io_threshold);
+
+struct threshold aio_sync_threshold = {
+	.thr_ban = &xio_global_ban,
+	.thr_limit = AIO_SYNC_MAX_LATENCY,
+	.thr_factor = 100,
+	.thr_plus = 0,
+};
+EXPORT_SYMBOL_GPL(aio_sync_threshold);
+
+int aio_sync_mode = 2;
+EXPORT_SYMBOL_GPL(aio_sync_mode);
+
+/************************ mmu faking (provisionary) ***********************/
+
+/* Kludge: our kernel threads will have no mm context, but need one
+ * for stuff like ioctx_alloc() / aio_setup_ring() etc
+ * which expect userspace resources.
+ * We fake one.
+ * TODO: factor out the userspace stuff from AIO such that
+ * this fake is no longer necessary.
+ * Even better: replace do_mmap() in AIO stuff by something
+ * more friendly to kernelspace apps.
+ */
+#include <linux/mmu_context.h>
+
+struct mm_struct *mm_fake = NULL;
+struct task_struct *mm_fake_task = NULL;
+atomic_t mm_fake_count = ATOMIC_INIT(0);
+
+static inline void set_fake(void)
+{
+	mm_fake = current->mm;
+	if (mm_fake) {
+		XIO_DBG("initialized fake\n");
+		mm_fake_task = current;
+		get_task_struct(current); /*  paired with put_task_struct() */
+		atomic_inc(&mm_fake->mm_count); /*  paired with mmdrop() */
+		atomic_inc(&mm_fake->mm_users); /*  paired with mmput() */
+	}
+}
+
+static inline void put_fake(void)
+{
+	int count = 0;
+
+	while (mm_fake && mm_fake_task) {
+		int remain = atomic_read(&mm_fake_count);
+
+		if (unlikely(remain != 0)) {
+			if (count++ < 10) {
+				XIO_WRN("cannot cleanup fake, remain = %d\n", remain);
+				brick_msleep(1000);
+				continue;
+			}
+			XIO_ERR("cannot cleanup fake, remain = %d\n", remain);
+			break;
+		} else {
+			XIO_DBG("cleaning up fake\n");
+			mmput(mm_fake);
+			mmdrop(mm_fake);
+			mm_fake = NULL;
+			put_task_struct(mm_fake_task);
+			mm_fake_task = NULL;
+		}
+	}
+}
+
+static inline void use_fake_mm(void)
+{
+	if (!current->mm && mm_fake) {
+		atomic_inc(&mm_fake_count);
+		XIO_DBG("using fake, count=%d\n", atomic_read(&mm_fake_count));
+		use_mm(mm_fake);
+	}
+}
+
+/* Cleanup faked mm, otherwise do_exit() will crash
+ */
+static inline void unuse_fake_mm(void)
+{
+	if (current->mm == mm_fake && mm_fake) {
+		XIO_DBG("unusing fake, count=%d\n", atomic_read(&mm_fake_count));
+		atomic_dec(&mm_fake_count);
+		unuse_mm(mm_fake);
+		current->mm = NULL;
+	}
+}
+
+/************************ own type definitions ***********************/
+
+/***************** some helpers *****************/
+
+static inline
+void _enqueue(struct aio_threadinfo *tinfo, struct aio_aio_aspect *aio_a, int prio, bool at_end)
+{
+	prio++;
+	if (unlikely(prio < 0))
+		prio = 0;
+	else if (unlikely(prio >= XIO_PRIO_NR))
+		prio = XIO_PRIO_NR - 1;
+
+	aio_a->enqueue_stamp = cpu_clock(raw_smp_processor_id());
+
+	spin_lock(&tinfo->lock);
+
+	if (at_end)
+		list_add_tail(&aio_a->io_head, &tinfo->aio_list[prio]);
+	else
+		list_add(&aio_a->io_head, &tinfo->aio_list[prio]);
+	tinfo->queued[prio]++;
+	atomic_inc(&tinfo->queued_sum);
+
+	spin_unlock(&tinfo->lock);
+
+	atomic_inc(&tinfo->total_enqueue_count);
+
+	wake_up_interruptible_all(&tinfo->event);
+}
+
+static inline
+struct aio_aio_aspect *_dequeue(struct aio_threadinfo *tinfo)
+{
+	struct aio_aio_aspect *aio_a = NULL;
+	int prio;
+
+	spin_lock(&tinfo->lock);
+
+	for (prio = 0; prio < XIO_PRIO_NR; prio++) {
+		struct list_head *start = &tinfo->aio_list[prio];
+		struct list_head *tmp = start->next;
+
+		if (tmp != start) {
+			list_del_init(tmp);
+			tinfo->queued[prio]--;
+			atomic_dec(&tinfo->queued_sum);
+			aio_a = container_of(tmp, struct aio_aio_aspect, io_head);
+			goto done;
+		}
+	}
+
+done:
+	spin_unlock(&tinfo->lock);
+
+	if (likely(aio_a && aio_a->object)) {
+		unsigned long long latency;
+
+		latency = cpu_clock(raw_smp_processor_id()) - aio_a->enqueue_stamp;
+		threshold_check(&aio_io_threshold[aio_a->object->io_rw & 1], latency);
+	}
+	return aio_a;
+}
+
+/***************** own brick * input * output operations *****************/
+
+static
+loff_t get_total_size(struct aio_output *output)
+{
+	struct file *file;
+	struct inode *inode;
+	loff_t min;
+
+	file = output->mf->mf_filp;
+	if (unlikely(!file)) {
+		XIO_ERR("file is not open\n");
+		return -EILSEQ;
+	}
+	if (unlikely(!file->f_mapping)) {
+		XIO_ERR("file %p has no mapping\n", file);
+		return -EILSEQ;
+	}
+	inode = file->f_mapping->host;
+	if (unlikely(!inode)) {
+		XIO_ERR("file %p has no inode\n", file);
+		return -EILSEQ;
+	}
+
+	min = i_size_read(inode);
+
+	/* Workaround for races in the page cache.
+	 * It appears that concurrent reads and writes seem to
+	 * result in inconsistent reads in some very rare cases, due to
+	 * races. Sometimes, the inode claims that the file has been already
+	 * appended by a write operation, but the data has not actually hit
+	 * the page cache, such that a concurrent read gets NULL blocks.
+	 */
+	if (!output->brick->is_static_device) {
+		loff_t max = 0;
+
+		mf_get_dirty(output->mf, &min, &max, 0, 99);
+	}
+
+	return min;
+}
+
+static int aio_io_get(struct aio_output *output, struct aio_object *aio)
+{
+	loff_t total_size;
+
+	if (unlikely(!output->mf)) {
+		XIO_ERR("brick is not switched on\n");
+		return -EILSEQ;
+	}
+
+	if (unlikely(aio->io_len <= 0)) {
+		XIO_ERR("bad io_len=%d\n", aio->io_len);
+		return -EILSEQ;
+	}
+
+	total_size = get_total_size(output);
+	if (unlikely(total_size < 0))
+		return total_size;
+	aio->io_total_size = total_size;
+
+	if (aio->obj_initialized) {
+		obj_get(aio);
+		return aio->io_len;
+	}
+
+	/* Buffered IO.
+	 */
+	if (!aio->io_data) {
+		struct aio_aio_aspect *aio_a = aio_aio_get_aspect(output->brick, aio);
+
+		if (unlikely(!aio_a)) {
+			XIO_ERR("bad aio_a\n");
+			return -EILSEQ;
+		}
+		if (unlikely(aio->io_len <= 0)) {
+			XIO_ERR("bad io_len = %d\n", aio->io_len);
+			return -ENOMEM;
+		}
+		aio->io_data = brick_block_alloc(aio->io_pos, (aio_a->alloc_len = aio->io_len));
+		aio_a->do_dealloc = true;
+		atomic_inc(&output->total_alloc_count);
+		atomic_inc(&output->alloc_count);
+	}
+
+	obj_get_first(aio);
+	return aio->io_len;
+}
+
+static void aio_io_put(struct aio_output *output, struct aio_object *aio)
+{
+	struct file *file;
+	struct aio_aio_aspect *aio_a;
+
+	if (!obj_put(aio))
+		goto done;
+
+	if (likely(output->mf)) {
+		file = output->mf->mf_filp;
+		if (likely(file && file->f_mapping && file->f_mapping->host))
+			aio->io_total_size = get_total_size(output);
+	}
+
+	aio_a = aio_aio_get_aspect(output->brick, aio);
+	if (aio_a && aio_a->do_dealloc) {
+		brick_block_free(aio->io_data, aio_a->alloc_len);
+		atomic_dec(&output->alloc_count);
+	}
+	obj_free(aio);
+done:;
+}
+
+static
+void _complete(struct aio_output *output, struct aio_aio_aspect *aio_a, int err)
+{
+	struct aio_object *aio;
+
+	CHECK_PTR(aio_a, fatal);
+	aio = aio_a->object;
+	CHECK_PTR(aio, fatal);
+
+	if (err < 0) {
+		XIO_ERR("IO error %d at pos=%lld len=%d (aio=%p io_data=%p)\n",
+			err,
+			aio->io_pos,
+			aio->io_len,
+			aio,
+			aio->io_data);
+	} else {
+		aio_checksum(aio);
+		aio->io_flags |= AIO_UPTODATE;
+	}
+
+	CHECKED_CALLBACK(aio, err, err_found);
+
+done:
+	if (aio->io_rw)
+		atomic_dec(&output->write_count);
+	else
+		atomic_dec(&output->read_count);
+	mf_remove_dirty(output->mf, &aio_a->di);
+
+	aio_io_put(output, aio);
+	atomic_dec(&xio_global_io_flying);
+	goto out_return;
+err_found:
+	XIO_FAT("giving up...\n");
+	goto done;
+
+fatal:
+	XIO_FAT("bad pointer, giving up...\n");
+out_return:;
+}
+
+static
+void _complete_aio(struct aio_output *output, struct aio_object *aio, int err)
+{
+	struct aio_aio_aspect *aio_a;
+
+	obj_check(aio);
+	aio_a = aio_aio_get_aspect(output->brick, aio);
+	CHECK_PTR(aio_a, fatal);
+	_complete(output, aio_a, err);
+	goto out_return;
+fatal:
+	XIO_FAT("bad pointer, giving up...\n");
+out_return:;
+}
+
+static
+void _complete_all(struct list_head *tmp_list, struct aio_output *output, int err)
+{
+	while (!list_empty(tmp_list)) {
+		struct list_head *tmp = tmp_list->next;
+		struct aio_aio_aspect *aio_a = container_of(tmp, struct aio_aio_aspect, io_head);
+
+		list_del_init(tmp);
+		aio_a->di.dirty_stage = 3;
+		_complete(output, aio_a, err);
+	}
+}
+
+static void aio_io_io(struct aio_output *output, struct aio_object *aio)
+{
+	struct aio_threadinfo *tinfo = &output->tinfo[0];
+	struct aio_aio_aspect *aio_a;
+	int err = -EINVAL;
+
+	obj_get(aio);
+	atomic_inc(&xio_global_io_flying);
+
+	/*  statistics */
+	if (aio->io_rw) {
+		atomic_inc(&output->total_write_count);
+		atomic_inc(&output->write_count);
+	} else {
+		atomic_inc(&output->total_read_count);
+		atomic_inc(&output->read_count);
+	}
+
+	if (unlikely(!output->mf || !output->mf->mf_filp))
+		goto done;
+
+	mapfree_set(output->mf, aio->io_pos, -1);
+
+	aio_a = aio_aio_get_aspect(output->brick, aio);
+	if (unlikely(!aio_a))
+		goto done;
+
+	_enqueue(tinfo, aio_a, aio->io_prio, true);
+	goto out_return;
+done:
+	_complete_aio(output, aio, err);
+out_return:;
+}
+
+static int aio_submit(struct aio_output *output, struct aio_aio_aspect *aio_a, bool use_fdsync)
+{
+	struct aio_object *aio = aio_a->object;
+
+	mm_segment_t oldfs;
+	int res;
+
+	struct iocb iocb = {
+		.aio_data = (__u64)aio_a,
+		.aio_lio_opcode = use_fdsync ? IOCB_CMD_FDSYNC : (aio->io_rw != 0 ? IOCB_CMD_PWRITE : IOCB_CMD_PREAD),
+		.aio_fildes = output->fd,
+		.aio_buf = (unsigned long)aio->io_data,
+		.aio_nbytes = aio->io_len,
+		.aio_offset = aio->io_pos,
+		/*  .aio_reqprio = something(aio->io_prio) field exists, but not yet implemented in kernelspace :( */
+	};
+	struct iocb *iocbp = &iocb;
+	unsigned long long latency;
+
+	if (unlikely(output->fd < 0)) {
+		XIO_ERR("bad fd = %d\n", output->fd);
+		res = -EBADF;
+		goto done;
+	}
+
+	oldfs = get_fs();
+	set_fs(get_ds());
+	latency = TIME_STATS(&timings[aio->io_rw & 1], res = sys_io_submit(output->ctxp, 1, &iocbp));
+	set_fs(oldfs);
+
+	threshold_check(&aio_submit_threshold, latency);
+
+	atomic_inc(&output->total_submit_count);
+
+	if (likely(res >= 0))
+		atomic_inc(&output->submit_count);
+	else if (likely(res == -EAGAIN))
+		atomic_inc(&output->total_again_count);
+	else
+		XIO_ERR("error = %d\n", res);
+
+done:
+	return res;
+}
+
+static int aio_submit_dummy(struct aio_output *output)
+{
+	mm_segment_t oldfs;
+	int res;
+	int dummy;
+
+	struct iocb iocb = {
+		.aio_buf = (__u64)&dummy,
+	};
+	struct iocb *iocbp = &iocb;
+
+	oldfs = get_fs();
+	set_fs(get_ds());
+	res = sys_io_submit(output->ctxp, 1, &iocbp);
+	set_fs(oldfs);
+
+	if (likely(res >= 0))
+		atomic_inc(&output->submit_count);
+	return res;
+}
+
+static
+int aio_start_thread(
+	struct aio_output *output,
+	struct aio_threadinfo *tinfo,
+	int (*fn)(void *),
+	char class)
+{
+	int j;
+
+	for (j = 0; j < XIO_PRIO_NR; j++)
+		INIT_LIST_HEAD(&tinfo->aio_list[j]);
+	tinfo->output = output;
+	spin_lock_init(&tinfo->lock);
+	init_waitqueue_head(&tinfo->event);
+	init_waitqueue_head(&tinfo->terminate_event);
+	tinfo->terminated = false;
+	tinfo->thread = brick_thread_create(fn, tinfo, "xio_aio_%c%d", class, output->index);
+	if (unlikely(!tinfo->thread)) {
+		XIO_ERR("cannot create thread\n");
+		return -ENOENT;
+	}
+	return 0;
+}
+
+static
+void aio_stop_thread(struct aio_output *output, int i, bool do_submit_dummy)
+{
+	struct aio_threadinfo *tinfo = &output->tinfo[i];
+
+	if (tinfo->thread) {
+		XIO_DBG("stopping thread %d ...\n", i);
+		brick_thread_stop_nowait(tinfo->thread);
+
+/**/
+		if (do_submit_dummy) {
+			XIO_DBG("submitting dummy for wakeup %d...\n", i);
+			use_fake_mm();
+			aio_submit_dummy(output);
+			if (likely(current->mm))
+				unuse_fake_mm();
+		}
+
+		/*  wait for termination */
+		XIO_DBG("waiting for thread %d ...\n", i);
+		wait_event_interruptible_timeout(
+			tinfo->terminate_event,
+			tinfo->terminated,
+			(60 - i * 2) * HZ);
+		if (likely(tinfo->terminated))
+			brick_thread_stop(tinfo->thread);
+		else
+			XIO_ERR("thread %d did not terminate - leaving a zombie\n", i);
+	}
+}
+
+static
+int aio_sync(struct file *file)
+{
+	int err;
+
+	switch (aio_sync_mode) {
+	case 1:
+#if defined(S_BIAS) || (defined(RHEL_MAJOR) && (RHEL_MAJOR < 7))
+		err = vfs_fsync_range(file, file->f_path.dentry, 0, LLONG_MAX, 1);
+#else
+		err = vfs_fsync_range(file, 0, LLONG_MAX, 1);
+#endif
+		break;
+	case 2:
+#if defined(S_BIAS) || (defined(RHEL_MAJOR) && (RHEL_MAJOR < 7))
+		err = vfs_fsync_range(file, file->f_path.dentry, 0, LLONG_MAX, 0);
+#else
+		err = vfs_fsync_range(file, 0, LLONG_MAX, 0);
+#endif
+		break;
+	default:
+		err = filemap_write_and_wait_range(file->f_mapping, 0, LLONG_MAX);
+	}
+
+	return err;
+}
+
+static
+void aio_sync_all(struct aio_output *output, struct list_head *tmp_list)
+{
+	unsigned long long latency;
+	int err;
+
+	output->fdsync_active = true;
+	atomic_inc(&output->total_fdsync_count);
+
+	latency = TIME_STATS(
+		&timings[2],
+		err = aio_sync(output->mf->mf_filp)
+		);
+
+	threshold_check(&aio_sync_threshold, latency);
+
+	output->fdsync_active = false;
+	wake_up_interruptible_all(&output->fdsync_event);
+	if (err < 0)
+		XIO_ERR("FDSYNC error %d\n", err);
+
+	/* Signal completion for the whole list.
+	 * No locking needed, it's on the stack.
+	 */
+	_complete_all(tmp_list, output, err);
+}
+
+/* Workaround for non-implemented aio_fsync()
+ */
+static
+int aio_sync_thread(void *data)
+{
+	struct aio_threadinfo *tinfo = data;
+	struct aio_output *output = tinfo->output;
+
+	XIO_DBG("sync thread has started on '%s'.\n", output->brick->brick_path);
+	/* set_user_nice(current, -20); */
+
+	while (!brick_thread_should_stop() || atomic_read(&tinfo->queued_sum) > 0) {
+		LIST_HEAD(tmp_list);
+		int i;
+
+		output->fdsync_active = false;
+		wake_up_interruptible_all(&output->fdsync_event);
+
+		wait_event_interruptible_timeout(
+			tinfo->event,
+			atomic_read(&tinfo->queued_sum) > 0,
+			HZ / 4);
+
+		spin_lock(&tinfo->lock);
+		for (i = 0; i < XIO_PRIO_NR; i++) {
+			struct list_head *start = &tinfo->aio_list[i];
+
+			if (!list_empty(start)) {
+				/*  move over the whole list */
+				list_replace_init(start, &tmp_list);
+				atomic_sub(tinfo->queued[i], &tinfo->queued_sum);
+				tinfo->queued[i] = 0;
+				break;
+			}
+		}
+		spin_unlock(&tinfo->lock);
+
+		if (!list_empty(&tmp_list))
+			aio_sync_all(output, &tmp_list);
+	}
+
+	XIO_DBG("sync thread has stopped.\n");
+	tinfo->terminated = true;
+	wake_up_interruptible_all(&tinfo->terminate_event);
+	return 0;
+}
+
+static int aio_event_thread(void *data)
+{
+	struct aio_threadinfo *tinfo = data;
+	struct aio_output *output = tinfo->output;
+	struct aio_threadinfo *other = &output->tinfo[2];
+	struct io_event *events;
+	int err = -ENOMEM;
+
+	events = brick_mem_alloc(sizeof(struct io_event) * XIO_MAX_AIO_READ);
+
+	XIO_DBG("event thread has started.\n");
+
+	use_fake_mm();
+	if (!current->mm)
+		goto err;
+
+	err = aio_start_thread(output, &output->tinfo[2], aio_sync_thread, 'y');
+	if (unlikely(err < 0))
+		goto err;
+
+	while (!brick_thread_should_stop() || atomic_read(&tinfo->queued_sum) > 0) {
+		mm_segment_t oldfs;
+		int count;
+		int i;
+
+		struct timespec timeout = {
+			.tv_sec = 1,
+		};
+
+		oldfs = get_fs();
+		set_fs(get_ds());
+		/* TODO: don't timeout upon termination.
+		 * Probably we should submit a dummy request.
+		 */
+		count = sys_io_getevents(output->ctxp, 1, XIO_MAX_AIO_READ, events, &timeout);
+		set_fs(oldfs);
+
+		if (likely(count > 0))
+			atomic_sub(count, &output->submit_count);
+
+		for (i = 0; i < count; i++) {
+			struct aio_aio_aspect *aio_a = (void *)events[i].data;
+			struct aio_object *aio;
+			int err = events[i].res;
+
+			if (!aio_a)
+				continue; /*  this was a dummy request */
+
+			aio_a->di.dirty_stage = 2;
+			aio = aio_a->object;
+
+			mapfree_set(output->mf, aio->io_pos, aio->io_pos + aio->io_len);
+
+			if (output->brick->o_fdsync
+			   && err >= 0
+			   && aio->io_rw != READ
+			   && !aio->io_skip_sync
+			   && !aio_a->resubmit++) {
+				/*  workaround for non-implemented AIO FSYNC operation */
+				if (output->mf &&
+				    output->mf->mf_filp &&
+				    output->mf->mf_filp->f_op &&
+				    !output->mf->mf_filp->f_op->aio_fsync) {
+					_enqueue(other, aio_a, aio->io_prio, true);
+					continue;
+				}
+				err = aio_submit(output, aio_a, true);
+				if (likely(err >= 0))
+					continue;
+			}
+
+			aio_a->di.dirty_stage = 3;
+			_complete(output, aio_a, err);
+
+		}
+	}
+	err = 0;
+
+err:
+	XIO_DBG("event thread has stopped, err = %d\n", err);
+
+	aio_stop_thread(output, 2, false);
+
+	unuse_fake_mm();
+
+	tinfo->terminated = true;
+	wake_up_interruptible_all(&tinfo->terminate_event);
+	brick_mem_free(events);
+	return err;
+}
+
+#if 1
+/* This should go to fs/open.c (as long as vfs_submit() is not implemented)
+ */
+#include <linux/fdtable.h>
+void fd_uninstall(unsigned int fd)
+{
+	struct files_struct *files = current->files;
+	struct fdtable *fdt;
+
+	XIO_DBG("fd = %d\n", fd);
+	if (unlikely(fd < 0)) {
+		XIO_ERR("bad fd = %d\n", fd);
+		goto out_return;
+	}
+	spin_lock(&files->file_lock);
+	fdt = files_fdtable(files);
+	rcu_assign_pointer(fdt->fd[fd], NULL);
+	spin_unlock(&files->file_lock);
+out_return:;
+}
+EXPORT_SYMBOL(fd_uninstall);
+#endif
+
+static
+atomic_t ioctx_count = ATOMIC_INIT(0);
+
+static
+void _destroy_ioctx(struct aio_output *output)
+{
+	if (unlikely(!output))
+		goto done;
+
+	aio_stop_thread(output, 1, true);
+
+	use_fake_mm();
+
+	if (likely(output->ctxp)) {
+		mm_segment_t oldfs;
+		int err;
+
+		XIO_DBG("ioctx count = %d destroying %p\n", atomic_read(&ioctx_count), (void *)output->ctxp);
+		oldfs = get_fs();
+		set_fs(get_ds());
+		err = sys_io_destroy(output->ctxp);
+		set_fs(oldfs);
+		atomic_dec(&ioctx_count);
+		XIO_DBG("ioctx count = %d status = %d\n", atomic_read(&ioctx_count), err);
+		output->ctxp = 0;
+	}
+
+	if (likely(output->fd >= 0)) {
+		XIO_DBG("destroying fd %d\n", output->fd);
+		fd_uninstall(output->fd);
+		put_unused_fd(output->fd);
+		output->fd = -1;
+	}
+
+done:
+	if (likely(current->mm))
+		unuse_fake_mm();
+}
+
+static
+int _create_ioctx(struct aio_output *output)
+{
+	struct file *file;
+
+	mm_segment_t oldfs;
+	int err = -EINVAL;
+
+	CHECK_PTR_NULL(output, done);
+	CHECK_PTR_NULL(output->mf, done);
+	file = output->mf->mf_filp;
+	CHECK_PTR_NULL(file, done);
+
+	/* TODO: this is provisionary. We only need it for sys_io_submit()
+	 * which uses userspace concepts like file handles.
+	 * This should be accompanied by a future kernelsapce vfs_submit() or
+	 * do_submit() which currently does not exist :(
+	 */
+	err = get_unused_fd();
+	XIO_DBG("file %p '%s' new fd = %d\n", file, output->mf->mf_name, err);
+	if (unlikely(err < 0)) {
+		XIO_ERR("cannot get fd, err=%d\n", err);
+		goto done;
+	}
+	output->fd = err;
+	fd_install(err, file);
+
+	use_fake_mm();
+
+	err = -ENOMEM;
+	if (unlikely(!current->mm)) {
+		XIO_ERR("cannot fake mm\n");
+		goto done;
+	}
+
+	XIO_DBG("ioctx count = %d old = %p\n", atomic_read(&ioctx_count), (void *)output->ctxp);
+	output->ctxp = 0;
+
+	oldfs = get_fs();
+	set_fs(get_ds());
+	err = sys_io_setup(XIO_MAX_AIO, &output->ctxp);
+	set_fs(oldfs);
+	if (likely(output->ctxp))
+		atomic_inc(&ioctx_count);
+	XIO_DBG("ioctx count = %d new = %p status = %d\n", atomic_read(&ioctx_count), (void *)output->ctxp, err);
+	if (unlikely(err < 0)) {
+		XIO_ERR("io_setup failed, err=%d\n", err);
+		goto done;
+	}
+
+	err = aio_start_thread(output, &output->tinfo[1], aio_event_thread, 'e');
+	if (unlikely(err < 0)) {
+		XIO_ERR("could not start event thread\n");
+		goto done;
+	}
+
+done:
+	if (likely(current->mm))
+		unuse_fake_mm();
+	return err;
+}
+
+static int aio_submit_thread(void *data)
+{
+	struct aio_threadinfo *tinfo = data;
+	struct aio_output *output = tinfo->output;
+	struct file *file;
+	int err = -EINVAL;
+
+	XIO_DBG("submit thread has started.\n");
+
+	file = output->mf->mf_filp;
+
+	use_fake_mm();
+
+	while (!brick_thread_should_stop() || atomic_read(&output->read_count) + atomic_read(&output->write_count) + atomic_read(&tinfo->queued_sum) > 0) {
+		struct aio_aio_aspect *aio_a;
+		struct aio_object *aio;
+		int sleeptime;
+		int status;
+
+		wait_event_interruptible_timeout(
+			tinfo->event,
+			atomic_read(&tinfo->queued_sum) > 0,
+			HZ / 4);
+
+		aio_a = _dequeue(tinfo);
+		if (!aio_a)
+			continue;
+
+		aio = aio_a->object;
+		status = -EINVAL;
+		CHECK_PTR(aio, error);
+
+		mapfree_set(output->mf, aio->io_pos, -1);
+
+		aio_a->di.dirty_stage = 0;
+		if (aio->io_rw)
+			mf_insert_dirty(output->mf, &aio_a->di);
+
+		aio->io_total_size = get_total_size(output);
+
+		/*  check for reads crossing the EOF boundary (special case) */
+		if (aio->io_timeout > 0 &&
+		    !aio->io_rw &&
+		    aio->io_pos + aio->io_len > aio->io_total_size) {
+			loff_t len = aio->io_total_size - aio->io_pos;
+
+			if (len > 0) {
+				if (aio->io_len > len)
+					aio->io_len = len;
+			} else {
+				if (!aio_a->start_jiffies)
+					aio_a->start_jiffies = jiffies;
+				if ((long long)jiffies - aio_a->start_jiffies <= aio->io_timeout) {
+					if (atomic_read(&tinfo->queued_sum) <= 0) {
+						atomic_inc(&output->total_msleep_count);
+						brick_msleep(1000 * 4 / HZ);
+					}
+					_enqueue(tinfo, aio_a, XIO_PRIO_LOW, true);
+					continue;
+				}
+				XIO_DBG("ENODATA %lld\n", len);
+				_complete(output, aio_a, -ENODATA);
+				continue;
+			}
+		}
+
+		sleeptime = 1;
+		for (;;) {
+			aio_a->di.dirty_stage = 1;
+			status = aio_submit(output, aio_a, false);
+
+			if (likely(status != -EAGAIN))
+				break;
+			aio_a->di.dirty_stage = 0;
+			atomic_inc(&output->total_delay_count);
+			brick_msleep(sleeptime);
+			if (sleeptime < 100)
+				sleeptime++;
+		}
+
+error:
+		if (unlikely(status < 0))
+			_complete_aio(output, aio, status);
+	}
+
+	XIO_DBG("submit thread has stopped, status = %d.\n", err);
+
+	if (likely(current->mm))
+		unuse_fake_mm();
+
+	tinfo->terminated = true;
+	wake_up_interruptible_all(&tinfo->terminate_event);
+	return err;
+}
+
+static int aio_get_info(struct aio_output *output, struct xio_info *info)
+{
+	struct file *file;
+
+	if (unlikely(!output || !output->mf))
+		return -EINVAL;
+	file = output->mf->mf_filp;
+	if (unlikely(!file || !file->f_mapping || !file->f_mapping->host))
+		return -EINVAL;
+
+	info->tf_align = 1;
+	info->tf_min_size = 1;
+	info->current_size = get_total_size(output);
+
+	XIO_DBG("determined file size = %lld\n", info->current_size);
+
+	return 0;
+}
+
+/*************** informational * statistics **************/
+
+static noinline
+char *aio_statistics(struct aio_brick *brick, int verbose)
+{
+	struct aio_output *output = brick->outputs[0];
+	char *res = brick_string_alloc(4096);
+	char *sync = NULL;
+	int pos = 0;
+
+	pos += report_timing(&timings[0], res + pos, 4096 - pos);
+	pos += report_timing(&timings[1], res + pos, 4096 - pos);
+	pos += report_timing(&timings[2], res + pos, 4096 - pos);
+
+	snprintf(res + pos, 4096 - pos,
+		 "total reads = %d writes = %d allocs = %d submits = %d again = %d delays = %d msleeps = %d fdsyncs = %d fdsync_waits = %d map_free = %d | flying reads = %d writes = %d allocs = %d submits = %d q0 = %d q1 = %d q2 = %d | total q0 = %d q1 = %d q2 = %d %s\n",
+		 atomic_read(&output->total_read_count),
+		 atomic_read(&output->total_write_count),
+		 atomic_read(&output->total_alloc_count),
+		 atomic_read(&output->total_submit_count),
+		 atomic_read(&output->total_again_count),
+		 atomic_read(&output->total_delay_count),
+		 atomic_read(&output->total_msleep_count),
+		 atomic_read(&output->total_fdsync_count),
+		 atomic_read(&output->total_fdsync_wait_count),
+		 atomic_read(&output->total_mapfree_count),
+		 atomic_read(&output->read_count),
+		 atomic_read(&output->write_count),
+		 atomic_read(&output->alloc_count),
+		 atomic_read(&output->submit_count),
+		 atomic_read(&output->tinfo[0].queued_sum),
+		 atomic_read(&output->tinfo[1].queued_sum),
+		 atomic_read(&output->tinfo[2].queued_sum),
+		 atomic_read(&output->tinfo[0].total_enqueue_count),
+		 atomic_read(&output->tinfo[1].total_enqueue_count),
+		 atomic_read(&output->tinfo[2].total_enqueue_count),
+		 sync ? sync : "");
+
+	if (sync)
+		brick_string_free(sync);
+
+	return res;
+}
+
+static noinline
+void aio_reset_statistics(struct aio_brick *brick)
+{
+	struct aio_output *output = brick->outputs[0];
+	int i;
+
+	atomic_set(&output->total_read_count, 0);
+	atomic_set(&output->total_write_count, 0);
+	atomic_set(&output->total_alloc_count, 0);
+	atomic_set(&output->total_submit_count, 0);
+	atomic_set(&output->total_again_count, 0);
+	atomic_set(&output->total_delay_count, 0);
+	atomic_set(&output->total_msleep_count, 0);
+	atomic_set(&output->total_fdsync_count, 0);
+	atomic_set(&output->total_fdsync_wait_count, 0);
+	atomic_set(&output->total_mapfree_count, 0);
+	for (i = 0; i < 3; i++) {
+		struct aio_threadinfo *tinfo = &output->tinfo[i];
+
+		atomic_set(&tinfo->total_enqueue_count, 0);
+	}
+}
+
+/*************** object * aspect constructors * destructors **************/
+
+static int aio_aio_aspect_init_fn(struct generic_aspect *_ini)
+{
+	struct aio_aio_aspect *ini = (void *)_ini;
+
+	INIT_LIST_HEAD(&ini->io_head);
+	INIT_LIST_HEAD(&ini->di.dirty_head);
+	ini->di.dirty_aio = ini->object;
+	return 0;
+}
+
+static void aio_aio_aspect_exit_fn(struct generic_aspect *_ini)
+{
+	struct aio_aio_aspect *ini = (void *)_ini;
+
+	CHECK_HEAD_EMPTY(&ini->di.dirty_head);
+	CHECK_HEAD_EMPTY(&ini->io_head);
+}
+
+XIO_MAKE_STATICS(aio);
+
+/********************* brick constructors * destructors *******************/
+
+static int aio_brick_construct(struct aio_brick *brick)
+{
+	return 0;
+}
+
+static int aio_switch(struct aio_brick *brick)
+{
+	static int index;
+	struct aio_output *output = brick->outputs[0];
+	const char *path = output->brick->brick_path;
+	int flags = O_RDWR | O_LARGEFILE;
+	int status = 0;
+
+	XIO_DBG("power.button = %d\n", brick->power.button);
+	if (!brick->power.button)
+		goto cleanup;
+
+	if (brick->power.on_led || output->mf)
+		goto done;
+
+	xio_set_power_off_led((void *)brick, false);
+
+	if (brick->o_creat) {
+		flags |= O_CREAT;
+		XIO_DBG("using O_CREAT on %s\n", path);
+	}
+	if (brick->o_direct) {
+		flags |= O_DIRECT;
+		XIO_DBG("using O_DIRECT on %s\n", path);
+	}
+
+	output->mf = mapfree_get(path, flags);
+	if (unlikely(!output->mf)) {
+		XIO_ERR("could not open file = '%s' flags = %d\n", path, flags);
+		status = -ENOENT;
+		goto err;
+	}
+
+	output->index = ++index;
+
+	status = _create_ioctx(output);
+	if (unlikely(status < 0)) {
+		XIO_ERR("could not create ioctx, status = %d\n", status);
+		goto err;
+	}
+
+	status = aio_start_thread(output, &output->tinfo[0], aio_submit_thread, 's');
+	if (unlikely(status < 0)) {
+		XIO_ERR("could not start theads, status = %d\n", status);
+		goto err;
+	}
+
+	XIO_DBG("opened file '%s'\n", path);
+	xio_set_power_on_led((void *)brick, true);
+
+done:
+	return 0;
+
+err:
+	XIO_ERR("status = %d\n", status);
+cleanup:
+	if (brick->power.off_led)
+		goto done;
+
+	xio_set_power_on_led((void *)brick, false);
+
+	aio_stop_thread(output, 0, false);
+
+	_destroy_ioctx(output);
+
+	xio_set_power_off_led((void *)brick,
+			  (output->tinfo[0].thread == NULL &&
+			   output->tinfo[1].thread == NULL &&
+			   output->tinfo[2].thread == NULL));
+
+	XIO_DBG("switch off off_led = %d status = %d\n", brick->power.off_led, status);
+	if (brick->power.off_led) {
+		if (output->mf) {
+			XIO_DBG("closing file = '%s'\n", output->mf->mf_name);
+			mapfree_put(output->mf);
+			output->mf = NULL;
+		}
+	}
+	return status;
+}
+
+static int aio_output_construct(struct aio_output *output)
+{
+	init_waitqueue_head(&output->fdsync_event);
+	output->fd = -1;
+	return 0;
+}
+
+static int aio_output_destruct(struct aio_output *output)
+{
+	if (unlikely(output->fd >= 0))
+		XIO_ERR("active fd = %d detected\n", output->fd);
+	return 0;
+}
+
+/************************ static structs ***********************/
+
+static struct aio_brick_ops aio_brick_ops = {
+	.brick_switch = aio_switch,
+	.brick_statistics = aio_statistics,
+	.reset_statistics = aio_reset_statistics,
+};
+
+static struct aio_output_ops aio_output_ops = {
+	.aio_get = aio_io_get,
+	.aio_put = aio_io_put,
+	.aio_io = aio_io_io,
+	.xio_get_info = aio_get_info,
+};
+
+const struct aio_input_type aio_input_type = {
+	.type_name = "aio_input",
+	.input_size = sizeof(struct aio_input),
+};
+
+static const struct aio_input_type *aio_input_types[] = {
+	&aio_input_type,
+};
+
+const struct aio_output_type aio_output_type = {
+	.type_name = "aio_output",
+	.output_size = sizeof(struct aio_output),
+	.master_ops = &aio_output_ops,
+	.output_construct = &aio_output_construct,
+	.output_destruct = &aio_output_destruct,
+};
+
+static const struct aio_output_type *aio_output_types[] = {
+	&aio_output_type,
+};
+
+const struct aio_brick_type aio_brick_type = {
+	.type_name = "aio_brick",
+	.brick_size = sizeof(struct aio_brick),
+	.max_inputs = 0,
+	.max_outputs = 1,
+	.master_ops = &aio_brick_ops,
+	.aspect_types = aio_aspect_types,
+	.default_input_types = aio_input_types,
+	.default_output_types = aio_output_types,
+	.brick_construct = &aio_brick_construct,
+};
+EXPORT_SYMBOL_GPL(aio_brick_type);
+
+/***************** module init stuff ************************/
+
+int __init init_xio_aio(void)
+{
+	XIO_DBG("init_aio()\n");
+	_aio_brick_type = (void *)&aio_brick_type;
+	set_fake();
+	return aio_register_brick_type();
+}
+
+void exit_xio_aio(void)
+{
+	XIO_DBG("exit_aio()\n");
+	put_fake();
+	aio_unregister_brick_type();
+}
-- 
2.0.0

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ