lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-8-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:09 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
	aahringo@...hat.com,
	teigland@...hat.com,
	Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 08/12] fix: improve lvb struct

---
 fs/dlm/lock.c | 1096 +++++++++++++++++++++++++++++--------------------
 1 file changed, 641 insertions(+), 455 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 1eae9cdb4fcd..f7d3d154e2a9 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3102,600 +3102,786 @@ static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
 	spin_lock_bh(&ls->ls_scan_lock);
 	r->res_toss_time = 0;
 
-	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
-	if (error)
+	/* if the rsb is not queued do nothing */
+	if (list_empty(&r->res_scan_list))
 		goto out;
 
-	send_args(r, lkb, ms);
-
-	ms->m_result = 0;
+	/* get the first element before delete */
+	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
+				 res_scan_list);
+	list_del_init(&r->res_scan_list);
+	/* check if the first element was the rsb we deleted */
+	if (first == r) {
+		/* try to get the new first element, if the list
+		 * is empty now try to delete the timer, if we are
+		 * too late we don't care.
+		 *
+		 * if the list isn't empty and a new first element got
+		 * in place, set the new timer expire time.
+		 */
+		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+						 res_scan_list);
+		if (!first)
+			timer_delete(&ls->ls_scan_timer);
+		else
+			enable_scan_timer(ls, first->res_toss_time);
+	}
 
-	error = send_message(mh, ms, r->res_name, r->res_length);
- out:
-	return error;
+out:
+	spin_unlock_bh(&ls->ls_scan_lock);
 }
 
-static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
+static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
 {
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	int to_nodeid, error;
-
-	to_nodeid = lkb->lkb_nodeid;
+	int our_nodeid = dlm_our_nodeid();
+	struct dlm_rsb *first;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
-	if (error)
-		goto out;
+	/* A dir record for a remote master rsb should never be on the scan list. */
+	WARN_ON(!dlm_no_directory(ls) &&
+		(r->res_master_nodeid != our_nodeid) &&
+		(dlm_dir_nodeid(r) == our_nodeid));
 
-	send_args(r, lkb, ms);
+	/* An active rsb should never be on the scan list. */
+	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
 
-	ms->m_bastmode = cpu_to_le32(mode);
+	/* An rsb should not already be on the scan list. */
+	WARN_ON(!list_empty(&r->res_scan_list));
 
-	error = send_message(mh, ms, r->res_name, r->res_length);
- out:
-	return error;
+	spin_lock_bh(&ls->ls_scan_lock);
+	/* set the new rsb absolute expire time in the rsb */
+	r->res_toss_time = rsb_toss_jiffies();
+	if (list_empty(&ls->ls_scan_list)) {
+		/* if the queue is empty add the element and it's
+		 * our new expire time
+		 */
+		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
+		enable_scan_timer(ls, r->res_toss_time);
+	} else {
+		/* try to get the maybe new first element and then add
+		 * to this rsb with the oldest expire time to the end
+		 * of the queue. If the list was empty before this
+		 * rsb expire time is our next expiration if it wasn't
+		 * the now new first elemet is our new expiration time
+		 */
+		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+						 res_scan_list);
+		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
+		if (!first)
+			enable_scan_timer(ls, r->res_toss_time);
+		else
+			enable_scan_timer(ls, first->res_toss_time);
+	}
+	spin_unlock_bh(&ls->ls_scan_lock);
 }
 
-static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
-{
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	int to_nodeid, error;
-
-	to_nodeid = dlm_dir_nodeid(r);
-
-	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
-	if (error)
-		goto fail;
-
-	send_args(r, lkb, ms);
-
-	error = send_message(mh, ms, r->res_name, r->res_length);
-	if (error)
-		goto fail;
-	return 0;
+/* if we hit contention we do in 250 ms a retry to trylock.
+ * if there is any other mod_timer in between we don't care
+ * about that it expires earlier again this is only for the
+ * unlikely case nothing happened in this time.
+ */
+#define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
 
- fail:
-	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
-	return error;
-}
+/* Called by lockspace scan_timer to free unused rsb's. */
 
-static int send_remove(struct dlm_rsb *r)
+void dlm_rsb_scan(struct timer_list *timer)
 {
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	int to_nodeid, error;
-
-	to_nodeid = dlm_dir_nodeid(r);
+	struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer);
+	int our_nodeid = dlm_our_nodeid();
+	struct dlm_rsb *r;
+	int rv;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
-	if (error)
-		goto out;
+	while (1) {
+		/* interrupting point to leave iteration when
+		 * recovery waits for timer_delete_sync(), recovery
+		 * will take care to delete everything in scan list.
+		 */
+		if (dlm_locking_stopped(ls))
+			break;
 
-	memcpy(ms->m_extra, r->res_name, r->res_length);
-	ms->m_hash = cpu_to_le32(r->res_hash);
+		rv = spin_trylock(&ls->ls_scan_lock);
+		if (!rv) {
+			/* rearm again try timer */
+			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
+			break;
+		}
 
-	error = send_message(mh, ms, r->res_name, r->res_length);
- out:
-	return error;
-}
+		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+					     res_scan_list);
+		if (!r) {
+			/* the next add_scan will enable the timer again */
+			spin_unlock(&ls->ls_scan_lock);
+			break;
+		}
 
-static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
-			     int mstype, int rv)
-{
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	int to_nodeid, error;
+		/*
+		 * If the first rsb is not yet expired, then stop because the
+		 * list is sorted with nearest expiration first.
+		 */
+		if (time_before(jiffies, r->res_toss_time)) {
+			/* rearm with the next rsb to expire in the future */
+			enable_scan_timer(ls, r->res_toss_time);
+			spin_unlock(&ls->ls_scan_lock);
+			break;
+		}
 
-	to_nodeid = lkb->lkb_nodeid;
+		/* in find_rsb_dir/nodir there is a reverse order of this
+		 * lock, however this is only a trylock if we hit some
+		 * possible contention we try it again.
+		 */
+		rv = write_trylock(&ls->ls_rsbtbl_lock);
+		if (!rv) {
+			spin_unlock(&ls->ls_scan_lock);
+			/* rearm again try timer */
+			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
+			break;
+		}
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
-	if (error)
-		goto out;
+		list_del(&r->res_slow_list);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
+		rsb_clear_flag(r, RSB_HASHED);
 
-	send_args(r, lkb, ms);
+		/* ls_rsbtbl_lock is not needed when calling send_remove() */
+		write_unlock(&ls->ls_rsbtbl_lock);
 
-	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
+		list_del_init(&r->res_scan_list);
+		spin_unlock(&ls->ls_scan_lock);
 
-	error = send_message(mh, ms, r->res_name, r->res_length);
- out:
-	return error;
-}
+		/* An rsb that is a dir record for a remote master rsb
+		 * cannot be removed, and should not have a timer enabled.
+		 */
+		WARN_ON(!dlm_no_directory(ls) &&
+			(r->res_master_nodeid != our_nodeid) &&
+			(dlm_dir_nodeid(r) == our_nodeid));
 
-static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
-{
-	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
-}
+		/* We're the master of this rsb but we're not
+		 * the directory record, so we need to tell the
+		 * dir node to remove the dir record
+		 */
+		if (!dlm_no_directory(ls) &&
+		    (r->res_master_nodeid == our_nodeid) &&
+		    (dlm_dir_nodeid(r) != our_nodeid))
+			send_remove(r);
 
-static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
-{
-	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
+		free_inactive_rsb(r);
+	}
 }
 
-static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
-{
-	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
-}
+/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
+   unlock any spinlocks, go back and call pre_rsb_struct again.
+   Otherwise, take an rsb off the list and return it. */
 
-static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
+			  struct dlm_rsb **r_ret)
 {
-	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
-}
+	struct dlm_rsb *r;
 
-static int send_lookup_reply(struct dlm_ls *ls,
-			     const struct dlm_message *ms_in, int ret_nodeid,
-			     int rv)
-{
-	struct dlm_rsb *r = &ls->ls_local_rsb;
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
+	r = dlm_allocate_rsb();
+	if (!r)
+		return -ENOMEM;
 
-	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
-	if (error)
-		goto out;
+	r->res_ls = ls;
+	r->res_length = len;
+	memcpy(r->res_name, name, len);
+	spin_lock_init(&r->res_lock);
 
-	ms->m_lkid = ms_in->m_lkid;
-	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
-	ms->m_nodeid = cpu_to_le32(ret_nodeid);
+	INIT_LIST_HEAD(&r->res_lookup);
+	INIT_LIST_HEAD(&r->res_grantqueue);
+	INIT_LIST_HEAD(&r->res_convertqueue);
+	INIT_LIST_HEAD(&r->res_waitqueue);
+	INIT_LIST_HEAD(&r->res_root_list);
+	INIT_LIST_HEAD(&r->res_scan_list);
+	INIT_LIST_HEAD(&r->res_recover_list);
+	INIT_LIST_HEAD(&r->res_masters_list);
 
-	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
- out:
-	return error;
+	*r_ret = r;
+	return 0;
 }
 
-/* which args we save from a received message depends heavily on the type
-   of message, unlike the send side where we can safely send everything about
-   the lkb for any type of message */
-
-static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
+			struct dlm_rsb **r_ret)
 {
-	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
-	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
-	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
-}
+	char key[DLM_RESNAME_MAXLEN] = {};
 
-static void receive_flags_reply(struct dlm_lkb *lkb,
-				const struct dlm_message *ms,
-				bool local)
-{
-	if (local)
-		return;
+	memcpy(key, name, len);
+	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
+	if (*r_ret)
+		return 0;
 
-	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
-	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
+	return -EBADR;
 }
 
-static int receive_extralen(const struct dlm_message *ms)
+static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	return (le16_to_cpu(ms->m_header.h_length) -
-		sizeof(struct dlm_message));
-}
+	int rv;
 
-static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
-		       const struct dlm_message *ms)
-{
-	int len;
+	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+				    dlm_rhash_rsb_params);
+	if (!rv)
+		rsb_set_flag(rsb, RSB_HASHED);
 
-	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
-		if (!lkb->lkb_lvbptr)
-			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
-		if (!lkb->lkb_lvbptr)
-			return -ENOMEM;
-		len = receive_extralen(ms);
-		if (len > ls->ls_lvblen)
-			len = ls->ls_lvblen;
-		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
-	}
-	return 0;
+	return rv;
 }
 
-static void fake_bastfn(void *astparam, int mode)
-{
-	log_print("fake_bastfn should not be called");
-}
+/*
+ * Find rsb in rsbtbl and potentially create/add one
+ *
+ * Delaying the release of rsb's has a similar benefit to applications keeping
+ * NL locks on an rsb, but without the guarantee that the cached master value
+ * will still be valid when the rsb is reused.  Apps aren't always smart enough
+ * to keep NL locks on an rsb that they may lock again shortly; this can lead
+ * to excessive master lookups and removals if we don't delay the release.
+ *
+ * Searching for an rsb means looking through both the normal list and toss
+ * list.  When found on the toss list the rsb is moved to the normal list with
+ * ref count of 1; when found on normal list the ref count is incremented.
+ *
+ * rsb's on the keep list are being used locally and refcounted.
+ * rsb's on the toss list are not being used locally, and are not refcounted.
+ *
+ * The toss list rsb's were either
+ * - previously used locally but not any more (were on keep list, then
+ *   moved to toss list when last refcount dropped)
+ * - created and put on toss list as a directory record for a lookup
+ *   (we are the dir node for the res, but are not using the res right now,
+ *   but some other node is)
+ *
+ * The purpose of find_rsb() is to return a refcounted rsb for local use.
+ * So, if the given rsb is on the toss list, it is moved to the keep list
+ * before being returned.
+ *
+ * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
+ * more refcounts exist, so the rsb is moved from the keep list to the
+ * toss list.
+ *
+ * rsb's on both keep and toss lists are used for doing a name to master
+ * lookups.  rsb's that are in use locally (and being refcounted) are on
+ * the keep list, rsb's that are not in use locally (not refcounted) and
+ * only exist for name/master lookups are on the toss list.
+ *
+ * rsb's on the toss list who's dir_nodeid is not local can have stale
+ * name/master mappings.  So, remote requests on such rsb's can potentially
+ * return with an error, which means the mapping is stale and needs to
+ * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
+ * first_lkid is to keep only a single outstanding request on an rsb
+ * while that rsb has a potentially stale master.)
+ */
 
-static void fake_astfn(void *astparam)
+static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
+			uint32_t hash, int dir_nodeid, int from_nodeid,
+			unsigned int flags, struct dlm_rsb **r_ret)
 {
-	log_print("fake_astfn should not be called");
-}
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int from_local = 0;
+	int from_other = 0;
+	int from_dir = 0;
+	int create = 0;
+	int error;
 
-static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
-				const struct dlm_message *ms)
-{
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
-	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
-	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
-	lkb->lkb_grmode = DLM_LOCK_IV;
-	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
+	if (flags & R_RECEIVE_REQUEST) {
+		if (from_nodeid == dir_nodeid)
+			from_dir = 1;
+		else
+			from_other = 1;
+	} else if (flags & R_REQUEST) {
+		from_local = 1;
+	}
 
-	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
-	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
+	/*
+	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
+	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
+	 * we're the new master.  Our local recovery may not have set
+	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
+	 * create the rsb; dlm_recover_process_copy() will handle EBADR
+	 * by resending.
+	 *
+	 * If someone sends us a request, we are the dir node, and we do
+	 * not find the rsb anywhere, then recreate it.  This happens if
+	 * someone sends us a request after we have removed/freed an rsb.
+	 * (They sent a request instead of lookup because they are using
+	 * an rsb taken from their scan list.)
+	 */
 
-	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
-		/* lkb was just created so there won't be an lvb yet */
-		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
-		if (!lkb->lkb_lvbptr)
-			return -ENOMEM;
+	if (from_local || from_dir ||
+	    (from_other && (dir_nodeid == our_nodeid))) {
+		create = 1;
 	}
 
-	return 0;
-}
-
-static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
-				const struct dlm_message *ms)
-{
-	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
-		return -EBUSY;
+ retry:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (error)
+		goto do_new;
 
-	if (receive_lvb(ls, lkb, ms))
-		return -ENOMEM;
+	/* check if the rsb is active under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -EBADR;
+		goto do_new;
+	}
 
-	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
-	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
-	return 0;
-}
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
+	}
 
-static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
-			       const struct dlm_message *ms)
-{
-	if (receive_lvb(ls, lkb, ms))
-		return -ENOMEM;
-	return 0;
-}
+	kref_get(&r->res_ref);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	goto out;
 
-/* We fill in the local-lkb fields with the info that send_xxxx_reply()
-   uses to send a reply and that the remote end uses to process the reply. */
 
-static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
-{
-	struct dlm_lkb *lkb = &ls->ls_local_lkb;
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
-	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
-}
+ do_inactive:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-/* This is called after the rsb is locked so that we can safely inspect
-   fields in the lkb. */
+	/*
+	 * The expectation here is that the rsb will have HASHED and
+	 * INACTIVE flags set, and that the rsb can be moved from
+	 * inactive back to active again.  However, between releasing
+	 * the read lock and acquiring the write lock, this rsb could
+	 * have been removed from rsbtbl, and had HASHED cleared, to
+	 * be freed.  To deal with this case, we would normally need
+	 * to repeat dlm_search_rsb_tree while holding the write lock,
+	 * but rcu allows us to simply check the HASHED flag, because
+	 * the rcu read lock means the rsb will not be freed yet.
+	 * If the HASHED flag is not set, then the rsb is being freed,
+	 * so we add a new rsb struct.  If the HASHED flag is set,
+	 * and INACTIVE is not set, it means another thread has
+	 * made the rsb active, as we're expecting to do here, and
+	 * we just repeat the lookup (this will be very unlikely.)
+	 */
+	if (rsb_flag(r, RSB_HASHED)) {
+		if (!rsb_flag(r, RSB_INACTIVE)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -EBADR;
+		goto do_new;
+	}
 
-static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
-{
-	int from = le32_to_cpu(ms->m_header.h_nodeid);
-	int error = 0;
+	/*
+	 * rsb found inactive (master_nodeid may be out of date unless
+	 * we are the dir_nodeid or were the master)  No other thread
+	 * is using this rsb because it's inactive, so we can
+	 * look at or update res_master_nodeid without lock_rsb.
+	 */
 
-	/* currently mixing of user/kernel locks are not supported */
-	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
-	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
-		log_error(lkb->lkb_resource->res_ls,
-			  "got user dlm message for a kernel lock");
-		error = -EINVAL;
+	if ((r->res_master_nodeid != our_nodeid) && from_other) {
+		/* our rsb was not master, and another node (not the dir node)
+		   has sent us a request */
+		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid,
+			  r->res_name);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -ENOTBLK;
 		goto out;
 	}
 
-	switch (ms->m_type) {
-	case cpu_to_le32(DLM_MSG_CONVERT):
-	case cpu_to_le32(DLM_MSG_UNLOCK):
-	case cpu_to_le32(DLM_MSG_CANCEL):
-		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
-			error = -EINVAL;
-		break;
+	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
+		/* don't think this should ever happen */
+		log_error(ls, "find_rsb inactive from_dir %d master %d",
+			  from_nodeid, r->res_master_nodeid);
+		dlm_print_rsb(r);
+		/* fix it and go on */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
+		r->res_first_lkid = 0;
+	}
 
-	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
-	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
-	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
-	case cpu_to_le32(DLM_MSG_GRANT):
-	case cpu_to_le32(DLM_MSG_BAST):
-		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
-			error = -EINVAL;
-		break;
+	if (from_local && (r->res_master_nodeid != our_nodeid)) {
+		/* Because we have held no locks on this rsb,
+		   res_master_nodeid could have become stale. */
+		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
+		r->res_first_lkid = 0;
+	}
+
+	/* we always deactivate scan timer for the rsb, when
+	 * we move it out of the inactive state as rsb state
+	 * can be changed and scan timers are only for inactive
+	 * rsbs.
+	 */
+	del_scan(ls, r);
+	list_move(&r->res_slow_list, &ls->ls_slow_active);
+	rsb_clear_flag(r, RSB_INACTIVE);
+	kref_init(&r->res_ref); /* ref is now used in active state */
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
-		if (!is_process_copy(lkb))
-			error = -EINVAL;
-		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
-			error = -EINVAL;
-		break;
+	goto out;
 
-	default:
-		error = -EINVAL;
-	}
 
-out:
-	if (error)
-		log_error(lkb->lkb_resource->res_ls,
-			  "ignore invalid message %d from %d %x %x %x %d",
-			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
-			  lkb->lkb_remid, dlm_iflags_val(lkb),
-			  lkb->lkb_nodeid);
-	return error;
-}
+ do_new:
+	/*
+	 * rsb not found
+	 */
 
-static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
-{
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int from_nodeid;
-	int error, namelen = 0;
+	if (error == -EBADR && !create)
+		goto out;
 
-	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	error = get_rsb_struct(ls, name, len, &r);
+	if (WARN_ON_ONCE(error))
+		goto out;
 
-	error = create_lkb(ls, &lkb);
-	if (error)
-		goto fail;
+	r->res_hash = hash;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
 
-	receive_flags(lkb, ms);
-	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
-	error = receive_request_args(ls, lkb, ms);
-	if (error) {
-		__put_lkb(ls, lkb);
-		goto fail;
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
 	}
 
-	/* The dir node is the authority on whether we are the master
-	   for this rsb or not, so if the master sends us a request, we should
-	   recreate the rsb if we've destroyed it.   This race happens when we
-	   send a remove message to the dir node at the same time that the dir
-	   node sends us a request for the rsb. */
-
-	namelen = receive_extralen(ms);
-
-	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
-			 R_RECEIVE_REQUEST, &r);
-	if (error) {
-		__put_lkb(ls, lkb);
-		goto fail;
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		r = NULL;
+		error = -ENOTBLK;
+		goto out;
 	}
 
-	lock_rsb(r);
-
-	if (r->res_master_nodeid != dlm_our_nodeid()) {
-		error = validate_master_nodeid(ls, r, from_nodeid);
-		if (error) {
-			unlock_rsb(r);
-			put_rsb(r);
-			__put_lkb(ls, lkb);
-			goto fail;
-		}
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
 	}
 
-	attach_lkb(r, lkb);
-	error = do_request(r, lkb);
-	send_request_reply(r, lkb, error);
-	do_request_effects(r, lkb, error);
-
-	unlock_rsb(r);
-	put_rsb(r);
-
-	if (error == -EINPROGRESS)
-		error = 0;
-	if (error)
-		dlm_put_lkb(lkb);
-	return 0;
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	} else {
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
+	}
 
- fail:
-	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
-	   and do this receive_request again from process_lookup_list once
-	   we get the lookup reply.  This would avoid a many repeated
-	   ENOTBLK request failures when the lookup reply designating us
-	   as master is delayed. */
+ out_add:
 
-	if (error != -ENOTBLK) {
-		log_limit(ls, "receive_request %x from %d %d",
-			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
-
-	setup_local_lkb(ls, ms);
-	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+	*r_ret = r;
 	return error;
 }
 
-static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
+/* FIXME: if this lkb is the only lock we hold on the rsb, then set
+   MASTER_UNCERTAIN to force the next request on the rsb to confirm
+   that the master is still correct. */
+
+static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error, reply = 1;
+	int rv;
+	bool only = true;
+	struct dlm_lkb *iter;
+
+	/* Caller should hold lock_rsb(r) when invoking send_unlock(); it's
+	 * therefore safe to inspect the rsb queues without additional locking.
+	 * If this lkb is the only locally-held lock on the rsb, mark the
+	 * master as uncertain so a subsequent request will reconfirm the
+	 * master via lookup.
+	 */
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		goto fail;
+	/* Check grant queue for any other lkbs */
+	list_for_each_entry(iter, &r->res_grantqueue, lkb_statequeue) {
+		if (iter != lkb) {
+			only = false;
+			break;
+		}
+	}
 
-	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
-		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
-			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
-			  (unsigned long long)lkb->lkb_recover_seq,
-			  le32_to_cpu(ms->m_header.h_nodeid),
-			  le32_to_cpu(ms->m_lkid));
-		error = -ENOENT;
-		dlm_put_lkb(lkb);
-		goto fail;
+	/* Check convert queue if still only */
+	if (only) {
+		list_for_each_entry(iter, &r->res_convertqueue, lkb_statequeue) {
+			if (iter != lkb) {
+				only = false;
+				break;
+			}
+		}
 	}
 
-	r = lkb->lkb_resource;
+	/* Check wait queue if still only */
+	if (only) {
+		list_for_each_entry(iter, &r->res_waitqueue, lkb_statequeue) {
+			if (iter != lkb) {
+				only = false;
+				break;
+			}
+		}
+	}
 
-	hold_rsb(r);
-	lock_rsb(r);
+	if (only)
+		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+	rv = send_common(r, lkb, DLM_MSG_UNLOCK);
+	return rv;
+}
 
-	receive_flags(lkb, ms);
+/* add/remove lkb to rsb's grant/convert/wait queue */
 
-	error = receive_convert_args(ls, lkb, ms);
-	if (error) {
-		send_convert_reply(r, lkb, error);
-		goto out;
-	}
+static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
+{
+	kref_get(&lkb->lkb_ref);
 
-	reply = !down_conversion(lkb);
+	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 
-	error = do_convert(r, lkb);
-	if (reply)
-		send_convert_reply(r, lkb, error);
-	do_convert_effects(r, lkb, error);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-	return 0;
+	lkb->lkb_timestamp = ktime_get();
 
- fail:
-	setup_local_lkb(ls, ms);
-	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
-	return error;
+	lkb->lkb_status = status;
+
+	switch (status) {
+	case DLM_LKSTS_WAITING:
+		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
+			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
+		else
+			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
+		break;
+	case DLM_LKSTS_GRANTED:
+		/* convention says granted locks kept in order of grmode */
+		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
+				lkb->lkb_grmode);
+		break;
+	case DLM_LKSTS_CONVERT:
+		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
+			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
+		else
+			list_add_tail(&lkb->lkb_statequeue,
+				      &r->res_convertqueue);
+		break;
+	default:
+		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
+	}
 }
 
-static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
+static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error;
+	lkb->lkb_status = 0;
+	list_del(&lkb->lkb_statequeue);
+	unhold_lkb(lkb);
+}
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		goto fail;
+static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
+{
+	del_lkb(r, lkb);
+	add_lkb(r, lkb, sts);
+}
 
-	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
-		log_error(ls, "receive_unlock %x remid %x remote %d %x",
-			  lkb->lkb_id, lkb->lkb_remid,
-			  le32_to_cpu(ms->m_header.h_nodeid),
-			  le32_to_cpu(ms->m_lkid));
-		error = -ENOENT;
-		dlm_put_lkb(lkb);
-		goto fail;
+static int msg_reply_type(int mstype)
+{
+	switch (mstype) {
+	case DLM_MSG_REQUEST:
+		return DLM_MSG_REQUEST_REPLY;
+	case DLM_MSG_CONVERT:
+		return DLM_MSG_CONVERT_REPLY;
+	case DLM_MSG_UNLOCK:
+		return DLM_MSG_UNLOCK_REPLY;
+	case DLM_MSG_CANCEL:
+		return DLM_MSG_CANCEL_REPLY;
+	case DLM_MSG_LOOKUP:
+		return DLM_MSG_LOOKUP_REPLY;
 	}
+	return -1;
+}
 
-	r = lkb->lkb_resource;
-
-	hold_rsb(r);
-	lock_rsb(r);
+/* add/remove lkb from global waiters list of lkb's waiting for
+   a reply from a remote node */
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 
-	receive_flags(lkb, ms);
+	spin_lock_bh(&ls->ls_waiters_lock);
+	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
+		switch (mstype) {
+		case DLM_MSG_UNLOCK:
+			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
+			break;
+		case DLM_MSG_CANCEL:
+			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
+			break;
+		default:
+			/* should never happen as validate_lock_args() checks
+			 * on lkb_wait_type and validate_unlock_args() only
+			 * creates UNLOCK or CANCEL messages.
+			 */
+			WARN_ON_ONCE(1);
+			goto out;
+		}
+		lkb->lkb_wait_count++;
+		hold_lkb(lkb);
 
-	error = receive_unlock_args(ls, lkb, ms);
-	if (error) {
-		send_unlock_reply(r, lkb, error);
+		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
+			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
+			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
 		goto out;
 	}
 
-	error = do_unlock(r, lkb);
-	send_unlock_reply(r, lkb, error);
-	do_unlock_effects(r, lkb, error);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-	return 0;
+	DLM_ASSERT(!lkb->lkb_wait_count,
+		   dlm_print_lkb(lkb);
+		   printk("wait_count %d\n", lkb->lkb_wait_count););
 
- fail:
-	setup_local_lkb(ls, ms);
-	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
-	return error;
+	lkb->lkb_wait_count++;
+	lkb->lkb_wait_type = mstype;
+	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
+	hold_lkb(lkb);
+	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ out:
+	spin_unlock_bh(&ls->ls_waiters_lock);
 }
 
-static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
+/* We clear the RESEND flag because we might be taking an lkb off the waiters
+   list as part of process_requestqueue (e.g. a lookup that has an optimized
+   request reply on the requestqueue) between dlm_recover_waiters_pre() which
+   set RESEND and dlm_recover_waiters_post() */
+
+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
+				const struct dlm_message *ms)
 {
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error;
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int overlap_done = 0;
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		goto fail;
+	if (mstype == DLM_MSG_UNLOCK_REPLY &&
+	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
+		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
+		overlap_done = 1;
+		goto out_del;
+	}
 
-	receive_flags(lkb, ms);
+	if (mstype == DLM_MSG_CANCEL_REPLY &&
+	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
+		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
+		overlap_done = 1;
+		goto out_del;
+	}
 
-	r = lkb->lkb_resource;
+	/* Cancel state was preemptively cleared by a successful convert,
+	   see next comment, nothing to do. */
 
-	hold_rsb(r);
-	lock_rsb(r);
+	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
+	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
+		log_debug(ls, "remwait %x cancel_reply wait_type %d",
+			  lkb->lkb_id, lkb->lkb_wait_type);
+		return -1;
+	}
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+	/* Remove for the convert reply, and premptively remove for the
+	   cancel reply.  A convert has been granted while there's still
+	   an outstanding cancel on it (the cancel is moot and the result
+	   in the cancel reply should be 0).  We preempt the cancel reply
+	   because the app gets the convert result and then can follow up
+	   with another op, like convert.  This subsequent op would see the
+	   lingering state of the cancel and fail with -EBUSY. */
 
-	error = do_cancel(r, lkb);
-	send_cancel_reply(r, lkb, error);
-	do_cancel_effects(r, lkb, error);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-	return 0;
+	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
+	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
+	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
+		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
+			  lkb->lkb_id);
+		lkb->lkb_wait_type = 0;
+		lkb->lkb_wait_count--;
+		unhold_lkb(lkb);
+		goto out_del;
+	}
 
- fail:
-	setup_local_lkb(ls, ms);
-	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
-	return error;
-}
+	/* N.B. type of reply may not always correspond to type of original
+	   msg due to lookup->request optimization, verify others? */
 
-static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
-{
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
-	int error;
+	if (lkb->lkb_wait_type) {
+		lkb->lkb_wait_type = 0;
+		goto out_del;
+	}
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
+	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
+		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
+		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
+	return -1;
 
-	r = lkb->lkb_resource;
+ out_del:
+	/* the force-unlock/cancel has completed and we haven't received a reply
+	   to the op that was in progress prior to the unlock/cancel; we
+	   give up on any reply to the earlier op.
+	   NOTE: not sure when/how this would happen */
 
-	hold_rsb(r);
-	lock_rsb(r);
+	if (overlap_done && lkb->lkb_wait_type) {
+		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
+			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
+		lkb->lkb_wait_count--;
+		unhold_lkb(lkb);
+		lkb->lkb_wait_type = 0;
+	}
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 
-	receive_flags_reply(lkb, ms, false);
-	if (is_altmode(lkb))
-		munge_altmode(lkb, ms);
-	grant_lock_pc(r, lkb, ms);
-	queue_cast(r, lkb, 0);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
+	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+	lkb->lkb_wait_count--;
+	if (!lkb->lkb_wait_count)
+		list_del_init(&lkb->lkb_wait_reply);
+	unhold_lkb(lkb);
 	return 0;
 }
 
-static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
+static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
-	struct dlm_lkb *lkb;
-	struct dlm_rsb *r;
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
-	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
-	if (error)
-		return error;
-
-	r = lkb->lkb_resource;
+	spin_lock_bh(&ls->ls_waiters_lock);
+	error = _remove_from_waiters(lkb, mstype, NULL);
+	spin_unlock_bh(&ls->ls_waiters_lock);
+	return error;
+}
 
-	hold_rsb(r);
-	lock_rsb(r);
+/* Handles situations where we might be processing a "fake" or "local" reply in
+ * the recovery context which stops any locking activity. Only debugfs might
+ * change the lockspace waiters but they will held the recovery lock to ensure
+ * remove_from_waiters_ms() in local case will be the only user manipulating the
+ * lockspace waiters in recovery context.
+ */
 
-	error = validate_message(lkb, ms);
-	if (error)
-		goto out;
+static int remove_from_waiters_ms(struct dlm_lkb *lkb,
+				  const struct dlm_message *ms, bool local)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int error;
 
-	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
-	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
- out:
-	unlock_rsb(r);
-	put_rsb(r);
-	dlm_put_lkb(lkb);
-	return 0;
+	if (!local)
+		spin_lock_bh(&ls->ls_waiters_lock);
+	else
+		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
+			     !dlm_locking_stopped(ls));
+	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
+	if (!local)
+		spin_unlock_bh(&ls->ls_waiters_lock);
+	return error;
 }
 
-static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
+/* lkb is master or local copy */
+
+static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	int len, error, ret_nodeid, from_nodeid, our_nodeid;
+	int b, len = r->res_ls->ls_lvblen;
 
 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
 	our_nodeid = dlm_our_nodeid();
-- 
2.48.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ