lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <6bf93d24571024f4b7237c2f7aac191c3126cb13.1483138400.git.tst@schoebel-theuer.de>
Date:   Fri, 30 Dec 2016 23:57:55 +0100
From:   Thomas Schoebel-Theuer <tst@...oebel-theuer.de>
To:     linux-kernel@...r.kernel.org, tst@...oebel-theuer.de
Subject: [RFC 29/32] mars: add new module mars_main

Signed-off-by: Thomas Schoebel-Theuer <tst@...oebel-theuer.de>
---
 drivers/staging/mars/mars/mars_main.c | 6160 +++++++++++++++++++++++++++++++++
 1 file changed, 6160 insertions(+)
 create mode 100644 drivers/staging/mars/mars/mars_main.c

diff --git a/drivers/staging/mars/mars/mars_main.c b/drivers/staging/mars/mars/mars_main.c
new file mode 100644
index 000000000000..346e0fbeb9b2
--- /dev/null
+++ b/drivers/staging/mars/mars/mars_main.c
@@ -0,0 +1,6160 @@
+/*
+ * MARS Long Distance Replication Software
+ *
+ * Copyright (C) 2010-2014 Thomas Schoebel-Theuer
+ * Copyright (C) 2011-2014 1&1 Internet AG
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#define XIO_DEBUGGING
+
+/* This MUST be updated whenever INCOMPATIBLE changes are made to the
+ * symlink tree in /mars/ .
+ *
+ * Just adding a new symlink is usually not "incompatible", if
+ * other tools like marsadm just ignore it.
+ *
+ * "incompatible" means that something may BREAK.
+ */
+#define SYMLINK_TREE_VERSION		"0.1"
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/string.h>
+
+#include <linux/genhd.h>
+#include <linux/blkdev.h>
+
+#include "strategy.h"
+
+#include <linux/wait.h>
+
+#include <linux/xio/lib_mapfree.h>
+
+/*  used brick types */
+#include <linux/xio/xio_server.h>
+#include <linux/xio/xio_client.h>
+#include <linux/xio/xio_copy.h>
+#include <linux/xio/xio_bio.h>
+#include <linux/xio/xio_sio.h>
+#include <linux/xio/xio_trans_logger.h>
+#include <linux/xio/xio_if.h>
+#include "mars_proc.h"
+
+#define REPLAY_TOLERANCE		(PAGE_SIZE + OVERHEAD)
+
+/*  TODO: add human-readable timestamps */
+#define XIO_INF_TO(channel, fmt, args...)				\
+	({								\
+		say_to(channel, SAY_INFO, "%s: " fmt, say_class[SAY_INFO], ##args);\
+		XIO_INF(fmt, ##args);					\
+	})
+
+#define XIO_WRN_TO(channel, fmt, args...)				\
+	({								\
+		say_to(channel, SAY_WARN, "%s: " fmt, say_class[SAY_WARN], ##args);\
+		XIO_WRN(fmt, ##args);					\
+	})
+
+#define XIO_ERR_TO(channel, fmt, args...)				\
+	({								\
+		say_to(channel, SAY_ERROR, "%s: " fmt, say_class[SAY_ERROR], ##args);\
+		XIO_ERR(fmt, ##args);					\
+	})
+
+loff_t raw_total_space;
+loff_t global_total_space;
+
+loff_t raw_remaining_space;
+loff_t global_remaining_space;
+
+int check_mars_space = 1;
+
+module_param_named(check_mars_space, check_mars_space, int, 0);
+
+int global_logrot_auto = 32;
+
+module_param_named(logrot_auto, global_logrot_auto, int, 0);
+
+int global_free_space_0 = CONFIG_MARS_MIN_SPACE_0;
+
+int global_free_space_1 = CONFIG_MARS_MIN_SPACE_1;
+
+int global_free_space_2 = CONFIG_MARS_MIN_SPACE_2;
+
+int global_free_space_3 = CONFIG_MARS_MIN_SPACE_3;
+
+int global_free_space_4 = CONFIG_MARS_MIN_SPACE_4;
+
+int _global_sync_want;
+int global_sync_want;
+
+int global_sync_nr;
+
+int global_sync_limit;
+
+int mars_rollover_interval = 3;
+
+module_param_named(mars_rollover_interval, mars_rollover_interval, int, 0);
+
+int mars_scan_interval = 5;
+
+module_param_named(mars_scan_interval, mars_scan_interval, int, 0);
+
+int mars_propagate_interval = 5;
+
+module_param_named(mars_propagate_interval, mars_propagate_interval, int, 0);
+
+int mars_sync_flip_interval = 60;
+
+module_param_named(mars_sync_flip_interval, mars_sync_flip_interval, int, 0);
+
+int mars_peer_abort = 7;
+
+int mars_fast_fullsync = 1;
+
+module_param_named(mars_fast_fullsync, mars_fast_fullsync, int, 0);
+
+int xio_throttle_start = 60;
+
+int xio_throttle_end = 90;
+
+int mars_emergency_mode;
+
+int mars_reset_emergency = 1;
+
+int mars_keep_msg = 10;
+
+#ifdef CONFIG_MARS_DEBUG
+#include <linux/reboot.h>
+
+int mars_crash_mode;
+int mars_hang_mode;
+
+void _crashme(int mode, bool do_sync)
+{
+	if (mode == mars_crash_mode) {
+		if (do_sync)
+			mars_sync();
+		emergency_restart();
+	}
+}
+
+#endif
+
+#define MARS_SYMLINK_MAX		1023
+
+struct key_value_pair {
+	const char *key;
+	char *val;
+	char *old_val;
+	unsigned long last_jiffies;
+	struct timespec system_stamp;
+	struct timespec lamport_stamp;
+};
+
+static inline
+void clear_vals(struct key_value_pair *start)
+{
+	while (start->key) {
+		brick_string_free(start->val);
+		start->val = NULL;
+		brick_string_free(start->old_val);
+		start->old_val = NULL;
+		start++;
+	}
+}
+
+static
+void show_vals(struct key_value_pair *start, const char *path, const char *add)
+{
+	while (start->key) {
+		char *dst = path_make("%s/actual-%s/msg-%s%s", path, my_id(), add, start->key);
+
+		/*  show the old message for some keep_time if no new one is available */
+		if (!start->val && start->old_val &&
+		    (long long)start->last_jiffies  + mars_keep_msg * HZ <= (long long)jiffies) {
+			start->val = start->old_val;
+			start->old_val = NULL;
+		}
+		if (start->val) {
+			char *src = path_make(
+			"%ld.%09ld %ld.%09ld %s",
+					      start->system_stamp.tv_sec, start->system_stamp.tv_nsec,
+					      start->lamport_stamp.tv_sec, start->lamport_stamp.tv_nsec,
+					      start->val);
+			mars_symlink(src, dst, NULL, 0);
+			brick_string_free(src);
+			brick_string_free(start->old_val);
+			start->old_val = start->val;
+			start->val = NULL;
+		} else {
+			mars_symlink("OK", dst, NULL, 0);
+			memset(&start->system_stamp, 0, sizeof(start->system_stamp));
+			memset(&start->lamport_stamp, 0, sizeof(start->lamport_stamp));
+			brick_string_free(start->old_val);
+			start->old_val = NULL;
+		}
+		brick_string_free(dst);
+		start++;
+	}
+}
+
+static inline
+void assign_keys(struct key_value_pair *start, const char **keys)
+{
+	while (*keys) {
+		start->key = *keys;
+		start++;
+		keys++;
+	}
+}
+
+static inline
+struct key_value_pair *find_key(struct key_value_pair *start, const char *key)
+{
+	while (start->key) {
+		if (!strcmp(start->key, key))
+			return start;
+		start++;
+	}
+	XIO_ERR("cannot find key '%s'\n", key);
+	return NULL;
+}
+
+static
+void _make_msg(int line, struct key_value_pair *pair, const char *fmt, ...)  __printf(3, 4);
+
+static
+void _make_msg(int line, struct key_value_pair *pair, const char *fmt, ...)
+{
+	int len;
+	va_list args;
+
+	if (unlikely(!pair || !pair->key)) {
+		XIO_ERR("bad pointer %p at line %d\n", pair, line);
+		goto out_return;
+	}
+	pair->last_jiffies = jiffies;
+	if (!pair->val) {
+		pair->val = brick_string_alloc(MARS_SYMLINK_MAX + 1);
+		len = 0;
+		if (!pair->system_stamp.tv_sec) {
+			pair->system_stamp = CURRENT_TIME;
+			get_lamport(&pair->lamport_stamp);
+		}
+	} else {
+		len = strnlen(pair->val, MARS_SYMLINK_MAX);
+		if (unlikely(len >= MARS_SYMLINK_MAX - 48))
+			goto out_return;
+		pair->val[len++] = ',';
+	}
+
+	va_start(args, fmt);
+	vsnprintf(pair->val + len, MARS_SYMLINK_MAX - 1 - len, fmt, args);
+	va_end(args);
+out_return:;
+}
+
+#define make_msg(pair, fmt, args...)					\
+	_make_msg(__LINE__, pair, fmt, ##args)
+
+static
+struct key_value_pair gbl_pairs[] = {
+	{ NULL }
+};
+
+#define make_gbl_msg(key, fmt, args...)					\
+	make_msg(find_key(gbl_pairs, key), fmt, ##args)
+
+static
+const char *rot_keys[] = {
+	/*  from _update_version_link() */
+	"err-versionlink-skip",
+	/*  from _update_info() */
+	"err-sequence-trash",
+	/*  from _is_switchover_possible() */
+	"inf-versionlink-not-yet-exist",
+	"inf-versionlink-not-equal",
+	"inf-replay-not-yet-finished",
+	"err-bad-log-name",
+	"err-log-not-contiguous",
+	"err-versionlink-not-readable",
+	"err-replaylink-not-readable",
+	"err-splitbrain-detected",
+	/*  from _update_file() */
+	"inf-fetch",
+	/*  from make_sync() */
+	"inf-sync",
+	/*  from make_log_step() */
+	"wrn-log-consecutive",
+	/*  from make_log_finalize() */
+	"inf-replay-start",
+	"wrn-space-low",
+	"err-space-low",
+	"err-emergency",
+	"err-replay-stop",
+	/*  from _check_logging_status() */
+	"inf-replay-tolerance",
+	"err-replay-size",
+	NULL,
+};
+
+#define make_rot_msg(rot, key, fmt, args...)				\
+	make_msg(find_key(&(rot)->msgs[0], key), fmt, ##args)
+
+#define IS_EXHAUSTED()		   (mars_emergency_mode > 0)
+#define IS_EMERGENCY_SECONDARY()   (mars_emergency_mode > 1)
+#define IS_EMERGENCY_PRIMARY()	   (mars_emergency_mode > 2)
+#define IS_JAMMED()		   (mars_emergency_mode > 3)
+
+static
+void _make_alivelink_str(const char *name, const char *src)
+{
+	char *dst = path_make("/mars/%s-%s", name, my_id());
+
+	if (!src || !dst) {
+		XIO_ERR("cannot make alivelink paths\n");
+		goto err;
+	}
+	XIO_DBG("'%s' -> '%s'\n", src, dst);
+	mars_symlink(src, dst, NULL, 0);
+err:
+	brick_string_free(dst);
+}
+
+static
+void _make_alivelink(const char *name, loff_t val)
+{
+	char *src = path_make("%lld", val);
+
+	_make_alivelink_str(name, src);
+	brick_string_free(src);
+}
+
+static
+int compute_emergency_mode(void)
+{
+	loff_t rest;
+	loff_t present;
+	loff_t limit = 0;
+	int mode = 4;
+	int this_mode = 0;
+
+	mars_remaining_space("/mars", &raw_total_space, &raw_remaining_space);
+	rest = raw_remaining_space;
+
+#define CHECK_LIMIT(LIMIT_VAR)						\
+do {									\
+	if (LIMIT_VAR > 0)						\
+		limit += (loff_t)LIMIT_VAR * 1024 * 1024;		\
+	if (rest < limit && !this_mode) {				\
+		this_mode = mode;					\
+	}								\
+	mode--;								\
+} while (0)
+
+	CHECK_LIMIT(global_free_space_4);
+	CHECK_LIMIT(global_free_space_3);
+	CHECK_LIMIT(global_free_space_2);
+	CHECK_LIMIT(global_free_space_1);
+
+	/* Decrease the emergeny mode only in single steps.
+	 */
+	if (mars_reset_emergency && mars_emergency_mode > 0 && mars_emergency_mode > this_mode)
+		mars_emergency_mode--;
+	else
+		mars_emergency_mode = this_mode;
+	_make_alivelink("emergency", mars_emergency_mode);
+
+	rest -= limit;
+	if (rest < 0)
+		rest = 0;
+	global_remaining_space = rest;
+	_make_alivelink("rest-space", rest / (1024 * 1024));
+
+	present = raw_total_space - limit;
+	global_total_space = present;
+
+	if (xio_throttle_start > 0 &&
+	    xio_throttle_end > xio_throttle_start &&
+	    present > 0) {
+		loff_t percent_used = 100 - (rest * 100 / present);
+
+		if (percent_used < xio_throttle_start)
+			if_throttle_start_size = 0;
+		else if (percent_used >= xio_throttle_end)
+			if_throttle_start_size = 1;
+		else
+			if_throttle_start_size = (
+			xio_throttle_end - percent_used) * 1024 / (xio_throttle_end - xio_throttle_start) + 1;
+	}
+
+	if (unlikely(present < global_free_space_0))
+		return -ENOSPC;
+	return 0;
+}
+
+/*****************************************************************/
+
+static struct task_struct *main_thread;
+
+typedef int (*main_worker_fn)(void *buf, struct mars_dent *dent);
+
+struct main_class {
+	char *cl_name;
+	int    cl_len;
+	char   cl_type;
+	bool   cl_hostcontext;
+	bool   cl_serial;
+	bool   cl_use_channel;
+	int    cl_father;
+
+	main_worker_fn cl_prepare;
+	main_worker_fn cl_forward;
+	main_worker_fn cl_backward;
+};
+
+/*  the order is important! */
+enum {
+	/*  root element: this must have index 0 */
+	CL_ROOT,
+	/*  global ID */
+	CL_UUID,
+	/*  global userspace */
+	CL_GLOBAL_USERSPACE,
+	CL_GLOBAL_USERSPACE_ITEMS,
+	/*  global todos */
+	CL_GLOBAL_TODO,
+	CL_GLOBAL_TODO_DELETE,
+	CL_GLOBAL_TODO_DELETED,
+	CL_DEFAULTS0,
+	CL_DEFAULTS,
+	CL_DEFAULTS_ITEMS0,
+	CL_DEFAULTS_ITEMS,
+	/*  replacement for DNS in kernelspace */
+	CL_IPS,
+	CL_PEERS,
+	CL_GBL_ACTUAL,
+	CL_GBL_ACTUAL_ITEMS,
+	CL_ALIVE,
+	CL_TIME,
+	CL_TREE,
+	CL_EMERGENCY,
+	CL_REST_SPACE,
+	/*  resource definitions */
+	CL_RESOURCE,
+	CL_RESOURCE_USERSPACE,
+	CL_RESOURCE_USERSPACE_ITEMS,
+	CL_RES_DEFAULTS0,
+	CL_RES_DEFAULTS,
+	CL_RES_DEFAULTS_ITEMS0,
+	CL_RES_DEFAULTS_ITEMS,
+	CL_TODO,
+	CL_TODO_ITEMS,
+	CL_ACTUAL,
+	CL_ACTUAL_ITEMS,
+	CL_DATA,
+	CL_SIZE,
+	CL_ACTSIZE,
+	CL_PRIMARY,
+	CL_CONNECT,
+	CL_TRANSFER,
+	CL_SYNC,
+	CL_VERIF,
+	CL_SYNCPOS,
+	CL_VERSION,
+	CL_LOG,
+	CL_REPLAYSTATUS,
+	CL_DEVICE,
+	CL_MAXNR,
+};
+
+/*********************************************************************/
+
+/*  needed for logfile rotation */
+
+#define MAX_INFOS			4
+
+struct mars_rotate {
+	struct list_head rot_head;
+	struct mars_global *global;
+	struct copy_brick *sync_brick;
+	struct mars_dent *replay_link;
+	struct xio_brick *bio_brick;
+	struct mars_dent *aio_dent;
+	struct xio_brick *aio_brick;
+	struct xio_info aio_info;
+	struct trans_logger_brick *trans_brick;
+	struct mars_dent *first_log;
+	struct mars_dent *relevant_log;
+	struct xio_brick *relevant_brick;
+	struct mars_dent *next_relevant_log;
+	struct xio_brick *next_relevant_brick;
+	struct mars_dent *prev_log;
+	struct mars_dent *next_log;
+	struct mars_dent *syncstatus_dent;
+	struct timespec sync_finish_stamp;
+	struct if_brick *if_brick;
+	struct client_brick *remote_brick;
+	const char *fetch_path;
+	const char *fetch_peer;
+	const char *preferred_peer;
+	const char *parent_path;
+	const char *parent_rest;
+	const char *fetch_next_origin;
+	struct say_channel *log_say;
+	struct copy_brick *fetch_brick;
+	struct rate_limiter replay_limiter;
+	struct rate_limiter sync_limiter;
+	struct rate_limiter fetch_limiter;
+	int inf_prev_sequence;
+	int inf_old_sequence;
+	long long flip_start;
+	loff_t dev_size;
+	loff_t start_pos;
+	loff_t end_pos;
+	int max_sequence;
+	int fetch_round;
+	int fetch_serial;
+	int fetch_next_serial;
+	int split_brain_serial;
+	int split_brain_round;
+	int fetch_next_is_available;
+	int relevant_serial;
+	int replay_code;
+	bool has_symlinks;
+	bool res_shutdown;
+	bool has_error;
+	bool has_double_logfile;
+	bool has_hole_logfile;
+	bool allow_update;
+	bool forbid_replay;
+	bool replay_mode;
+	bool todo_primary;
+	bool is_primary;
+	bool old_is_primary;
+	bool created_hole;
+	bool is_log_damaged;
+	bool has_emergency;
+	bool wants_sync;
+	bool gets_sync;
+	bool log_is_really_damaged;
+
+	/* protect the infs array against concurrent read/write */
+	spinlock_t inf_lock;
+	bool infs_is_dirty[MAX_INFOS];
+	struct trans_logger_info infs[MAX_INFOS];
+	struct key_value_pair msgs[sizeof(rot_keys) / sizeof(char *)];
+};
+
+static LIST_HEAD(rot_anchor);
+
+/*********************************************************************/
+
+/*  TUNING */
+
+int mars_mem_percent = 20;
+
+#define CONF_TRANS_SHADOW_LIMIT		(1024 * 128) /*  don't fill the hashtable too much */
+
+#define CONF_TRANS_BATCHLEN		64
+#define CONF_TRANS_PRIO			XIO_PRIO_HIGH
+#define CONF_TRANS_LOG_READS		false
+
+#define CONF_ALL_BATCHLEN		1
+#define CONF_ALL_PRIO			XIO_PRIO_NORMAL
+
+#define IF_SKIP_SYNC			true
+
+#define IF_MAX_PLUGGED			10000
+#define IF_READAHEAD			0
+
+#define BIO_READAHEAD			0
+#define BIO_NOIDLE			true
+#define BIO_SYNC			true
+#define BIO_UNPLUG			true
+
+#define COPY_APPEND_MODE		0
+#define COPY_PRIO			XIO_PRIO_LOW
+
+static
+int _set_trans_params(struct xio_brick *_brick, void *private)
+{
+	struct trans_logger_brick *trans_brick = (void *)_brick;
+
+	if (_brick->type != (void *)&trans_logger_brick_type) {
+		XIO_ERR("bad brick type\n");
+		return -EINVAL;
+	}
+	if (!trans_brick->q_phase[1].q_ordering) {
+		trans_brick->q_phase[0].q_batchlen = CONF_TRANS_BATCHLEN;
+		trans_brick->q_phase[1].q_batchlen = CONF_ALL_BATCHLEN;
+		trans_brick->q_phase[2].q_batchlen = CONF_ALL_BATCHLEN;
+		trans_brick->q_phase[3].q_batchlen = CONF_ALL_BATCHLEN;
+
+		trans_brick->q_phase[0].q_io_prio = CONF_TRANS_PRIO;
+		trans_brick->q_phase[1].q_io_prio = CONF_ALL_PRIO;
+		trans_brick->q_phase[2].q_io_prio = CONF_ALL_PRIO;
+		trans_brick->q_phase[3].q_io_prio = CONF_ALL_PRIO;
+
+		trans_brick->q_phase[1].q_ordering = true;
+		trans_brick->q_phase[3].q_ordering = true;
+
+		trans_brick->shadow_mem_limit = CONF_TRANS_SHADOW_LIMIT;
+		trans_brick->log_reads = CONF_TRANS_LOG_READS;
+	}
+	XIO_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
+	return 1;
+}
+
+struct client_cookie {
+	bool limit_mode;
+	bool create_mode;
+};
+
+static
+int _set_client_params(struct xio_brick *_brick, void *private)
+{
+	struct client_brick *client_brick = (void *)_brick;
+	struct client_cookie *clc = private;
+
+	client_brick->limit_mode = clc ? clc->limit_mode : false;
+	client_brick->killme = true;
+	XIO_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
+	return 1;
+}
+
+static
+int _set_sio_params(struct xio_brick *_brick, void *private)
+{
+	struct sio_brick *sio_brick = (void *)_brick;
+
+	if (_brick->type == (void *)&client_brick_type)
+		return _set_client_params(_brick, private);
+	if (_brick->type != (void *)&sio_brick_type) {
+		XIO_ERR("bad brick type\n");
+		return -EINVAL;
+	}
+	sio_brick->o_direct = false; /*  important! */
+	sio_brick->o_fdsync = true;
+	sio_brick->killme = true;
+	XIO_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
+	return 1;
+}
+
+static
+int _set_bio_params(struct xio_brick *_brick, void *private)
+{
+	struct bio_brick *bio_brick;
+
+	if (_brick->type == (void *)&client_brick_type)
+		return _set_client_params(_brick, private);
+	if (_brick->type == (void *)&sio_brick_type)
+		return _set_sio_params(_brick, private);
+	if (_brick->type != (void *)&bio_brick_type) {
+		XIO_ERR("bad brick type\n");
+		return -EINVAL;
+	}
+	bio_brick = (void *)_brick;
+	bio_brick->ra_pages = BIO_READAHEAD;
+	bio_brick->do_noidle = BIO_NOIDLE;
+	bio_brick->do_sync = BIO_SYNC;
+	bio_brick->do_unplug = BIO_UNPLUG;
+	bio_brick->killme = true;
+	XIO_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
+	return 1;
+}
+
+static
+int _set_if_params(struct xio_brick *_brick, void *private)
+{
+	struct if_brick *if_brick = (void *)_brick;
+	struct mars_rotate *rot = private;
+
+	if (_brick->type != (void *)&if_brick_type) {
+		XIO_ERR("bad brick type\n");
+		return -EINVAL;
+	}
+	if (likely(rot))
+		if_brick->max_size = rot->dev_size;
+	if_brick->max_plugged = IF_MAX_PLUGGED;
+	if_brick->readahead = IF_READAHEAD;
+	if_brick->skip_sync = IF_SKIP_SYNC;
+	XIO_INF("name = '%s' path = '%s' size = %lld\n", _brick->brick_name, _brick->brick_path, if_brick->dev_size);
+	return 1;
+}
+
+struct copy_cookie {
+	const char *argv[2];
+	const char *copy_path;
+	loff_t start_pos;
+	loff_t end_pos;
+	bool keep_running;
+	bool verify_mode;
+
+	const char *fullpath[2];
+	struct xio_output *output[2];
+	struct xio_info info[2];
+};
+
+static
+int _set_copy_params(struct xio_brick *_brick, void *private)
+{
+	struct copy_brick *copy_brick = (void *)_brick;
+	struct copy_cookie *cc = private;
+	int status = 1;
+
+	if (_brick->type != (void *)&copy_brick_type) {
+		XIO_ERR("bad brick type\n");
+		status = -EINVAL;
+		goto done;
+	}
+	copy_brick->append_mode = COPY_APPEND_MODE;
+	copy_brick->io_prio = COPY_PRIO;
+	copy_brick->verify_mode = cc->verify_mode;
+	copy_brick->repair_mode = true;
+	copy_brick->killme = true;
+	XIO_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
+
+	/* Determine the copy area, switch on/off when necessary
+	 */
+	if (!copy_brick->power.button && copy_brick->power.off_led) {
+		int i;
+
+		copy_brick->copy_last = 0;
+		for (i = 0; i < 2; i++) {
+			status = cc->output[i]->ops->xio_get_info(cc->output[i], &cc->info[i]);
+			if (status < 0) {
+				XIO_WRN("cannot determine current size of '%s'\n", cc->argv[i]);
+				goto done;
+			}
+			XIO_DBG("%d '%s' current_size = %lld\n", i, cc->fullpath[i], cc->info[i].current_size);
+		}
+		copy_brick->copy_start = cc->info[1].current_size;
+		if (cc->start_pos != -1) {
+			copy_brick->copy_start = cc->start_pos;
+			if (unlikely(cc->start_pos > cc->info[0].current_size)) {
+				XIO_ERR(
+				"bad start position %lld is larger than actual size %lld on '%s'\n",
+				cc->start_pos,
+				cc->info[0].current_size,
+				cc->copy_path);
+				status = -EINVAL;
+				goto done;
+			}
+		}
+		XIO_DBG("copy_start = %lld\n", copy_brick->copy_start);
+		copy_brick->copy_end = cc->info[0].current_size;
+		if (cc->end_pos != -1) {
+			if (unlikely(cc->end_pos > copy_brick->copy_end)) {
+				XIO_ERR(
+				"target size %lld is larger than actual size %lld on source\n",
+				cc->end_pos,
+				copy_brick->copy_end);
+				status = -EINVAL;
+				goto done;
+			}
+			copy_brick->copy_end = cc->end_pos;
+			if (unlikely(cc->end_pos > cc->info[1].current_size)) {
+				XIO_ERR(
+				"bad end position %lld is larger than actual size %lld on target\n",
+				cc->end_pos,
+				cc->info[1].current_size);
+				status = -EINVAL;
+				goto done;
+			}
+		}
+		XIO_DBG("copy_end = %lld\n", copy_brick->copy_end);
+		if (copy_brick->copy_start < copy_brick->copy_end) {
+			status = 1;
+			XIO_DBG("copy switch on\n");
+		}
+	} else if (copy_brick->power.button && copy_brick->power.on_led &&
+		   !cc->keep_running &&
+		   copy_brick->copy_last == copy_brick->copy_end && copy_brick->copy_end > 0) {
+		status = 0;
+		XIO_DBG("copy switch off\n");
+	}
+
+done:
+	return status;
+}
+
+/*********************************************************************/
+
+/*  internal helpers */
+
+#define MARS_DELIM			','
+
+static int _parse_args(struct mars_dent *dent, char *str, int count)
+{
+	int i;
+	int status = -EINVAL;
+
+	if (!str)
+		goto done;
+	if (!dent->d_args)
+		dent->d_args = brick_strdup(str);
+	for (i = 0; i < count; i++) {
+		char *tmp;
+		int len;
+
+		if (!*str)
+			goto done;
+		if (i == count - 1) {
+			len = strlen(str);
+		} else {
+			char *tmp = strchr(str, MARS_DELIM);
+
+			if (!tmp)
+				goto done;
+			len = (tmp - str);
+		}
+		brick_string_free(dent->d_argv[i]);
+		tmp = brick_string_alloc(len + 1);
+		dent->d_argv[i] = tmp;
+		strncpy(dent->d_argv[i], str, len);
+		dent->d_argv[i][len] = '\0';
+
+		str += len;
+		if (i != count - 1)
+			str++;
+	}
+	status = 0;
+done:
+	if (status < 0) {
+		XIO_ERR(
+		"bad syntax '%s' (should have %d args), status = %d\n",
+		dent->d_args ? dent->d_args : "",
+		count,
+		status);
+	}
+	return status;
+}
+
+static
+int _check_switch(struct mars_global *global, const char *path)
+{
+	int status;
+	int res = 0;
+	struct mars_dent *allow_dent;
+
+	/* Upon shutdown, treat all switches as "off"
+	 */
+	if (!global->global_power.button)
+		goto done;
+
+	allow_dent = mars_find_dent(global, path);
+	if (!allow_dent || !allow_dent->link_val)
+		goto done;
+	status = kstrtoint(allow_dent->link_val, 10, &res);
+	(void)status; /*  treat errors as if the switch were set to 0 */
+	XIO_DBG("'%s' -> %d\n", path, res);
+
+done:
+	return res;
+}
+
+static
+int __check_allow(struct mars_global *global, struct mars_dent *parent, const char *name, const char *peer)
+{
+	int res = 0;
+	char *path = path_make("%s/todo-%s/%s", parent->d_path, peer, name);
+
+	if (!path)
+		goto done;
+
+	res = _check_switch(global, path);
+
+done:
+	brick_string_free(path);
+	return res;
+}
+
+static inline
+int _check_allow(struct mars_global *global, struct mars_dent *parent, const char *name)
+{
+	return __check_allow(global, parent, name, my_id());
+}
+
+#define skip_part(s) _skip_part(s, ',', ':')
+#define skip_sect(s) _skip_part(s, ':', 0)
+static inline
+int _skip_part(const char *str, const char del1, const char del2)
+{
+	int len = 0;
+
+	while (str[len] && str[len] != del1 && (!del2 || str[len] != del2))
+		len++;
+	return len;
+}
+
+static inline
+int skip_dir(const char *str)
+{
+	int len = 0;
+	int res = 0;
+
+	for (len = 0; str[len]; len++)
+		if (str[len] == '/')
+			res = len + 1;
+	return res;
+}
+
+static
+int parse_logfile_name(const char *str, int *seq, const char **host)
+{
+	char *_host;
+	int count;
+	int len = 0;
+	int len_host;
+
+	*seq = 0;
+	*host = NULL;
+
+	count = sscanf(str, "log-%d-%n", seq, &len);
+	if (unlikely(count != 1)) {
+		XIO_ERR("bad logfile name '%s', count=%d, len=%d\n", str, count, len);
+		return 0;
+	}
+
+	_host = brick_strdup(str + len);
+
+	len_host = skip_part(_host);
+	_host[len_host] = '\0';
+	*host = _host;
+	len += len_host;
+
+	return len;
+}
+
+static
+int compare_replaylinks(struct mars_rotate *rot, const char *hosta, const char *hostb)
+{
+	const char *linka = path_make("%s/replay-%s", rot->parent_path, hosta);
+	const char *linkb = path_make("%s/replay-%s", rot->parent_path, hostb);
+	const char *a = NULL;
+	const char *b = NULL;
+	int seqa;
+	int seqb;
+	int posa;
+	int posb;
+	loff_t offa = 0;
+	loff_t offb = -1;
+	loff_t taila = 0;
+	loff_t tailb = -1;
+	int count;
+	int res = -2;
+
+	if (unlikely(!linka || !linkb)) {
+		XIO_ERR("nen MEM");
+		goto done;
+	}
+
+	a = mars_readlink(linka);
+	if (unlikely(!a || !a[0])) {
+		XIO_ERR_TO(rot->log_say, "cannot read replaylink '%s'\n", linka);
+		goto done;
+	}
+	b = mars_readlink(linkb);
+	if (unlikely(!b || !b[0])) {
+		XIO_ERR_TO(rot->log_say, "cannot read replaylink '%s'\n", linkb);
+		goto done;
+	}
+
+	count = sscanf(a, "log-%d-%n", &seqa, &posa);
+	if (unlikely(count != 1))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linka, a);
+	count = sscanf(b, "log-%d-%n", &seqb, &posb);
+	if (unlikely(count != 1))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linkb, b);
+
+	if (seqa < seqb) {
+		res = -1;
+		goto done;
+	} else if (seqa > seqb) {
+		res = 1;
+		goto done;
+	}
+
+	posa += skip_part(a + posa);
+	posb += skip_part(b + posb);
+	if (unlikely(!a[posa++]))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linka, a);
+	if (unlikely(!b[posb++]))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linkb, b);
+
+	count = sscanf(a + posa, "%lld,%lld", &offa, &taila);
+	if (unlikely(count != 2))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linka, a);
+	count = sscanf(b + posb, "%lld,%lld", &offb, &tailb);
+	if (unlikely(count != 2))
+		XIO_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linkb, b);
+
+	if (offa < offb)
+		res = -1;
+	else if (offa > offb)
+		res = 1;
+	else
+		res = 0;
+
+done:
+	brick_string_free(a);
+	brick_string_free(b);
+	brick_string_free(linka);
+	brick_string_free(linkb);
+	return res;
+}
+
+/*********************************************************************/
+
+/*  status display */
+
+static
+int _update_link_when_necessary(struct mars_rotate *rot, const char *type, const char *old, const char *new)
+{
+	char *check = NULL;
+	int status = -EINVAL;
+	bool res = false;
+
+	if (unlikely(!old || !new))
+		goto out;
+
+	/* Check whether something really has changed (avoid
+	 * useless/disturbing timestamp updates)
+	 */
+	check = mars_readlink(new);
+	if (check && !strcmp(check, old)) {
+		XIO_DBG("%s symlink '%s' -> '%s' has not changed\n", type, old, new);
+		res = 0;
+		goto out;
+	}
+
+	status = mars_symlink(old, new, NULL, 0);
+	if (unlikely(status < 0)) {
+		XIO_ERR_TO(
+		rot->log_say, "cannot create %s symlink '%s' -> '%s' status = %d\n", type, old, new, status);
+	} else {
+		res = 1;
+		XIO_DBG("made %s symlink '%s' -> '%s' status = %d\n", type, old, new, status);
+	}
+
+out:
+	brick_string_free(check);
+	return res;
+}
+
+static
+int _update_replay_link(struct mars_rotate *rot, struct trans_logger_info *inf)
+{
+	char *old = NULL;
+	char *new = NULL;
+	int res = 0;
+
+	old = path_make(
+	"log-%09d-%s,%lld,%lld",
+	inf->inf_sequence,
+	inf->inf_host,
+	inf->inf_min_pos,
+	inf->inf_max_pos - inf->inf_min_pos);
+	if (!old)
+		goto out;
+	new = path_make("%s/replay-%s", rot->parent_path, my_id());
+	if (!new)
+		goto out;
+
+	_crashme(1, true);
+
+	res = _update_link_when_necessary(rot, "replay", old, new);
+
+out:
+	brick_string_free(new);
+	brick_string_free(old);
+	return res;
+}
+
+static
+int _update_version_link(struct mars_rotate *rot, struct trans_logger_info *inf)
+{
+	char *data = brick_string_alloc(0);
+	char *old = brick_string_alloc(0);
+	char *new = NULL;
+	unsigned char *digest = brick_string_alloc(0);
+	char *prev = NULL;
+	char *prev_link = NULL;
+	char *prev_digest = NULL;
+	int len;
+	int i;
+	int res = 0;
+
+	if (likely(inf->inf_sequence > 1)) {
+		if (unlikely((inf->inf_sequence < rot->inf_prev_sequence ||
+			      inf->inf_sequence > rot->inf_prev_sequence + 1) &&
+			     rot->inf_prev_sequence != 0)) {
+			char *skip_path = path_make("%s/skip-check-%s", rot->parent_path, my_id());
+			char *skip_link = mars_readlink(skip_path);
+			char *msg = "";
+			int skip_nr = -1;
+			int nr_char = 0;
+
+			if (likely(skip_link && skip_link[0])) {
+				int status = sscanf(skip_link, "%d%n", &skip_nr, &nr_char);
+
+				(void)status; /* keep msg empty in case of errors */
+				msg = skip_link + nr_char;
+			}
+			brick_string_free(skip_path);
+			if (likely(skip_nr != inf->inf_sequence)) {
+				XIO_ERR_TO(
+				rot->log_say,
+				"SKIP in sequence numbers detected: %d != %d + 1\n",
+				inf->inf_sequence,
+				rot->inf_prev_sequence);
+				make_rot_msg(
+				rot,
+				"err-versionlink-skip",
+				"SKIP in sequence numbers detected: %d != %d + 1",
+				inf->inf_sequence,
+				rot->inf_prev_sequence);
+				brick_string_free(skip_link);
+				goto out;
+			}
+			XIO_WRN_TO(
+			rot->log_say,
+				    "you explicitly requested to SKIP sequence numbers from %d to %d%s\n",
+				    rot->inf_prev_sequence, inf->inf_sequence, msg);
+			brick_string_free(skip_link);
+		}
+		prev = path_make("%s/version-%09d-%s", rot->parent_path, inf->inf_sequence - 1, my_id());
+		if (unlikely(!prev)) {
+			XIO_ERR("no MEM\n");
+			goto out;
+		}
+		prev_link = mars_readlink(prev);
+		rot->inf_prev_sequence = inf->inf_sequence;
+	}
+
+	len = sprintf(
+	data, "%d,%s,%lld:%s", inf->inf_sequence, inf->inf_host, inf->inf_log_pos, prev_link ? prev_link : "");
+
+	XIO_DBG("data = '%s' len = %d\n", data, len);
+
+	xio_digest(digest, data, len);
+
+	len = 0;
+	for (i = 0; i < xio_digest_size; i++)
+		len += sprintf(old + len, "%02x", digest[i]);
+
+	if (likely(prev_link && prev_link[0])) {
+		char *tmp;
+
+		prev_digest = brick_strdup(prev_link);
+		/*  take the part before ':' */
+		for (tmp = prev_digest; *tmp; tmp++)
+			if (*tmp == ':')
+				break;
+		*tmp = '\0';
+	}
+
+	len += sprintf(
+	old + len,
+	",log-%09d-%s,%lld:%s",
+	inf->inf_sequence,
+	inf->inf_host,
+	inf->inf_log_pos,
+	prev_digest ? prev_digest : "");
+
+	new = path_make("%s/version-%09d-%s", rot->parent_path, inf->inf_sequence, my_id());
+	if (!new) {
+		XIO_ERR("no MEM\n");
+		goto out;
+	}
+
+	_crashme(2, true);
+
+	res = _update_link_when_necessary(rot, "version", old, new);
+
+out:
+	brick_string_free(new);
+	brick_string_free(prev);
+	brick_string_free(data);
+	brick_string_free(digest);
+	brick_string_free(old);
+	brick_string_free(prev_link);
+	brick_string_free(prev_digest);
+	return res;
+}
+
+static
+void _update_info(struct trans_logger_info *inf)
+{
+	struct mars_rotate *rot = inf->inf_private;
+	int hash;
+	unsigned long flags;
+
+	if (unlikely(!rot)) {
+		XIO_ERR("rot is NULL\n");
+		goto done;
+	}
+
+	XIO_DBG(
+	"inf = %p '%s' seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_replaying = %d is_logging = %d\n",
+		 inf,
+		 inf->inf_host,
+		 inf->inf_sequence,
+		 inf->inf_min_pos,
+		 inf->inf_max_pos,
+		 inf->inf_log_pos,
+		 inf->inf_is_replaying,
+		 inf->inf_is_logging);
+
+	hash = inf->inf_sequence % MAX_INFOS;
+	if (unlikely(rot->infs_is_dirty[hash])) {
+		if (unlikely(rot->infs[hash].inf_sequence != inf->inf_sequence)) {
+			XIO_ERR_TO(
+			rot->log_say,
+			"buffer %d: sequence trash %d -> %d. is the mars_main thread hanging?\n",
+			hash,
+			rot->infs[hash].inf_sequence,
+			inf->inf_sequence);
+			make_rot_msg(
+			rot,
+			"err-sequence-trash",
+			"buffer %d: sequence trash %d -> %d",
+			hash,
+			rot->infs[hash].inf_sequence,
+			inf->inf_sequence);
+		} else {
+			XIO_DBG("buffer %d is overwritten (sequence=%d)\n", hash, inf->inf_sequence);
+		}
+	}
+
+	spin_lock_irqsave(&rot->inf_lock, flags);
+	memcpy(&rot->infs[hash], inf, sizeof(struct trans_logger_info));
+	rot->infs_is_dirty[hash] = true;
+	spin_unlock_irqrestore(&rot->inf_lock, flags);
+
+	local_trigger();
+done:;
+}
+
+static
+void write_info_links(struct mars_rotate *rot)
+{
+	struct trans_logger_info inf;
+	int count = 0;
+
+	for (;;) {
+		unsigned long flags;
+		int hash = -1;
+		int min = 0;
+		int i;
+
+		spin_lock_irqsave(&rot->inf_lock, flags);
+		for (i = 0; i < MAX_INFOS; i++) {
+			if (!rot->infs_is_dirty[i])
+				continue;
+			if (!min || min > rot->infs[i].inf_sequence) {
+				min = rot->infs[i].inf_sequence;
+				hash = i;
+			}
+		}
+
+		if (hash < 0) {
+			spin_unlock_irqrestore(&rot->inf_lock, flags);
+			break;
+		}
+
+		rot->infs_is_dirty[hash] = false;
+		memcpy(&inf, &rot->infs[hash], sizeof(struct trans_logger_info));
+		spin_unlock_irqrestore(&rot->inf_lock, flags);
+
+		XIO_DBG(
+		"seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_replaying = %d is_logging = %d\n",
+			 inf.inf_sequence,
+			 inf.inf_min_pos,
+			 inf.inf_max_pos,
+			 inf.inf_log_pos,
+			 inf.inf_is_replaying,
+			 inf.inf_is_logging);
+
+		if (inf.inf_is_logging || inf.inf_is_replaying) {
+			count += _update_replay_link(rot, &inf);
+			count += _update_version_link(rot, &inf);
+			if (min > rot->inf_old_sequence) {
+				mars_sync();
+				rot->inf_old_sequence = min;
+			}
+		}
+	}
+	if (count) {
+		if (inf.inf_min_pos == inf.inf_max_pos)
+			local_trigger();
+		remote_trigger();
+	}
+}
+
+static
+void _make_new_replaylink(struct mars_rotate *rot, char *new_host, int new_sequence, loff_t end_pos)
+{
+	struct trans_logger_info inf = {
+		.inf_private = rot,
+		.inf_sequence = new_sequence,
+		.inf_min_pos = 0,
+		.inf_max_pos = 0,
+		.inf_log_pos = end_pos,
+		.inf_is_replaying = true,
+	};
+	strncpy(inf.inf_host, new_host, sizeof(inf.inf_host));
+
+	XIO_DBG("new_host = '%s' new_sequence = %d end_pos = %lld\n", new_host, new_sequence, end_pos);
+
+	_update_replay_link(rot, &inf);
+	_update_version_link(rot, &inf);
+
+	local_trigger();
+	remote_trigger();
+}
+
+static
+int __show_actual(const char *path, const char *name, int val)
+{
+	char *src;
+	char *dst = NULL;
+	int status = -EINVAL;
+
+	src = path_make("%d", val);
+	dst = path_make("%s/actual-%s/%s", path, my_id(), name);
+	status = -ENOMEM;
+	if (!dst)
+		goto done;
+
+	XIO_DBG("symlink '%s' -> '%s'\n", dst, src);
+	status = mars_symlink(src, dst, NULL, 0);
+
+done:
+	brick_string_free(src);
+	brick_string_free(dst);
+	return status;
+}
+
+static inline
+int _show_actual(const char *path, const char *name, bool val)
+{
+	return __show_actual(path, name, val ? 1 : 0);
+}
+
+static
+void _show_primary(struct mars_rotate *rot, struct mars_dent *parent)
+{
+	int status;
+
+	if (!rot || !parent)
+		goto out_return;
+	status = _show_actual(parent->d_path, "is-primary", rot->is_primary);
+	if (rot->is_primary != rot->old_is_primary) {
+		rot->old_is_primary = rot->is_primary;
+		remote_trigger();
+	}
+out_return:;
+}
+
+static
+void _show_brick_status(struct xio_brick *test, bool shutdown)
+{
+	const char *path;
+	char *src;
+	char *dst;
+	int status;
+
+	path = test->brick_path;
+	if (!path) {
+		XIO_WRN("bad path\n");
+		goto out_return;
+	}
+	if (*path != '/') {
+		XIO_WRN("bogus path '%s'\n", path);
+		goto out_return;
+	}
+
+	src = (test->power.on_led && !shutdown) ? "1" : "0";
+	dst = backskip_replace(path, '/', true, "/actual-%s/", my_id());
+	if (!dst)
+		goto out_return;
+
+	status = mars_symlink(src, dst, NULL, 0);
+	XIO_DBG("status symlink '%s' -> '%s' status = %d\n", dst, src, status);
+	brick_string_free(dst);
+out_return:;
+}
+
+static
+void _show_status_all(struct mars_global *global)
+{
+	struct list_head *tmp;
+
+	down_read(&global->brick_mutex);
+	for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) {
+		struct xio_brick *test;
+
+		test = container_of(tmp, struct xio_brick, global_brick_link);
+		if (!test->show_status)
+			continue;
+		_show_brick_status(test, false);
+	}
+	up_read(&global->brick_mutex);
+}
+
+static
+void _show_rate(struct mars_rotate *rot, struct rate_limiter *limiter, const char *basename)
+{
+	char *name;
+
+	rate_limit(limiter, 0);
+
+	name = path_make("ops-%s", basename);
+	__show_actual(rot->parent_path, name, limiter->lim_ops_rate);
+	brick_string_free(name);
+
+	name = path_make("amount-%s", basename);
+	__show_actual(rot->parent_path, name, limiter->lim_amount_rate);
+	brick_string_free(name);
+}
+
+/*********************************************************************/
+
+static
+int __make_copy(
+		struct mars_global *global,
+		struct mars_dent *belongs,
+		const char *switch_path,
+		const char *copy_path,
+		const char *parent,
+		const char *argv[],
+		struct key_value_pair *msg_pair,
+		loff_t start_pos, /*  -1 means at EOF of source */
+		loff_t end_pos,   /*  -1 means at EOF of target */
+		bool keep_running,
+		bool verify_mode,
+		bool limit_mode,
+		bool space_using_mode,
+		struct copy_brick **__copy)
+{
+	struct xio_brick *copy;
+	struct copy_cookie cc = {};
+
+	struct client_cookie clc[2] = {
+		{
+			.limit_mode = limit_mode,
+		},
+		{
+			.limit_mode = limit_mode,
+			.create_mode = true,
+		},
+	};
+	int i;
+	bool switch_copy;
+	int status = -EINVAL;
+
+	if (!switch_path || !global)
+		goto done;
+
+	/*  don't generate empty aio files if copy does not yet exist */
+	switch_copy = _check_switch(global, switch_path);
+	copy = mars_find_brick(global, &copy_brick_type, copy_path);
+	if (!copy && !switch_copy)
+		goto done;
+
+	/*  create/find predecessor aio bricks */
+	for (i = 0; i < 2; i++) {
+		struct xio_brick *aio;
+
+		cc.argv[i] = argv[i];
+		if (parent) {
+			cc.fullpath[i] = path_make("%s/%s", parent, argv[i]);
+			if (!cc.fullpath[i]) {
+				XIO_ERR("cannot make path '%s/%s'\n", parent, argv[i]);
+				goto done;
+			}
+		} else {
+			cc.fullpath[i] = argv[i];
+		}
+
+		aio =
+			make_brick_all(
+			global,
+				       NULL,
+				       _set_bio_params,
+				       &clc[i],
+				       NULL,
+				       (const struct generic_brick_type *)&bio_brick_type,
+				       (const struct generic_brick_type*[]){},
+				       switch_copy || (copy && !copy->power.off_led) ? 2 : -1,
+				       cc.fullpath[i],
+				       (const char *[]){},
+				       0);
+		if (!aio) {
+			XIO_DBG("cannot instantiate '%s'\n", cc.fullpath[i]);
+			make_msg(msg_pair, "cannot instantiate '%s'", cc.fullpath[i]);
+			goto done;
+		}
+		cc.output[i] = aio->outputs[0];
+		/* When switching off, use a short timeout for aborting.
+		 * Important on very slow networks (since a large number
+		 * of requests may be pending).
+		 */
+		aio->power.io_timeout = switch_copy ? 0 : 1;
+	}
+
+	cc.copy_path = copy_path;
+	cc.start_pos = start_pos;
+	cc.end_pos = end_pos;
+	cc.keep_running = keep_running;
+	cc.verify_mode = verify_mode;
+
+	copy =
+		make_brick_all(
+		global,
+			       belongs,
+			       _set_copy_params,
+			       &cc,
+			       cc.fullpath[1],
+			       (const struct generic_brick_type *)&copy_brick_type,
+			       (const struct generic_brick_type*[]){NULL, NULL, NULL, NULL},
+			       (!switch_copy || (IS_EMERGENCY_PRIMARY() && !space_using_mode)) ? -1 : 2,
+			       "%s",
+			       (const char *[]){"%s", "%s", "%s", "%s"},
+			       4,
+			       copy_path,
+			       cc.fullpath[0],
+			       cc.fullpath[0],
+			       cc.fullpath[1],
+			       cc.fullpath[1]);
+	if (copy) {
+		struct copy_brick *_copy = (void *)copy;
+
+		copy->show_status = _show_brick_status;
+		make_msg(
+		msg_pair,
+			 "from = '%s' to = '%s' on = %d start_pos = %lld end_pos = %lld actual_pos = %lld actual_stamp = %ld.%09ld ops_rate = %d amount_rate = %d read_fly = %d write_fly = %d error_code = %d nr_errors = %d",
+			 argv[0],
+			 argv[1],
+			 _copy->power.on_led,
+			 _copy->copy_start,
+			 _copy->copy_end,
+			 _copy->copy_last,
+			 _copy->copy_last_stamp.tv_sec, _copy->copy_last_stamp.tv_nsec,
+			 _copy->copy_limiter ? _copy->copy_limiter->lim_ops_rate : 0,
+			 _copy->copy_limiter ? _copy->copy_limiter->lim_amount_rate : 0,
+			 atomic_read(&_copy->copy_read_flight),
+			 atomic_read(&_copy->copy_write_flight),
+			 _copy->copy_error,
+			 _copy->copy_error_count);
+	}
+	if (__copy)
+		*__copy = (void *)copy;
+
+	status = 0;
+
+done:
+	XIO_DBG("status = %d\n", status);
+	for (i = 0; i < 2; i++) {
+		if (cc.fullpath[i] && cc.fullpath[i] != argv[i])
+			brick_string_free(cc.fullpath[i]);
+	}
+	return status;
+}
+
+/*********************************************************************/
+
+/*  remote workers */
+
+static
+rwlock_t peer_lock = __RW_LOCK_UNLOCKED(&peer_lock);
+
+static
+struct list_head peer_anchor = LIST_HEAD_INIT(peer_anchor);
+
+struct mars_peerinfo {
+	struct mars_global *global;
+	char *peer;
+	char *path;
+	struct xio_socket socket;
+	struct task_struct *peer_thread;
+
+	/* protect the following lists against concurrent read/write */
+	spinlock_t lock;
+	struct list_head peer_head;
+	struct list_head remote_dent_list;
+	unsigned long last_remote_jiffies;
+	int maxdepth;
+	bool to_remote_trigger;
+	bool from_remote_trigger;
+};
+
+static
+struct mars_peerinfo *find_peer(const char *peer_name)
+{
+	struct list_head *tmp;
+	struct mars_peerinfo *res = NULL;
+	unsigned long flags;
+
+	read_lock_irqsave(&peer_lock, flags);
+	for (tmp = peer_anchor.next; tmp != &peer_anchor; tmp = tmp->next) {
+		struct mars_peerinfo *peer = container_of(tmp, struct mars_peerinfo, peer_head);
+
+		if (!strcmp(peer->peer, peer_name)) {
+			res = peer;
+			break;
+		}
+	}
+	read_unlock_irqrestore(&peer_lock, flags);
+
+	return res;
+}
+
+static
+bool _is_usable_dir(const char *name)
+{
+	if (!strncmp(name, "resource-", 9) ||
+	    !strncmp(name, "todo-", 5) ||
+	    !strncmp(name, "actual-", 7) ||
+	    !strncmp(name, "defaults", 8)) {
+		return true;
+	}
+	return false;
+}
+
+static
+bool _is_peer_logfile(const char *name, const char *id)
+{
+	int len = strlen(name);
+	int idlen = id ? strlen(id) : 4 + 9 + 1;
+
+	if (len <= idlen ||
+	    strncmp(name, "log-", 4) != 0) {
+		XIO_DBG("not a logfile at all: '%s'\n", name);
+		return false;
+	}
+	if (id &&
+	    name[len - idlen - 1] == '-' &&
+	    strncmp(name + len - idlen, id, idlen) == 0) {
+		XIO_DBG("not a peer logfile: '%s'\n", name);
+		return false;
+	}
+	XIO_DBG("found peer logfile: '%s'\n", name);
+	return true;
+}
+
+static
+int _update_file(
+struct mars_dent *parent,
+const char *switch_path,
+const char *copy_path,
+const char *file,
+const char *peer,
+loff_t end_pos)
+{
+	struct mars_rotate *rot = parent->d_private;
+	struct mars_global *global = rot->global;
+	const char *tmp = path_make("%s@%s:%d", file, peer, xio_net_default_port + 1);
+	const char *argv[2] = { tmp, file };
+	struct copy_brick *copy = NULL;
+	struct key_value_pair *msg_pair = find_key(rot->msgs, "inf-fetch");
+	bool do_start = true;
+	int status = -ENOMEM;
+
+	if (unlikely(!tmp || !global))
+		goto done;
+
+	rot->fetch_round = 0;
+
+	if (rot->todo_primary | rot->is_primary) {
+		XIO_DBG("disallowing fetch, todo_primary=%d is_primary=%d\n", rot->todo_primary, rot->is_primary);
+		make_msg(
+		msg_pair, "disallowing fetch (todo_primary=%d is_primary=%d)", rot->todo_primary, rot->is_primary);
+		do_start = false;
+	}
+	if (do_start && !strcmp(peer, "(none)")) {
+		XIO_DBG("disabling fetch from unspecified peer / no primary designated\n");
+		make_msg(msg_pair, "disabling fetch from unspecified peer / no primary designated");
+		do_start = false;
+	}
+	if (do_start && !global->global_power.button) {
+		XIO_DBG("disabling fetch due to rmmod\n");
+		make_msg(msg_pair, "disabling fetch due to rmmod");
+		do_start = false;
+	}
+#if 0
+	/* Disabled for now. Re-enable this code after a new feature has been
+	 * implemented: when pause-replay is given on a secondary,
+	 * /dev/mars/mydata should appear in _readonly_ form.
+	 * You may then draw a backup from the readonly device without losing
+	 * redundancy, because the transactions logs will continue updating.
+	 * Until the new feature is implemented, use
+	 * "marsadm pause-replay $res; marsadm detach $res; mount -o ro /dev/lv/$res"
+	 * as a workaround. It is important that "fetch" remains enabled.
+	 *
+	 * Hint: "marsadm down" disables _all_ switches, including fetch,
+	 * thus it can / must be used for pausing everything.
+	 */
+	if (do_start && !_check_allow(global, parent, "attach")) {
+		XIO_DBG("disabling fetch due to detach\n");
+		make_msg(msg_pair, "disabling fetch due to detach");
+		do_start = false;
+	}
+#endif
+	if (do_start && !_check_allow(global, parent, "connect")) {
+		XIO_DBG("disabling fetch due to disconnect\n");
+		make_msg(msg_pair, "disabling fetch due to disconnect");
+		do_start = false;
+	}
+
+	XIO_DBG("src = '%s' dst = '%s'\n", tmp, file);
+	status = __make_copy(
+	global,
+	NULL,
+	do_start ? switch_path : "",
+	copy_path,
+	NULL,
+	argv,
+	msg_pair,
+	-1,
+	-1,
+	false,
+	false,
+	false,
+	true,
+	&copy);
+	if (status >= 0 && copy) {
+		copy->copy_limiter = &rot->fetch_limiter;
+		/*  FIXME: code is dead */
+		if (copy->append_mode && copy->power.on_led &&
+		    end_pos > copy->copy_end) {
+			XIO_DBG("appending to '%s' %lld => %lld\n", copy_path, copy->copy_end, end_pos);
+			/*  FIXME: use corrected length from xio_get_info() / see _set_copy_params() */
+			copy->copy_end = end_pos;
+		}
+	}
+
+done:
+	brick_string_free(tmp);
+	return status;
+}
+
+static
+int check_logfile(
+const char *peer,
+struct mars_dent *remote_dent,
+struct mars_dent *local_dent,
+struct mars_dent *parent,
+loff_t dst_size)
+{
+	loff_t src_size = remote_dent->stat_val.size;
+	struct mars_rotate *rot;
+	const char *switch_path = NULL;
+	struct copy_brick *fetch_brick;
+	int status = 0;
+
+	/*  correct the remote size when necessary */
+	if (remote_dent->d_corr_B > 0 && remote_dent->d_corr_B < src_size) {
+		XIO_DBG(
+		"logfile '%s' correcting src_size from %lld to %lld\n",
+		remote_dent->d_path,
+		src_size,
+		remote_dent->d_corr_B);
+		src_size = remote_dent->d_corr_B;
+	}
+
+	/*  plausibility checks */
+	if (unlikely(dst_size > src_size)) {
+		XIO_WRN("my local copy is larger than the remote one, ignoring\n");
+		status = -EINVAL;
+		goto done;
+	}
+
+	/*  check whether we are participating in that resource */
+	rot = parent->d_private;
+	if (!rot) {
+		XIO_WRN("parent has no rot info\n");
+		status = -EINVAL;
+		goto done;
+	}
+	if (!rot->fetch_path) {
+		XIO_WRN("parent has no fetch_path\n");
+		status = -EINVAL;
+		goto done;
+	}
+
+	/*  bookkeeping for serialization of logfile updates */
+	if (remote_dent->d_serial > rot->fetch_serial) {
+		rot->fetch_next_is_available++;
+		if (!rot->fetch_next_serial || !rot->fetch_next_origin) {
+			rot->fetch_next_serial = remote_dent->d_serial;
+			rot->fetch_next_origin = brick_strdup(remote_dent->d_rest);
+		} else if (
+		rot->fetch_next_serial == remote_dent->d_serial && strcmp(
+		rot->fetch_next_origin, remote_dent->d_rest)) {
+			rot->split_brain_round = 0;
+			rot->split_brain_serial = remote_dent->d_serial;
+			XIO_WRN(
+			"SPLIT BRAIN logfiles from '%s' and '%s' with same serial number %d detected!\n",
+				 rot->fetch_next_origin, remote_dent->d_rest, rot->split_brain_serial);
+		}
+	}
+
+	/*  check whether connection is allowed */
+	switch_path = path_make("%s/todo-%s/connect", parent->d_path, my_id());
+
+	/*  check whether copy is necessary */
+	fetch_brick = rot->fetch_brick;
+	XIO_DBG(
+	"fetch_brick = %p (remote '%s' %d) fetch_serial = %d\n",
+	fetch_brick,
+	remote_dent->d_path,
+	remote_dent->d_serial,
+	rot->fetch_serial);
+	if (fetch_brick) {
+		if (remote_dent->d_serial == rot->fetch_serial && rot->fetch_peer && !strcmp(peer, rot->fetch_peer)) {
+			/*  treat copy brick instance underway */
+			status = _update_file(
+			parent, switch_path, rot->fetch_path, remote_dent->d_path, peer, src_size);
+			XIO_DBG("re-update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer, status);
+		}
+	} else if (!rot->fetch_serial && rot->allow_update &&
+		   !rot->is_primary && !rot->old_is_primary &&
+		   (!rot->preferred_peer || !strcmp(rot->preferred_peer, peer)) &&
+		   (!rot->split_brain_serial || remote_dent->d_serial < rot->split_brain_serial) &&
+		   (dst_size < src_size || !local_dent)) {
+		/*  start copy brick instance */
+		status = _update_file(parent, switch_path, rot->fetch_path, remote_dent->d_path, peer, src_size);
+		XIO_DBG("update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer, status);
+		if (likely(status >= 0)) {
+			rot->fetch_serial = remote_dent->d_serial;
+			rot->fetch_next_is_available = 0;
+			brick_string_free(rot->fetch_peer);
+			rot->fetch_peer = brick_strdup(peer);
+		}
+	} else {
+		XIO_DBG(
+		"allow_update = %d src_size = %lld dst_size = %lld local_dent = %p\n",
+		rot->allow_update,
+		src_size,
+		dst_size,
+		local_dent);
+	}
+
+done:
+	brick_string_free(switch_path);
+	return status;
+}
+
+static
+int run_bone(struct mars_peerinfo *peer, struct mars_dent *remote_dent)
+{
+	int status = 0;
+	struct kstat local_stat = {};
+	const char *marker_path = NULL;
+	bool stat_ok;
+	bool update_mtime = true;
+	bool update_ctime = true;
+	bool run_trigger = false;
+
+	if (!strncmp(remote_dent->d_name, ".tmp", 4))
+		goto done;
+	if (!strncmp(remote_dent->d_name, ".deleted-", 9))
+		goto done;
+	if (!strncmp(remote_dent->d_name, "ignore", 6))
+		goto done;
+
+	/*  create / check markers (prevent concurrent updates) */
+	if (remote_dent->link_val && !strncmp(remote_dent->d_path, "/mars/todo-global/delete-", 25)) {
+		marker_path = backskip_replace(remote_dent->link_val, '/', true, "/.deleted-");
+		if (mars_stat(marker_path, &local_stat, true) < 0 ||
+		    timespec_compare(&remote_dent->stat_val.mtime, &local_stat.mtime) > 0) {
+			XIO_DBG(
+			"creating / updating marker '%s' mtime=%lu.%09lu\n",
+				 marker_path, remote_dent->stat_val.mtime.tv_sec, remote_dent->stat_val.mtime.tv_nsec);
+			mars_symlink("1", marker_path, &remote_dent->stat_val.mtime, 0);
+		}
+		if (remote_dent->d_serial < peer->global->deleted_my_border) {
+			XIO_DBG(
+			"ignoring deletion '%s' at border %d\n", remote_dent->d_path, peer->global->deleted_my_border);
+			goto done;
+		}
+	} else {
+		/*  check marker preventing concurrent updates from remote hosts when deletes are in progress */
+		marker_path = backskip_replace(remote_dent->d_path, '/', true, "/.deleted-");
+		if (mars_stat(marker_path, &local_stat, true) >= 0) {
+			if (timespec_compare(&remote_dent->stat_val.mtime, &local_stat.mtime) <= 0) {
+				XIO_DBG(
+				"marker '%s' exists, ignoring '%s' (new mtime=%lu.%09lu, marker mtime=%lu.%09lu)\n",
+					 marker_path, remote_dent->d_path,
+					 remote_dent->stat_val.mtime.tv_sec, remote_dent->stat_val.mtime.tv_nsec,
+					 local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec);
+				goto done;
+			} else {
+				XIO_DBG(
+				"marker '%s' exists, overwriting '%s' (new mtime=%lu.%09lu, marker mtime=%lu.%09lu)\n",
+					 marker_path, remote_dent->d_path,
+					 remote_dent->stat_val.mtime.tv_sec, remote_dent->stat_val.mtime.tv_nsec,
+					 local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec);
+			}
+		}
+	}
+
+	status = mars_stat(remote_dent->d_path, &local_stat, true);
+	stat_ok = (status >= 0);
+
+	if (stat_ok) {
+		update_mtime = timespec_compare(&remote_dent->stat_val.mtime, &local_stat.mtime) > 0;
+		update_ctime = timespec_compare(&remote_dent->stat_val.ctime, &local_stat.ctime) > 0;
+	}
+
+	if (S_ISDIR(remote_dent->stat_val.mode)) {
+		if (!_is_usable_dir(remote_dent->d_name)) {
+			XIO_DBG("ignoring directory '%s'\n", remote_dent->d_path);
+			goto done;
+		}
+		if (!stat_ok) {
+			status = mars_mkdir(remote_dent->d_path);
+			XIO_DBG("create directory '%s' status = %d\n", remote_dent->d_path, status);
+		}
+	} else if (S_ISLNK(remote_dent->stat_val.mode) && remote_dent->link_val) {
+		if (!stat_ok || update_mtime) {
+			status = mars_symlink(
+			remote_dent->link_val, remote_dent->d_path, &remote_dent->stat_val.mtime, __kuid_val(
+			remote_dent->stat_val.uid));
+			XIO_DBG(
+			"create symlink '%s' -> '%s' status = %d\n",
+			remote_dent->d_path,
+			remote_dent->link_val,
+			status);
+			run_trigger = true;
+		}
+	} else if (S_ISREG(remote_dent->stat_val.mode) && _is_peer_logfile(remote_dent->d_name, my_id())) {
+		const char *parent_path = backskip_replace(remote_dent->d_path, '/', false, "");
+
+		if (likely(parent_path)) {
+			struct mars_dent *parent = mars_find_dent(peer->global, parent_path);
+
+			if (unlikely(!parent)) {
+				XIO_DBG("ignoring non-existing local resource '%s'\n", parent_path);
+			/*  don't copy old / outdated logfiles */
+			} else {
+				struct mars_rotate *rot;
+
+				rot = parent->d_private;
+				if (rot && rot->relevant_serial > remote_dent->d_serial) {
+					XIO_DBG(
+					"ignoring outdated remote logfile '%s' behind %d\n",
+						 remote_dent->d_path, rot->relevant_serial);
+				} else {
+					struct mars_dent *local_dent;
+
+					local_dent = mars_find_dent(peer->global, remote_dent->d_path);
+					status = check_logfile(
+					peer->peer, remote_dent, local_dent, parent, local_stat.size);
+				}
+			}
+			brick_string_free(parent_path);
+		}
+	} else {
+		XIO_DBG("ignoring '%s'\n", remote_dent->d_path);
+	}
+
+done:
+	brick_string_free(marker_path);
+	if (status >= 0)
+		status = run_trigger ? 1 : 0;
+	return status;
+}
+
+static
+int run_bones(struct mars_peerinfo *peer)
+{
+	LIST_HEAD(tmp_list);
+	struct list_head *tmp;
+	unsigned long flags;
+	bool run_trigger = false;
+	int status = 0;
+
+	spin_lock_irqsave(&peer->lock, flags);
+	list_replace_init(&peer->remote_dent_list, &tmp_list);
+	spin_unlock_irqrestore(&peer->lock, flags);
+
+	XIO_DBG("remote_dent_list list_empty = %d\n", list_empty(&tmp_list));
+
+	for (tmp = tmp_list.next; tmp != &tmp_list; tmp = tmp->next) {
+		struct mars_dent *remote_dent = container_of(tmp, struct mars_dent, dent_link);
+
+		if (!remote_dent->d_path || !remote_dent->d_name) {
+			XIO_DBG("NULL\n");
+			continue;
+		}
+		status = run_bone(peer, remote_dent);
+		if (status > 0)
+			run_trigger = true;
+		/* XIO_DBG("path = '%s' worker status = %d\n", remote_dent->d_path, status); */
+	}
+
+	xio_free_dent_all(NULL, &tmp_list);
+
+	if (run_trigger)
+		local_trigger();
+	return status;
+}
+
+/*********************************************************************/
+
+/*  remote working infrastructure */
+
+static
+void _peer_cleanup(struct mars_peerinfo *peer)
+{
+	XIO_DBG("cleanup\n");
+	if (xio_socket_is_alive(&peer->socket)) {
+		XIO_DBG("really shutdown socket\n");
+		xio_shutdown_socket(&peer->socket);
+	}
+	xio_put_socket(&peer->socket);
+}
+
+static DECLARE_WAIT_QUEUE_HEAD(remote_event);
+
+static
+int peer_thread(void *data)
+{
+	struct mars_peerinfo *peer = data;
+	const char *real_peer;
+	struct sockaddr_storage src_sockaddr;
+	struct sockaddr_storage dst_sockaddr;
+
+	struct key_value_pair peer_pairs[] = {
+		{ peer->peer },
+		{ NULL }
+	};
+	int pause_time = 0;
+	bool do_kill = false;
+	int status;
+
+	if (!peer)
+		return -1;
+
+	real_peer = xio_translate_hostname(peer->peer);
+	XIO_INF("-------- peer thread starting on peer '%s' (%s)\n", peer->peer, real_peer);
+
+	status = xio_create_sockaddr(&src_sockaddr, my_id());
+	if (unlikely(status < 0)) {
+		XIO_ERR("unusable local address '%s' (%s)\n", real_peer, peer->peer);
+		goto done;
+	}
+
+	status = xio_create_sockaddr(&dst_sockaddr, real_peer);
+	if (unlikely(status < 0)) {
+		XIO_ERR("unusable remote address '%s' (%s)\n", real_peer, peer->peer);
+		goto done;
+	}
+
+	while (!brick_thread_should_stop()) {
+		struct mars_global tmp_global = {
+			.dent_anchor = LIST_HEAD_INIT(tmp_global.dent_anchor),
+			.brick_anchor = LIST_HEAD_INIT(tmp_global.brick_anchor),
+			.global_power = {
+				.button = true,
+			},
+			.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(tmp_global.main_event),
+		};
+		LIST_HEAD(old_list);
+		unsigned long flags;
+
+		struct xio_cmd cmd = {
+			.cmd_str1 = peer->path,
+			.cmd_int1 = peer->maxdepth,
+		};
+
+		init_rwsem(&tmp_global.dent_mutex);
+		init_rwsem(&tmp_global.brick_mutex);
+
+		show_vals(peer_pairs, "/mars", "connection-from-");
+
+		if (!xio_socket_is_alive(&peer->socket)) {
+			make_msg(peer_pairs, "connection to '%s' (%s) is dead", peer->peer, real_peer);
+			brick_string_free(real_peer);
+			real_peer = xio_translate_hostname(peer->peer);
+			status = xio_create_sockaddr(&dst_sockaddr, real_peer);
+			if (unlikely(status < 0)) {
+				XIO_ERR("unusable remote address '%s' (%s)\n", real_peer, peer->peer);
+				make_msg(peer_pairs, "unusable remote address '%s' (%s)\n", real_peer, peer->peer);
+				brick_msleep(1000);
+				continue;
+			}
+			if (do_kill) {
+				do_kill = false;
+				_peer_cleanup(peer);
+				brick_msleep(1000);
+				continue;
+			}
+			if (!xio_net_is_alive) {
+				brick_msleep(1000);
+				continue;
+			}
+
+			status = xio_create_socket(&peer->socket, &src_sockaddr, &dst_sockaddr, &repl_tcp_params);
+			if (unlikely(status < 0)) {
+				XIO_INF(
+				"no connection to mars module on '%s' (%s) status = %d\n",
+				peer->peer,
+				real_peer,
+				status);
+				make_msg(
+				peer_pairs,
+				"connection to '%s' (%s) could not be established: status = %d",
+				peer->peer,
+				real_peer,
+				status);
+				brick_msleep(2000);
+				continue;
+			}
+			do_kill = true;
+			peer->socket.s_shutdown_on_err = true;
+			peer->socket.s_send_abort = mars_peer_abort;
+			peer->socket.s_recv_abort = mars_peer_abort;
+			XIO_DBG("successfully opened socket to '%s'\n", real_peer);
+			brick_msleep(100);
+			continue;
+		} else {
+			const char *new_peer;
+
+			/* check whether IP assignment has changed */
+			new_peer = xio_translate_hostname(peer->peer);
+			XIO_INF(
+			"AHA %d '%s' '%s'\n",
+				 xio_socket_is_alive(&peer->socket),
+				 new_peer, real_peer);
+			if (new_peer && real_peer && strcmp(new_peer, real_peer))
+				xio_shutdown_socket(&peer->socket);
+			brick_string_free(new_peer);
+		}
+
+		if (peer->from_remote_trigger) {
+			pause_time = 0;
+			peer->from_remote_trigger = false;
+			XIO_DBG("got notify from peer.\n");
+		}
+
+		status = 0;
+		if (peer->to_remote_trigger) {
+			pause_time = 0;
+			peer->to_remote_trigger = false;
+			XIO_DBG("sending notify to peer...\n");
+			cmd.cmd_code = CMD_NOTIFY;
+			status = xio_send_struct(&peer->socket, &cmd, xio_cmd_meta);
+		}
+
+		if (likely(status >= 0)) {
+			cmd.cmd_code = CMD_GETENTS;
+			status = xio_send_struct(&peer->socket, &cmd, xio_cmd_meta);
+		}
+		if (unlikely(status < 0)) {
+			XIO_WRN("communication error on send, status = %d\n", status);
+			if (do_kill) {
+				do_kill = false;
+				_peer_cleanup(peer);
+			}
+			brick_msleep(1000);
+			continue;
+		}
+
+		XIO_DBG("fetching remote dentry list\n");
+		status = xio_recv_dent_list(&peer->socket, &tmp_global.dent_anchor);
+		if (unlikely(status < 0)) {
+			XIO_WRN("communication error on receive, status = %d\n", status);
+			if (do_kill) {
+				do_kill = false;
+				_peer_cleanup(peer);
+			}
+			goto free_and_restart;
+			xio_free_dent_all(NULL, &tmp_global.dent_anchor);
+			brick_msleep(2000);
+			continue;
+		}
+
+		if (likely(!list_empty(&tmp_global.dent_anchor))) {
+			struct mars_dent *peer_uuid;
+			struct mars_dent *my_uuid;
+
+			XIO_DBG("got remote denties\n");
+
+			peer_uuid = mars_find_dent(&tmp_global, "/mars/uuid");
+			if (unlikely(!peer_uuid || !peer_uuid->link_val)) {
+				XIO_ERR("peer %s has no uuid\n", peer->peer);
+				make_msg(peer_pairs, "peer has no UUID");
+				goto free_and_restart;
+			}
+			my_uuid = mars_find_dent(mars_global, "/mars/uuid");
+			if (unlikely(!my_uuid || !my_uuid->link_val)) {
+				XIO_ERR("cannot determine my own uuid for peer %s\n", peer->peer);
+				make_msg(peer_pairs, "cannot determine my own uuid");
+				goto free_and_restart;
+			}
+			if (unlikely(strcmp(peer_uuid->link_val, my_uuid->link_val))) {
+				XIO_ERR(
+				"UUID mismatch for peer %s, you are trying to communicate with a foreign cluster!\n",
+				peer->peer);
+				make_msg(
+				peer_pairs,
+				"UUID mismatch, own cluster '%s' is trying to communicate with a foreign cluster '%s'",
+					 my_uuid->link_val, peer_uuid->link_val);
+				goto free_and_restart;
+			}
+
+			make_msg(peer_pairs, "CONNECTED %s(%s)", peer->peer, real_peer);
+
+			spin_lock_irqsave(&peer->lock, flags);
+
+			list_replace_init(&peer->remote_dent_list, &old_list);
+			list_replace_init(&tmp_global.dent_anchor, &peer->remote_dent_list);
+
+			spin_unlock_irqrestore(&peer->lock, flags);
+
+			peer->last_remote_jiffies = jiffies;
+
+			local_trigger();
+
+			xio_free_dent_all(NULL, &old_list);
+		}
+
+		brick_msleep(100);
+		if (!brick_thread_should_stop()) {
+			if (pause_time < mars_propagate_interval)
+				pause_time++;
+			wait_event_interruptible_timeout(
+			remote_event,
+							 (peer->to_remote_trigger | peer->from_remote_trigger) ||
+							 (mars_global && mars_global->main_trigger),
+							 pause_time * HZ);
+		}
+		continue;
+
+free_and_restart:
+		xio_free_dent_all(NULL, &tmp_global.dent_anchor);
+		brick_msleep(2000);
+	}
+
+	XIO_INF("-------- peer thread terminating\n");
+
+	make_msg(peer_pairs, "NOT connected %s(%s)", peer->peer, real_peer);
+	show_vals(peer_pairs, "/mars", "connection-from-");
+
+	if (do_kill)
+		_peer_cleanup(peer);
+
+done:
+	clear_vals(peer_pairs);
+	brick_string_free(real_peer);
+	return 0;
+}
+
+static
+void _make_alive(void)
+{
+	struct timespec now;
+	char *tmp;
+
+	get_lamport(&now);
+	tmp = path_make("%ld.%09ld", now.tv_sec, now.tv_nsec);
+	if (likely(tmp)) {
+		_make_alivelink_str("time", tmp);
+		brick_string_free(tmp);
+	}
+	_make_alivelink("alive", mars_global && mars_global->global_power.button ? 1 : 0);
+	_make_alivelink_str("tree", SYMLINK_TREE_VERSION);
+}
+
+void from_remote_trigger(void)
+{
+	struct list_head *tmp;
+	int count = 0;
+	unsigned long flags;
+
+	_make_alive();
+
+	read_lock_irqsave(&peer_lock, flags);
+	for (tmp = peer_anchor.next; tmp != &peer_anchor; tmp = tmp->next) {
+		struct mars_peerinfo *peer = container_of(tmp, struct mars_peerinfo, peer_head);
+
+		peer->from_remote_trigger = true;
+		count++;
+	}
+	read_unlock_irqrestore(&peer_lock, flags);
+
+	XIO_DBG("got trigger for %d peers\n", count);
+	wake_up_interruptible_all(&remote_event);
+}
+
+static
+void __remote_trigger(void)
+{
+	struct list_head *tmp;
+	int count = 0;
+	unsigned long flags;
+
+	read_lock_irqsave(&peer_lock, flags);
+	for (tmp = peer_anchor.next; tmp != &peer_anchor; tmp = tmp->next) {
+		struct mars_peerinfo *peer = container_of(tmp, struct mars_peerinfo, peer_head);
+
+		peer->to_remote_trigger = true;
+		count++;
+	}
+	read_unlock_irqrestore(&peer_lock, flags);
+
+	XIO_DBG("triggered %d peers\n", count);
+	wake_up_interruptible_all(&remote_event);
+}
+
+static
+bool is_shutdown(void)
+{
+	bool res = false;
+	int used = atomic_read(&global_mshadow_count);
+
+	if (used > 0) {
+		XIO_INF(
+		"global shutdown delayed: there are %d buffers in use, occupying %ld bytes\n", used, atomic64_read(
+		&global_mshadow_used));
+	} else {
+		int rounds = 3;
+
+		while ((used = atomic_read(&xio_global_io_flying)) <= 0) {
+			if (--rounds <= 0) {
+				res = true;
+				break;
+			}
+			brick_msleep(30);
+		}
+		if (!res)
+			XIO_INF("global shutdown delayed: there are %d IO requests flying\n", used);
+	}
+	return res;
+}
+
+/*********************************************************************/
+
+/*  helpers for worker functions */
+
+static int _kill_peer(struct mars_global *global, struct mars_peerinfo *peer)
+{
+	LIST_HEAD(tmp_list);
+	unsigned long flags;
+
+	if (!peer)
+		return 0;
+
+	write_lock_irqsave(&peer_lock, flags);
+	list_del_init(&peer->peer_head);
+	write_unlock_irqrestore(&peer_lock, flags);
+
+	XIO_INF("stopping peer thread...\n");
+	if (peer->peer_thread) {
+		brick_thread_stop(peer->peer_thread);
+		peer->peer_thread = NULL;
+	}
+	spin_lock_irqsave(&peer->lock, flags);
+	list_replace_init(&peer->remote_dent_list, &tmp_list);
+	spin_unlock_irqrestore(&peer->lock, flags);
+	xio_free_dent_all(NULL, &tmp_list);
+	brick_string_free(peer->peer);
+	brick_string_free(peer->path);
+	return 0;
+}
+
+static
+void peer_destruct(void *_peer)
+{
+	struct mars_peerinfo *peer = _peer;
+
+	if (likely(peer))
+		_kill_peer(peer->global, peer);
+}
+
+static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *path)
+{
+	static int serial;
+	struct mars_peerinfo *peer;
+	char *mypeer;
+	char *parent_path;
+	int status = 0;
+
+	if (unlikely(!global ||
+		     !dent || !dent->link_val || !dent->d_parent)) {
+		XIO_DBG("cannot work\n");
+		return 0;
+	}
+	parent_path = dent->d_parent->d_path;
+	if (unlikely(!parent_path)) {
+		XIO_DBG("cannot work\n");
+		return 0;
+	}
+	mypeer = dent->d_rest;
+	if (!mypeer) {
+		status = _parse_args(dent, dent->link_val, 1);
+		if (status < 0)
+			goto done;
+		mypeer = dent->d_argv[0];
+	}
+
+	XIO_DBG("peer '%s'\n", mypeer);
+	if (!dent->d_private) {
+		unsigned long flags;
+
+		dent->d_private = brick_zmem_alloc(sizeof(struct mars_peerinfo));
+		dent->d_private_destruct = peer_destruct;
+		peer = dent->d_private;
+		peer->global = global;
+		peer->peer = brick_strdup(mypeer);
+		peer->path = brick_strdup(path);
+		peer->maxdepth = 2;
+		spin_lock_init(&peer->lock);
+		INIT_LIST_HEAD(&peer->peer_head);
+		INIT_LIST_HEAD(&peer->remote_dent_list);
+
+		write_lock_irqsave(&peer_lock, flags);
+		list_add_tail(&peer->peer_head, &peer_anchor);
+		write_unlock_irqrestore(&peer_lock, flags);
+	}
+
+	peer = dent->d_private;
+	if (!peer->peer_thread) {
+		peer->peer_thread = brick_thread_create(peer_thread, peer, "mars_peer%d", serial++);
+		if (unlikely(!peer->peer_thread)) {
+			XIO_ERR("cannot start peer thread\n");
+			return -1;
+		}
+		XIO_DBG("started peer thread\n");
+	}
+
+	/* This must be called by the main thread in order to
+	 * avoid nasty races.
+	 * The peer thread does nothing but fetching the dent list.
+	 */
+	status = run_bones(peer);
+
+done:
+	return status;
+}
+
+static int kill_scan(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_peerinfo *peer = dent->d_private;
+	int res;
+
+	if (!global || global->global_power.button || !peer)
+		return 0;
+	dent->d_private = NULL;
+	res = _kill_peer(global, peer);
+	brick_mem_free(peer);
+	return res;
+}
+
+static int make_scan(void *buf, struct mars_dent *dent)
+{
+	XIO_DBG("path = '%s' peer = '%s'\n", dent->d_path, dent->d_rest);
+	/*  don't connect to myself */
+	if (!strcmp(dent->d_rest, my_id()))
+		return 0;
+	return _make_peer(buf, dent, "/mars");
+}
+
+static
+int kill_any(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct list_head *tmp;
+
+	if (global->global_power.button || !is_shutdown())
+		return 0;
+
+	for (tmp = dent->brick_list.next; tmp != &dent->brick_list; tmp = tmp->next) {
+		struct xio_brick *brick = container_of(tmp, struct xio_brick, dent_brick_link);
+
+		if (brick->nr_outputs > 0 && brick->outputs[0] && brick->outputs[0]->nr_connected) {
+			XIO_DBG(
+			"cannot kill dent '%s' because brick '%s' is wired\n", dent->d_path, brick->brick_path);
+			return 0;
+		}
+	}
+
+	XIO_DBG("killing dent = '%s'\n", dent->d_path);
+	xio_kill_dent(dent);
+	return 1;
+}
+
+/*********************************************************************/
+
+/*  handlers / helpers for logfile rotation */
+
+static
+void _create_new_logfile(const char *path)
+{
+	struct file *f;
+	const int flags = O_RDWR | O_CREAT | O_EXCL;
+	const int prot = 0600;
+
+	mm_segment_t oldfs;
+
+	oldfs = get_fs();
+	set_fs(get_ds());
+	f = filp_open(path, flags, prot);
+	set_fs(oldfs);
+	if (IS_ERR(f)) {
+		int err = PTR_ERR(f);
+
+		if (err == -EEXIST)
+			XIO_INF("logfile '%s' already exists\n", path);
+		else
+			XIO_ERR("could not create logfile '%s' status = %d\n", path, err);
+	} else {
+		XIO_DBG("created empty logfile '%s'\n", path);
+		mars_sync();
+		_crashme(10, false);
+		filp_close(f, NULL);
+		local_trigger();
+	}
+}
+
+static
+const char *__get_link_path(const char *_linkpath, const char **linkpath)
+{
+	const char *res = mars_readlink(_linkpath);
+
+	if (linkpath)
+		*linkpath = _linkpath;
+	else
+		brick_string_free(_linkpath);
+	return res;
+}
+
+static
+const char *get_replaylink(const char *parent_path, const char *host, const char **linkpath)
+{
+	const char * _linkpath = path_make("%s/replay-%s", parent_path, host);
+
+	return __get_link_path(_linkpath, linkpath);
+}
+
+static
+const char *get_versionlink(const char *parent_path, int seq, const char *host, const char **linkpath)
+{
+	const char * _linkpath = path_make("%s/version-%09d-%s", parent_path, seq, host);
+
+	return __get_link_path(_linkpath, linkpath);
+}
+
+static inline
+int _get_tolerance(struct mars_rotate *rot)
+{
+	if (rot->is_log_damaged)
+		return REPLAY_TOLERANCE;
+	return 0;
+}
+
+static
+bool is_switchover_possible(
+struct mars_rotate *rot, const char *old_log_path, const char *new_log_path, int replay_tolerance, bool skip_new)
+{
+	const char *old_log_name = old_log_path + skip_dir(old_log_path);
+	const char *new_log_name = new_log_path + skip_dir(new_log_path);
+	const char *old_host = NULL;
+	const char *new_host = NULL;
+	const char *own_versionlink_path = NULL;
+	const char *old_versionlink_path = NULL;
+	const char *new_versionlink_path = NULL;
+	const char *own_versionlink = NULL;
+	const char *old_versionlink = NULL;
+	const char *new_versionlink = NULL;
+	const char *own_replaylink_path = NULL;
+	const char *own_replaylink = NULL;
+	loff_t own_r_val;
+	loff_t own_v_val;
+	loff_t own_r_tail;
+	int old_log_seq;
+	int new_log_seq;
+	int own_r_offset;
+	int own_v_offset;
+	int own_r_len;
+	int own_v_len;
+	int len1;
+	int len2;
+	int offs2;
+	char dummy = 0;
+
+	bool res = false;
+
+	XIO_DBG(
+	"old_log = '%s' new_log = '%s' toler = %d skip_new = %d\n",
+		 old_log_path, new_log_path, replay_tolerance, skip_new);
+
+	/*  check precondition: is split brain already for sure? */
+	if (unlikely(rot->has_double_logfile)) {
+		XIO_WRN_TO(
+		rot->log_say,
+		"SPLIT BRAIN detected: multiple logfiles with sequence number %d exist\n",
+		rot->next_relevant_log->d_serial);
+		make_rot_msg(
+		rot,
+		"err-splitbrain-detected",
+		"SPLIT BRAIN detected: multiple logfiles with sequence number %d exist\n",
+		rot->next_relevant_log->d_serial);
+		goto done;
+	}
+
+	/*  parse the names */
+	if (unlikely(!parse_logfile_name(old_log_name, &old_log_seq, &old_host))) {
+		make_rot_msg(rot, "err-bad-log-name", "logfile name '%s' cannot be parsed", old_log_name);
+		goto done;
+	}
+	if (unlikely(!parse_logfile_name(new_log_name, &new_log_seq, &new_host))) {
+		make_rot_msg(rot, "err-bad-log-name", "logfile name '%s' cannot be parsed", new_log_name);
+		goto done;
+	}
+
+	/*  check: are the sequence numbers contiguous? */
+	if (unlikely(new_log_seq != old_log_seq + 1)) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"logfile sequence numbers are not contiguous (%d != %d + 1), old_log_path='%s' new_log_path='%s'\n",
+		new_log_seq,
+		old_log_seq,
+		old_log_path,
+		new_log_path);
+		make_rot_msg(
+		rot,
+		"err-log-not-contiguous",
+		"logfile sequence numbers are not contiguous (%d != %d + 1) old_log_path='%s' new_log_path='%s'",
+		new_log_seq,
+		old_log_seq,
+		old_log_path,
+		new_log_path);
+		goto done;
+	}
+
+	/*  fetch all the versionlinks and test for their existence. */
+	own_versionlink = get_versionlink(rot->parent_path, old_log_seq, my_id(), &own_versionlink_path);
+	if (unlikely(!own_versionlink || !own_versionlink[0])) {
+		XIO_ERR_TO(rot->log_say, "cannot read my own versionlink '%s'\n", own_versionlink_path);
+		make_rot_msg(
+		rot, "err-versionlink-not-readable", "cannot read my own versionlink '%s'", own_versionlink_path);
+		goto done;
+	}
+	old_versionlink = get_versionlink(rot->parent_path, old_log_seq, old_host, &old_versionlink_path);
+	if (unlikely(!old_versionlink || !old_versionlink[0])) {
+		XIO_ERR_TO(rot->log_say, "cannot read old versionlink '%s'\n", old_versionlink_path);
+		make_rot_msg(
+		rot, "err-versionlink-not-readable", "cannot read old versionlink '%s'", old_versionlink_path);
+		goto done;
+	}
+	if (!skip_new && strcmp(new_host, my_id())) {
+		new_versionlink = get_versionlink(rot->parent_path, new_log_seq, new_host, &new_versionlink_path);
+		if (unlikely(!new_versionlink || !new_versionlink[0])) {
+			XIO_INF_TO(
+			rot->log_say,
+			"new versionlink '%s' does not yet exist, we must wait for it.\n",
+			new_versionlink_path);
+			make_rot_msg(
+			rot,
+			"inf-versionlink-not-yet-exist",
+			"we must wait for new versionlink '%s'",
+			new_versionlink_path);
+			goto done;
+		}
+	}
+
+	/*  check: are the versionlinks correct? */
+	if (unlikely(strcmp(own_versionlink, old_versionlink))) {
+		XIO_INF_TO(
+		rot->log_say,
+		"old logfile is not yet completeley transferred, own_versionlink '%s' -> '%s' != old_versionlink '%s' -> '%s'\n",
+		own_versionlink_path,
+		own_versionlink,
+		old_versionlink_path,
+		old_versionlink);
+		make_rot_msg(
+		rot,
+		"inf-versionlink-not-equal",
+		"old logfile is not yet completeley transferred (own_versionlink '%s' -> '%s' != old_versionlink '%s' -> '%s')",
+		own_versionlink_path,
+		own_versionlink,
+		old_versionlink_path,
+		old_versionlink);
+		goto done;
+	}
+
+	/*  check: did I fully replay my old logfile data? */
+	own_replaylink = get_replaylink(rot->parent_path, my_id(), &own_replaylink_path);
+	if (unlikely(!own_replaylink || !own_replaylink[0])) {
+		XIO_ERR_TO(rot->log_say, "cannot read my own replaylink '%s'\n", own_replaylink_path);
+		goto done;
+	}
+	own_r_len = skip_part(own_replaylink);
+	own_v_offset = skip_part(own_versionlink);
+	if (unlikely(!own_versionlink[own_v_offset++])) {
+		XIO_ERR_TO(
+		rot->log_say, "own version link '%s' -> '%s' is malformed\n", own_versionlink_path, own_versionlink);
+		make_rot_msg(
+		rot,
+		"err-replaylink-not-readable",
+		"own version link '%s' -> '%s' is malformed",
+		own_versionlink_path,
+		own_versionlink);
+		goto done;
+	}
+	own_v_len = skip_part(own_versionlink + own_v_offset);
+	if (unlikely(own_r_len != own_v_len ||
+		     strncmp(own_replaylink, own_versionlink + own_v_offset, own_r_len))) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"internal problem: logfile name mismatch between '%s' and '%s'\n",
+		own_replaylink,
+		own_versionlink);
+		make_rot_msg(
+		rot,
+		"err-bad-log-name",
+		"internal problem: logfile name mismatch between '%s' and '%s'",
+		own_replaylink,
+		own_versionlink);
+		goto done;
+	}
+	if (unlikely(!own_replaylink[own_r_len])) {
+		XIO_ERR_TO(
+		rot->log_say, "own replay link '%s' -> '%s' is malformed\n", own_replaylink_path, own_replaylink);
+		make_rot_msg(
+		rot,
+		"err-replaylink-not-readable",
+		"own replay link '%s' -> '%s' is malformed",
+		own_replaylink_path,
+		own_replaylink);
+		goto done;
+	}
+	own_r_offset = own_r_len + 1;
+	if (unlikely(!own_versionlink[own_v_len])) {
+		XIO_ERR_TO(
+		rot->log_say, "own version link '%s' -> '%s' is malformed\n", own_versionlink_path, own_versionlink);
+		make_rot_msg(
+		rot,
+		"err-versionlink-not-readable",
+		"own version link '%s' -> '%s' is malformed",
+		own_versionlink_path,
+		own_versionlink);
+		goto done;
+	}
+	own_v_offset += own_r_len + 1;
+	own_r_len = skip_part(own_replaylink  + own_r_offset);
+	own_v_len = skip_part(own_versionlink + own_v_offset);
+	own_r_val = 0;
+	own_v_val = 0;
+	own_r_tail = 0;
+	if (sscanf(own_replaylink + own_r_offset, "%lld,%lld", &own_r_val, &own_r_tail) != 2) {
+		XIO_ERR_TO(
+		rot->log_say, "own replay link '%s' -> '%s' is malformed\n", own_replaylink_path, own_replaylink);
+		make_rot_msg(
+		rot,
+		"err-replaylink-not-readable",
+		"own replay link '%s' -> '%s' is malformed",
+		own_replaylink_path,
+		own_replaylink);
+		goto done;
+	}
+	/* SSCANF_TO_KSTRTO: kstros64 does not work because of the next char */
+	if (sscanf(own_versionlink + own_v_offset, "%lld%c", &own_v_val, &dummy) != 2) {
+		XIO_ERR_TO(
+		rot->log_say, "own version link '%s' -> '%s' is malformed\n", own_versionlink_path, own_versionlink);
+		make_rot_msg(
+		rot,
+		"err-versionlink-not-readable",
+		"own version link '%s' -> '%s' is malformed",
+		own_versionlink_path,
+		own_versionlink);
+		goto done;
+	}
+	if (unlikely(own_r_len > own_v_len || own_r_len + replay_tolerance < own_v_len)) {
+		XIO_INF_TO(
+		rot->log_say,
+		"log replay is not yet finished: '%s' and '%s' are reporting different positions.\n",
+		own_replaylink,
+		own_versionlink);
+		make_rot_msg(
+		rot,
+		"inf-replay-not-yet-finished",
+		"log replay is not yet finished: '%s' and '%s' are reporting different positions",
+		own_replaylink,
+		own_versionlink);
+		goto done;
+	}
+
+	/*  last check: is the new versionlink based on the old one? */
+	if (new_versionlink) {
+		len1 = skip_sect(own_versionlink);
+		offs2 = skip_sect(new_versionlink);
+		if (unlikely(!new_versionlink[offs2++])) {
+			XIO_ERR_TO(
+			rot->log_say,
+			"new version link '%s' -> '%s' is malformed\n",
+			new_versionlink_path,
+			new_versionlink);
+			make_rot_msg(
+			rot,
+			"err-versionlink-not-readable",
+			"new version link '%s' -> '%s' is malformed",
+			new_versionlink_path,
+			new_versionlink);
+			goto done;
+		}
+		len2 = skip_sect(new_versionlink + offs2);
+		if (unlikely(len1 != len2 ||
+			     strncmp(own_versionlink, new_versionlink + offs2, len1))) {
+			XIO_WRN_TO(
+			rot->log_say,
+			"VERSION MISMATCH old '%s' -> '%s' new '%s' -> '%s' ==(%d,%d) ===> check for SPLIT BRAIN!\n",
+			own_versionlink_path,
+			own_versionlink,
+			new_versionlink_path,
+			new_versionlink,
+			len1,
+			len2);
+			make_rot_msg(
+			rot,
+			"err-splitbrain-detected",
+			"VERSION MISMATCH old '%s' -> '%s' new '%s' -> '%s' ==(%d,%d) ===> check for SPLIT BRAIN",
+			own_versionlink_path,
+			own_versionlink,
+			new_versionlink_path,
+			new_versionlink,
+			len1,
+			len2);
+			goto done;
+		}
+	}
+
+	/*  report success */
+	res = true;
+	XIO_DBG("VERSION OK '%s' -> '%s'\n", own_versionlink_path, own_versionlink);
+
+done:
+	brick_string_free(old_host);
+	brick_string_free(new_host);
+	brick_string_free(own_versionlink_path);
+	brick_string_free(old_versionlink_path);
+	brick_string_free(new_versionlink_path);
+	brick_string_free(own_versionlink);
+	brick_string_free(old_versionlink);
+	brick_string_free(new_versionlink);
+	brick_string_free(own_replaylink_path);
+	brick_string_free(own_replaylink);
+	return res;
+}
+
+static
+void rot_destruct(void *_rot)
+{
+	struct mars_rotate *rot = _rot;
+
+	if (likely(rot)) {
+		list_del_init(&rot->rot_head);
+		write_info_links(rot);
+		del_channel(rot->log_say);
+		rot->log_say = NULL;
+		brick_string_free(rot->fetch_path);
+		brick_string_free(rot->fetch_peer);
+		brick_string_free(rot->preferred_peer);
+		brick_string_free(rot->parent_path);
+		brick_string_free(rot->parent_rest);
+		brick_string_free(rot->fetch_next_origin);
+		rot->fetch_path = NULL;
+		rot->fetch_peer = NULL;
+		rot->preferred_peer = NULL;
+		rot->parent_path = NULL;
+		rot->parent_rest = NULL;
+		rot->fetch_next_origin = NULL;
+		clear_vals(rot->msgs);
+	}
+}
+
+/* This must be called once at every round of logfile checking.
+ */
+static
+int make_log_init(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_dent *parent = dent->d_parent;
+	struct xio_brick *bio_brick;
+	struct xio_brick *aio_brick;
+	struct xio_brick *trans_brick;
+	struct mars_rotate *rot = parent->d_private;
+	struct mars_dent *replay_link;
+	struct mars_dent *aio_dent;
+	struct xio_output *output;
+	const char *parent_path;
+	const char *replay_path = NULL;
+	const char *aio_path = NULL;
+	bool switch_on;
+	int status = 0;
+
+	if (!global->global_power.button)
+		goto done;
+	status = -EINVAL;
+	CHECK_PTR(parent, done);
+	parent_path = parent->d_path;
+	CHECK_PTR(parent_path, done);
+
+	if (!rot) {
+		const char *fetch_path;
+
+		rot = brick_zmem_alloc(sizeof(struct mars_rotate));
+		spin_lock_init(&rot->inf_lock);
+		fetch_path = path_make("%s/logfile-update", parent_path);
+		if (unlikely(!fetch_path)) {
+			XIO_ERR("cannot create fetch_path\n");
+			brick_mem_free(rot);
+			status = -ENOMEM;
+			goto done;
+		}
+		rot->fetch_path = fetch_path;
+		rot->global = global;
+		parent->d_private = rot;
+		parent->d_private_destruct = rot_destruct;
+		list_add_tail(&rot->rot_head, &rot_anchor);
+		assign_keys(rot->msgs, rot_keys);
+	}
+
+	rot->replay_link = NULL;
+	rot->aio_dent = NULL;
+	rot->aio_brick = NULL;
+	rot->first_log = NULL;
+	rot->relevant_log = NULL;
+	rot->relevant_serial = 0;
+	rot->relevant_brick = NULL;
+	rot->next_relevant_log = NULL;
+	rot->prev_log = NULL;
+	rot->next_log = NULL;
+	brick_string_free(rot->fetch_next_origin);
+	rot->fetch_next_origin = NULL;
+	rot->max_sequence = 0;
+	/*  reset the split brain detector only when conflicts have gone for a number of rounds */
+	if (rot->split_brain_serial && rot->split_brain_round++ > 3)
+		rot->split_brain_serial = 0;
+	rot->fetch_next_serial = 0;
+	rot->has_error = false;
+	rot->wants_sync = false;
+	rot->has_symlinks = true;
+	brick_string_free(rot->preferred_peer);
+	rot->preferred_peer = NULL;
+
+	if (dent->link_val) {
+		int status = kstrtos64(dent->link_val, 10, &rot->dev_size);
+
+		(void)status; /* leave as before in case of errors */
+	}
+	if (!rot->parent_path) {
+		rot->parent_path = brick_strdup(parent_path);
+		rot->parent_rest = brick_strdup(parent->d_rest);
+	}
+
+	if (unlikely(!rot->log_say)) {
+		char *name = path_make("%s/logstatus-%s", parent_path, my_id());
+
+		if (likely(name)) {
+			rot->log_say = make_channel(name, false);
+			brick_string_free(name);
+		}
+	}
+
+	write_info_links(rot);
+
+	/* Fetch the replay status symlink.
+	 * It must exist, and its value will control everything.
+	 */
+	replay_path = path_make("%s/replay-%s", parent_path, my_id());
+	if (unlikely(!replay_path)) {
+		XIO_ERR("cannot make path\n");
+		status = -ENOMEM;
+		goto done;
+	}
+
+	replay_link = (void *)mars_find_dent(global, replay_path);
+	if (unlikely(!replay_link || !replay_link->link_val)) {
+		XIO_DBG("replay status symlink '%s' does not exist (%p)\n", replay_path, replay_link);
+		rot->allow_update = false;
+		status = -ENOENT;
+		goto done;
+	}
+
+	status = _parse_args(replay_link, replay_link->link_val, 3);
+	if (unlikely(status < 0))
+		goto done;
+	rot->replay_link = replay_link;
+
+	/* Fetch AIO dentry of the logfile.
+	 */
+	if (rot->trans_brick) {
+		struct trans_logger_input *trans_input = rot->trans_brick->inputs[rot->trans_brick->old_input_nr];
+
+		if (trans_input && trans_input->is_operating) {
+			aio_path = path_make(
+			"%s/log-%09d-%s", parent_path, trans_input->inf.inf_sequence, trans_input->inf.inf_host);
+			XIO_DBG(
+			"using logfile '%s' from trans_input %d (new=%d)\n",
+			aio_path,
+			rot->trans_brick->old_input_nr,
+			rot->trans_brick->log_input_nr);
+		}
+	}
+	if (!aio_path) {
+		aio_path = path_make("%s/%s", parent_path, replay_link->d_argv[0]);
+		XIO_DBG("using logfile '%s' from replay symlink\n", aio_path);
+	}
+	if (unlikely(!aio_path)) {
+		XIO_ERR("cannot make path\n");
+		status = -ENOMEM;
+		goto done;
+	}
+
+	aio_dent = (void *)mars_find_dent(global, aio_path);
+	if (unlikely(!aio_dent)) {
+		XIO_DBG("logfile '%s' does not exist\n", aio_path);
+		status = -ENOENT;
+		if (rot->todo_primary && !rot->is_primary && !rot->old_is_primary) {
+			int offset = strlen(aio_path) - strlen(my_id());
+
+			if (offset > 0 && aio_path[offset - 1] == '-' && !strcmp(aio_path + offset, my_id())) {
+				/*  try to create an empty logfile */
+				_create_new_logfile(aio_path);
+			}
+		}
+		goto done;
+	}
+	rot->aio_dent = aio_dent;
+
+	/*  check whether attach is allowed */
+	switch_on = _check_allow(global, parent, "attach");
+	if (switch_on && rot->res_shutdown) {
+		XIO_ERR("cannot start transaction logger: resource shutdown mode is currently active\n");
+		switch_on = false;
+	}
+
+	/* Fetch / make the AIO brick instance
+	 */
+	aio_brick =
+		make_brick_all(
+		global,
+			       aio_dent,
+			       _set_sio_params,
+			       NULL,
+			       aio_path,
+			       (const struct generic_brick_type *)&sio_brick_type,
+			       (const struct generic_brick_type*[]){},
+/**/			       rot->trans_brick || switch_on ? 2 : -1,
+			       "%s",
+			       (const char *[]){},
+			       0,
+			       aio_path);
+	rot->aio_brick = aio_brick;
+	status = 0;
+	if (unlikely(!aio_brick || !aio_brick->power.on_led))
+		goto done; /*  this may happen in case of detach */
+	bio_brick = rot->bio_brick;
+	if (unlikely(!bio_brick || !bio_brick->power.on_led))
+		goto done; /*  this may happen in case of detach */
+
+	/* Fetch the actual logfile size
+	 */
+	output = aio_brick->outputs[0];
+	status = output->ops->xio_get_info(output, &rot->aio_info);
+	if (status < 0) {
+		XIO_ERR("cannot get info on '%s'\n", aio_path);
+		goto done;
+	}
+	XIO_DBG("logfile '%s' size = %lld\n", aio_path, rot->aio_info.current_size);
+
+	if (rot->is_primary &&
+	    global_logrot_auto > 0 &&
+	    unlikely(rot->aio_info.current_size >= (loff_t)global_logrot_auto * 1024 * 1024 * 1024)) {
+		char *new_path = path_make("%s/log-%09d-%s", parent_path, aio_dent->d_serial + 1, my_id());
+
+		if (likely(new_path && !mars_find_dent(global, new_path))) {
+			XIO_INF(
+			"old logfile size = %lld, creating new logfile '%s'\n", rot->aio_info.current_size, new_path);
+			_create_new_logfile(new_path);
+		}
+		brick_string_free(new_path);
+	}
+
+	/* Fetch / make the transaction logger.
+	 * We deliberately "forget" to connect the log input here.
+	 * Will be carried out later in make_log_step().
+	 * The final switch-on will be started in make_log_finalize().
+	 */
+	trans_brick =
+		make_brick_all(
+		global,
+			       replay_link,
+			       _set_trans_params,
+			       NULL,
+			       aio_path,
+			       (const struct generic_brick_type *)&trans_logger_brick_type,
+			       (const struct generic_brick_type *[]){NULL},
+			       1, /*  create when necessary, but leave in current state otherwise */
+			       "%s/replay-%s",
+			       (const char *[]){"%s/data-%s"},
+			       1,
+			       parent_path,
+			       my_id(),
+			       parent_path,
+			       my_id());
+	rot->trans_brick = (void *)trans_brick;
+	status = -ENOENT;
+	if (!trans_brick)
+		goto done;
+	rot->trans_brick->kill_ptr = (void **)&rot->trans_brick;
+	rot->trans_brick->replay_limiter = &rot->replay_limiter;
+	/* For safety, default is to try an (unnecessary) replay in case
+	 * something goes wrong later.
+	 */
+	rot->replay_mode = true;
+
+	status = 0;
+
+done:
+	brick_string_free(aio_path);
+	brick_string_free(replay_path);
+	return status;
+}
+
+static
+bool _next_is_acceptable(struct mars_rotate *rot, struct mars_dent *old_dent, struct mars_dent *new_dent)
+{
+	/* Primaries are never allowed to consider logfiles not belonging to them.
+	 * Secondaries need this for replay, unfortunately.
+	 */
+	if ((rot->is_primary | rot->old_is_primary) ||
+	    (rot->trans_brick && rot->trans_brick->power.on_led && !rot->trans_brick->replay_mode)) {
+		if (new_dent->stat_val.size) {
+			XIO_WRN(
+			"logrotate impossible, '%s' size = %lld\n", new_dent->d_rest, new_dent->stat_val.size);
+			return false;
+		}
+		if (strcmp(new_dent->d_rest, my_id())) {
+			XIO_WRN("logrotate impossible, '%s'\n", new_dent->d_rest);
+			return false;
+		}
+	} else {
+		/* Only secondaries should check for contiguity,
+		 * primaries sometimes need holes for emergency mode.
+		 */
+		if (new_dent->d_serial != old_dent->d_serial + 1)
+			return false;
+	}
+	return true;
+}
+
+/* Note: this is strictly called in d_serial order.
+ * This is important!
+ */
+static
+int make_log_step(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_dent *parent = dent->d_parent;
+	struct mars_rotate *rot;
+	struct trans_logger_brick *trans_brick;
+	struct mars_dent *prev_log;
+	int replay_log_nr = 0;
+	int status = -EINVAL;
+
+	CHECK_PTR(parent, err);
+	rot = parent->d_private;
+	if (!rot)
+		goto err;
+	CHECK_PTR(rot, err);
+
+	status = 0;
+	trans_brick = rot->trans_brick;
+	if (!global->global_power.button || !dent->d_parent || !trans_brick || rot->has_error) {
+		XIO_DBG("nothing to do rot_error = %d\n", rot->has_error);
+		goto done;
+	}
+
+	/* Check for consecutiveness of logfiles
+	 */
+	prev_log = rot->next_log;
+	if (prev_log && prev_log->d_serial + 1 != dent->d_serial &&
+	    (!rot->replay_link || !rot->replay_link->d_argv[0] ||
+	     sscanf(rot->replay_link->d_argv[0], "log-%d", &replay_log_nr) != 1 ||
+	     dent->d_serial > replay_log_nr)) {
+		XIO_WRN_TO(
+		rot->log_say,
+		"transaction logs are not consecutive at '%s' (%d ~> %d)\n",
+		dent->d_path,
+		prev_log->d_serial,
+		dent->d_serial);
+		make_rot_msg(
+		rot,
+		"wrn-log-consecutive",
+		"transaction logs are not consecutive at '%s' (%d ~> %d)\n",
+		dent->d_path,
+		prev_log->d_serial,
+		dent->d_serial);
+	}
+
+	if (dent->d_serial > rot->max_sequence)
+		rot->max_sequence = dent->d_serial;
+
+	if (!rot->first_log)
+		rot->first_log = dent;
+
+	/* Skip any logfiles after the relevant one.
+	 * This should happen only when replaying multiple logfiles
+	 * in sequence, or when starting a new logfile for writing.
+	 */
+	status = 0;
+	if (rot->relevant_log) {
+		if (!rot->next_relevant_log) {
+			if (unlikely(dent->d_serial == rot->relevant_log->d_serial)) {
+				/*  always prefer the one created by myself */
+				if (!strcmp(rot->relevant_log->d_rest, my_id())) {
+					XIO_WRN(
+					"PREFER LOGFILE '%s' in front of '%s'\n",
+						 rot->relevant_log->d_path, dent->d_path);
+				} else if (!strcmp(dent->d_rest, my_id())) {
+					XIO_WRN(
+					"PREFER LOGFILE '%s' in front of '%s'\n",
+						 dent->d_path, rot->relevant_log->d_path);
+					rot->relevant_log = dent;
+				} else {
+					rot->has_double_logfile = true;
+					XIO_ERR(
+					"DOUBLE LOGFILES '%s' '%s'\n",
+						 dent->d_path, rot->relevant_log->d_path);
+				}
+			} else if (_next_is_acceptable(rot, rot->relevant_log, dent)) {
+				rot->next_relevant_log = dent;
+			} else if (dent->d_serial > rot->relevant_log->d_serial + 5) {
+				rot->has_hole_logfile = true;
+			}
+		} else { /*  check for double logfiles = > split brain */
+			if (unlikely(dent->d_serial == rot->next_relevant_log->d_serial)) {
+				/*  always prefer the one created by myself */
+				if (!strcmp(rot->next_relevant_log->d_rest, my_id())) {
+					XIO_WRN(
+					"PREFER LOGFILE '%s' in front of '%s'\n",
+					rot->next_relevant_log->d_path,
+					dent->d_path);
+				} else if (!strcmp(dent->d_rest, my_id())) {
+					XIO_WRN(
+					"PREFER LOGFILE '%s' in front of '%s'\n",
+					dent->d_path,
+					rot->next_relevant_log->d_path);
+					rot->next_relevant_log = dent;
+				} else {
+					rot->has_double_logfile = true;
+					XIO_ERR(
+					"DOUBLE LOGFILES '%s' '%s'\n", dent->d_path, rot->next_relevant_log->d_path);
+				}
+			} else if (dent->d_serial > rot->next_relevant_log->d_serial + 5) {
+				rot->has_hole_logfile = true;
+			}
+		}
+		XIO_DBG("next_relevant_log = %p\n", rot->next_relevant_log);
+		goto ok;
+	}
+
+	/* Preconditions
+	 */
+	if (!rot->replay_link || !rot->aio_dent || !rot->aio_brick) {
+		XIO_DBG("nothing to do on '%s'\n", dent->d_path);
+		goto ok;
+	}
+
+	/* Remember the relevant log.
+	 */
+	if (!rot->relevant_log && rot->aio_dent->d_serial == dent->d_serial) {
+		rot->relevant_serial = dent->d_serial;
+		rot->relevant_log = dent;
+		rot->has_double_logfile = false;
+		rot->has_hole_logfile = false;
+	}
+
+ok:
+	/* All ok: switch over the indicators.
+	 */
+	XIO_DBG("next_log = '%s'\n", dent->d_path);
+	rot->prev_log = rot->next_log;
+	rot->next_log = dent;
+
+done:
+	if (status < 0) {
+		XIO_DBG("rot_error status = %d\n", status);
+		rot->has_error = true;
+	}
+err:
+	return status;
+}
+
+/* Internal helper. Return codes:
+ * ret < 0 : error
+ * ret == 0 : not relevant
+ * ret == 1 : relevant, no transaction replay, switch to the next
+ * ret == 2 : relevant for transaction replay
+ * ret == 3 : relevant for appending
+ */
+static
+int _check_logging_status(
+struct mars_rotate *rot, int *log_nr, long long *oldpos_start, long long *oldpos_end, long long *newpos)
+{
+	struct mars_dent *dent = rot->relevant_log;
+	struct mars_dent *parent;
+	struct mars_global *global = NULL;
+	const char *vers_link = NULL;
+	int status = 0;
+
+	if (!dent)
+		goto done;
+
+	status = -EINVAL;
+	parent = dent->d_parent;
+	CHECK_PTR(parent, done);
+	global = rot->global;
+	CHECK_PTR_NULL(global, done);
+	CHECK_PTR(rot->replay_link, done);
+	CHECK_PTR(rot->aio_brick, done);
+	CHECK_PTR(rot->aio_dent, done);
+
+	XIO_DBG("    dent = '%s'\n", dent->d_path);
+	XIO_DBG("aio_dent = '%s'\n", rot->aio_dent->d_path);
+	if (unlikely(strcmp(dent->d_path, rot->aio_dent->d_path)))
+		goto done;
+
+	if (sscanf(rot->replay_link->d_argv[0], "log-%d", log_nr) != 1) {
+		XIO_ERR_TO(
+		rot->log_say, "replay link has malformed logfile number '%s'\n", rot->replay_link->d_argv[0]);
+		goto done;
+	}
+	if (kstrtos64(rot->replay_link->d_argv[1], 10, oldpos_start)) {
+		XIO_ERR_TO(
+		rot->log_say, "replay link has bad start position argument '%s'\n", rot->replay_link->d_argv[1]);
+		goto done;
+	}
+	if (kstrtos64(rot->replay_link->d_argv[2], 10, oldpos_end)) {
+		XIO_ERR_TO(
+		rot->log_say, "replay link has bad end position argument '%s'\n", rot->replay_link->d_argv[2]);
+		goto done;
+	}
+	*oldpos_end += *oldpos_start;
+	if (unlikely(*oldpos_end < *oldpos_start)) {
+		XIO_ERR_TO(rot->log_say, "replay link end_pos %lld < start_pos %lld\n", *oldpos_end, *oldpos_start);
+		/*  safety: use the smaller value, it does not hurt */
+		*oldpos_start = *oldpos_end;
+		if (unlikely(*oldpos_start < 0))
+			*oldpos_start = 0;
+	}
+
+	vers_link = get_versionlink(rot->parent_path, *log_nr, my_id(), NULL);
+	if (vers_link && vers_link[0]) {
+		long long vers_pos = 0;
+		int offset = 0;
+		int i;
+
+		for (i = 0; i < 2; i++) {
+			offset += skip_part(vers_link + offset);
+			if (unlikely(!vers_link[offset++])) {
+				XIO_ERR_TO(rot->log_say, "version link '%s' is malformed\n", vers_link);
+				goto check_pos;
+			}
+		}
+
+		sscanf(vers_link + offset, "%lld", &vers_pos);
+		if (vers_pos < *oldpos_start) {
+			XIO_WRN(
+			"versionlink has smaller startpos %lld < %lld\n",
+				 vers_pos, *oldpos_start);
+			/* for safety, take the minimum of both */
+			*oldpos_start = vers_pos;
+		} else if (vers_pos > *oldpos_start) {
+			XIO_WRN(
+			"versionlink has greater startpos %lld > %lld\n",
+				 vers_pos, *oldpos_start);
+		}
+	}
+check_pos:
+	*newpos = rot->aio_info.current_size;
+
+	if (unlikely(rot->aio_info.current_size < *oldpos_start)) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"oops, bad replay position attempted at logfile '%s' (file length %lld should never be smaller than requested position %lld, is your filesystem corrupted?) => please repair this by hand\n",
+		rot->aio_dent->d_path,
+		rot->aio_info.current_size,
+		*oldpos_start);
+		make_rot_msg(
+		rot,
+		"err-replay-size",
+		"oops, bad replay position attempted at logfile '%s' (file length %lld should never be smaller than requested position %lld, is your filesystem corrupted?) => please repair this by hand",
+		rot->aio_dent->d_path,
+		rot->aio_info.current_size,
+		*oldpos_start);
+		status = -EBADF;
+		goto done;
+	}
+
+	status = 0;
+	if (rot->aio_info.current_size > *oldpos_start) {
+		if ((rot->aio_info.current_size - *oldpos_start < REPLAY_TOLERANCE ||
+		     (rot->log_is_really_damaged &&
+		      rot->todo_primary &&
+		      rot->relevant_log &&
+		      strcmp(rot->relevant_log->d_rest, my_id()))) &&
+		    (rot->todo_primary ||
+			(rot->relevant_log &&
+			 rot->next_relevant_log &&
+			 is_switchover_possible(
+			 rot, rot->relevant_log->d_path, rot->next_relevant_log->d_path, _get_tolerance(
+			 rot), false)))) {
+			XIO_INF_TO(
+			rot->log_say,
+			"TOLERANCE: transaction log '%s' is treated as fully applied\n",
+			rot->aio_dent->d_path);
+			make_rot_msg(
+			rot,
+			"inf-replay-tolerance",
+			"TOLERANCE: transaction log '%s' is treated as fully applied",
+			rot->aio_dent->d_path);
+			status = 1;
+		} else {
+			XIO_INF_TO(
+			rot->log_say,
+			"transaction log replay is necessary on '%s' from %lld to %lld (dirty region ends at %lld)\n",
+			rot->aio_dent->d_path,
+			*oldpos_start,
+			rot->aio_info.current_size,
+			*oldpos_end);
+			status = 2;
+		}
+	} else if (rot->next_relevant_log) {
+		XIO_INF_TO(
+		rot->log_say,
+		"transaction log '%s' is already applied, and the next one is available for switching\n",
+		rot->aio_dent->d_path);
+		status = 1;
+	} else if (rot->todo_primary) {
+		if (rot->aio_info.current_size > 0 || strcmp(dent->d_rest, my_id()) != 0) {
+			XIO_INF_TO(
+			rot->log_say,
+			"transaction log '%s' is already applied (would be usable for appending at position %lld, but a fresh logfile will be used for safety reasons)\n",
+			rot->aio_dent->d_path,
+			*oldpos_end);
+			status = 1;
+		} else {
+			XIO_INF_TO(
+			rot->log_say,
+			"empty transaction log '%s' is usable for me as a primary node\n",
+			rot->aio_dent->d_path);
+			status = 3;
+		}
+	} else {
+		XIO_DBG("transaction log '%s' is the last one, currently fully applied\n", rot->aio_dent->d_path);
+		status = 0;
+	}
+
+done:
+	brick_string_free(vers_link);
+	return status;
+}
+
+static
+int _make_logging_status(struct mars_rotate *rot)
+{
+	struct mars_dent *dent = rot->relevant_log;
+	struct mars_dent *parent;
+	struct mars_global *global = NULL;
+	struct trans_logger_brick *trans_brick;
+	int log_nr = 0;
+	loff_t start_pos = 0;
+	loff_t dirty_pos = 0;
+	loff_t end_pos = 0;
+	int status = 0;
+
+	if (!dent)
+		goto done;
+
+	status = -EINVAL;
+	parent = dent->d_parent;
+	CHECK_PTR(parent, done);
+	global = rot->global;
+	CHECK_PTR_NULL(global, done);
+
+	status = 0;
+	trans_brick = rot->trans_brick;
+	if (!global->global_power.button || !trans_brick || rot->has_error) {
+		XIO_DBG("nothing to do rot_error = %d\n", rot->has_error);
+		goto done;
+	}
+
+	/* Find current logging status.
+	 */
+	status = _check_logging_status(rot, &log_nr, &start_pos, &dirty_pos, &end_pos);
+	XIO_DBG(
+	"case = %d (todo_primary=%d is_primary=%d old_is_primary=%d)\n",
+	status,
+	rot->todo_primary,
+	rot->is_primary,
+	rot->old_is_primary);
+	if (status < 0)
+		goto done;
+	if (unlikely(start_pos < 0 || dirty_pos < start_pos || end_pos < dirty_pos)) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"replay symlink has implausible values: start_pos = %lld dirty_pos = %lld end_pos = %lld\n",
+		start_pos,
+		dirty_pos,
+		end_pos);
+	}
+	/* Relevant or not?
+	 */
+	switch (status) {
+	case 0: /*  not relevant */
+		goto ok;
+	case 1: /* Relevant, and transaction replay already finished.
+		 * Allow switching over to a new logfile.
+		 */
+		if (!trans_brick->power.button && !trans_brick->power.on_led && trans_brick->power.off_led) {
+			if (rot->next_relevant_log && !rot->log_is_really_damaged) {
+				int replay_tolerance = _get_tolerance(rot);
+				bool skip_new = !!rot->todo_primary;
+
+				XIO_DBG(
+				"check switchover from '%s' to '%s' (size = %lld, skip_new = %d, replay_tolerance = %d)\n",
+				dent->d_path,
+				rot->next_relevant_log->d_path,
+				rot->next_relevant_log->stat_val.size,
+				skip_new,
+				replay_tolerance);
+				if (
+				is_switchover_possible(
+				rot, dent->d_path, rot->next_relevant_log->d_path, replay_tolerance, skip_new) ||
+				    (skip_new && !_check_allow(global, parent, "connect"))) {
+					XIO_INF_TO(
+					rot->log_say,
+					"start switchover from transaction log '%s' to '%s'\n",
+					dent->d_path,
+					rot->next_relevant_log->d_path);
+					_make_new_replaylink(
+					rot,
+					rot->next_relevant_log->d_rest,
+					rot->next_relevant_log->d_serial,
+					rot->next_relevant_log->stat_val.size);
+				}
+			} else if (rot->todo_primary) {
+				if (dent->d_serial > log_nr)
+					log_nr = dent->d_serial;
+				XIO_INF_TO(
+				rot->log_say,
+				"preparing new transaction log, number moves from %d to %d\n",
+				dent->d_serial,
+				log_nr + 1);
+				_make_new_replaylink(rot, my_id(), log_nr + 1, 0);
+			} else {
+				XIO_DBG("nothing to do on last transaction log '%s'\n", dent->d_path);
+			}
+		}
+		status = -EAGAIN;
+		goto done;
+	case 2: /*  relevant for transaction replay */
+		XIO_INF_TO(
+		rot->log_say,
+		"replaying transaction log '%s' from position %lld to %lld\n",
+		dent->d_path,
+		start_pos,
+		end_pos);
+		rot->replay_mode = true;
+		rot->start_pos = start_pos;
+		rot->end_pos = end_pos;
+		break;
+	case 3: /*  relevant for appending */
+		XIO_INF_TO(rot->log_say, "appending to transaction log '%s'\n", dent->d_path);
+		rot->replay_mode = false;
+		rot->start_pos = 0;
+		rot->end_pos = 0;
+		break;
+	default:
+		XIO_ERR_TO(rot->log_say, "bad internal status %d\n", status);
+		status = -EINVAL;
+		goto done;
+	}
+
+ok:
+	/* All ok: switch over the indicators.
+	 */
+	rot->prev_log = rot->next_log;
+	rot->next_log = dent;
+
+done:
+	if (status < 0) {
+		XIO_DBG("rot_error status = %d\n", status);
+		rot->has_error = true;
+	}
+	return status;
+}
+
+static
+void _init_trans_input(struct trans_logger_input *trans_input, struct mars_dent *log_dent, struct mars_rotate *rot)
+{
+	if (unlikely(trans_input->connect || trans_input->is_operating)) {
+		XIO_ERR("this should not happen\n");
+		goto out_return;
+	}
+
+	memset(&trans_input->inf, 0, sizeof(trans_input->inf));
+
+	strncpy(trans_input->inf.inf_host, log_dent->d_rest, sizeof(trans_input->inf.inf_host));
+	trans_input->inf.inf_sequence = log_dent->d_serial;
+	trans_input->inf.inf_private = rot;
+	trans_input->inf.inf_callback = _update_info;
+	XIO_DBG("initialized '%s' %d\n", trans_input->inf.inf_host, trans_input->inf.inf_sequence);
+out_return:;
+}
+
+static
+int _get_free_input(struct trans_logger_brick *trans_brick)
+{
+	int nr = (((trans_brick->log_input_nr - TL_INPUT_LOG1) + 1) % 2) + TL_INPUT_LOG1;
+	struct trans_logger_input *candidate;
+
+	candidate = trans_brick->inputs[nr];
+	if (unlikely(!candidate)) {
+		XIO_ERR("input nr = %d is corrupted!\n", nr);
+		return -EEXIST;
+	}
+	if (unlikely(candidate->is_operating || candidate->connect)) {
+		XIO_DBG(
+		"nr = %d unusable! is_operating = %d connect = %p\n", nr, candidate->is_operating, candidate->connect);
+		return -EEXIST;
+	}
+	XIO_DBG("got nr = %d\n", nr);
+	return nr;
+}
+
+static
+void _rotate_trans(struct mars_rotate *rot)
+{
+	struct trans_logger_brick *trans_brick = rot->trans_brick;
+	int old_nr = trans_brick->old_input_nr;
+	int log_nr = trans_brick->log_input_nr;
+	int next_nr;
+
+	XIO_DBG(
+	"log_input_nr = %d old_input_nr = %d next_relevant_log = %p\n", log_nr, old_nr, rot->next_relevant_log);
+
+	/*  try to cleanup old log */
+	if (log_nr != old_nr) {
+		struct trans_logger_input *trans_input = trans_brick->inputs[old_nr];
+		struct trans_logger_input *new_input = trans_brick->inputs[log_nr];
+
+		if (!trans_input->connect) {
+			XIO_DBG("ignoring unused old input %d\n", old_nr);
+		} else if (!new_input->is_operating) {
+			XIO_DBG("ignoring uninitialized new input %d\n", log_nr);
+		} else if (trans_input->is_operating &&
+			   trans_input->inf.inf_min_pos == trans_input->inf.inf_max_pos &&
+			   list_empty(&trans_input->pos_list) &&
+			   atomic_read(&trans_input->log_obj_count) <= 0) {
+			int status;
+
+			XIO_INF("cleanup old transaction log (%d -> %d)\n", old_nr, log_nr);
+			status = generic_disconnect((void *)trans_input);
+			if (unlikely(status < 0))
+				XIO_ERR("disconnect failed\n");
+			else
+				remote_trigger();
+		} else {
+			XIO_DBG(
+			"old transaction replay not yet finished: is_operating = %d pos %lld != %lld\n",
+				 trans_input->is_operating,
+				 trans_input->inf.inf_min_pos,
+				 trans_input->inf.inf_max_pos);
+		}
+	} else
+	/*  try to setup new log */
+	if (log_nr == trans_brick->new_input_nr &&
+	    rot->next_relevant_log &&
+	    (rot->next_relevant_log->d_serial == trans_brick->inputs[log_nr]->inf.inf_sequence + 1 ||
+	     trans_brick->cease_logging)) {
+		struct trans_logger_input *trans_input;
+		int status;
+
+		next_nr = _get_free_input(trans_brick);
+		if (unlikely(next_nr < 0)) {
+			XIO_ERR_TO(rot->log_say, "no free input\n");
+			goto done;
+		}
+
+		XIO_DBG("start switchover %d -> %d\n", old_nr, next_nr);
+
+		rot->next_relevant_brick =
+			make_brick_all(
+			rot->global,
+				       rot->next_relevant_log,
+				       _set_sio_params,
+				       NULL,
+				       rot->next_relevant_log->d_path,
+				       (const struct generic_brick_type *)&sio_brick_type,
+				       (const struct generic_brick_type *[]){},
+				       2, /*  create + activate */
+				       rot->next_relevant_log->d_path,
+				       (const char *[]){},
+				       0);
+		if (unlikely(!rot->next_relevant_brick)) {
+			XIO_ERR_TO(
+			rot->log_say, "could not open next transaction log '%s'\n", rot->next_relevant_log->d_path);
+			goto done;
+		}
+		trans_input = trans_brick->inputs[next_nr];
+		if (unlikely(!trans_input)) {
+			XIO_ERR_TO(rot->log_say, "internal log input does not exist\n");
+			goto done;
+		}
+
+		_init_trans_input(trans_input, rot->next_relevant_log, rot);
+
+		status = generic_connect((void *)trans_input, (void *)rot->next_relevant_brick->outputs[0]);
+		if (unlikely(status < 0)) {
+			XIO_ERR_TO(rot->log_say, "internal connect failed\n");
+			goto done;
+		}
+		trans_brick->new_input_nr = next_nr;
+		XIO_INF_TO(
+		rot->log_say,
+		"started logrotate switchover from '%s' to '%s'\n",
+		rot->relevant_log->d_path,
+		rot->next_relevant_log->d_path);
+		rot->replay_code = 0;
+	}
+done:;
+}
+
+static
+void _change_trans(struct mars_rotate *rot)
+{
+	struct trans_logger_brick *trans_brick = rot->trans_brick;
+
+	XIO_DBG(
+	"replay_mode = %d start_pos = %lld end_pos = %lld\n", trans_brick->replay_mode, rot->start_pos, rot->end_pos);
+
+	if (trans_brick->replay_mode) {
+		trans_brick->replay_start_pos = rot->start_pos;
+		trans_brick->replay_end_pos = rot->end_pos;
+	} else {
+		_rotate_trans(rot);
+	}
+}
+
+static
+int _start_trans(struct mars_rotate *rot)
+{
+	struct trans_logger_brick *trans_brick;
+	struct trans_logger_input *trans_input;
+	int nr;
+	int status;
+
+	/* Internal safety checks
+	 */
+	status = -EINVAL;
+	if (unlikely(!rot)) {
+		XIO_ERR("rot is NULL\n");
+		goto done;
+	}
+	if (unlikely(!rot->aio_brick || !rot->relevant_log)) {
+		XIO_ERR(
+		"aio %p or relevant log %p is missing, this should not happen\n", rot->aio_brick, rot->relevant_log);
+		goto done;
+	}
+	trans_brick = rot->trans_brick;
+	if (unlikely(!trans_brick)) {
+		XIO_ERR("logger instance does not exist\n");
+		goto done;
+	}
+
+	/* Update status when already working
+	 */
+	if (trans_brick->power.button || !trans_brick->power.off_led) {
+		_change_trans(rot);
+		status = 0;
+		goto done;
+	}
+
+	/* Further safety checks.
+	 */
+	if (unlikely(rot->relevant_brick)) {
+		XIO_ERR("log aio brick already present, this should not happen\n");
+		goto done;
+	}
+	if (
+	unlikely(
+	trans_brick->inputs[TL_INPUT_LOG1]->is_operating || trans_brick->inputs[TL_INPUT_LOG2]->is_operating)) {
+		XIO_ERR("some input is operating, this should not happen\n");
+		goto done;
+	}
+
+	/* Allocate new input slot
+	 */
+	nr = _get_free_input(trans_brick);
+	if (unlikely(nr < TL_INPUT_LOG1 || nr > TL_INPUT_LOG2)) {
+		XIO_ERR("bad new_input_nr = %d\n", nr);
+		goto done;
+	}
+	trans_brick->new_input_nr = nr;
+	trans_brick->old_input_nr = nr;
+	trans_brick->log_input_nr = nr;
+	trans_input = trans_brick->inputs[nr];
+	if (unlikely(!trans_input)) {
+		XIO_ERR("log input %d does not exist\n", nr);
+		goto done;
+	}
+
+	/* Open new transaction log
+	 */
+	rot->relevant_brick =
+		make_brick_all(
+		rot->global,
+			       rot->relevant_log,
+			       _set_sio_params,
+			       NULL,
+			       rot->relevant_log->d_path,
+			       (const struct generic_brick_type *)&sio_brick_type,
+			       (const struct generic_brick_type *[]){},
+			       2, /*  start always */
+			       rot->relevant_log->d_path,
+			       (const char *[]){},
+			       0);
+	if (unlikely(!rot->relevant_brick)) {
+		XIO_ERR("log aio brick '%s' not open\n", rot->relevant_log->d_path);
+		goto done;
+	}
+
+	/* Supply all relevant parameters
+	 */
+	trans_brick->replay_mode = rot->replay_mode;
+	trans_brick->replay_tolerance = REPLAY_TOLERANCE;
+	_init_trans_input(trans_input, rot->relevant_log, rot);
+	rot->replay_code = 0;
+
+	/* Connect to new transaction log
+	 */
+	status = generic_connect((void *)trans_input, (void *)rot->relevant_brick->outputs[0]);
+	if (unlikely(status < 0)) {
+		XIO_ERR("initial connect failed\n");
+		goto done;
+	}
+
+	_change_trans(rot);
+
+	/* Switch on....
+	 */
+	status = mars_power_button((void *)trans_brick, true, false);
+	XIO_DBG("status = %d\n", status);
+
+done:
+	return status;
+}
+
+static
+int _stop_trans(struct mars_rotate *rot, const char *parent_path)
+{
+	struct trans_logger_brick *trans_brick = rot->trans_brick;
+	int status = 0;
+
+	if (!trans_brick)
+		goto done;
+
+	/* Switch off temporarily....
+	 */
+	status = mars_power_button((void *)trans_brick, false, false);
+	XIO_DBG("status = %d\n", status);
+	if (status < 0)
+		goto done;
+
+	/* Disconnect old connection(s)
+	 */
+	if (trans_brick->power.off_led) {
+		int i;
+
+		for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
+			struct trans_logger_input *trans_input;
+
+			trans_input = trans_brick->inputs[i];
+			if (trans_input && !trans_input->is_operating) {
+				if (trans_input->connect)
+					(void)generic_disconnect((void *)trans_input);
+			}
+		}
+	}
+	write_info_links(rot);
+
+done:
+	return status;
+}
+
+static
+int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
+{
+	struct mars_dent *parent = dent->d_parent;
+	struct mars_rotate *rot;
+	struct trans_logger_brick *trans_brick;
+	struct copy_brick *fetch_brick;
+	bool is_attached;
+	bool is_stopped;
+	int status = -EINVAL;
+
+	CHECK_PTR(parent, err);
+	rot = parent->d_private;
+	if (!rot)
+		goto err;
+	CHECK_PTR(rot, err);
+	rot->has_symlinks = true;
+	trans_brick = rot->trans_brick;
+	status = 0;
+	if (!trans_brick) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+
+	/* Handle jamming (a very exceptional state)
+	 */
+	if (IS_JAMMED()) {
+#ifndef CONFIG_MARS_DEBUG
+		brick_say_logging = 0;
+#endif
+		rot->has_emergency = true;
+		/* Report remote errors to clients when they
+		 * try to sync during emergency mode.
+		 */
+		if (rot->bio_brick && rot->bio_brick->mode_ptr)
+			*rot->bio_brick->mode_ptr = -EMEDIUMTYPE;
+		XIO_ERR_TO(rot->log_say, "DISK SPACE IS EXTREMELY LOW on %s\n", rot->parent_path);
+		make_rot_msg(rot, "err-space-low", "DISK SPACE IS EXTREMELY LOW");
+	} else if (IS_EXHAUSTED() && rot->has_emergency) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"EMEGENCY MODE HYSTERESIS on %s: you need to free more space for recovery.\n",
+		rot->parent_path);
+		make_rot_msg(
+		rot, "err-space-low", "EMEGENCY MODE HYSTERESIS: you need to free more space for recovery.");
+	} else {
+		int limit = _check_allow(global, parent, "emergency-limit");
+
+		rot->has_emergency = (limit > 0 && global_remaining_space * 100 / global_total_space < limit);
+		XIO_DBG(
+		"has_emergency=%d limit=%d remaining_space=%lld total_space=%lld\n",
+			 rot->has_emergency, limit, global_remaining_space, global_total_space);
+		if (!rot->has_emergency && rot->bio_brick && rot->bio_brick->mode_ptr)
+			*rot->bio_brick->mode_ptr = 0;
+	}
+	_show_actual(parent->d_path, "has-emergency", rot->has_emergency);
+	if (rot->has_emergency) {
+		if (rot->todo_primary || rot->is_primary) {
+			trans_brick->cease_logging = true;
+			rot->inf_prev_sequence = 0; /*	disable checking */
+		}
+	} else {
+		if (!trans_logger_resume) {
+			XIO_INF_TO(
+			rot->log_say,
+			"emergency mode on %s could be turned off now, but /proc/sys/mars/logger_resume inhibits it.\n",
+			rot->parent_path);
+		} else {
+			trans_brick->cease_logging = false;
+			XIO_INF_TO(rot->log_say, "emergency mode on %s will be turned off again\n", rot->parent_path);
+		}
+	}
+	is_stopped = trans_brick->cease_logging | trans_brick->stopped_logging;
+	_show_actual(parent->d_path, "is-emergency", is_stopped);
+	if (is_stopped) {
+		XIO_ERR_TO(
+		rot->log_say,
+		"EMERGENCY MODE on %s: stopped transaction logging, and created a hole in the logfile sequence nubers.\n",
+		rot->parent_path);
+		make_rot_msg(
+		rot,
+		"err-emergency",
+		"EMERGENCY MODE on %s: stopped transaction logging, and created a hole in the logfile sequence nubers.\n",
+		rot->parent_path);
+		/* Create a hole in the sequence of logfile numbers.
+		 * The secondaries will later stumble over it.
+		 */
+		if (!rot->created_hole) {
+			int new_sequence = rot->max_sequence + 10;
+			char *new_vers = path_make("%s/version-%09d-%s", rot->parent_path, new_sequence, my_id());
+
+			char *new_vval = path_make(
+
+			"00000000000000000000000000000000,log-%09d-%s,0:", new_sequence, my_id());
+			char *new_path = path_make("%s/log-%09d-%s", rot->parent_path, new_sequence + 1, my_id());
+
+			if (likely(new_vers && new_vval && new_path &&
+				   !mars_find_dent(global, new_path))) {
+				XIO_INF_TO(rot->log_say, "EMERGENCY: creating new logfile '%s'\n", new_path);
+				mars_symlink(new_vval, new_vers, NULL, 0);
+				_create_new_logfile(new_path);
+				rot->created_hole = true;
+			}
+			brick_string_free(new_vers);
+			brick_string_free(new_vval);
+			brick_string_free(new_path);
+		}
+	} else {
+		rot->created_hole = false;
+	}
+
+	if (IS_EMERGENCY_SECONDARY()) {
+		if (!rot->todo_primary && rot->first_log && rot->first_log != rot->relevant_log) {
+			XIO_WRN_TO(
+			rot->log_say,
+			"EMERGENCY: ruthlessly freeing old logfile '%s', don't cry on any ramifications.\n",
+			rot->first_log->d_path);
+			make_rot_msg(
+			rot,
+			"wrn-space-low",
+			"EMERGENCY: ruthlessly freeing old logfile '%s'",
+			rot->first_log->d_path);
+			mars_unlink(rot->first_log->d_path);
+			rot->first_log->d_killme = true;
+			/*  give it a chance to cease deleting next time */
+			compute_emergency_mode();
+		} else if (IS_EMERGENCY_PRIMARY()) {
+			XIO_WRN_TO(rot->log_say, "EMERGENCY: the space on /mars/ is VERY low.\n");
+			make_rot_msg(rot, "wrn-space-low", "EMERGENCY: the space on /mars/ is VERY low.");
+		} else {
+			XIO_WRN_TO(rot->log_say, "EMERGENCY: the space on /mars/ is low.\n");
+			make_rot_msg(rot, "wrn-space-low", "EMERGENCY: the space on /mars/ is low.");
+		}
+	} else if (IS_EXHAUSTED()) {
+		XIO_WRN_TO(rot->log_say, "EMERGENCY: the space on /mars/ is becoming low.\n");
+		make_rot_msg(rot, "wrn-space-low", "EMERGENCY: the space on /mars/ is becoming low.");
+	}
+
+	rot->log_is_really_damaged = false;
+	if (trans_brick->replay_mode) {
+		if (trans_brick->replay_code > 0) {
+			XIO_INF_TO(
+			rot->log_say,
+			"logfile replay ended successfully at position %lld\n",
+			trans_brick->replay_current_pos);
+			if (rot->replay_code >= 0)
+				rot->replay_code = trans_brick->replay_code;
+		} else if (trans_brick->replay_code == -EAGAIN ||
+			   trans_brick->replay_end_pos - trans_brick->replay_current_pos < trans_brick->replay_tolerance) {
+			XIO_INF_TO(
+			rot->log_say,
+			"logfile replay stopped intermediately at position %lld\n",
+			trans_brick->replay_current_pos);
+		} else if (trans_brick->replay_code < 0) {
+			XIO_ERR_TO(
+			rot->log_say,
+			"logfile replay stopped with error = %d at position %lld\n",
+			trans_brick->replay_code,
+			trans_brick->replay_current_pos);
+			make_rot_msg(
+			rot,
+			"err-replay-stop",
+			"logfile replay stopped with error = %d at position %lld",
+			trans_brick->replay_code,
+			trans_brick->replay_current_pos);
+			rot->replay_code = trans_brick->replay_code;
+			rot->log_is_really_damaged = true;
+		} else if (rot->replay_code >= 0) {
+			rot->replay_code = trans_brick->replay_code;
+		}
+	} else {
+		rot->replay_code = 0;
+	}
+	__show_actual(parent->d_path, "replay-code", rot->replay_code);
+
+	/* Stopping is also possible in case of errors
+	 */
+	if (trans_brick->power.button && trans_brick->power.on_led && !trans_brick->power.off_led) {
+		bool do_stop = true;
+
+		if (trans_brick->replay_mode) {
+			rot->is_log_damaged =
+				trans_brick->replay_code == -EAGAIN &&
+				trans_brick->replay_end_pos - trans_brick->replay_current_pos < trans_brick->replay_tolerance;
+			do_stop = trans_brick->replay_code != 0 ||
+				!global->global_power.button ||
+				!_check_allow(global, parent, "allow-replay") ||
+				!_check_allow(global, parent, "attach");
+		} else {
+			do_stop =
+				trans_brick->outputs[0]->nr_connected <= 0 &&
+				(!rot->todo_primary ||
+				 !_check_allow(global, parent, "attach"));
+		}
+
+		XIO_DBG(
+		"replay_mode = %d replay_code = %d is_primary = %d nr_connected = %d do_stop = %d\n",
+			 trans_brick->replay_mode, trans_brick->replay_code,
+			 trans_brick->outputs[0]->nr_connected,
+			 rot->is_primary, (int)do_stop);
+
+		if (do_stop)
+			status = _stop_trans(rot, parent->d_path);
+		else
+			_change_trans(rot);
+		goto done;
+	}
+
+	/* Starting is only possible when no error occurred.
+	 */
+	if (!rot->relevant_log || rot->has_error) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+
+	/* Start when necessary
+	 */
+	if (!trans_brick->power.button && !trans_brick->power.on_led && trans_brick->power.off_led) {
+		bool do_start;
+
+		status = _make_logging_status(rot);
+		if (status <= 0)
+			goto done;
+
+		rot->is_log_damaged = false;
+
+		do_start = (!rot->replay_mode ||
+			    (rot->start_pos != rot->end_pos &&
+			     _check_allow(global, parent, "allow-replay")));
+
+		if (do_start && rot->forbid_replay) {
+			XIO_INF("cannot start replay because sync wants to start\n");
+			make_rot_msg(rot, "inf-replay-start", "cannot start replay because sync wants to star");
+			do_start = false;
+		}
+
+		if (do_start && rot->sync_brick && !rot->sync_brick->power.off_led) {
+			XIO_INF("cannot start replay because sync is running\n");
+			make_rot_msg(rot, "inf-replay-start", "cannot start replay because sync is running");
+			do_start = false;
+		}
+
+		XIO_DBG(
+		"rot->replay_mode = %d rot->start_pos = %lld rot->end_pos = %lld | do_start = %d\n",
+		rot->replay_mode,
+		rot->start_pos,
+		rot->end_pos,
+		do_start);
+
+		if (do_start)
+			status = _start_trans(rot);
+	}
+
+done:
+	/*  check whether some copy has finished */
+	fetch_brick = (struct copy_brick *)mars_find_brick(global, &copy_brick_type, rot->fetch_path);
+	XIO_DBG("fetch_path = '%s' fetch_brick = %p\n", rot->fetch_path, fetch_brick);
+	if (fetch_brick &&
+	    (fetch_brick->power.off_led ||
+	     !global->global_power.button ||
+	     !_check_allow(global, parent, "connect") ||
+	     !_check_allow(global, parent, "attach") ||
+	     (fetch_brick->copy_last == fetch_brick->copy_end &&
+	      (rot->fetch_next_is_available > 0 ||
+	       rot->fetch_round++ > 3)))) {
+		int i;
+
+		for (i = 0; i < 4; i++) {
+			if (fetch_brick->inputs[i] && fetch_brick->inputs[i]->brick)
+				fetch_brick->inputs[i]->brick->power.io_timeout = 1;
+		}
+		status = xio_kill_brick((void *)fetch_brick);
+		if (status < 0)
+			XIO_ERR("could not kill fetch_brick, status = %d\n", status);
+		else
+			fetch_brick = NULL;
+		local_trigger();
+	}
+	rot->fetch_next_is_available = 0;
+	rot->fetch_brick = fetch_brick;
+	if (fetch_brick)
+		fetch_brick->kill_ptr = (void **)&rot->fetch_brick;
+	else
+		rot->fetch_serial = 0;
+	/*  remove trans_logger (when possible) upon detach */
+	is_attached = !!rot->trans_brick;
+	_show_actual(rot->parent_path, "is-attached", is_attached);
+
+	if (rot->trans_brick && rot->trans_brick->power.off_led && !rot->trans_brick->outputs[0]->nr_connected) {
+		bool do_attach = _check_allow(global, parent, "attach");
+
+		XIO_DBG("do_attach = %d\n", do_attach);
+		if (!do_attach) {
+			rot->trans_brick->killme = true;
+			rot->trans_brick = NULL;
+		}
+	}
+
+	_show_actual(
+	rot->parent_path,
+	"is-replaying",
+	rot->trans_brick && rot->trans_brick->replay_mode && !rot->trans_brick->power.off_led);
+	_show_rate(rot, &rot->replay_limiter, "replay_rate");
+	_show_actual(rot->parent_path, "is-copying", rot->fetch_brick && !rot->fetch_brick->power.off_led);
+	_show_rate(rot, &rot->fetch_limiter, "file_rate");
+	_show_actual(rot->parent_path, "is-syncing", rot->sync_brick && !rot->sync_brick->power.off_led);
+	_show_rate(rot, &rot->sync_limiter, "sync_rate");
+err:
+	return status;
+}
+
+/*********************************************************************/
+
+/*  specific handlers */
+
+static
+int make_primary(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_dent *parent;
+	struct mars_rotate *rot;
+	int status = -EINVAL;
+
+	parent = dent->d_parent;
+	CHECK_PTR(parent, done);
+	rot = parent->d_private;
+	if (!rot)
+		goto done;
+	CHECK_PTR(rot, done);
+
+	rot->has_symlinks = true;
+
+	rot->todo_primary =
+		global->global_power.button && dent->link_val && !strcmp(dent->link_val, my_id());
+	XIO_DBG("todo_primary = %d is_primary = %d\n", rot->todo_primary, rot->is_primary);
+	status = 0;
+
+done:
+	return status;
+}
+
+static
+int make_bio(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_rotate *rot;
+	struct xio_brick *brick;
+	bool switch_on;
+	int status = 0;
+
+	if (!global || !dent->d_parent)
+		goto done;
+	rot = dent->d_parent->d_private;
+	if (!rot)
+		goto done;
+
+	rot->has_symlinks = true;
+
+	switch_on = _check_allow(global, dent->d_parent, "attach");
+	if (switch_on && rot->res_shutdown) {
+		XIO_ERR("cannot access disk: resource shutdown mode is currently active\n");
+		switch_on = false;
+	}
+
+	brick =
+		make_brick_all(
+		global,
+			       dent,
+			       _set_bio_params,
+			       NULL,
+			       dent->d_path,
+			       (const struct generic_brick_type *)&bio_brick_type,
+			       (const struct generic_brick_type *[]){},
+			       switch_on ? 2 : -1,
+			       dent->d_path,
+			       (const char *[]){},
+			       0);
+	rot->bio_brick = brick;
+	if (unlikely(!brick)) {
+		status = -ENXIO;
+		goto done;
+	}
+
+	/* Report the actual size of the device.
+	 * It may be larger than the global size.
+	 */
+	if (brick && brick->power.on_led) {
+		struct xio_info info = {};
+		struct xio_output *output;
+		char *src = NULL;
+		char *dst = NULL;
+
+		output = brick->outputs[0];
+		status = output->ops->xio_get_info(output, &info);
+		if (status < 0) {
+			XIO_ERR("cannot get info on '%s'\n", dent->d_path);
+			goto done;
+		}
+		src = path_make("%lld", info.current_size);
+		dst = path_make("%s/actsize-%s", dent->d_parent->d_path, my_id());
+		if (src && dst)
+			(void)mars_symlink(src, dst, NULL, 0);
+		brick_string_free(src);
+		brick_string_free(dst);
+	}
+
+done:
+	return status;
+}
+
+static int make_replay(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_dent *parent = dent->d_parent;
+	int status = 0;
+
+	if (!global->global_power.button || !parent || !dent->link_val) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+
+	status = make_log_finalize(global, dent);
+	if (status < 0) {
+		XIO_DBG("logger not initialized\n");
+		goto done;
+	}
+
+done:
+	return status;
+}
+
+static
+int make_dev_remote(struct mars_global *global, struct mars_dent *dent, struct mars_rotate *rot)
+{
+	struct mars_dent *parent = dent->d_parent;
+	char *primary;
+	char *status_path = NULL;
+	char *status_val = NULL;
+	char *client_path = NULL;
+	struct xio_brick *remote_brick;
+	struct xio_brick *dev_brick;
+	int switch_on = 0;
+	int status = -EINVAL;
+
+	primary = dent->d_argv[1];
+	if (!primary)
+		goto setup;
+
+	if (!global->global_power.button)
+		goto setup;
+
+	/* Check both the local and the remote attach switch.
+	 */
+	if (!_check_allow(global, dent->d_parent, "rattach"))
+		goto setup;
+	if (!__check_allow(global, dent->d_parent, "attach", primary))
+		goto setup;
+
+	/* Check whether designated primary is the correct one
+	 */
+	status_path = path_make(
+	"%s/primary",
+				parent->d_path, primary);
+	status_val = mars_readlink(status_path);
+	if (strcmp(status_val, primary))
+		goto setup;
+
+	brick_string_free(status_path);
+	brick_string_free(status_val);
+
+	/* In addition, check actual primary
+	 */
+	status_path = path_make(
+	"%s/actual-%s/is-primary",
+				parent->d_path, primary);
+	status_val = mars_readlink(status_path);
+	status = kstrtoint(status_val, 10, &switch_on);
+	if (unlikely(status)) {
+		switch_on = 0;
+		goto setup;
+	}
+
+	switch_on = 1;
+
+setup:
+
+	client_path = path_make(
+	"%s/replay-%s@%s",
+				parent->d_path, primary, primary);
+
+	remote_brick =
+		make_brick_all(
+		global,
+			       dent,
+			       _set_client_params,
+			       NULL,
+			       client_path,
+			       (const struct generic_brick_type *)&client_brick_type,
+			       (const struct generic_brick_type *[]){},
+			       switch_on || rot->if_brick ? 2 : -1,
+			       "%s",
+			       (const char *[]){},
+			       0,
+			       client_path);
+	rot->remote_brick = (void *)remote_brick;
+	if (remote_brick) {
+		remote_brick->kill_ptr = (void **)&rot->remote_brick;
+		/* When on, set the timeout to infinite.
+		 * This is necessary for prevention of IO errors reported to
+		 * filesystems like XFS. Some fs could run into problems when
+		 * other requests after the timeout could succeed again, e.g.
+		 * needing an xfs_repair (which is worse than hanging or
+		 * simple power loss).
+		 */
+		if (switch_on) {
+			remote_brick->power.io_timeout = -1;
+		} else {
+			remote_brick->power.io_timeout = 1;
+			remote_brick->killme = true;
+		}
+	} else {
+		switch_on = 0;
+	}
+
+	dev_brick =
+		make_brick_all(
+		global,
+			       dent,
+			       _set_if_params,
+			       rot,
+			       dent->d_argv[0],
+			       (const struct generic_brick_type *)&if_brick_type,
+			       (
+			       const struct generic_brick_type *[]){(
+			       const struct generic_brick_type *)&client_brick_type},
+			       switch_on || (rot->if_brick && atomic_read(&rot->if_brick->open_count) > 0) ? 2 : -1,
+			       "%s/device-%s",
+			       (const char *[]){client_path},
+			       1,
+			       parent->d_path,
+			       my_id());
+	rot->if_brick = (void *)dev_brick;
+	if (dev_brick) {
+		dev_brick->kill_ptr = (void **)&rot->if_brick;
+		if (!switch_on)
+			dev_brick->killme = true;
+	}
+
+	brick_string_free(status_path);
+	brick_string_free(status_val);
+	brick_string_free(client_path);
+	return status;
+}
+
+static
+int make_dev(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_dent *parent = dent->d_parent;
+	struct mars_rotate *rot = NULL;
+	struct xio_brick *dev_brick;
+	char *remote;
+	char *dev_name = NULL;
+	bool switch_on;
+	int open_count;
+	int status = 0;
+
+	if (!parent || !dent->link_val) {
+		XIO_ERR("nothing to do\n");
+		return -EINVAL;
+	}
+	rot = parent->d_private;
+	if (!rot || !rot->parent_path) {
+		XIO_DBG("nothing to do\n");
+		goto err;
+	}
+	if (strcmp(dent->d_rest, my_id())) {
+		XIO_DBG("nothing to do\n");
+		goto err;
+	}
+	rot->has_symlinks = true;
+	status = _parse_args(dent, dent->link_val, 1);
+	if (unlikely(status < 0))
+		goto done;
+
+	remote = strstr(dent->d_argv[0], "@");
+	if (remote) {
+		brick_string_free(dent->d_argv[1]);
+		dent->d_argv[1] = brick_strdup(remote + 1);
+		*remote = '\0';
+		status = make_dev_remote(global, dent, rot);
+		goto done;
+	}
+
+	if (!rot->trans_brick) {
+		XIO_DBG("transaction logger does not exist\n");
+		goto done;
+	}
+	if (rot->dev_size <= 0) {
+		XIO_WRN("trying to create device '%s' with zero size\n", dent->d_path);
+		goto done;
+	}
+
+	dev_name = path_make("mars/%s", dent->d_argv[0]);
+
+	switch_on =
+		(rot->if_brick && atomic_read(&rot->if_brick->open_count) > 0) ||
+		(rot->todo_primary &&
+		 !rot->trans_brick->replay_mode &&
+		 rot->trans_brick->power.on_led &&
+		 strcmp(dent->d_argv[0], "(none)") &&
+		 _check_allow(global, dent->d_parent, "attach"));
+	if (!global->global_power.button)
+		switch_on = false;
+	if (switch_on && rot->res_shutdown) {
+		XIO_ERR("cannot create device: resource shutdown mode is currently active\n");
+		switch_on = false;
+	}
+
+	dev_brick =
+		make_brick_all(
+		global,
+			       dent,
+			       _set_if_params,
+			       rot,
+			       dev_name,
+			       (const struct generic_brick_type *)&if_brick_type,
+			       (
+			       const struct generic_brick_type *[]){(
+			       const struct generic_brick_type *)&trans_logger_brick_type},
+			       switch_on ? 2 : -1,
+			       "%s/device-%s",
+			       (const char *[]){"%s/replay-%s"},
+			       1,
+			       parent->d_path,
+			       my_id(),
+			       parent->d_path,
+			       my_id());
+	rot->if_brick = (void *)dev_brick;
+	if (!dev_brick) {
+		XIO_DBG("device not shown\n");
+		goto done;
+	}
+	if (!switch_on) {
+		XIO_DBG("setting killme on if_brick\n");
+		dev_brick->killme = true;
+	}
+	dev_brick->kill_ptr = (void **)&rot->if_brick;
+	dev_brick->show_status = _show_brick_status;
+
+done:
+	open_count = 0;
+	if (rot->if_brick) {
+		_show_rate(rot, &rot->if_brick->io_limiter, "if_rate");
+		open_count = atomic_read(&rot->if_brick->open_count);
+	}
+	__show_actual(rot->parent_path, "open-count", open_count);
+	rot->is_primary =
+		rot->trans_brick &&
+		rot->trans_brick->power.on_led &&
+		!rot->trans_brick->replay_mode;
+	_show_primary(rot, parent);
+
+err:
+	brick_string_free(dev_name);
+	return status;
+}
+
+static
+int kill_dev(void *buf, struct mars_dent *dent)
+{
+	struct mars_dent *parent = dent->d_parent;
+	int status = kill_any(buf, dent);
+
+	if (status > 0 && parent) {
+		struct mars_rotate *rot = parent->d_private;
+
+		if (rot)
+			rot->if_brick = NULL;
+	}
+	return status;
+}
+
+static
+int _update_syncstatus(struct mars_rotate *rot, struct copy_brick *copy, char *peer)
+{
+	const char *src = NULL;
+	const char *dst = NULL;
+	const char *syncpos_path = NULL;
+	const char *peer_replay_path = NULL;
+	const char *peer_replay_link = NULL;
+	const char *peer_time_path = NULL;
+	int status = -EINVAL;
+
+	/* create syncpos symlink when necessary */
+	if (copy->copy_last == copy->copy_end && !rot->sync_finish_stamp.tv_sec) {
+		get_lamport(&rot->sync_finish_stamp);
+		XIO_DBG(
+		"sync finished at timestamp %lu\n",
+			 rot->sync_finish_stamp.tv_sec);
+		/* Give the remote replay position a chance to become
+		 * recent enough.
+		 */
+		remote_trigger();
+		status = -EAGAIN;
+		goto done;
+	}
+	if (rot->sync_finish_stamp.tv_sec) {
+		struct kstat peer_time_stat = {};
+
+		peer_time_path = path_make("/mars/tree-%s", peer);
+		status = mars_stat(peer_time_path, &peer_time_stat, true);
+		if (unlikely(status < 0)) {
+			XIO_ERR("cannot stat '%s'\n", peer_time_path);
+			goto done;
+		}
+
+		/* The syncpos tells us the replay position at the primary
+		 * which was effective at the moment when the local sync was done.
+		 * It is used to guarantee consistency:
+		 * before our underlying disk is _really_ consistent, not only
+		 * the sync must have finished, but additionally the local
+		 * replay must have grown (at least) until the same position
+		 * at which the primary was at that moment.
+		 * Therefore, we have to remember the replay position of
+		 * the primary at that moment.
+		 * And because of the network delays we must ensure
+		 * to get a recent enough remote version.
+		 */
+		syncpos_path = path_make("%s/syncpos-%s", rot->parent_path, my_id());
+		peer_replay_path = path_make("%s/replay-%s", rot->parent_path, peer);
+		peer_replay_link = mars_readlink(peer_replay_path);
+		if (unlikely(!peer_replay_link || !peer_replay_link[0])) {
+			XIO_ERR("cannot read peer replay link '%s'\n", peer_replay_path);
+			goto done;
+		}
+
+		_crashme(3, true);
+
+		status = _update_link_when_necessary(rot, "syncpos", peer_replay_link, syncpos_path);
+		/* Sync is only marked as finished when the syncpos
+		 * production was successful and timestamps are recent enough.
+		 */
+		if (unlikely(status < 0))
+			goto done;
+		if (timespec_compare(&peer_time_stat.mtime, &rot->sync_finish_stamp) < 0) {
+			XIO_INF(
+			"peer replay link '%s' is not recent enough, %lu < %lu\n",
+				 peer_replay_path,
+				 peer_time_stat.mtime.tv_sec,
+				 rot->sync_finish_stamp.tv_sec);
+			remote_trigger();
+			status = -EAGAIN;
+			goto done;
+		}
+	}
+
+	src = path_make("%lld", copy->copy_last);
+	dst = path_make("%s/syncstatus-%s", rot->parent_path, my_id());
+
+	_crashme(4, true);
+
+	status = _update_link_when_necessary(rot, "syncstatus", src, dst);
+
+	brick_string_free(src);
+	brick_string_free(dst);
+	src = path_make("%lld,%lld", copy->verify_ok_count, copy->verify_error_count);
+	dst = path_make("%s/verifystatus-%s", rot->parent_path, my_id());
+
+	_crashme(5, true);
+
+	(void)_update_link_when_necessary(rot, "verifystatus", src, dst);
+
+	memset(&rot->sync_finish_stamp, 0, sizeof(rot->sync_finish_stamp));
+done:
+	brick_string_free(src);
+	brick_string_free(dst);
+	brick_string_free(peer_replay_link);
+	brick_string_free(peer_replay_path);
+	brick_string_free(syncpos_path);
+	brick_string_free(peer_time_path);
+	return status;
+}
+
+static int make_sync(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	struct mars_rotate *rot;
+	loff_t start_pos = 0;
+	loff_t end_pos = 0;
+	struct mars_dent *size_dent;
+	struct mars_dent *primary_dent;
+	struct mars_dent *syncfrom_dent;
+	char *peer;
+	struct copy_brick *copy = NULL;
+	char *tmp = NULL;
+	const char *switch_path = NULL;
+	const char *copy_path = NULL;
+	const char *src = NULL;
+	const char *dst = NULL;
+	bool do_start;
+	int status;
+
+	if (!dent->d_parent || !dent->link_val)
+		return 0;
+
+	/* Determine peer
+	 */
+	tmp = path_make("%s/primary", dent->d_parent->d_path);
+	primary_dent = (void *)mars_find_dent(global, tmp);
+	if (!primary_dent || !primary_dent->link_val) {
+		XIO_ERR("cannot determine primary, symlink '%s'\n", tmp);
+		status = 0;
+		goto done;
+	}
+	peer = primary_dent->link_val;
+
+	do_start = _check_allow(global, dent->d_parent, "attach");
+
+	/* Analyze replay position
+	 */
+	status = kstrtos64(dent->link_val, 10, &start_pos);
+	if (unlikely(status)) {
+		XIO_ERR("bad syncstatus symlink syntax '%s' (%s)\n", dent->link_val, dent->d_path);
+		status = -EINVAL;
+		goto done;
+	}
+
+	rot = dent->d_parent->d_private;
+	status = -ENOENT;
+	CHECK_PTR(rot, done);
+
+	rot->has_symlinks = true;
+	rot->allow_update = true;
+	rot->syncstatus_dent = dent;
+
+	/* Sync necessary?
+	 */
+	brick_string_free(tmp);
+	tmp = path_make("%s/size", dent->d_parent->d_path);
+	status = -ENOMEM;
+	if (unlikely(!tmp))
+		goto done;
+	size_dent = (void *)mars_find_dent(global, tmp);
+	if (!size_dent || !size_dent->link_val) {
+		XIO_ERR("cannot determine size '%s'\n", tmp);
+		status = -ENOENT;
+		goto done;
+	}
+	status = kstrtos64(size_dent->link_val, 10, &end_pos);
+	if (unlikely(status)) {
+		XIO_ERR("bad size symlink syntax '%s' (%s)\n", size_dent->link_val, tmp);
+		status = -EINVAL;
+		goto done;
+	}
+
+	/* Is sync necessary at all?
+	 */
+	if (start_pos >= end_pos) {
+		XIO_DBG("no data sync necessary, size = %lld\n", start_pos);
+		do_start = false;
+	}
+
+	/* Handle final waiting step when finished
+	 */
+	if (rot->sync_finish_stamp.tv_sec && do_start)
+		goto shortcut;
+
+	/* Don't sync when logfiles are discontiguous
+	 */
+	if (do_start && (rot->has_double_logfile | rot->has_hole_logfile)) {
+		XIO_WRN(
+		"no sync possible due to discontiguous logfiles %d ~!~ %d\n",
+			 rot->has_double_logfile, rot->has_hole_logfile);
+		if (do_start)
+			start_pos = 0;
+		do_start = false;
+	}
+
+	/* stop sync when primary is unknown
+	 */
+	if (!strcmp(peer, "(none)")) {
+		XIO_INF("cannot start sync, no primary is designated\n");
+		if (do_start)
+			start_pos = 0;
+		do_start = false;
+	}
+
+	/* Check syncfrom link (when existing)
+	 */
+	brick_string_free(tmp);
+	tmp = path_make("%s/syncfrom-%s", dent->d_parent->d_path, my_id());
+	syncfrom_dent = (void *)mars_find_dent(global, tmp);
+	if (do_start && syncfrom_dent && syncfrom_dent->link_val &&
+	    strcmp(syncfrom_dent->link_val, peer)) {
+		XIO_WRN(
+		"cannot start sync, primary has changed: '%s' != '%s'\n",
+			 syncfrom_dent->link_val, peer);
+		if (do_start)
+			start_pos = 0;
+		do_start = false;
+	}
+
+	/* Disallow contemporary sync & logfile_replay
+	 */
+	if (do_start &&
+	    rot->trans_brick &&
+	    !rot->trans_brick->power.off_led) {
+		XIO_INF("cannot start sync because logger is working\n");
+		do_start = false;
+	}
+
+	/* Disallow overwrite of newer data
+	 */
+	if (do_start)
+		write_info_links(rot);
+	rot->forbid_replay = (do_start && compare_replaylinks(rot, peer, my_id()) < 0);
+	if (rot->forbid_replay) {
+		XIO_INF("cannot start sync because my data is newer than the remote one at '%s'!\n", peer);
+		do_start = false;
+	}
+
+	/* Flip between replay and sync
+	 */
+	if (do_start && rot->replay_mode && rot->end_pos > rot->start_pos &&
+	    mars_sync_flip_interval >= 8) {
+		if (!rot->flip_start) {
+			rot->flip_start = jiffies;
+		} else if ((long long)jiffies - rot->flip_start > mars_sync_flip_interval * HZ) {
+			do_start = false;
+			rot->flip_start = jiffies + mars_sync_flip_interval * HZ;
+		}
+	} else {
+		rot->flip_start = 0;
+	}
+
+	XIO_DBG("initial sync '%s' => '%s' do_start = %d\n", src, dst, do_start);
+	/* Obey global sync limit
+	 */
+	rot->wants_sync = (do_start != 0);
+	if (rot->wants_sync && global_sync_limit > 0) {
+		do_start = rot->gets_sync;
+		if (!rot->gets_sync) {
+			XIO_INF_TO(
+			rot->log_say, "won't start sync because of parallelism limit %d\n", global_sync_limit);
+		}
+	}
+
+shortcut:
+	/* Start copy
+	 */
+	src = path_make("data-%s@%s:%d", peer, peer, xio_net_default_port + 2);
+	dst = path_make("data-%s", my_id());
+	copy_path = backskip_replace(dent->d_path, '/', true, "/copy-");
+
+	/*  check whether connection is allowed */
+	switch_path = path_make("%s/todo-%s/sync", dent->d_parent->d_path, my_id());
+
+	status = -ENOMEM;
+	if (unlikely(!src || !dst || !copy_path || !switch_path))
+		goto done;
+
+	/* Informational
+	 */
+	XIO_DBG(
+	"start_pos = %lld end_pos = %lld sync_finish_stamp=%lu do_start=%d\n",
+		 start_pos, end_pos, rot->sync_finish_stamp.tv_sec, do_start);
+
+	if (!do_start)
+		memset(&rot->sync_finish_stamp, 0, sizeof(rot->sync_finish_stamp));
+
+	/* Now do it....
+	 */
+	{
+		const char *argv[2] = { src, dst };
+
+		status = __make_copy(
+		global, dent,
+				     do_start ? switch_path : "",
+				     copy_path, dent->d_parent->d_path, argv, find_key(rot->msgs, "inf-sync"),
+				     start_pos, end_pos,
+				     true,
+				     mars_fast_fullsync > 0,
+				     true, false, &copy);
+		if (copy) {
+			copy->kill_ptr = (void **)&rot->sync_brick;
+			copy->copy_limiter = &rot->sync_limiter;
+		}
+		rot->sync_brick = copy;
+	}
+
+	/* Update syncstatus symlink
+	 */
+	if (status >= 0 && copy &&
+	    ((copy->power.button && copy->power.on_led) ||
+	     !copy->copy_start ||
+	     (copy->copy_last == copy->copy_end && copy->copy_end > 0))) {
+		status = _update_syncstatus(rot, copy, peer);
+	}
+
+done:
+	XIO_DBG("status = %d\n", status);
+	brick_string_free(tmp);
+	brick_string_free(src);
+	brick_string_free(dst);
+	brick_string_free(copy_path);
+	brick_string_free(switch_path);
+	return status;
+}
+
+static
+bool remember_peer(struct mars_rotate *rot, struct mars_peerinfo *peer)
+{
+	if (!peer || !rot || rot->preferred_peer)
+		return false;
+
+	if ((long long)peer->last_remote_jiffies + mars_scan_interval * HZ * 2 < (long long)jiffies)
+		return false;
+
+	rot->preferred_peer = brick_strdup(peer->peer);
+	return true;
+}
+
+static
+int make_connect(void *buf, struct mars_dent *dent)
+{
+	struct mars_rotate *rot;
+	struct mars_peerinfo *peer;
+	char *names;
+	char *this_name;
+	char *tmp;
+
+	if (unlikely(!dent->d_parent || !dent->link_val))
+		goto done;
+	rot = dent->d_parent->d_private;
+	if (unlikely(!rot))
+		goto done;
+
+	names = brick_strdup(dent->link_val);
+	for (tmp = this_name = names; *tmp; tmp++) {
+		if (*tmp == MARS_DELIM) {
+			*tmp = '\0';
+			peer = find_peer(this_name);
+			if (remember_peer(rot, peer))
+				goto found;
+			this_name = tmp + 1;
+		}
+	}
+	peer = find_peer(this_name);
+	remember_peer(rot, peer);
+
+found:
+	brick_string_free(names);
+done:
+	return 0;
+}
+
+static int prepare_delete(void *buf, struct mars_dent *dent)
+{
+	struct kstat stat;
+	struct kstat *to_delete = NULL;
+	struct mars_global *global = buf;
+	struct mars_dent *target;
+	struct mars_dent *response;
+	const char *marker_path = NULL;
+	const char *response_path = NULL;
+	struct xio_brick *brick;
+	int max_serial = 0;
+	int status;
+
+	if (!global || !dent || !dent->link_val || !dent->d_path)
+		goto err;
+
+	/*  create a marker which prevents concurrent updates from remote hosts */
+	marker_path = backskip_replace(dent->link_val, '/', true, "/.deleted-");
+	if (mars_stat(marker_path, &stat, true) < 0 ||
+	    timespec_compare(&dent->stat_val.mtime, &stat.mtime) > 0) {
+		XIO_DBG(
+		"creating / updating marker '%s' mtime=%lu.%09lu\n",
+			 marker_path, dent->stat_val.mtime.tv_sec, dent->stat_val.mtime.tv_nsec);
+		mars_symlink("1", marker_path, &dent->stat_val.mtime, 0);
+	}
+
+	brick = mars_find_brick(global, NULL, dent->link_val);
+	if (brick &&
+	    unlikely((brick->nr_outputs > 0 && brick->outputs[0] && brick->outputs[0]->nr_connected) ||
+		     (brick->type == (void *)&if_brick_type && !brick->power.off_led))) {
+		XIO_WRN("target '%s' cannot be deleted, its brick '%s' in use\n", dent->link_val, brick->brick_name);
+		goto done;
+	}
+
+	status = 0;
+	target = mars_find_dent(global, dent->link_val);
+	if (target) {
+		if (timespec_compare(&target->stat_val.mtime, &dent->stat_val.mtime) > 0) {
+			XIO_WRN("target '%s' has newer timestamp than deletion link, ignoring\n", dent->link_val);
+			status = -EAGAIN;
+			goto ok;
+		}
+		if (target->d_child_count) {
+			XIO_WRN("target '%s' has %d children, cannot kill\n", dent->link_val, target->d_child_count);
+			goto done;
+		}
+		target->d_killme = true;
+		XIO_DBG("target '%s' marked for removal\n", dent->link_val);
+		to_delete = &target->stat_val;
+	} else if (mars_stat(dent->link_val, &stat, true) >= 0) {
+		if (timespec_compare(&stat.mtime, &dent->stat_val.mtime) > 0) {
+			XIO_WRN("target '%s' has newer timestamp than deletion link, ignoring\n", dent->link_val);
+			status = -EAGAIN;
+			goto ok;
+		}
+		to_delete = &stat;
+	} else {
+		status = -EAGAIN;
+		XIO_DBG("target '%s' does no longer exist\n", dent->link_val);
+	}
+	if (to_delete) {
+		status = mars_unlink(dent->link_val);
+		XIO_DBG("unlink '%s', status = %d\n", dent->link_val, status);
+	}
+
+ok:
+	if (status < 0) {
+		XIO_DBG(
+		"deletion '%s' to target '%s' is accomplished\n",
+			 dent->d_path, dent->link_val);
+		if (dent->d_serial <= global->deleted_border) {
+			XIO_DBG("removing deletion symlink '%s'\n", dent->d_path);
+			dent->d_killme = true;
+			mars_unlink(dent->d_path);
+			XIO_DBG("removing marker '%s'\n", marker_path);
+			mars_unlink(marker_path);
+		}
+	}
+
+done:
+	/*  tell the world that we have seen this deletion... (even when not yet accomplished) */
+	response_path = path_make("/mars/todo-global/deleted-%s", my_id());
+	response = mars_find_dent(global, response_path);
+	if (response && response->link_val) {
+		int status = kstrtoint(response->link_val, 10, &max_serial);
+
+		(void)status; /* leave untouched in case of errors */
+	}
+	if (dent->d_serial > max_serial) {
+		char response_val[16];
+
+		max_serial = dent->d_serial;
+		global->deleted_my_border = max_serial;
+		snprintf(response_val, sizeof(response_val), "%09d", max_serial);
+		mars_symlink(response_val, response_path, NULL, 0);
+	}
+
+err:
+	brick_string_free(marker_path);
+	brick_string_free(response_path);
+	return 0;
+}
+
+static int check_deleted(void *buf, struct mars_dent *dent)
+{
+	struct mars_global *global = buf;
+	int serial = 0;
+	int status;
+
+	if (!global || !dent || !dent->link_val)
+		goto done;
+
+	status = kstrtoint(dent->link_val, 10, &serial);
+	if (unlikely(status || serial <= 0)) {
+		XIO_WRN("cannot parse symlink '%s' -> '%s'\n", dent->d_path, dent->link_val);
+		goto done;
+	}
+
+	if (!strcmp(dent->d_rest, my_id()))
+		global->deleted_my_border = serial;
+
+	/* Compute the minimum of the deletion progress among
+	 * the resource members.
+	 */
+	if (serial < global->deleted_min || !global->deleted_min)
+		global->deleted_min = serial;
+
+done:
+	return 0;
+}
+
+static
+int make_res(void *buf, struct mars_dent *dent)
+{
+	struct mars_rotate *rot = dent->d_private;
+
+	if (!rot) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+
+	rot->has_symlinks = false;
+
+done:
+	return 0;
+}
+
+static
+int kill_res(void *buf, struct mars_dent *dent)
+{
+	struct mars_rotate *rot = dent->d_private;
+
+	if (unlikely(!rot || !rot->parent_path)) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+
+	show_vals(rot->msgs, rot->parent_path, "");
+
+	if (unlikely(!rot->global)) {
+		XIO_DBG("nothing to do\n");
+		goto done;
+	}
+	if (rot->has_symlinks) {
+		XIO_DBG("symlinks were present, nothing to kill.\n");
+		goto done;
+	}
+
+	/*  this code is only executed in case of forced deletion of symlinks */
+	if (rot->if_brick || rot->sync_brick || rot->fetch_brick || rot->trans_brick) {
+		rot->res_shutdown = true;
+		XIO_WRN("resource '%s' has no symlinks, shutting down.\n", rot->parent_path);
+	}
+	if (rot->if_brick) {
+		if (atomic_read(&rot->if_brick->open_count) > 0) {
+			XIO_ERR("cannot destroy resource '%s': device is is use!\n", rot->parent_path);
+			goto done;
+		}
+		rot->if_brick->killme = true;
+		if (!rot->if_brick->power.off_led) {
+			int status = mars_power_button((void *)rot->if_brick, false, false);
+
+			XIO_INF("switching off resource '%s', device status = %d\n", rot->parent_path, status);
+		} else {
+			xio_kill_brick((void *)rot->if_brick);
+			rot->if_brick = NULL;
+		}
+	}
+	if (rot->sync_brick) {
+		rot->sync_brick->killme = true;
+		if (!rot->sync_brick->power.off_led) {
+			int status = mars_power_button((void *)rot->sync_brick, false, false);
+
+			XIO_INF("switching off resource '%s', sync status = %d\n", rot->parent_path, status);
+		}
+	}
+	if (rot->fetch_brick) {
+		rot->fetch_brick->killme = true;
+		if (!rot->fetch_brick->power.off_led) {
+			int status = mars_power_button((void *)rot->fetch_brick, false, false);
+
+			XIO_INF("switching off resource '%s', fetch status = %d\n", rot->parent_path, status);
+		}
+	}
+	if (rot->trans_brick) {
+		struct trans_logger_output *output = rot->trans_brick->outputs[0];
+
+		if (!output || output->nr_connected) {
+			XIO_ERR("cannot destroy resource '%s': trans_logger is is use!\n", rot->parent_path);
+			goto done;
+		}
+		rot->trans_brick->killme = true;
+		if (!rot->trans_brick->power.off_led) {
+			int status = mars_power_button((void *)rot->trans_brick, false, false);
+
+			XIO_INF("switching off resource '%s', logger status = %d\n", rot->parent_path, status);
+		}
+	}
+	if (!rot->if_brick && !rot->sync_brick && !rot->fetch_brick && !rot->trans_brick)
+		rot->res_shutdown = false;
+
+done:
+	return 0;
+}
+
+static
+int make_defaults(void *buf, struct mars_dent *dent)
+{
+	if (!dent->link_val)
+		goto done;
+
+	XIO_DBG("name = '%s' value = '%s'\n", dent->d_name, dent->link_val);
+
+	if (!strcmp(dent->d_name, "sync-limit")) {
+		int status = kstrtoint(dent->link_val, 10, &global_sync_limit);
+
+		(void)status; /* leave untouched in case of errors */
+	} else if (!strcmp(dent->d_name, "sync-pref-list")) {
+		const char *start;
+		struct list_head *tmp;
+		int len;
+		int want_count = 0;
+		int get_count = 0;
+
+		for (tmp = rot_anchor.next; tmp != &rot_anchor; tmp = tmp->next) {
+			struct mars_rotate *rot = container_of(tmp, struct mars_rotate, rot_head);
+
+			if (rot->wants_sync)
+				want_count++;
+			else
+				rot->gets_sync = false;
+			if (rot->sync_brick && rot->sync_brick->power.on_led)
+				get_count++;
+		}
+		global_sync_want = want_count;
+		global_sync_nr = get_count;
+
+		/*  prefer mentioned resources in the right order */
+		for (start = dent->link_val; *start && get_count < global_sync_limit; start += len) {
+			len = 1;
+			while (start[len] && start[len] != ',')
+				len++;
+			for (tmp = rot_anchor.next; tmp != &rot_anchor; tmp = tmp->next) {
+				struct mars_rotate *rot = container_of(tmp, struct mars_rotate, rot_head);
+
+				if (rot->wants_sync && rot->parent_rest && !strncmp(start, rot->parent_rest, len)) {
+					rot->gets_sync = true;
+					get_count++;
+					XIO_DBG(
+					"new get_count = %d res = '%s' wants_sync = %d gets_sync = %d\n",
+						 get_count, rot->parent_rest, rot->wants_sync, rot->gets_sync);
+					break;
+				}
+			}
+			if (start[len])
+				len++;
+		}
+		/*  fill up with unmentioned resources */
+		for (tmp = rot_anchor.next; tmp != &rot_anchor && get_count < global_sync_limit; tmp = tmp->next) {
+			struct mars_rotate *rot = container_of(tmp, struct mars_rotate, rot_head);
+
+			if (rot->wants_sync && !rot->gets_sync) {
+				rot->gets_sync = true;
+				get_count++;
+			}
+			XIO_DBG(
+			"new get_count = %d res = '%s' wants_sync = %d gets_sync = %d\n",
+				 get_count, rot->parent_rest, rot->wants_sync, rot->gets_sync);
+		}
+		XIO_DBG("final want_count = %d get_count = %d\n", want_count, get_count);
+	} else {
+		XIO_DBG("unimplemented default '%s'\n", dent->d_name);
+	}
+done:
+	return 0;
+}
+
+/*********************************************************************/
+
+/* Please keep the order the same as in the enum.
+ */
+static const struct main_class main_classes[] = {
+	/* Placeholder for root node /mars/
+	 */
+	[CL_ROOT] = {
+	},
+
+	/* UUID, indentifying the whole cluster.
+	 */
+	[CL_UUID] = {
+		.cl_name = "uuid",
+		.cl_len = 4,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+
+	/* Subdirectory for global userspace items...
+	 */
+	[CL_GLOBAL_USERSPACE] = {
+		.cl_name = "userspace",
+		.cl_len = 9,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_ROOT,
+	},
+	[CL_GLOBAL_USERSPACE_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_GLOBAL_USERSPACE,
+	},
+
+	/* Subdirectory for defaults...
+	 */
+	[CL_DEFAULTS0] = {
+		.cl_name = "defaults",
+		.cl_len = 8,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_ROOT,
+	},
+	[CL_DEFAULTS] = {
+		.cl_name = "defaults-",
+		.cl_len = 9,
+		.cl_type = 'd',
+		.cl_hostcontext = true,
+		.cl_father = CL_ROOT,
+	},
+	/* ... and its contents
+	 */
+	[CL_DEFAULTS_ITEMS0] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_DEFAULTS0,
+	},
+	[CL_DEFAULTS_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_DEFAULTS,
+		.cl_forward = make_defaults,
+	},
+
+	/* Subdirectory for global controlling items...
+	 */
+	[CL_GLOBAL_TODO] = {
+		.cl_name = "todo-global",
+		.cl_len = 11,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_ROOT,
+	},
+	/* ... and its contents
+	 */
+	[CL_GLOBAL_TODO_DELETE] = {
+		.cl_name = "delete-",
+		.cl_len = 7,
+		.cl_type = 'l',
+		.cl_serial = true,
+		.cl_hostcontext = false, /*  ignore context, although present */
+		.cl_father = CL_GLOBAL_TODO,
+		.cl_prepare = prepare_delete,
+	},
+	[CL_GLOBAL_TODO_DELETED] = {
+		.cl_name = "deleted-",
+		.cl_len = 8,
+		.cl_type = 'l',
+		.cl_father = CL_GLOBAL_TODO,
+		.cl_prepare = check_deleted,
+	},
+
+	/* Directory containing the addresses of all peers
+	 */
+	[CL_IPS] = {
+		.cl_name = "ips",
+		.cl_len = 3,
+		.cl_type = 'd',
+		.cl_father = CL_ROOT,
+	},
+	/* Anyone participating in a MARS cluster must
+	 * be named here (symlink pointing to the IP address).
+	 * We have no DNS in kernel space.
+	 */
+	[CL_PEERS] = {
+		.cl_name = "ip-",
+		.cl_len = 3,
+		.cl_type = 'l',
+		.cl_father = CL_IPS,
+		.cl_forward = make_scan,
+		.cl_backward = kill_scan,
+	},
+	/* Subdirectory for actual state
+	 */
+	[CL_GBL_ACTUAL] = {
+		.cl_name = "actual-",
+		.cl_len = 7,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_ROOT,
+	},
+	/* ... and its contents
+	 */
+	[CL_GBL_ACTUAL_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_GBL_ACTUAL,
+	},
+	/* Indicate aliveness of all cluster paritcipants
+	 * by the timestamp of this link.
+	 */
+	[CL_ALIVE] = {
+		.cl_name = "alive-",
+		.cl_len = 6,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+	[CL_TIME] = {
+		.cl_name = "time-",
+		.cl_len = 5,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+	/* Show version indication for symlink tree.
+	 */
+	[CL_TREE] = {
+		.cl_name = "tree-",
+		.cl_len = 5,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+	/* Indicate whether filesystem is full
+	 */
+	[CL_EMERGENCY] = {
+		.cl_name = "emergency-",
+		.cl_len = 10,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+	/* dto as percentage
+	 */
+	[CL_REST_SPACE] = {
+		.cl_name = "rest-space-",
+		.cl_len = 11,
+		.cl_type = 'l',
+		.cl_father = CL_ROOT,
+	},
+
+	/* Directory containing all items of a resource
+	 */
+	[CL_RESOURCE] = {
+		.cl_name = "resource-",
+		.cl_len = 9,
+		.cl_type = 'd',
+		.cl_use_channel = true,
+		.cl_father = CL_ROOT,
+		.cl_forward = make_res,
+		.cl_backward = kill_res,
+	},
+
+	/* Subdirectory for resource-specific userspace items...
+	 */
+	[CL_RESOURCE_USERSPACE] = {
+		.cl_name = "userspace",
+		.cl_len = 9,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	[CL_RESOURCE_USERSPACE_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_RESOURCE_USERSPACE,
+	},
+
+	/* Subdirectory for defaults...
+	 */
+	[CL_RES_DEFAULTS0] = {
+		.cl_name = "defaults",
+		.cl_len = 8,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	[CL_RES_DEFAULTS] = {
+		.cl_name = "defaults-",
+		.cl_len = 9,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	/* ... and its contents
+	 */
+	[CL_RES_DEFAULTS_ITEMS0] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_RES_DEFAULTS0,
+	},
+	[CL_RES_DEFAULTS_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_RES_DEFAULTS,
+	},
+
+	/* Subdirectory for controlling items...
+	 */
+	[CL_TODO] = {
+		.cl_name = "todo-",
+		.cl_len = 5,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	/* ... and its contents
+	 */
+	[CL_TODO_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_TODO,
+	},
+
+	/* Subdirectory for actual state
+	 */
+	[CL_ACTUAL] = {
+		.cl_name = "actual-",
+		.cl_len = 7,
+		.cl_type = 'd',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	/* ... and its contents
+	 */
+	[CL_ACTUAL_ITEMS] = {
+		.cl_name = "",
+		.cl_len = 0, /*  catch any */
+		.cl_type = 'l',
+		.cl_father = CL_ACTUAL,
+	},
+
+	/* File or symlink to the real device / real (sparse) file
+	 * when hostcontext is missing, the corresponding peer will
+	 * not participate in that resource.
+	 */
+	[CL_DATA] = {
+		.cl_name = "data-",
+		.cl_len = 5,
+		.cl_type = 'F',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_bio,
+		.cl_backward = kill_any,
+	},
+	/* Symlink indicating the (common) size of the resource
+	 */
+	[CL_SIZE] = {
+		.cl_name = "size",
+		.cl_len = 4,
+		.cl_type = 'l',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_log_init,
+		.cl_backward = kill_any,
+	},
+	/* Dito for each individual size
+	 */
+	[CL_ACTSIZE] = {
+		.cl_name = "actsize-",
+		.cl_len = 8,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+	},
+	/* Symlink pointing to the name of the primary node
+	 */
+	[CL_PRIMARY] = {
+		.cl_name = "primary",
+		.cl_len = 7,
+		.cl_type = 'l',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_primary,
+		.cl_backward = NULL,
+	},
+	/* Symlink for connection preferences
+	 */
+	[CL_CONNECT] = {
+		.cl_name = "connect-",
+		.cl_len = 8,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_connect,
+	},
+	/* informational symlink indicating the current
+	 * status / start / pos / end of logfile transfers.
+	 */
+	[CL_TRANSFER] = {
+		.cl_name = "transferstatus-",
+		.cl_len = 15,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+	},
+	/* symlink indicating the current status / end
+	 * of initial data sync.
+	 */
+	[CL_SYNC] = {
+		.cl_name = "syncstatus-",
+		.cl_len = 11,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_sync,
+		.cl_backward = kill_any,
+	},
+	/* informational symlink for verify status
+	 * of initial data sync.
+	 */
+	[CL_VERIF] = {
+		.cl_name = "verifystatus-",
+		.cl_len = 13,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+	},
+	/* informational symlink: after sync has finished,
+	 * keep a copy of the replay symlink from the primary.
+	 * when comparing the own replay symlink against this,
+	 * we can determine whether we are consistent.
+	 */
+	[CL_SYNCPOS] = {
+		.cl_name = "syncpos-",
+		.cl_len = 8,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+	},
+	/* Passive symlink indicating the split-brain crypto hash
+	 */
+	[CL_VERSION] = {
+		.cl_name = "version-",
+		.cl_len = 8,
+		.cl_type = 'l',
+		.cl_serial = true,
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+	},
+	/* Logfiles for transaction logger
+	 */
+	[CL_LOG] = {
+		.cl_name = "log-",
+		.cl_len = 4,
+		.cl_type = 'F',
+		.cl_serial = true,
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_log_step,
+		.cl_backward = kill_any,
+	},
+	/* Symlink indicating the last state of
+	 * transaction log replay.
+	 */
+	[CL_REPLAYSTATUS] = {
+		.cl_name = "replay-",
+		.cl_len = 7,
+		.cl_type = 'l',
+		.cl_hostcontext = true,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_replay,
+		.cl_backward = kill_any,
+	},
+
+	/* Name of the device appearing at the primary
+	 */
+	[CL_DEVICE] = {
+		.cl_name = "device-",
+		.cl_len = 7,
+		.cl_type = 'l',
+		.cl_hostcontext = false,
+		.cl_father = CL_RESOURCE,
+		.cl_forward = make_dev,
+		.cl_backward = kill_dev,
+	},
+
+	/* Quirk: when dead resources are recreated during a network partition,
+	 * this is used to void version number clashes in the
+	 * partitioned cluster.
+	 */
+	[CL_MAXNR] = {
+		.cl_name = "maxnr",
+		.cl_len = 5,
+		.cl_type = 'l',
+		.cl_father = CL_RESOURCE,
+	},
+	{}
+};
+
+/* Helper routine to pre-determine the relevance of a name from the filesystem.
+ */
+int main_checker(
+struct mars_dent *parent,
+const char *_name,
+int namlen,
+unsigned int d_type,
+int *prefix,
+int *serial,
+bool *use_channel)
+{
+	int class;
+	int status = -2;
+
+#ifdef XIO_DEBUGGING
+	const char *name = brick_strndup(_name, namlen);
+
+#else
+	const char *name = _name;
+
+#endif
+
+	/* XIO_DBG("trying '%s' '%s'\n", path, name); */
+	for (class = CL_ROOT + 1; ; class++) {
+		const struct main_class *test = &main_classes[class];
+		int len = test->cl_len;
+
+		if (!test->cl_name) { /*  end of table */
+			break;
+		}
+
+		/* XIO_DBG("   testing class '%s'\n", test->cl_name); */
+
+#ifdef XIO_DEBUGGING
+		if (len != strlen(test->cl_name)) {
+			XIO_ERR(
+			"internal table '%s' mismatch: %d != %d\n", test->cl_name, len, (int)strlen(test->cl_name));
+			len = strlen(test->cl_name);
+		}
+#endif
+
+		if (test->cl_father &&
+		    (!parent || parent->d_class != test->cl_father)) {
+			continue;
+		}
+
+		if (len > 0 &&
+		    (namlen < len || memcmp(name, test->cl_name, len))) {
+			continue;
+		}
+
+		/* XIO_DBG("path '%s/%s' matches class %d '%s'\n", path, name, class, test->cl_name); */
+
+		/*  check special contexts */
+		if (test->cl_serial) {
+			int plus = 0;
+			int count;
+
+			count = sscanf(name + len, "%d%n", serial, &plus);
+			if (count < 1) {
+				/* XIO_DBG("'%s' serial number mismatch at '%s'\n", name, name + len); */
+				continue;
+			}
+			/* XIO_DBG("'%s' serial number = %d\n", name, *serial); */
+			len += plus;
+			if (name[len] == '-')
+				len++;
+		}
+		if (prefix)
+			*prefix = len;
+		if (test->cl_hostcontext) {
+			if (memcmp(name + len, my_id(), namlen - len)) {
+				/* XIO_DBG("context mismatch '%s' at '%s'\n", name, name + len); */
+				continue;
+			}
+		}
+
+		/*  all ok */
+		status = class;
+		*use_channel = test->cl_use_channel;
+	}
+
+#ifdef XIO_DEBUGGING
+	brick_string_free(name);
+#endif
+	return status;
+}
+
+/* Do some syntactic checks, then delegate work to the real worker functions
+ * from the main_classes[] table.
+ */
+static int main_worker(struct mars_global *global, struct mars_dent *dent, bool prepare, bool direction)
+{
+	main_worker_fn worker;
+	int class = dent->d_class;
+
+	if (class < 0 || class >= sizeof(main_classes) / sizeof(struct main_class)) {
+		XIO_ERR("bad internal class %d of '%s'\n", class, dent->d_path);
+		return -EINVAL;
+	}
+	switch (main_classes[class].cl_type) {
+	case 'd':
+		if (!S_ISDIR(dent->stat_val.mode)) {
+			XIO_ERR("'%s' should be a directory, but is something else\n", dent->d_path);
+			return -EINVAL;
+		}
+		break;
+	case 'f':
+		if (!S_ISREG(dent->stat_val.mode)) {
+			XIO_ERR("'%s' should be a regular file, but is something else\n", dent->d_path);
+			return -EINVAL;
+		}
+		break;
+	case 'F':
+		if (!S_ISREG(dent->stat_val.mode) && !S_ISLNK(dent->stat_val.mode)) {
+			XIO_ERR("'%s' should be a regular file or a symlink, but is something else\n", dent->d_path);
+			return -EINVAL;
+		}
+		break;
+	case 'l':
+		if (!S_ISLNK(dent->stat_val.mode)) {
+			XIO_ERR("'%s' should be a symlink, but is something else\n", dent->d_path);
+			return -EINVAL;
+		}
+		break;
+	}
+	if (likely(class > CL_ROOT)) {
+		int father = main_classes[class].cl_father;
+
+		if (father == CL_ROOT) {
+			if (unlikely(dent->d_parent)) {
+				XIO_ERR("'%s' class %d is not at the root of the hierarchy\n", dent->d_path, class);
+				return -EINVAL;
+			}
+		} else if (unlikely(!dent->d_parent || dent->d_parent->d_class != father)) {
+			XIO_ERR(
+			"last component '%s' from '%s' is at the wrong position in the hierarchy (class = %d, parent_class = %d, parent = '%s')\n",
+			dent->d_name,
+			dent->d_path,
+			father,
+			dent->d_parent ? dent->d_parent->d_class : -9999,
+			dent->d_parent ? dent->d_parent->d_path : "");
+			return -EINVAL;
+		}
+	}
+	if (prepare)
+		worker = main_classes[class].cl_prepare;
+	else if (direction)
+		worker = main_classes[class].cl_backward;
+	else
+		worker = main_classes[class].cl_forward;
+	if (worker) {
+		int status;
+
+		if (!direction)
+			XIO_DBG(
+			"--- start working %s on '%s' rest='%s'\n",
+			direction ? "backward" : "forward",
+			dent->d_path,
+			dent->d_rest);
+		status = worker(global, (void *)dent);
+		XIO_DBG(
+		"--- done, worked %s on '%s', status = %d\n",
+		direction ? "backward" : "forward",
+		dent->d_path,
+		status);
+		return status;
+	}
+	return 0;
+}
+
+static struct mars_global _global = {
+	.dent_anchor = LIST_HEAD_INIT(_global.dent_anchor),
+	.brick_anchor = LIST_HEAD_INIT(_global.brick_anchor),
+	.global_power = {
+		.button = true,
+	},
+	.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(_global.main_event),
+};
+
+static int _main_thread(void *data)
+{
+	long long last_rollover = jiffies;
+	char *id = my_id();
+	int status = 0;
+
+	init_rwsem(&_global.dent_mutex);
+	init_rwsem(&_global.brick_mutex);
+
+	mars_global = &_global;
+
+	if (!id || strlen(id) < 2) {
+		XIO_ERR("invalid hostname\n");
+		status = -EFAULT;
+		goto done;
+	}
+
+	XIO_INF("-------- starting as host '%s' ----------\n", id);
+
+	while (_global.global_power.button || !list_empty(&_global.brick_anchor)) {
+		int status;
+
+		XIO_DBG("-------- NEW ROUND ---------\n");
+
+		if (mars_mem_percent < 0)
+			mars_mem_percent = 0;
+		if (mars_mem_percent > 70)
+			mars_mem_percent = 70;
+		brick_global_memlimit = (long long)brick_global_memavail * mars_mem_percent / 100;
+
+		brick_msleep(100);
+
+		if (brick_thread_should_stop()) {
+			_global.global_power.button = false;
+			xio_net_is_alive = false;
+		}
+
+		_make_alive();
+
+		compute_emergency_mode();
+
+		XIO_DBG("-------- start worker ---------\n");
+		_global.deleted_min = 0;
+		status = mars_dent_work(
+		&_global, "/mars", sizeof(struct mars_dent), main_checker, main_worker, &_global, 3);
+		_global.deleted_border = _global.deleted_min;
+		XIO_DBG("-------- worker deleted_min = %d status = %d\n", _global.deleted_min, status);
+
+		if (!_global.global_power.button) {
+			status = xio_kill_brick_when_possible(
+			&_global, &_global.brick_anchor, false, (void *)&copy_brick_type, true);
+			XIO_DBG("kill copy bricks (when possible) = %d\n", status);
+		}
+
+		status = xio_kill_brick_when_possible(&_global, &_global.brick_anchor, false, NULL, false);
+		XIO_DBG("kill main bricks (when possible) = %d\n", status);
+
+		status = xio_kill_brick_when_possible(
+		&_global, &_global.brick_anchor, false, (void *)&client_brick_type, true);
+		XIO_DBG("kill client bricks (when possible) = %d\n", status);
+		status = xio_kill_brick_when_possible(
+		&_global, &_global.brick_anchor, false, (void *)&sio_brick_type, true);
+		XIO_DBG("kill sio    bricks (when possible) = %d\n", status);
+		status = xio_kill_brick_when_possible(
+		&_global, &_global.brick_anchor, false, (void *)&bio_brick_type, true);
+		XIO_DBG("kill bio    bricks (when possible) = %d\n", status);
+
+		if ((long long)jiffies + mars_rollover_interval * HZ >= last_rollover) {
+			last_rollover = jiffies;
+			rollover_all();
+		}
+
+		_show_status_all(&_global);
+		show_vals(gbl_pairs, "/mars", "");
+		show_statistics(&_global, "main");
+
+		XIO_DBG(
+		"ban_count = %d ban_renew_count = %d\n", xio_global_ban.ban_count, xio_global_ban.ban_renew_count);
+
+		brick_msleep(500);
+
+		wait_event_interruptible_timeout(_global.main_event, _global.main_trigger, mars_scan_interval * HZ);
+
+		_global.main_trigger = false;
+	}
+
+done:
+	XIO_INF("-------- cleaning up ----------\n");
+	remote_trigger();
+	brick_msleep(1000);
+
+	xio_free_dent_all(&_global, &_global.dent_anchor);
+	xio_kill_brick_all(&_global, &_global.brick_anchor, false);
+
+	_show_status_all(&_global);
+	show_vals(gbl_pairs, "/mars", "");
+	show_statistics(&_global, "main");
+
+	mars_global = NULL;
+
+	XIO_INF("-------- done status = %d ----------\n", status);
+	/* cleanup_mm(); */
+	return status;
+}
+
+static
+char *_xio_info(void)
+{
+	int max = PAGE_SIZE - 64;
+	char *txt;
+	struct list_head *tmp;
+	int dent_count = 0;
+	int brick_count = 0;
+	int pos = 0;
+
+	if (unlikely(!mars_global))
+		return NULL;
+
+	txt = brick_string_alloc(max);
+
+	txt[--max] = '\0'; /*  safeguard */
+
+	down_read(&mars_global->brick_mutex);
+	for (tmp = mars_global->brick_anchor.next; tmp != &mars_global->brick_anchor; tmp = tmp->next) {
+		struct xio_brick *test;
+
+		brick_count++;
+		test = container_of(tmp, struct xio_brick, global_brick_link);
+		pos += scnprintf(
+			txt + pos, max - pos,
+			"brick button=%d off=%d on=%d path='%s'\n",
+			test->power.button,
+			test->power.off_led,
+			test->power.on_led,
+			test->brick_path
+			);
+	}
+	up_read(&mars_global->brick_mutex);
+
+	pos += scnprintf(
+		txt + pos, max - pos,
+		"SUMMARY: brick_count=%d dent_count=%d\n",
+		brick_count,
+		dent_count
+		);
+
+	return txt;
+}
+
+#define INIT_MAX			32
+static char *exit_names[INIT_MAX];
+static void (*exit_fn[INIT_MAX])(void);
+static int exit_fn_nr;
+
+#define DO_INIT(name)							\
+	do {								\
+		XIO_DBG("=== starting module " #name "...\n");		\
+		status = init_##name();					\
+		if (status < 0)						\
+			goto done;					\
+		exit_names[exit_fn_nr] = #name;				\
+		exit_fn[exit_fn_nr++] = exit_##name;			\
+	} while (0)
+
+void (*_remote_trigger)(void);
+
+static void exit_main(void)
+{
+	XIO_DBG("====================== stopping everything...\n");
+	/*  TODO: make this thread-safe. */
+	if (main_thread) {
+		XIO_DBG("=== stopping main thread...\n");
+		local_trigger();
+		XIO_INF("stopping main thread...\n");
+		brick_thread_stop(main_thread);
+	}
+
+	xio_info = NULL;
+	_remote_trigger = NULL;
+
+	while (exit_fn_nr > 0) {
+		XIO_DBG("=== stopping module %s ...\n", exit_names[exit_fn_nr - 1]);
+		exit_fn[--exit_fn_nr]();
+	}
+
+	XIO_DBG("====================== stopped everything.\n");
+	exit_say();
+	printk(KERN_INFO "stopped MARS\n");
+	/* Workaround for nasty race: some kernel threads have not yet
+	 * really finished even _after_ kthread_stop() and may execute
+	 * some code which will disappear right after return from this
+	 * function.
+	 * A correct solution would probably need the help of the kernel
+	 * scheduler.
+	 */
+	brick_msleep(1000);
+}
+
+static int __init init_main(void)
+{
+	struct kstat dummy;
+	int status = mars_stat("/mars/uuid", &dummy, true);
+
+	if (unlikely(status < 0)) {
+		printk(
+		KERN_ERR "cannot load MARS: cluster UUID is missing. Mount /mars/, and/or use {create,join}-cluster first.\n");
+		return -ENOENT;
+	}
+
+	printk(KERN_INFO "loading MARS, tree_version=%s\n", SYMLINK_TREE_VERSION);
+
+	init_say(); /*	this must come first */
+
+	/* be careful: order is important!
+	 */
+	DO_INIT(brick_mem);
+	DO_INIT(brick);
+	DO_INIT(xio);
+	DO_INIT(xio_mapfree);
+	DO_INIT(xio_net);
+	DO_INIT(xio_client);
+	DO_INIT(xio_sio);
+	DO_INIT(xio_bio);
+	DO_INIT(xio_copy);
+	DO_INIT(log_format);
+	DO_INIT(xio_trans_logger);
+	DO_INIT(xio_if);
+
+	DO_INIT(sy);
+	DO_INIT(sy_net);
+	DO_INIT(xio_proc);
+
+#ifdef CONFIG_MARS_MEM_PREALLOC
+	brick_pre_reserve[5] = 64;
+	brick_mem_reserve();
+#endif
+
+	DO_INIT(xio_server);
+
+	status = compute_emergency_mode();
+	if (check_mars_space && unlikely(status < 0)) {
+		XIO_ERR("Sorry, your /mars/ filesystem is too small!\n");
+		goto done;
+	}
+	status = 0;
+
+	main_thread = brick_thread_create(_main_thread, NULL, "mars_main");
+	if (unlikely(!main_thread)) {
+		status = -ENOENT;
+		goto done;
+	}
+
+done:
+	if (status < 0) {
+		XIO_ERR("module init failed with status = %d, exiting.\n", status);
+		exit_main();
+	}
+	_remote_trigger = __remote_trigger;
+	xio_info = _xio_info;
+	return status;
+}
+
+/*  force module loading */
+const void *dummy1 = &client_brick_type;
+const void *dummy2 = &server_brick_type;
+
+MODULE_DESCRIPTION("XIO");
+MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@...hoebel-theuer,1und1}.de>");
+MODULE_VERSION(SYMLINK_TREE_VERSION);
+MODULE_LICENSE("GPL");
+
+#ifndef CONFIG_MARS_DEBUG
+MODULE_INFO(debug, "production");
+#else
+MODULE_INFO(debug, "DEBUG");
+#endif
+#ifdef CONFIG_MARS_DEBUG_MEM
+MODULE_INFO(io, "BAD_PERFORMANCE");
+#endif
+#ifdef CONFIG_MARS_DEBUG_ORDER0
+MODULE_INFO(memory, "EVIL_PERFORMANCE");
+#endif
+
+module_init(init_main);
+module_exit(exit_main);
-- 
2.11.0

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ