lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [day] [month] [year] [list]
Date:	Mon, 23 Jul 2012 13:17:44 -0500
From:	David Teigland <teigland@...hat.com>
To:	linux-kernel@...r.kernel.org
Subject: [PATCH 3/6] dlm: fix race between remove and lookup

It was possible for a remove message on an old
rsb to be sent after a lookup message on a new
rsb, where the rsbs were for the same resource
name.  This could lead to a missing directory
entry for the new rsb.

It is fixed by keeping a copy of the resource
name being removed until after the remove has
been sent.  A lookup checks if this in-progress
remove matches the name it is looking up.

Signed-off-by: David Teigland <teigland@...hat.com>
---
 fs/dlm/dlm_internal.h |   13 ++++
 fs/dlm/lock.c         |  181 +++++++++++++++++++++++++++++++++++++++----------
 fs/dlm/lockspace.c    |   21 +++++-
 3 files changed, 176 insertions(+), 39 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index a5f82d5..9d3e485 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -498,6 +498,13 @@ struct rcom_lock {
 	char			rl_lvb[0];
 };
 
+/*
+ * The max number of resources per rsbtbl bucket that shrink will attempt
+ * to remove in each iteration.
+ */
+
+#define DLM_REMOVE_NAMES_MAX 8
+
 struct dlm_ls {
 	struct list_head	ls_list;	/* list of lockspaces */
 	dlm_lockspace_t		*ls_local_handle;
@@ -531,6 +538,12 @@ struct dlm_ls {
 	int			ls_new_rsb_count;
 	struct list_head	ls_new_rsb;	/* new rsb structs */
 
+	spinlock_t		ls_remove_spin;
+	char			ls_remove_name[DLM_RESNAME_MAXLEN+1];
+	char			*ls_remove_names[DLM_REMOVE_NAMES_MAX];
+	int			ls_remove_len;
+	int			ls_remove_lens[DLM_REMOVE_NAMES_MAX];
+
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index d9ee1b9..c7c6cf9 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 	return error;
 }
 
-/* FIXME: make this more efficient */
+/* If there's an rsb for the same resource being removed, ensure
+   that the remove message is sent before the new lookup message.
+   It should be rare to need a delay here, but if not, then it may
+   be worthwhile to add a proper wait mechanism rather than a delay. */
 
-static int shrink_bucket(struct dlm_ls *ls, int b)
+static void wait_pending_remove(struct dlm_rsb *r)
 {
-	struct rb_node *n;
+	struct dlm_ls *ls = r->res_ls;
+ restart:
+	spin_lock(&ls->ls_remove_spin);
+	if (ls->ls_remove_len &&
+	    !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
+		log_debug(ls, "delay lookup for remove dir %d %s",
+		  	  r->res_dir_nodeid, r->res_name);
+		spin_unlock(&ls->ls_remove_spin);
+		msleep(1);
+		goto restart;
+	}
+	spin_unlock(&ls->ls_remove_spin);
+}
+
+/*
+ * ls_remove_spin protects ls_remove_name and ls_remove_len which are
+ * read by other threads in wait_pending_remove.  ls_remove_names
+ * and ls_remove_lens are only used by the scan thread, so they do
+ * not need protection.
+ */
+
+static void shrink_bucket(struct dlm_ls *ls, int b)
+{
+	struct rb_node *n, *next;
 	struct dlm_rsb *r;
+	char *name;
 	int our_nodeid = dlm_our_nodeid();
-	int count = 0, found;
+	int remote_count = 0;
+	int i, len, rv;
 
-	for (;;) {
-		found = 0;
-		spin_lock(&ls->ls_rsbtbl[b].lock);
-		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-			/* If we're the directory record for this rsb, and
-			   we're not the master of it, then we need to wait
-			   for the master node to send us a dir remove for
-			   before removing the dir record. */
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+		next = rb_next(n);
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
-			if (!dlm_no_directory(ls) && !is_master(r) &&
-			    (dlm_dir_nodeid(r) == our_nodeid)) {
-				continue;
-			}
+		/* If we're the directory record for this rsb, and
+		   we're not the master of it, then we need to wait
+		   for the master node to send us a dir remove for
+		   before removing the dir record. */
 
-			if (!time_after_eq(jiffies, r->res_toss_time +
-					   dlm_config.ci_toss_secs * HZ))
-				continue;
-			found = 1;
-			break;
+		if (!dlm_no_directory(ls) &&
+		    (r->res_master_nodeid != our_nodeid) &&
+		    (dlm_dir_nodeid(r) == our_nodeid)) {
+			continue;
 		}
 
-		if (!found) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
-			break;
+		if (!time_after_eq(jiffies, r->res_toss_time +
+				   dlm_config.ci_toss_secs * HZ)) {
+			continue;
 		}
 
-		if (kref_put(&r->res_ref, kill_rsb)) {
-			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+		if (!dlm_no_directory(ls) &&
+		    (r->res_master_nodeid == our_nodeid) &&
+		    (dlm_dir_nodeid(r) != our_nodeid)) {
 
 			/* We're the master of this rsb but we're not
 			   the directory record, so we need to tell the
 			   dir node to remove the dir record. */
 
-			if (!dlm_no_directory(ls) && is_master(r) &&
-			    (dlm_dir_nodeid(r) != our_nodeid)) {
-				send_remove(r);
-			}
+			ls->ls_remove_lens[remote_count] = r->res_length;
+			memcpy(ls->ls_remove_names[remote_count], r->res_name,
+			       DLM_RESNAME_MAXLEN);
+			remote_count++;
 
-			dlm_free_rsb(r);
-			count++;
-		} else {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			if (remote_count >= DLM_REMOVE_NAMES_MAX)
+				break;
+			continue;
+		}
+
+		if (!kref_put(&r->res_ref, kill_rsb)) {
 			log_error(ls, "tossed rsb in use %s", r->res_name);
+			continue;
 		}
+
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		dlm_free_rsb(r);
 	}
+	spin_unlock(&ls->ls_rsbtbl[b].lock);
 
-	return count;
+	/*
+	 * While searching for rsb's to free, we found some that require
+	 * remote removal.  We leave them in place and find them again here
+	 * so there is a very small gap between removing them from the toss
+	 * list and sending the removal.  Keeping this gap small is
+	 * important to keep us (the master node) from being out of sync
+	 * with the remote dir node for very long.
+	 *
+	 * From the time the rsb is removed from toss until just after
+	 * send_remove, the rsb name is saved in ls_remove_name.  A new
+	 * lookup checks this to ensure that a new lookup message for the
+	 * same resource name is not sent just before the remove message.
+	 */
+
+	for (i = 0; i < remote_count; i++) {
+		name = ls->ls_remove_names[i];
+		len = ls->ls_remove_lens[i];
+
+		spin_lock(&ls->ls_rsbtbl[b].lock);
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+		if (rv) {
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			log_debug(ls, "remove_name not toss %s", name);
+			continue;
+		}
+
+		if (r->res_master_nodeid != our_nodeid) {
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			log_debug(ls, "remove_name master %d dir %d our %d %s",
+				  r->res_master_nodeid, r->res_dir_nodeid,
+				  our_nodeid, name);
+			continue;
+		}
+
+		if (r->res_dir_nodeid == our_nodeid) {
+			/* should never happen */
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			log_error(ls, "remove_name dir %d master %d our %d %s",
+				  r->res_dir_nodeid, r->res_master_nodeid,
+				  our_nodeid, name);
+			continue;
+		}
+
+		if (!time_after_eq(jiffies, r->res_toss_time +
+				   dlm_config.ci_toss_secs * HZ)) {
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			log_debug(ls, "remove_name toss_time %lu now %lu %s",
+				  r->res_toss_time, jiffies, name);
+			continue;
+		}
+
+		if (!kref_put(&r->res_ref, kill_rsb)) {
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			log_error(ls, "remove_name in use %s", name);
+			continue;
+		}
+
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+
+		/* block lookup of same name until we've sent remove */
+		spin_lock(&ls->ls_remove_spin);
+		ls->ls_remove_len = len;
+		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+		spin_unlock(&ls->ls_remove_spin);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+		send_remove(r);
+
+		/* allow lookup of name again */
+		spin_lock(&ls->ls_remove_spin);
+		ls->ls_remove_len = 0;
+		memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+		spin_unlock(&ls->ls_remove_spin);
+
+		dlm_free_rsb(r);
+	}
 }
 
 void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 		return 0;
 	}
 
+	wait_pending_remove(r);
+
 	r->res_first_lkid = lkb->lkb_id;
 	send_lookup(r, lkb);
 	return 1;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d4d3b31..952557d 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -506,6 +506,15 @@ static int new_lockspace(const char *name, const char *cluster,
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
+	spin_lock_init(&ls->ls_remove_spin);
+
+	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
+		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
+						 GFP_KERNEL);
+		if (!ls->ls_remove_names[i])
+			goto out_rsbtbl;
+	}
+
 	idr_init(&ls->ls_lkbidr);
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
@@ -556,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 	if (!ls->ls_recover_buf)
-		goto out_lkbfree;
+		goto out_lkbidr;
 
 	ls->ls_slot = 0;
 	ls->ls_num_slots = 0;
@@ -640,8 +649,13 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_unlock(&lslist_lock);
 	idr_destroy(&ls->ls_recover_idr);
 	kfree(ls->ls_recover_buf);
- out_lkbfree:
+ out_lkbidr:
 	idr_destroy(&ls->ls_lkbidr);
+	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
+		if (ls->ls_remove_names[i])
+			kfree(ls->ls_remove_names[i]);
+	}
+ out_rsbtbl:
 	vfree(ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
@@ -796,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	vfree(ls->ls_rsbtbl);
 
+	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
+		kfree(ls->ls_remove_names[i]);
+
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
-- 
1.7.10.1.362.g242cab3

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ