lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-12-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:13 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
	aahringo@...hat.com,
	teigland@...hat.com,
	Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 12/12] fix: imrpove l/c error

---
 fs/dlm/lock.c | 635 +++++++++++++++++++++++++++++++++++---------------
 1 file changed, 442 insertions(+), 193 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 6b0ab9e1bfcc..4dfde93f4056 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5513,264 +5513,513 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
 			}
 
-int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
-		    uint32_t flags, uint32_t lkid, char *lvb_in)
-{
-	struct dlm_lkb *lkb;
-	struct dlm_args args;
-	struct dlm_user_args *ua;
-	int error;
+			del_lkb(r, lkb);
 
-	dlm_lock_recovery(ls);
+			/* this put should free the lkb */
+			if (!dlm_put_lkb(lkb))
+				log_error(ls, "purged dead lkb not released");
 
-	error = find_lkb(ls, lkid, &lkb);
-	if (error)
-		goto out;
+			rsb_set_flag(r, RSB_RECOVER_GRANT);
 
-	trace_dlm_unlock_start(ls, lkb, flags);
+			(*count)++;
+		}
+	}
+}
 
-	ua = lkb->lkb_ua;
+/* Get rid of locks held by nodes that are gone. */
 
-	if (lvb_in && ua->lksb.sb_lvbptr)
-		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
-	if (ua_tmp->castparam)
-		ua->castparam = ua_tmp->castparam;
-	ua->user_lksb = ua_tmp->user_lksb;
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
+{
+	struct dlm_rsb *r;
+	struct dlm_member *memb;
+	int nodes_count = 0;
+	int nodeid_gone = 0;
+	unsigned int lkb_count = 0;
 
-	error = set_unlock_args(flags, ua, &args);
-	if (error)
-		goto out_put;
+	/* cache one removed nodeid to optimize the common
+	   case of a single node removed */
 
-	error = unlock_lock(ls, lkb, &args);
+	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+		nodes_count++;
+		nodeid_gone = memb->nodeid;
+	}
 
-	if (error == -DLM_EUNLOCK)
-		error = 0;
-	/* from validate_unlock_args() */
-	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
-		error = 0;
-	if (error)
-		goto out_put;
+	if (!nodes_count)
+		return;
 
-	spin_lock_bh(&ua->proc->locks_spin);
-	/* dlm_user_add_cb() may have already taken lkb off the proc list */
-	if (!list_empty(&lkb->lkb_ownqueue))
-		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
-	spin_unlock_bh(&ua->proc->locks_spin);
- out_put:
-	trace_dlm_unlock_end(ls, lkb, flags, error);
-	dlm_put_lkb(lkb);
- out:
-	dlm_unlock_recovery(ls);
-	kfree(ua_tmp);
-	return error;
+	list_for_each_entry(r, root_list, res_root_list) {
+		lock_rsb(r);
+		if (r->res_nodeid != -1 && is_master(r)) {
+			purge_dead_list(ls, r, &r->res_grantqueue,
+					nodeid_gone, &lkb_count);
+			purge_dead_list(ls, r, &r->res_convertqueue,
+					nodeid_gone, &lkb_count);
+			purge_dead_list(ls, r, &r->res_waitqueue,
+					nodeid_gone, &lkb_count);
+		}
+		unlock_rsb(r);
+
+		cond_resched();
+	}
+
+	if (lkb_count)
+		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
+			  lkb_count, nodes_count);
 }
 
-int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
-		    uint32_t flags, uint32_t lkid)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 {
-	struct dlm_lkb *lkb;
-	struct dlm_args args;
-	struct dlm_user_args *ua;
-	int error;
+	struct dlm_rsb *r;
 
-	dlm_lock_recovery(ls);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
+		if (!rsb_flag(r, RSB_RECOVER_GRANT))
+			continue;
+		if (!is_master(r)) {
+			rsb_clear_flag(r, RSB_RECOVER_GRANT);
+			continue;
+		}
+		hold_rsb(r);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		return r;
+	}
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	return NULL;
+}
 
-	error = find_lkb(ls, lkid, &lkb);
-	if (error)
-		goto out;
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted.  The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock.  This would make iterating through all
+ * rsb's very inefficient.  So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
 
-	trace_dlm_unlock_start(ls, lkb, flags);
+void dlm_recover_grant(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	unsigned int count = 0;
+	unsigned int rsb_count = 0;
+	unsigned int lkb_count = 0;
+
+	while (1) {
+		r = find_grant_rsb(ls);
+		if (!r)
+			break;
 
-	ua = lkb->lkb_ua;
-	if (ua_tmp->castparam)
-		ua->castparam = ua_tmp->castparam;
-	ua->user_lksb = ua_tmp->user_lksb;
+		rsb_count++;
+		count = 0;
+		lock_rsb(r);
+		/* the RECOVER_GRANT flag is checked in the grant path */
+		grant_pending_locks(r, &count);
+		rsb_clear_flag(r, RSB_RECOVER_GRANT);
+		lkb_count += count;
+		confirm_master(r, 0);
+		unlock_rsb(r);
+		put_rsb(r);
+		cond_resched();
+	}
 
-	error = set_unlock_args(flags, ua, &args);
-	if (error)
-		goto out_put;
+	if (lkb_count)
+		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
+			  lkb_count, rsb_count);
+}
 
-	error = cancel_lock(ls, lkb, &args);
+static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
+					 uint32_t remid)
+{
+	struct dlm_lkb *lkb;
 
-	if (error == -DLM_ECANCEL)
-		error = 0;
-	/* from validate_unlock_args() */
-	if (error == -EBUSY)
-		error = 0;
- out_put:
-	trace_dlm_unlock_end(ls, lkb, flags, error);
-	dlm_put_lkb(lkb);
- out:
-	dlm_unlock_recovery(ls);
-	kfree(ua_tmp);
-	return error;
+	list_for_each_entry(lkb, head, lkb_statequeue) {
+		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
+			return lkb;
+	}
+	return NULL;
 }
 
-int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
+static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
+				    uint32_t remid)
 {
 	struct dlm_lkb *lkb;
-	struct dlm_args args;
-	struct dlm_user_args *ua;
-	struct dlm_rsb *r;
-	int error;
 
-	dlm_lock_recovery(ls);
+	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
+	if (lkb)
+		return lkb;
+	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
+	if (lkb)
+		return lkb;
+	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
+	if (lkb)
+		return lkb;
+	return NULL;
+}
+
+/* needs at least dlm_rcom + rcom_lock */
+static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
+				  struct dlm_rsb *r, const struct dlm_rcom *rc)
+{
+	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
+
+	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
+	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
+	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
+	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
+	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
+	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
+	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
+	lkb->lkb_rqmode = rl->rl_rqmode;
+	lkb->lkb_grmode = rl->rl_grmode;
+	/* don't set lkb_status because add_lkb wants to itself */
+
+	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
+	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
+
+	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
+			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
+		if (lvblen > ls->ls_lvblen)
+			return -EINVAL;
+		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
+		if (!lkb->lkb_lvbptr)
+			return -ENOMEM;
+		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
+	}
 
-	error = find_lkb(ls, lkid, &lkb);
-	if (error)
-		goto out;
+	/* Conversions between PR and CW (middle modes) need special handling.
+	   The real granted mode of these converting locks cannot be determined
+	   until all locks have been rebuilt on the rsb (recover_conversion) */
 
-	trace_dlm_unlock_start(ls, lkb, flags);
+	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
+		/* We may need to adjust grmode depending on other granted locks. */
+		log_limit(ls, "%s %x middle convert in progress", __func__,
+			 lkb->lkb_id);
 
-	ua = lkb->lkb_ua;
+		/* We sent this lock to the new master. The new master will
+		 * tell us when it's granted.  We no longer need a reply, so
+		 * use a fake reply to put the lkb into the right state.
+		 */
+		hold_lkb(lkb);
+		memset(ms_local, 0, sizeof(struct dlm_message));
+		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
+		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
+		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+		_receive_convert_reply(lkb, ms_local, true);
+		unhold_lkb(lkb);
 
-	error = set_unlock_args(flags, ua, &args);
-	if (error)
-		goto out_put;
+	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
+		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+	}
 
-	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
+	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
+	   conversions are async; there's no reply from the remote master */
+}
 
-	r = lkb->lkb_resource;
-	hold_rsb(r);
-	lock_rsb(r);
+/* A waiting lkb needs recovery if the master node has failed, or
+   the master node is changing (only when no directory is used) */
 
-	error = validate_unlock_args(lkb, &args);
-	if (error)
-		goto out_r;
-	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+				 int dir_nodeid)
+{
+	if (dlm_no_directory(ls))
+		return 1;
 
-	error = _cancel_lock(r, lkb);
- out_r:
-	unlock_rsb(r);
-	put_rsb(r);
+	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
+		return 1;
 
-	if (error == -DLM_ECANCEL)
-		error = 0;
-	/* from validate_unlock_args() */
-	if (error == -EBUSY)
-		error = 0;
- out_put:
-	trace_dlm_unlock_end(ls, lkb, flags, error);
-	dlm_put_lkb(lkb);
- out:
-	dlm_unlock_recovery(ls);
-	return error;
+	return 0;
 }
 
-/* lkb's that are removed from the waiters list by revert are just left on the
-   orphans list with the granted orphan locks, to be freed by purge */
+/* Recovery for locks that are waiting for replies from nodes that are now
+   gone.  We can just complete unlocks and cancels by faking a reply from the
+   dead node.  Requests and up-conversions we flag to be resent after
+   recovery.  Down-conversions can just be completed with a fake reply like
+   unlocks.  Conversions between PR and CW need special attention. */
 
-static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_recover_waiters_pre(struct dlm_ls *ls)
 {
-	struct dlm_args args;
-	int error;
+	struct dlm_lkb *lkb, *safe;
+	struct dlm_message *ms_local;
+	int wait_type, local_unlock_result, local_cancel_result;
+	int dir_nodeid;
 
-	hold_lkb(lkb); /* reference for the ls_orphans list */
-	spin_lock_bh(&ls->ls_orphans_lock);
-	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
-	spin_unlock_bh(&ls->ls_orphans_lock);
+	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
+	if (!ms_local)
+		return;
 
-	set_unlock_args(0, lkb->lkb_ua, &args);
+	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
-	error = cancel_lock(ls, lkb, &args);
-	if (error == -DLM_ECANCEL)
-		error = 0;
-	return error;
-}
+		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
 
-/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
-   granted.  Regardless of what rsb queue the lock is on, it's removed and
-   freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
-   if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
+		/* exclude debug messages about unlocks because there can be so
+		   many and they aren't very interesting */
 
-static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-	struct dlm_args args;
-	int error;
+		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
+			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+				  lkb->lkb_id,
+				  lkb->lkb_remid,
+				  lkb->lkb_wait_type,
+				  lkb->lkb_resource->res_nodeid,
+				  lkb->lkb_nodeid,
+				  lkb->lkb_wait_nodeid,
+				  dir_nodeid);
+		}
 
-	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
-			lkb->lkb_ua, &args);
+		/* all outstanding lookups, regardless of destination  will be
+		   resent after recovery is done */
 
-	error = unlock_lock(ls, lkb, &args);
-	if (error == -DLM_EUNLOCK)
-		error = 0;
-	return error;
-}
+		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
+			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+			continue;
+		}
 
-/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
-   (which does lock_rsb) due to deadlock with receiving a message that does
-   lock_rsb followed by dlm_user_add_cb() */
+		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
+			continue;
 
-static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
-				     struct dlm_user_proc *proc)
-{
-	struct dlm_lkb *lkb = NULL;
+		wait_type = lkb->lkb_wait_type;
+		local_unlock_result = -DLM_EUNLOCK;
+		local_cancel_result = -DLM_ECANCEL;
 
-	spin_lock_bh(&ls->ls_clear_proc_locks);
-	if (list_empty(&proc->locks))
-		goto out;
+		/* Main reply may have been received leaving a zero wait_type,
+		   but a reply for the overlapping op may not have been
+		   received.  In that case we need to fake the appropriate
+		   reply for the overlap op. */
 
-	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
-	list_del_init(&lkb->lkb_ownqueue);
+		if (!wait_type) {
+			if (is_overlap_cancel(lkb)) {
+				wait_type = DLM_MSG_CANCEL;
+				if (lkb->lkb_grmode == DLM_LOCK_IV)
+					local_cancel_result = 0;
+			}
+			if (is_overlap_unlock(lkb)) {
+				wait_type = DLM_MSG_UNLOCK;
+				if (lkb->lkb_grmode == DLM_LOCK_IV)
+					local_unlock_result = -ENOENT;
+			}
 
-	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
-		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
-	else
-		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
- out:
-	spin_unlock_bh(&ls->ls_clear_proc_locks);
-	return lkb;
-}
+			log_debug(ls, "rwpre overlap %x %x %d %d %d",
+				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
+				  local_cancel_result, local_unlock_result);
+		}
 
-/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
-   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
-   which we clear here. */
+		switch (wait_type) {
 
-/* proc CLOSING flag is set so no more device_reads should look at proc->asts
-   list, and no more device_writes should add lkb's to proc->locks list; so we
-   shouldn't need to take asts_spin or locks_spin here.  this assumes that
-   device reads/writes/closes are serialized -- FIXME: we may need to serialize
-   them ourself. */
+		case DLM_MSG_REQUEST:
+			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+			break;
+
+		case DLM_MSG_CONVERT:
+			recover_convert_waiter(ls, lkb, ms_local);
+			break;
+
+		case DLM_MSG_UNLOCK:
+			hold_lkb(lkb);
+			memset(ms_local, 0, sizeof(struct dlm_message));
+			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
+			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
+			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			_receive_unlock_reply(lkb, ms_local, true);
+			dlm_put_lkb(lkb);
+			break;
+
+		case DLM_MSG_CANCEL:
+			hold_lkb(lkb);
+			memset(ms_local, 0, sizeof(struct dlm_message));
+			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
+			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
+			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			_receive_cancel_reply(lkb, ms_local, true);
+			dlm_put_lkb(lkb);
+			break;
+
+		default:
+			log_error(ls, "invalid lkb wait_type %d %d",
+				  lkb->lkb_wait_type, wait_type);
+		}
+		schedule();
+	}
+	kfree(ms_local);
+}
 
-void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
-	struct dlm_callback *cb, *cb_safe;
-	struct dlm_lkb *lkb, *safe;
+	struct dlm_lkb *lkb = NULL, *iter;
 
-	dlm_lock_recovery(ls);
+	spin_lock_bh(&ls->ls_waiters_lock);
+	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
+		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
+			hold_lkb(iter);
+			lkb = iter;
+			break;
+		}
+	}
+	spin_unlock_bh(&ls->ls_waiters_lock);
+
+	return lkb;
+}
+
+/*
+ * Forced state reset for locks that were in the middle of remote operations
+ * when recovery happened (i.e. lkbs that were on the waiters list, waiting
+ * for a reply from a remote operation.)  The lkbs remaining on the waiters
+ * list need to be reevaluated; some may need resending to a different node
+ * than previously, and some may now need local handling rather than remote.
+ *
+ * First, the lkb state for the voided remote operation is forcibly reset,
+ * equivalent to what remove_from_waiters() would normally do:
+ * . lkb removed from ls_waiters list
+ * . lkb wait_type cleared
+ * . lkb waiters_count cleared
+ * . lkb ref count decremented for each waiters_count (almost always 1,
+ *   but possibly 2 in case of cancel/unlock overlapping, which means
+ *   two remote replies were being expected for the lkb.)
+ *
+ * Second, the lkb is reprocessed like an original operation would be,
+ * by passing it to _request_lock or _convert_lock, which will either
+ * process the lkb operation locally, or send it to a remote node again
+ * and put the lkb back onto the waiters list.
+ *
+ * When reprocessing the lkb, we may find that it's flagged for an overlapping
+ * force-unlock or cancel, either from before recovery began, or after recovery
+ * finished.  If this is the case, the unlock/cancel is done directly, and the
+ * original operation is not initiated again (no _request_lock/_convert_lock.)
+ */
+
+int dlm_recover_waiters_post(struct dlm_ls *ls)
+{
+	struct dlm_lkb *lkb;
+	struct dlm_rsb *r;
+	int error = 0, mstype, err, oc, ou;
 
 	while (1) {
-		lkb = del_proc_lock(ls, proc);
+		if (dlm_locking_stopped(ls)) {
+			log_debug(ls, "recover_waiters_post aborted");
+			error = -EINTR;
+			break;
+		}
+
+		/* 
+		 * Find an lkb from the waiters list that's been affected by
+		 * recovery node changes, and needs to be reprocessed.  Does
+		 * hold_lkb(), adding a refcount.
+		 */
+		lkb = find_resend_waiter(ls);
 		if (!lkb)
 			break;
-		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
-			orphan_proc_lock(ls, lkb);
-		else
-			unlock_proc_lock(ls, lkb);
 
-		/* this removes the reference for the proc->locks list
-		   added by dlm_user_request, it may result in the lkb
-		   being freed */
+		r = lkb->lkb_resource;
+		hold_rsb(r);
+		lock_rsb(r);
 
-		dlm_put_lkb(lkb);
-	}
+		/*
+		 * If the lkb has been flagged for a force unlock or cancel,
+		 * then the reprocessing below will be replaced by just doing
+		 * the unlock/cancel directly.
+		 */
+		mstype = lkb->lkb_wait_type;
+		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
+					&lkb->lkb_iflags);
+		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
+					&lkb->lkb_iflags);
+		err = 0;
+
+		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+			  dlm_dir_nodeid(r), oc, ou);
 
-	spin_lock_bh(&ls->ls_clear_proc_locks);
+		/*
+		 * No reply to the pre-recovery operation will now be received,
+		 * so a forced equivalent of remove_from_waiters() is needed to
+		 * reset the waiters state that was in place before recovery.
+		 */
 
-	/* in-progress unlocks */
-	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
-		list_del_init(&lkb->lkb_ownqueue);
-		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
-		dlm_put_lkb(lkb);
-	}
+		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
 
-	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
-		list_del(&cb->list);
-		dlm_free_cb(cb);
+		/* Forcibly clear wait_type */
+		lkb->lkb_wait_type = 0;
+
+		/*
+		 * Forcibly reset wait_count and associated refcount.  The
+		 * wait_count will almost always be 1, but in case of an
+		 * overlapping unlock/cancel it could be 2: see where
+		 * add_to_waiters() finds the lkb is already on the waiters
+		 * list and does lkb_wait_count++; hold_lkb().
+		 */
+		while (lkb->lkb_wait_count) {
+			lkb->lkb_wait_count--;
+			unhold_lkb(lkb);
+		}
+
+		/* Forcibly remove from waiters list */
+		spin_lock_bh(&ls->ls_waiters_lock);
+		list_del_init(&lkb->lkb_wait_reply);
+		spin_unlock_bh(&ls->ls_waiters_lock);
+
+		/*
+		 * The lkb is now clear of all prior waiters state and can be
+		 * processed locally, or sent to remote node again, or directly
+		 * cancelled/unlocked.
+		 */
+
+		if (oc || ou) {
+			/* do an unlock or cancel instead of resending */
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
+							-DLM_ECANCEL);
+				unhold_lkb(lkb); /* undoes create_lkb() */
+				break;
+			case DLM_MSG_CONVERT:
+				if (oc) {
+					queue_cast(r, lkb, -DLM_ECANCEL);
+				} else {
+					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
+					_unlock_lock(r, lkb);
+				}
+				break;
+			default:
+				err = 1;
+			}
+		} else {
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				_request_lock(r, lkb);
+				if (r->res_nodeid != -1 && is_master(r))
+					confirm_master(r, 0);
+				break;
+			case DLM_MSG_CONVERT:
+				_convert_lock(r, lkb);
+				break;
+			default:
+				err = 1;
+			}
+		}
+
+		if (err) {
+			log_error(ls, "waiter %x msg %d r_nodeid %d "
+				  "dir_nodeid %d overlap %d %d",
+				  lkb->lkb_id, mstype, r->res_nodeid,
+				  dlm_dir_nodeid(r), oc, ou);
+		}
+		unlock_rsb(r);
+		put_rsb(r);
+		dlm_put_lkb(lkb);
 	}
 
-	spin_unlock_bh(&ls->ls_clear_proc_locks);
-	dlm_unlock_recovery(ls);
+	return error;
 }
 
 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
-- 
2.48.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ