lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-3-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:04 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
	aahringo@...hat.com,
	teigland@...hat.com,
	Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 03/12] refactor: unify active/inactive logic in find_rsb_nodir

---
 fs/dlm/lock.c | 355 ++++++++++++++++++++------------------------------
 1 file changed, 141 insertions(+), 214 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 0165b3b7e36a..c62ec235c047 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1068,269 +1068,196 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
 	write_unlock_bh(&ls->ls_rsbtbl_lock);
-
  out:
 	*r_ret = r;
 	return error;
 }
 
-/*
- * rsb rcu usage
- *
- * While rcu read lock is held, the rsb cannot be freed,
- * which allows a lookup optimization.
- *
- * Two threads are accessing the same rsb concurrently,
- * the first (A) is trying to use the rsb, the second (B)
- * is trying to free the rsb.
- *
- * thread A                 thread B
- * (trying to use rsb)      (trying to free rsb)
- *
- * A1. rcu read lock
- * A2. rsbtbl read lock
- * A3. look up rsb in rsbtbl
- * A4. rsbtbl read unlock
- *                          B1. rsbtbl write lock
- *                          B2. look up rsb in rsbtbl
- *                          B3. remove rsb from rsbtbl
- *                          B4. clear rsb HASHED flag
- *                          B5. rsbtbl write unlock
- *                          B6. begin freeing rsb using rcu...
- *
- * (rsb is inactive, so try to make it active again)
- * A5. read rsb HASHED flag (safe because rsb is not freed yet)
- * A6. the rsb HASHED flag is not set, which it means the rsb
- *     is being removed from rsbtbl and freed, so don't use it.
- * A7. rcu read unlock
- *
- *                          B7. ...finish freeing rsb using rcu
- * A8. create a new rsb
- *
- * Without the rcu optimization, steps A5-8 would need to do
- * an extra rsbtbl lookup:
- * A5. rsbtbl write lock
- * A6. look up rsb in rsbtbl, not found
- * A7. rsbtbl write unlock
- * A8. create a new rsb
- */
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
 
-static int find_rsb(struct dlm_ls *ls, const void *name, int len,
-		    int from_nodeid, unsigned int flags,
-		    struct dlm_rsb **r_ret)
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
 {
-	int dir_nodeid;
-	uint32_t hash;
-	int rv;
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
 
-	if (len > DLM_RESNAME_MAXLEN)
-		return -EINVAL;
+ retry:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (error)
+		goto do_new;
 
-	hash = jhash(name, len, 0);
-	dir_nodeid = dlm_hash2nodeid(ls, hash);
+	/* check if the rsb is in active state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	rcu_read_lock();
-	if (dlm_no_directory(ls))
-		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
-				      from_nodeid, flags, r_ret);
-	else
-		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
-				    from_nodeid, flags, r_ret);
-	rcu_read_unlock();
-	return rv;
-}
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
+	}
 
-/* we have received a request and found that res_master_nodeid != our_nodeid,
-   so we need to return an error or make ourself the master */
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
-static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
-				  int from_nodeid)
-{
-	if (dlm_no_directory(ls)) {
-		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
-			  from_nodeid, r->res_master_nodeid,
-			  r->res_dir_nodeid);
-		dlm_print_rsb(r);
-		return -ENOTBLK;
-	}
+	kref_get(&r->res_ref);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+
+	goto out;
 
-	if (from_nodeid != r->res_dir_nodeid) {
-		/* our rsb is not master, and another node (not the dir node)
-	   	   has sent us a request.  this is much more common when our
-	   	   master_nodeid is zero, so limit debug to non-zero.  */
 
-		if (r->res_master_nodeid) {
-			log_debug(ls, "validate master from_other %d master %d "
-				  "dir %d first %x %s", from_nodeid,
-				  r->res_master_nodeid, r->res_dir_nodeid,
-				  r->res_first_lkid, r->res_name);
+ do_inactive:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* See comment in find_rsb_dir. */
+	if (rsb_flag(r, RSB_HASHED)) {
+		if (!rsb_flag(r, RSB_INACTIVE)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry;
 		}
-		return -ENOTBLK;
 	} else {
-		/* our rsb is not master, but the dir nodeid has sent us a
-	   	   request; this could happen with master 0 / res_nodeid -1 */
-
-		if (r->res_master_nodeid) {
-			log_error(ls, "validate master from_dir %d master %d "
-				  "first %x %s",
-				  from_nodeid, r->res_master_nodeid,
-				  r->res_first_lkid, r->res_name);
-		}
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
+
+
+	/*
+	 * rsb found inactive. No other thread is using this rsb because
+	 * it's inactive, so we can look at or update res_master_nodeid
+	 * without lock_rsb.
+	 */
+
+	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+		/* our rsb is not master, and another node has sent us a
+		   request; this should never happen */
+		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -ENOTBLK;
+		goto out;
+	}
 
-		r->res_master_nodeid = dlm_our_nodeid();
+	if (!recover && (r->res_master_nodeid != our_nodeid) &&
+	    (dir_nodeid == our_nodeid)) {
+		/* our rsb is not master, and we are dir; may as well fix it;
+		   this should never happen */
+		log_error(ls, "find_rsb inactive our %d master %d dir %d",
+			  our_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		r->res_master_nodeid = our_nodeid;
 		r->res_nodeid = 0;
-		return 0;
 	}
-}
 
-static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
-				int from_nodeid, bool is_inactive, unsigned int flags,
-				int *r_nodeid, int *result)
-{
-	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
-	int from_master = (flags & DLM_LU_RECOVER_DIR);
+	del_scan(ls, r);
+	list_move(&r->res_slow_list, &ls->ls_slow_active);
+	rsb_clear_flag(r, RSB_INACTIVE);
+	kref_init(&r->res_ref); /* ref is now used in active state */
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	if (r->res_dir_nodeid != our_nodeid) {
-		/* should not happen, but may as well fix it and carry on */
-		log_error(ls, "%s res_dir %d our %d %s", __func__,
-			  r->res_dir_nodeid, our_nodeid, r->res_name);
-		r->res_dir_nodeid = our_nodeid;
-	}
+	goto out;
 
-	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
-		/* Recovery uses this function to set a new master when
-		 * the previous master failed.  Setting NEW_MASTER will
-		 * force dlm_recover_masters to call recover_master on this
-		 * rsb even though the res_nodeid is no longer removed.
-		 */
 
-		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
-		rsb_set_flag(r, RSB_NEW_MASTER);
+ do_new:
+	/*
+	 * rsb not found
+	 */
 
-		if (is_inactive) {
-			/* I don't think we should ever find it inactive. */
-			log_error(ls, "%s fix_master inactive", __func__);
-			dlm_dump_rsb(r);
-		}
-	}
+	if (error == -EBADR && !create)
+		goto out;
 
-	if (from_master && (r->res_master_nodeid != from_nodeid)) {
-		/* this will happen if from_nodeid became master during
-		 * a previous recovery cycle, and we aborted the previous
-		 * cycle before recovering this master value
-		 */
+	error = get_rsb_struct(ls, name, len, &r);
+	if (WARN_ON_ONCE(error))
+		goto out;
 
-		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
-			  __func__, from_nodeid, r->res_master_nodeid,
-			  r->res_nodeid, r->res_first_lkid, r->res_name);
+	r->res_hash = hash;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
 
-		if (r->res_master_nodeid == our_nodeid) {
-			log_error(ls, "from_master %d our_master", from_nodeid);
-			dlm_dump_rsb(r);
-			goto ret_assign;
-		}
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
+	}
 
-		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
-		rsb_set_flag(r, RSB_NEW_MASTER);
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		r = NULL;
+		error = -ENOTBLK;
+		goto out;
 	}
 
-	if (!r->res_master_nodeid) {
-		/* this will happen if recovery happens while we're looking
-		 * up the master for this rsb
-		 */
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
+	}
 
-		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
-			  from_nodeid, r->res_first_lkid, r->res_name);
-		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	} else {
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
 	}
 
-	if (!from_master && !fix_master &&
-	    (r->res_master_nodeid == from_nodeid)) {
-		/* this can happen when the master sends remove, the dir node
-		 * finds the rsb on the active list and ignores the remove,
-		 * and the former master sends a lookup
-		 */
+ out_add:
 
-		log_limit(ls, "%s from master %d flags %x first %x %s",
-			  __func__, from_nodeid, flags, r->res_first_lkid,
-			  r->res_name);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
-
- ret_assign:
-	*r_nodeid = r->res_master_nodeid;
-	if (result)
-		*result = DLM_LU_MATCH;
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+	*r_ret = r;
+	return error;
 }
 
-/*
- * We're the dir node for this res and another node wants to know the
- * master nodeid.  During normal operation (non recovery) this is only
- * called from receive_lookup(); master lookups when the local node is
- * the dir node are done by find_rsb().
- *
- * normal operation, we are the dir node for a resource
- * . _request_lock
- * . set_master
- * . send_lookup
- * . receive_lookup
- * . dlm_master_lookup flags 0
- *
- * recover directory, we are rebuilding dir for all resources
- * . dlm_recover_directory
- * . dlm_rcom_names
- *   remote node sends back the rsb names it is master of and we are dir of
- * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
- *   we either create new rsb setting remote node as master, or find existing
- *   rsb and set master to be the remote node.
- *
- * recover masters, we are finding the new master for resources
- * . dlm_recover_masters
- * . recover_master
- * . dlm_send_rcom_lookup
- * . receive_rcom_lookup
- * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
- */
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
 
-static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-			      int len, unsigned int flags, int *r_nodeid, int *result)
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash;
 	int our_nodeid = dlm_our_nodeid();
-	int dir_nodeid, error;
-
-	if (len > DLM_RESNAME_MAXLEN)
-		return -EINVAL;
-
-	if (from_nodeid == our_nodeid) {
-		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
-			  our_nodeid, flags);
-		return -EINVAL;
-	}
-
-	hash = jhash(name, len, 0);
-	dir_nodeid = dlm_hash2nodeid(ls, hash);
-	if (dir_nodeid != our_nodeid) {
-		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
-			  from_nodeid, dir_nodeid, our_nodeid, hash,
-			  ls->ls_num_nodes);
-		*r_nodeid = -1;
-		return -EINVAL;
-	}
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
 
  retry:
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (error)
-		goto not_found;
+		goto do_new;
 
-	/* check if the rsb is active under read lock - likely path */
+	/* check if the rsb is in active state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	if (!rsb_flag(r, RSB_HASHED)) {
 		read_unlock_bh(&ls->ls_rsbtbl_lock);
-		goto not_found;
+		goto do_new;
 	}
 
 	if (rsb_flag(r, RSB_INACTIVE)) {
-- 
2.48.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ