lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-9-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:10 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
	aahringo@...hat.com,
	teigland@...hat.com,
	Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 09/12] fix: improve lkb struct

---
 fs/dlm/lock.c | 937 +++++++++++++++++++++++++-------------------------
 1 file changed, 472 insertions(+), 465 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f7d3d154e2a9..3ead785d8dbe 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3883,585 +3883,592 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
 	int b, len = r->res_ls->ls_lvblen;
 
-	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
-	our_nodeid = dlm_our_nodeid();
+	/* b=1 lvb returned to caller
+	   b=0 lvb written to rsb or invalidated
+	   b=-1 do nothing */
 
-	len = receive_extralen(ms);
+	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
 
-	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
-				  &ret_nodeid, NULL);
-
-	/* Optimization: we're master so treat lookup as a request */
-	if (!error && ret_nodeid == our_nodeid) {
-		receive_request(ls, ms);
-		return;
-	}
-	send_lookup_reply(ls, ms, ret_nodeid, error);
-}
+	if (b == 1) {
+		if (!lkb->lkb_lvbptr)
+			return;
 
-static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
-{
-	char name[DLM_RESNAME_MAXLEN+1];
-	struct dlm_rsb *r;
-	int rv, len, dir_nodeid, from_nodeid;
+		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+			return;
 
-	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+		if (!r->res_lvbptr)
+			return;
 
-	len = receive_extralen(ms);
+		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
+		lkb->lkb_lvbseq = r->res_lvbseq;
 
-	if (len > DLM_RESNAME_MAXLEN) {
-		log_error(ls, "receive_remove from %d bad len %d",
-			  from_nodeid, len);
-		return;
-	}
+	} else if (b == 0) {
+		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
+			rsb_set_flag(r, RSB_VALNOTVALID);
+			return;
+		}
 
-	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
-	if (dir_nodeid != dlm_our_nodeid()) {
-		log_error(ls, "receive_remove from %d bad nodeid %d",
-			  from_nodeid, dir_nodeid);
-		return;
-	}
+		if (!lkb->lkb_lvbptr)
+			return;
 
-	/*
-	 * Look for inactive rsb, if it's there, free it.
-	 * If the rsb is active, it's being used, and we should ignore this
-	 * message.  This is an expected race between the dir node sending a
-	 * request to the master node at the same time as the master node sends
-	 * a remove to the dir node.  The resolution to that race is for the
-	 * dir node to ignore the remove message, and the master node to
-	 * recreate the master rsb when it gets a request from the dir node for
-	 * an rsb it doesn't have.
-	 */
+		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+			return;
 
-	memset(name, 0, sizeof(name));
-	memcpy(name, ms->m_extra, len);
+		if (!r->res_lvbptr)
+			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
-	rcu_read_lock();
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (rv) {
-		rcu_read_unlock();
-		/* should not happen */
-		log_error(ls, "%s from %d not found %s", __func__,
-			  from_nodeid, name);
-		return;
-	}
+		if (!r->res_lvbptr)
+			return;
 
-	write_lock_bh(&ls->ls_rsbtbl_lock);
-	if (!rsb_flag(r, RSB_HASHED)) {
-		rcu_read_unlock();
-		write_unlock_bh(&ls->ls_rsbtbl_lock);
-		/* should not happen */
-		log_error(ls, "%s from %d got removed during removal %s",
-			  __func__, from_nodeid, name);
-		return;
+		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
+		r->res_lvbseq++;
+		lkb->lkb_lvbseq = r->res_lvbseq;
+		rsb_clear_flag(r, RSB_VALNOTVALID);
 	}
-	/* at this stage the rsb can only being freed here */
-	rcu_read_unlock();
 
-	if (!rsb_flag(r, RSB_INACTIVE)) {
-		if (r->res_master_nodeid != from_nodeid) {
-			/* should not happen */
-			log_error(ls, "receive_remove on active rsb from %d master %d",
-				  from_nodeid, r->res_master_nodeid);
-			dlm_print_rsb(r);
-			write_unlock_bh(&ls->ls_rsbtbl_lock);
-			return;
-		}
+	if (rsb_flag(r, RSB_VALNOTVALID))
+		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
+}
 
-		/* Ignore the remove message, see race comment above. */
+static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	if (lkb->lkb_grmode < DLM_LOCK_PW)
+		return;
 
-		log_debug(ls, "receive_remove from %d master %d first %x %s",
-			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
-			  name);
-		write_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
+		rsb_set_flag(r, RSB_VALNOTVALID);
 		return;
 	}
 
-	if (r->res_master_nodeid != from_nodeid) {
-		log_error(ls, "receive_remove inactive from %d master %d",
-			  from_nodeid, r->res_master_nodeid);
-		dlm_print_rsb(r);
-		write_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (!lkb->lkb_lvbptr)
 		return;
-	}
 
-	list_del(&r->res_slow_list);
-	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-			       dlm_rhash_rsb_params);
-	rsb_clear_flag(r, RSB_HASHED);
-	write_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+		return;
 
-	free_inactive_rsb(r);
-}
+	if (!r->res_lvbptr)
+		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 
-static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
-{
-	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
-}
+	if (!r->res_lvbptr)
+		return;
 
-static int receive_request_reply(struct dlm_ls *ls,
-				 const struct dlm_message *ms)
-{
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error, mstype, result;
-	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
+	r->res_lvbseq++;
+	rsb_clear_flag(r, RSB_VALNOTVALID);
+}
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
+/* lkb is process copy (pc) */
 
-	r = lkb->lkb_resource;
-	hold_rsb(r);
-	lock_rsb(r);
+static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
+			    const struct dlm_message *ms)
+{
+	int b;
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+	if (!lkb->lkb_lvbptr)
+		return;
 
-	mstype = lkb->lkb_wait_type;
-	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
-	if (error) {
-		log_error(ls, "receive_request_reply %x remote %d %x result %d",
-			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
-			  from_dlm_errno(le32_to_cpu(ms->m_result)));
-		dlm_dump_rsb(r);
-		goto out;
-	}
+	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+		return;
 
-	/* Optimization: the dir node was also the master, so it took our
-	   lookup as a request and sent request reply instead of lookup reply */
-	if (mstype == DLM_MSG_LOOKUP) {
-		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
-		lkb->lkb_nodeid = from_nodeid;
+	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+	if (b == 1) {
+		int len = receive_extralen(ms);
+		if (len > r->res_ls->ls_lvblen)
+			len = r->res_ls->ls_lvblen;
+		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
+		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
 	}
+}
 
-	/* this is the value returned from do_request() on the master */
-	result = from_dlm_errno(le32_to_cpu(ms->m_result));
+/* Manipulate lkb's on rsb's convert/granted/waiting queues
+   remove_lock -- used for unlock, removes lkb from granted
+   revert_lock -- used for cancel, moves lkb from convert to granted
+   grant_lock  -- used for request and convert, adds lkb to granted or
+                  moves lkb from convert or waiting to granted
 
-	switch (result) {
-	case -EAGAIN:
-		/* request would block (be queued) on remote master */
-		queue_cast(r, lkb, -EAGAIN);
-		confirm_master(r, -EAGAIN);
-		unhold_lkb(lkb); /* undoes create_lkb() */
-		break;
+   Each of these is used for master or local copy lkb's.  There is
+   also a _pc() variation used to make the corresponding change on
+   a process copy (pc) lkb. */
 
-	case -EINPROGRESS:
-	case 0:
-		/* request was queued or granted on remote master */
-		receive_flags_reply(lkb, ms, false);
-		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
-		if (is_altmode(lkb))
-			munge_altmode(lkb, ms);
-		if (result) {
-			add_lkb(r, lkb, DLM_LKSTS_WAITING);
-		} else {
-			grant_lock_pc(r, lkb, ms);
-			queue_cast(r, lkb, 0);
-		}
-		confirm_master(r, result);
-		break;
+static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	del_lkb(r, lkb);
+	lkb->lkb_grmode = DLM_LOCK_IV;
+	/* this unhold undoes the original ref from create_lkb()
+	   so this leads to the lkb being freed */
+	unhold_lkb(lkb);
+}
 
-	case -EBADR:
-	case -ENOTBLK:
-		/* find_rsb failed to find rsb or rsb wasn't master */
-		log_limit(ls, "receive_request_reply %x from %d %d "
-			  "master %d dir %d first %x %s", lkb->lkb_id,
-			  from_nodeid, result, r->res_master_nodeid,
-			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
-
-		if (r->res_dir_nodeid != dlm_our_nodeid() &&
-		    r->res_master_nodeid != dlm_our_nodeid()) {
-			/* cause _request_lock->set_master->send_lookup */
-			r->res_master_nodeid = 0;
-			r->res_nodeid = -1;
-			lkb->lkb_nodeid = -1;
-		}
+static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	set_lvb_unlock(r, lkb);
+	_remove_lock(r, lkb);
+}
 
-		if (is_overlap(lkb)) {
-			/* we'll ignore error in cancel/unlock reply */
-			queue_cast_overlap(r, lkb);
-			confirm_master(r, result);
-			unhold_lkb(lkb); /* undoes create_lkb() */
-		} else {
-			_request_lock(r, lkb);
+static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	_remove_lock(r, lkb);
+}
 
-			if (r->res_master_nodeid == dlm_our_nodeid())
-				confirm_master(r, 0);
-		}
-		break;
+/* returns: 0 did nothing
+	    1 moved lock to granted
+	   -1 removed lock */
 
-	default:
-		log_error(ls, "receive_request_reply %x error %d",
-			  lkb->lkb_id, result);
-	}
+static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	int rv = 0;
 
-	if ((result == 0 || result == -EINPROGRESS) &&
-	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
-		log_debug(ls, "receive_request_reply %x result %d unlock",
-			  lkb->lkb_id, result);
-		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
-		send_unlock(r, lkb);
-	} else if ((result == -EINPROGRESS) &&
-		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
-				      &lkb->lkb_iflags)) {
-		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
-		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
-		send_cancel(r, lkb);
-	} else {
-		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
-		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
-	}
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-	return 0;
-}
+	lkb->lkb_rqmode = DLM_LOCK_IV;
 
-static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
-				    const struct dlm_message *ms, bool local)
-{
-	/* this is the value returned from do_convert() on the master */
-	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
-	case -EAGAIN:
-		/* convert would block (be queued) on remote master */
-		queue_cast(r, lkb, -EAGAIN);
+	switch (lkb->lkb_status) {
+	case DLM_LKSTS_GRANTED:
 		break;
-
-	case -EDEADLK:
-		receive_flags_reply(lkb, ms, local);
-		revert_lock_pc(r, lkb);
-		queue_cast(r, lkb, -EDEADLK);
+	case DLM_LKSTS_CONVERT:
+		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+		rv = 1;
 		break;
-
-	case -EINPROGRESS:
-		/* convert was queued on remote master */
-		receive_flags_reply(lkb, ms, local);
-		if (is_demoted(lkb))
-			munge_demoted(lkb);
+	case DLM_LKSTS_WAITING:
 		del_lkb(r, lkb);
-		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
-		break;
-
-	case 0:
-		/* convert was granted on remote master */
-		receive_flags_reply(lkb, ms, local);
-		if (is_demoted(lkb))
-			munge_demoted(lkb);
-		grant_lock_pc(r, lkb, ms);
-		queue_cast(r, lkb, 0);
+		lkb->lkb_grmode = DLM_LOCK_IV;
+		/* this unhold undoes the original ref from create_lkb()
+		   so this leads to the lkb being freed */
+		unhold_lkb(lkb);
+		rv = -1;
 		break;
-
 	default:
-		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
-			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
-			  le32_to_cpu(ms->m_lkid),
-			  from_dlm_errno(le32_to_cpu(ms->m_result)));
-		dlm_print_rsb(r);
-		dlm_print_lkb(lkb);
+		log_print("invalid status for revert %d", lkb->lkb_status);
 	}
+	return rv;
 }
 
-static void _receive_convert_reply(struct dlm_lkb *lkb,
-				   const struct dlm_message *ms, bool local)
+static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
-	int error;
-
-	hold_rsb(r);
-	lock_rsb(r);
-
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+	return revert_lock(r, lkb);
+}
 
-	error = remove_from_waiters_ms(lkb, ms, local);
-	if (error)
-		goto out;
+static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
+		lkb->lkb_grmode = lkb->lkb_rqmode;
+		if (lkb->lkb_status)
+			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+		else
+			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
+	}
 
-	__receive_convert_reply(r, lkb, ms, local);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
+	lkb->lkb_rqmode = DLM_LOCK_IV;
+	lkb->lkb_highbast = 0;
 }
 
-static int receive_convert_reply(struct dlm_ls *ls,
-				 const struct dlm_message *ms)
+static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_lkb *lkb;
-	int error;
-
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
+	set_lvb_lock(r, lkb);
+	_grant_lock(r, lkb);
+}
 
-	_receive_convert_reply(lkb, ms, false);
-	dlm_put_lkb(lkb);
-	return 0;
+static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
+			  const struct dlm_message *ms)
+{
+	set_lvb_lock_pc(r, lkb, ms);
+	_grant_lock(r, lkb);
 }
 
-static void _receive_unlock_reply(struct dlm_lkb *lkb,
-				  const struct dlm_message *ms, bool local)
+/* called by grant_pending_locks() which means an async grant message must
+   be sent to the requesting node in addition to granting the lock if the
+   lkb belongs to a remote node. */
+
+static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
-	int error;
+	grant_lock(r, lkb);
+	if (is_master_copy(lkb))
+		send_grant(r, lkb);
+	else
+		queue_cast(r, lkb, 0);
+}
 
-	hold_rsb(r);
-	lock_rsb(r);
+/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
+   change the granted/requested modes.  We're munging things accordingly in
+   the process copy.
+   CONVDEADLK: our grmode may have been forced down to NL to resolve a
+   conversion deadlock
+   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
+   compatible with other granted locks */
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+static void munge_demoted(struct dlm_lkb *lkb)
+{
+	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
+		log_print("munge_demoted %x invalid modes gr %d rq %d",
+			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
+		return;
+	}
 
-	error = remove_from_waiters_ms(lkb, ms, local);
-	if (error)
-		goto out;
+	lkb->lkb_grmode = DLM_LOCK_NL;
+}
 
-	/* this is the value returned from do_unlock() on the master */
+static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
+{
+	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
+	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
+		log_print("munge_altmode %x invalid reply type %d",
+			  lkb->lkb_id, le32_to_cpu(ms->m_type));
+		return;
+	}
 
-	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
-	case -DLM_EUNLOCK:
-		receive_flags_reply(lkb, ms, local);
-		remove_lock_pc(r, lkb);
-		queue_cast(r, lkb, -DLM_EUNLOCK);
-		break;
-	case -ENOENT:
-		break;
-	default:
-		log_error(r->res_ls, "receive_unlock_reply %x error %d",
-			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
+	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
+		lkb->lkb_rqmode = DLM_LOCK_PR;
+	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
+		lkb->lkb_rqmode = DLM_LOCK_CW;
+	else {
+		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
+		dlm_print_lkb(lkb);
 	}
- out:
-	unlock_rsb(r);
-	put_rsb(r);
 }
 
-static int receive_unlock_reply(struct dlm_ls *ls,
-				const struct dlm_message *ms)
+static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
 {
-	struct dlm_lkb *lkb;
-	int error;
-
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
+	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
+					   lkb_statequeue);
+	if (lkb->lkb_id == first->lkb_id)
+		return 1;
 
-	_receive_unlock_reply(lkb, ms, false);
-	dlm_put_lkb(lkb);
 	return 0;
 }
 
-static void _receive_cancel_reply(struct dlm_lkb *lkb,
-				  const struct dlm_message *ms, bool local)
+/* Check if the given lkb conflicts with another lkb on the queue. */
+
+static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
-	int error;
+	struct dlm_lkb *this;
 
-	hold_rsb(r);
-	lock_rsb(r);
+	list_for_each_entry(this, head, lkb_statequeue) {
+		if (this == lkb)
+			continue;
+		if (!modes_compat(this, lkb))
+			return 1;
+	}
+	return 0;
+}
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+/*
+ * "A conversion deadlock arises with a pair of lock requests in the converting
+ * queue for one resource.  The granted mode of each lock blocks the requested
+ * mode of the other lock."
+ *
+ * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
+ * convert queue from being granted, then deadlk/demote lkb.
+ *
+ * Example:
+ * Granted Queue: empty
+ * Convert Queue: NL->EX (first lock)
+ *                PR->EX (second lock)
+ *
+ * The first lock can't be granted because of the granted mode of the second
+ * lock and the second lock can't be granted because it's not first in the
+ * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
+ * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
+ * flag set and return DEMOTED in the lksb flags.
+ *
+ * Originally, this function detected conv-deadlk in a more limited scope:
+ * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
+ * - if lkb1 was the first entry in the queue (not just earlier), and was
+ *   blocked by the granted mode of lkb2, and there was nothing on the
+ *   granted queue preventing lkb1 from being granted immediately, i.e.
+ *   lkb2 was the only thing preventing lkb1 from being granted.
+ *
+ * That second condition meant we'd only say there was conv-deadlk if
+ * resolving it (by demotion) would lead to the first lock on the convert
+ * queue being granted right away.  It allowed conversion deadlocks to exist
+ * between locks on the convert queue while they couldn't be granted anyway.
+ *
+ * Now, we detect and take action on conversion deadlocks immediately when
+ * they're created, even if they may not be immediately consequential.  If
+ * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
+ * mode that would prevent lkb1's conversion from being granted, we do a
+ * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
+ * I think this means that the lkb_is_ahead condition below should always
+ * be zero, i.e. there will never be conv-deadlk between two locks that are
+ * both already on the convert queue.
+ */
 
-	error = remove_from_waiters_ms(lkb, ms, local);
-	if (error)
-		goto out;
+static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
+{
+	struct dlm_lkb *lkb1;
+	int lkb_is_ahead = 0;
 
-	/* this is the value returned from do_cancel() on the master */
+	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
+		if (lkb1 == lkb2) {
+			lkb_is_ahead = 1;
+			continue;
+		}
 
-	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
-	case -DLM_ECANCEL:
-		receive_flags_reply(lkb, ms, local);
-		revert_lock_pc(r, lkb);
-		queue_cast(r, lkb, -DLM_ECANCEL);
-		break;
-	case 0:
-		break;
-	default:
-		log_error(r->res_ls, "receive_cancel_reply %x error %d",
-			  lkb->lkb_id,
-			  from_dlm_errno(le32_to_cpu(ms->m_result)));
+		if (!lkb_is_ahead) {
+			if (!modes_compat(lkb2, lkb1))
+				return 1;
+		} else {
+			if (!modes_compat(lkb2, lkb1) &&
+			    !modes_compat(lkb1, lkb2))
+				return 1;
+		}
 	}
- out:
-	unlock_rsb(r);
-	put_rsb(r);
+	return 0;
 }
 
-static int receive_cancel_reply(struct dlm_ls *ls,
-				const struct dlm_message *ms)
+/*
+ * Return 1 if the lock can be granted, 0 otherwise.
+ * Also detect and resolve conversion deadlocks.
+ *
+ * lkb is the lock to be granted
+ *
+ * now is 1 if the function is being called in the context of the
+ * immediate request, it is 0 if called later, after the lock has been
+ * queued.
+ *
+ * recover is 1 if dlm_recover_grant() is trying to grant conversions
+ * after recovery.
+ *
+ * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
+ */
+
+static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
+			   int recover)
 {
-	struct dlm_lkb *lkb;
-	int error;
+	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
+	/*
+	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
+	 * a new request for a NL mode lock being blocked.
+	 *
+	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
+	 * request, then it would be granted.  In essence, the use of this flag
+	 * tells the Lock Manager to expedite theis request by not considering
+	 * what may be in the CONVERTING or WAITING queues...  As of this
+	 * writing, the EXPEDITE flag can be used only with new requests for NL
+	 * mode locks.  This flag is not valid for conversion requests.
+	 *
+	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
+	 * conversion or used with a non-NL requested mode.  We also know an
+	 * EXPEDITE request is always granted immediately, so now must always
+	 * be 1.  The full condition to grant an expedite request: (now &&
+	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
+	 * therefore be shortened to just checking the flag.
+	 */
 
-	_receive_cancel_reply(lkb, ms, false);
-	dlm_put_lkb(lkb);
-	return 0;
-}
+	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
+		return 1;
 
-static void receive_lookup_reply(struct dlm_ls *ls,
-				 const struct dlm_message *ms)
-{
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error, ret_nodeid;
-	int do_lookup_list = 0;
+	/*
+	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
+	 * added to the remaining conditions.
+	 */
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
-	if (error) {
-		log_error(ls, "%s no lkid %x", __func__,
-			  le32_to_cpu(ms->m_lkid));
-		return;
-	}
+	if (queue_conflict(&r->res_grantqueue, lkb))
+		return 0;
 
-	/* ms->m_result is the value returned by dlm_master_lookup on dir node
-	   FIXME: will a non-zero error ever be returned? */
+	/*
+	 * 6-3: By default, a conversion request is immediately granted if the
+	 * requested mode is compatible with the modes of all other granted
+	 * locks
+	 */
 
-	r = lkb->lkb_resource;
-	hold_rsb(r);
-	lock_rsb(r);
+	if (queue_conflict(&r->res_convertqueue, lkb))
+		return 0;
 
-	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
-	if (error)
-		goto out;
+	/*
+	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
+	 * locks for a recovered rsb, on which lkb's have been rebuilt.
+	 * The lkb's may have been rebuilt on the queues in a different
+	 * order than they were in on the previous master.  So, granting
+	 * queued conversions in order after recovery doesn't make sense
+	 * since the order hasn't been preserved anyway.  The new order
+	 * could also have created a new "in place" conversion deadlock.
+	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
+	 * After recovery, there would be no granted locks, and possibly
+	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
+	 * recovery, grant conversions without considering order.
+	 */
 
-	ret_nodeid = le32_to_cpu(ms->m_nodeid);
+	if (conv && recover)
+		return 1;
 
-	/* We sometimes receive a request from the dir node for this
-	   rsb before we've received the dir node's loookup_reply for it.
-	   The request from the dir node implies we're the master, so we set
-	   ourself as master in receive_request_reply, and verify here that
-	   we are indeed the master. */
+	/*
+	 * 6-5: But the default algorithm for deciding whether to grant or
+	 * queue conversion requests does not by itself guarantee that such
+	 * requests are serviced on a "first come first serve" basis.  This, in
+	 * turn, can lead to a phenomenon known as "indefinate postponement".
+	 *
+	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
+	 * the system service employed to request a lock conversion.  This flag
+	 * forces certain conversion requests to be queued, even if they are
+	 * compatible with the granted modes of other locks on the same
+	 * resource.  Thus, the use of this flag results in conversion requests
+	 * being ordered on a "first come first servce" basis.
+	 *
+	 * DCT: This condition is all about new conversions being able to occur
+	 * "in place" while the lock remains on the granted queue (assuming
+	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
+	 * doesn't _have_ to go onto the convert queue where it's processed in
+	 * order.  The "now" variable is necessary to distinguish converts
+	 * being received and processed for the first time now, because once a
+	 * convert is moved to the conversion queue the condition below applies
+	 * requiring fifo granting.
+	 */
 
-	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
-		/* This should never happen */
-		log_error(ls, "receive_lookup_reply %x from %d ret %d "
-			  "master %d dir %d our %d first %x %s",
-			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
-			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
-			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
-	}
+	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
+		return 1;
 
-	if (ret_nodeid == dlm_our_nodeid()) {
-		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = 0;
-		do_lookup_list = 1;
-		r->res_first_lkid = 0;
-	} else if (ret_nodeid == -1) {
-		/* the remote node doesn't believe it's the dir node */
-		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
-			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
-		r->res_master_nodeid = 0;
-		r->res_nodeid = -1;
-		lkb->lkb_nodeid = -1;
-	} else {
-		/* set_master() will set lkb_nodeid from r */
-		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = ret_nodeid;
-	}
+	/*
+	 * Even if the convert is compat with all granted locks,
+	 * QUECVT forces it behind other locks on the convert queue.
+	 */
 
-	if (is_overlap(lkb)) {
-		log_debug(ls, "receive_lookup_reply %x unlock %x",
-			  lkb->lkb_id, dlm_iflags_val(lkb));
-		queue_cast_overlap(r, lkb);
-		unhold_lkb(lkb); /* undoes create_lkb() */
-		goto out_list;
+	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
+		if (list_empty(&r->res_convertqueue))
+			return 1;
+		else
+			return 0;
 	}
 
-	_request_lock(r, lkb);
+	/*
+	 * The NOORDER flag is set to avoid the standard vms rules on grant
+	 * order.
+	 */
 
- out_list:
-	if (do_lookup_list)
-		process_lookup_list(r);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-}
+	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
+		return 1;
 
-static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
-			     uint32_t saved_seq)
-{
-	int error = 0, noent = 0;
+	/*
+	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
+	 * granted until all other conversion requests ahead of it are granted
+	 * and/or canceled.
+	 */
 
-	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
-		log_limit(ls, "receive %d from non-member %d %x %x %d",
-			  le32_to_cpu(ms->m_type),
-			  le32_to_cpu(ms->m_header.h_nodeid),
-			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
-			  from_dlm_errno(le32_to_cpu(ms->m_result)));
-		return;
-	}
+	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
+		return 1;
 
-	switch (ms->m_type) {
+	/*
+	 * 6-4: By default, a new request is immediately granted only if all
+	 * three of the following conditions are satisfied when the request is
+	 * issued:
+	 * - The queue of ungranted conversion requests for the resource is
+	 *   empty.
+	 * - The queue of ungranted new requests for the resource is empty.
+	 * - The mode of the new request is compatible with the most
+	 *   restrictive mode of all granted locks on the resource.
+	 */
 
-	/* messages sent to a master node */
+	if (now && !conv && list_empty(&r->res_convertqueue) &&
+	    list_empty(&r->res_waitqueue))
+		return 1;
 
-	case cpu_to_le32(DLM_MSG_REQUEST):
-		error = receive_request(ls, ms);
-		break;
+	/*
+	 * 6-4: Once a lock request is in the queue of ungranted new requests,
+	 * it cannot be granted until the queue of ungranted conversion
+	 * requests is empty, all ungranted new requests ahead of it are
+	 * granted and/or canceled, and it is compatible with the granted mode
+	 * of the most restrictive lock granted on the resource.
+	 */
 
-	case cpu_to_le32(DLM_MSG_CONVERT):
-		error = receive_convert(ls, ms);
-		break;
+	if (!now && !conv && list_empty(&r->res_convertqueue) &&
+	    first_in_list(lkb, &r->res_waitqueue))
+		return 1;
 
-	case cpu_to_le32(DLM_MSG_UNLOCK):
-		error = receive_unlock(ls, ms);
-		break;
+	return 0;
+}
 
-	case cpu_to_le32(DLM_MSG_CANCEL):
-		noent = 1;
-		error = receive_cancel(ls, ms);
-		break;
+static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
+			  int recover, int *err)
+{
+	int rv;
+	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
+	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
 
-	/* messages sent from a master node (replies to above) */
+	if (err)
+		*err = 0;
 
-	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
-		error = receive_request_reply(ls, ms);
-		break;
+	rv = _can_be_granted(r, lkb, now, recover);
+	if (rv)
+		goto out;
 
-	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
-		error = receive_convert_reply(ls, ms);
-		break;
+	/*
+	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
+	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
+	 * cancels one of the locks.
+	 */
 
-	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
-		error = receive_unlock_reply(ls, ms);
-		break;
+	if (is_convert && can_be_queued(lkb) &&
+	    conversion_deadlock_detect(r, lkb)) {
+		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
+			lkb->lkb_grmode = DLM_LOCK_NL;
+			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
+		} else if (err) {
+			*err = -EDEADLK;
+		} else {
+			log_print("can_be_granted deadlock %x now %d",
+				  lkb->lkb_id, now);
+			dlm_dump_rsb(r);
+		}
+		goto out;
+	}
 
-	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
-		error = receive_cancel_reply(ls, ms);
-		break;
+	/*
+	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
+	 * to grant a request in a mode other than the normal rqmode.  It's a
+	 * simple way to provide a big optimization to applications that can
+	 * use them.
+	 */
 
-	/* messages sent from a master node (only two types of async msg) */
+	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
+		alt = DLM_LOCK_PR;
+	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
+		alt = DLM_LOCK_CW;
 
-	case cpu_to_le32(DLM_MSG_GRANT):
-		noent = 1;
-		error = receive_grant(ls, ms);
-		break;
+	if (alt) {
+		lkb->lkb_rqmode = alt;
+		rv = _can_be_granted(r, lkb, now, 0);
+		if (rv)
+			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
+		else
+			lkb->lkb_rqmode = rqmode;
+	}
+ out:
+	return rv;
+}
 
-	case cpu_to_le32(DLM_MSG_BAST):
-		noent = 1;
-		error = receive_bast(ls, ms);
-		break;
+/* Returns the highest requested mode of all blocked conversions; sets
+   cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-	/* messages sent to a dir node */
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+				 unsigned int *count)
+{
+	struct dlm_lkb *lkb, *s;
+	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
+	int hi, demoted, quit, grant_restart, demote_restart;
+	int deadlk;
 
-	case cpu_to_le32(DLM_MSG_LOOKUP):
-		receive_lookup(ls, ms);
-		break;
+	quit = 0;
+ restart:
+	grant_restart = 0;
+	demote_restart = 0;
+	hi = DLM_LOCK_IV;
 
-	case cpu_to_le32(DLM_MSG_REMOVE):
-		receive_remove(ls, ms);
-		break;
+	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
+		demoted = is_demoted(lkb);
+		deadlk = 0;
 
-	/* messages sent from a dir node (remove has no reply) */
+		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
+			grant_lock_pending(r, lkb);
+			grant_restart = 1;
+			if (count)
+				(*count)++;
+			continue;
+		}
+
+		if (!demoted && is_demoted(lkb)) {
+			log_print("WARN: pending demoted %x node %d %s",
+				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
+			demote_restart = 1;
+			continue;
+		}
 
 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
 		receive_lookup_reply(ls, ms);
-- 
2.48.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ