lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-5-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:06 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
	aahringo@...hat.com,
	teigland@...hat.com,
	Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 05/12] fix: improve rsb struct

---
 fs/dlm/lock.c | 885 +++++++++++++++++++++++---------------------------
 1 file changed, 414 insertions(+), 471 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index de77a0e9e2db..320a652e1691 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1388,591 +1388,534 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 		goto retry;
-	} else if (error) {
-		write_unlock_bh(&ls->ls_rsbtbl_lock);
-		/* should never happen */
-		dlm_free_rsb(r);
-		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
-
-	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
 	write_unlock_bh(&ls->ls_rsbtbl_lock);
-
-	if (result)
-		*result = DLM_LU_ADD;
-	*r_nodeid = from_nodeid;
  out:
+	*r_ret = r;
 	return error;
 }
 
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-		      int len, unsigned int flags, int *r_nodeid, int *result)
-{
-	int rv;
-	rcu_read_lock();
-	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
-	rcu_read_unlock();
-	return rv;
-}
-
-static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
-{
-	struct dlm_rsb *r;
-
-	read_lock_bh(&ls->ls_rsbtbl_lock);
-	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
-		if (r->res_hash == hash)
-			dlm_dump_rsb(r);
-	}
-	read_unlock_bh(&ls->ls_rsbtbl_lock);
-}
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
 
-void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
 	int error;
 
-	rcu_read_lock();
+ retry:
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error)
-		goto out;
-
-	dlm_dump_rsb(r);
- out:
-	rcu_read_unlock();
-}
+	if (error)
+		goto do_new;
 
-static void deactivate_rsb(struct kref *kref)
-{
-	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
-	struct dlm_ls *ls = r->res_ls;
-	int our_nodeid = dlm_our_nodeid();
+	/* check if the rsb is in active state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
-	rsb_set_flag(r, RSB_INACTIVE);
-	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
+	}
 
 	/*
-	 * When the rsb becomes unused, there are two possibilities:
-	 * 1. Leave the inactive rsb in place (don't remove it).
-	 * 2. Add it to the scan list to be removed.
-	 *
-	 * 1 is done when the rsb is acting as the dir record
-	 * for a remotely mastered rsb.  The rsb must be left
-	 * in place as an inactive rsb to act as the dir record.
-	 *
-	 * 2 is done when a) the rsb is not the master and not the
-	 * dir record, b) when the rsb is both the master and the
-	 * dir record, c) when the rsb is master but not dir record.
-	 *
-	 * (If no directory is used, the rsb can always be removed.)
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
 	 */
-	if (dlm_no_directory(ls) ||
-	    (r->res_master_nodeid == our_nodeid ||
-	     dlm_dir_nodeid(r) != our_nodeid))
-		add_scan(ls, r);
-
-	if (r->res_lvbptr) {
-		dlm_free_lvb(r->res_lvbptr);
-		r->res_lvbptr = NULL;
-	}
-}
-
-void free_inactive_rsb(struct dlm_rsb *r)
-{
-	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
 
-	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
-	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
+	kref_get(&r->res_ref);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	dlm_free_rsb(r);
-}
+	goto out;
 
-/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
-   The rsb must exist as long as any lkb's for it do. */
 
-static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
-{
-	hold_rsb(r);
-	lkb->lkb_resource = r;
-}
+ do_inactive:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-static void detach_lkb(struct dlm_lkb *lkb)
-{
-	if (lkb->lkb_resource) {
-		put_rsb(lkb->lkb_resource);
-		lkb->lkb_resource = NULL;
+	/* See comment in find_rsb_dir. */
+	if (rsb_flag(r, RSB_HASHED)) {
+		if (!rsb_flag(r, RSB_INACTIVE)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
 	}
-}
-
-static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
-		       unsigned long start, unsigned long end)
-{
-	struct xa_limit limit;
-	struct dlm_lkb *lkb;
-	int rv;
 
-	limit.max = end;
-	limit.min = start;
 
-	lkb = dlm_allocate_lkb();
-	if (!lkb)
-		return -ENOMEM;
-
-	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
-	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
-	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
-	lkb->lkb_nodeid = -1;
-	lkb->lkb_grmode = DLM_LOCK_IV;
-	kref_init(&lkb->lkb_ref);
-	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
-	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
+	/*
+	 * rsb found inactive. No other thread is using this rsb because
+	 * it's inactive, so we can look at or update res_master_nodeid
+	 * without lock_rsb.
+	 */
 
-	write_lock_bh(&ls->ls_lkbxa_lock);
-	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
-	write_unlock_bh(&ls->ls_lkbxa_lock);
+	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+		/* our rsb is not master, and another node has sent us a
+		   request; this should never happen */
+		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -ENOTBLK;
+		goto out;
+	}
 
-	if (rv < 0) {
-		log_error(ls, "create_lkb xa error %d", rv);
-		dlm_free_lkb(lkb);
-		return rv;
+	if (!recover && (r->res_master_nodeid != our_nodeid) &&
+	    (dir_nodeid == our_nodeid)) {
+		/* our rsb is not master, and we are dir; may as well fix it;
+		   this should never happen */
+		log_error(ls, "find_rsb inactive our %d master %d dir %d",
+			  our_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
 	}
 
-	*lkb_ret = lkb;
-	return 0;
-}
+	del_scan(ls, r);
+	list_move(&r->res_slow_list, &ls->ls_slow_active);
+	rsb_clear_flag(r, RSB_INACTIVE);
+	kref_init(&r->res_ref); /* ref is now used in active state */
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
-{
-	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
-}
+	goto out;
 
-static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
-{
-	struct dlm_lkb *lkb;
 
-	rcu_read_lock();
-	lkb = xa_load(&ls->ls_lkbxa, lkid);
-	if (lkb) {
-		/* check if lkb is still part of lkbxa under lkbxa_lock as
-		 * the lkb_ref is tight to the lkbxa data structure, see
-		 * __put_lkb().
-		 */
-		read_lock_bh(&ls->ls_lkbxa_lock);
-		if (kref_read(&lkb->lkb_ref))
-			kref_get(&lkb->lkb_ref);
-		else
-			lkb = NULL;
-		read_unlock_bh(&ls->ls_lkbxa_lock);
-	}
-	rcu_read_unlock();
+ do_new:
+	/*
+	 * rsb not found
+	 */
 
-	*lkb_ret = lkb;
-	return lkb ? 0 : -ENOENT;
-}
+	if (error == -EBADR && !create)
+		goto out;
 
-static void kill_lkb(struct kref *kref)
-{
-	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
+	error = get_rsb_struct(ls, name, len, &r);
+	if (WARN_ON_ONCE(error))
+		goto out;
 
-	/* All work is done after the return from kref_put() so we
-	   can release the write_lock before the detach_lkb */
+	r->res_hash = hash;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
 
-	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
-}
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
+	}
 
-/* __put_lkb() is used when an lkb may not have an rsb attached to
-   it so we need to provide the lockspace explicitly */
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		r = NULL;
+		error = -ENOTBLK;
+		goto out;
+	}
 
-static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-	uint32_t lkid = lkb->lkb_id;
-	int rv;
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
+	}
 
-	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
-					&ls->ls_lkbxa_lock);
-	if (rv) {
-		xa_erase(&ls->ls_lkbxa, lkid);
-		write_unlock_bh(&ls->ls_lkbxa_lock);
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	} else {
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
+	}
 
-		detach_lkb(lkb);
+ out_add:
 
-		/* for local/process lkbs, lvbptr points to caller's lksb */
-		if (lkb->lkb_lvbptr && is_master_copy(lkb))
-			dlm_free_lvb(lkb->lkb_lvbptr);
-		dlm_free_lkb(lkb);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
-
-	return rv;
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+	*r_ret = r;
+	return error;
 }
 
-int dlm_put_lkb(struct dlm_lkb *lkb)
-{
-	struct dlm_ls *ls;
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
 
-	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
-	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
+{
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
 
-	ls = lkb->lkb_resource->res_ls;
-	return __put_lkb(ls, lkb);
-}
+ retry:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (error)
+		goto do_new;
 
-/* This is only called to add a reference when the code already holds
-   a valid reference to the lkb, so there's no need for locking. */
+	/* check if the rsb is in active state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-static inline void hold_lkb(struct dlm_lkb *lkb)
-{
-	kref_get(&lkb->lkb_ref);
-}
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
+	}
 
-static void unhold_lkb_assert(struct kref *kref)
-{
-	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
-	DLM_ASSERT(false, dlm_print_lkb(lkb););
-}
+	kref_get(&r->res_ref);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 
-/* This is called when we need to remove a reference and are certain
-   it's not the last ref.  e.g. del_lkb is always called between a
-   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
-   put_lkb would work fine, but would involve unnecessary locking */
+	goto out;
 
-static inline void unhold_lkb(struct dlm_lkb *lkb)
-{
-	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
-}
 
-static void lkb_add_ordered(struct list_head *new, struct list_head *head,
-			    int mode)
-{
-	struct dlm_lkb *lkb = NULL, *iter;
+ do_inactive:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-	list_for_each_entry(iter, head, lkb_statequeue)
-		if (iter->lkb_rqmode < mode) {
-			lkb = iter;
-			list_add_tail(new, &iter->lkb_statequeue);
-			break;
+	/* See comment in find_rsb_dir. */
+	if (rsb_flag(r, RSB_HASHED)) {
+		if (!rsb_flag(r, RSB_INACTIVE)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry;
 		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	if (!lkb)
-		list_add_tail(new, head);
-}
 
-/* add/remove lkb to rsb's grant/convert/wait queue */
-
-static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
-{
-	kref_get(&lkb->lkb_ref);
-
-	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
-
-	lkb->lkb_timestamp = ktime_get();
-
-	lkb->lkb_status = status;
+	/*
+	 * rsb found inactive. No other thread is using this rsb because
+	 * it's inactive, so we can look at or update res_master_nodeid
+	 * without lock_rsb.
+	 */
 
-	switch (status) {
-	case DLM_LKSTS_WAITING:
-		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
-			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
-		else
-			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
-		break;
-	case DLM_LKSTS_GRANTED:
-		/* convention says granted locks kept in order of grmode */
-		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
-				lkb->lkb_grmode);
-		break;
-	case DLM_LKSTS_CONVERT:
-		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
-			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
-		else
-			list_add_tail(&lkb->lkb_statequeue,
-				      &r->res_convertqueue);
-		break;
-	default:
-		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
+	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+		/* our rsb is not master, and another node has sent us a
+		   request; this should never happen */
+		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -ENOTBLK;
+		goto out;
 	}
-}
-
-static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
-{
-	lkb->lkb_status = 0;
-	list_del(&lkb->lkb_statequeue);
-	unhold_lkb(lkb);
-}
-
-static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
-{
-	del_lkb(r, lkb);
-	add_lkb(r, lkb, sts);
-}
 
-static int msg_reply_type(int mstype)
-{
-	switch (mstype) {
-	case DLM_MSG_REQUEST:
-		return DLM_MSG_REQUEST_REPLY;
-	case DLM_MSG_CONVERT:
-		return DLM_MSG_CONVERT_REPLY;
-	case DLM_MSG_UNLOCK:
-		return DLM_MSG_UNLOCK_REPLY;
-	case DLM_MSG_CANCEL:
-		return DLM_MSG_CANCEL_REPLY;
-	case DLM_MSG_LOOKUP:
-		return DLM_MSG_LOOKUP_REPLY;
+	if (!recover && (r->res_master_nodeid != our_nodeid) &&
+	    (dir_nodeid == our_nodeid)) {
+		/* our rsb is not master, and we are dir; may as well fix it;
+		   this should never happen */
+		log_error(ls, "find_rsb inactive our %d master %d dir %d",
+			  our_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
 	}
-	return -1;
-}
-
-/* add/remove lkb from global waiters list of lkb's waiting for
-   a reply from a remote node */
 
-static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
-{
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	del_scan(ls, r);
+	list_move(&r->res_slow_list, &ls->ls_slow_active);
+	rsb_clear_flag(r, RSB_INACTIVE);
+	kref_init(&r->res_ref); /* ref is now used in active state */
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	spin_lock_bh(&ls->ls_waiters_lock);
-	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
-		switch (mstype) {
-		case DLM_MSG_UNLOCK:
-			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
-			break;
-		case DLM_MSG_CANCEL:
-			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
-			break;
-		default:
-			/* should never happen as validate_lock_args() checks
-			 * on lkb_wait_type and validate_unlock_args() only
-			 * creates UNLOCK or CANCEL messages.
-			 */
-			WARN_ON_ONCE(1);
-			goto out;
-		}
-		lkb->lkb_wait_count++;
-		hold_lkb(lkb);
+	goto out;
 
-		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
-			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
-			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
-		goto out;
-	}
 
-	DLM_ASSERT(!lkb->lkb_wait_count,
-		   dlm_print_lkb(lkb);
-		   printk("wait_count %d\n", lkb->lkb_wait_count););
+ do_new:
+	/*
+	 * rsb not found
+	 */
 
-	lkb->lkb_wait_count++;
-	lkb->lkb_wait_type = mstype;
-	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
-	hold_lkb(lkb);
-	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
- out:
-	spin_unlock_bh(&ls->ls_waiters_lock);
-}
+	if (error == -EBADR && !create)
+		goto out;
 
-/* We clear the RESEND flag because we might be taking an lkb off the waiters
-   list as part of process_requestqueue (e.g. a lookup that has an optimized
-   request reply on the requestqueue) between dlm_recover_waiters_pre() which
-   set RESEND and dlm_recover_waiters_post() */
+	error = get_rsb_struct(ls, name, len, &r);
+	if (WARN_ON_ONCE(error))
+		goto out;
 
-static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
-				const struct dlm_message *ms)
-{
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	int overlap_done = 0;
+	r->res_hash = hash;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
 
-	if (mstype == DLM_MSG_UNLOCK_REPLY &&
-	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
-		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
-		overlap_done = 1;
-		goto out_del;
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
 	}
 
-	if (mstype == DLM_MSG_CANCEL_REPLY &&
-	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
-		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
-		overlap_done = 1;
-		goto out_del;
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		r = NULL;
+		error = -ENOTBLK;
+		goto out;
 	}
 
-	/* Cancel state was preemptively cleared by a successful convert,
-	   see next comment, nothing to do. */
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
+	}
 
-	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
-	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
-		log_debug(ls, "remwait %x cancel_reply wait_type %d",
-			  lkb->lkb_id, lkb->lkb_wait_type);
-		return -1;
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	} else {
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
 	}
 
-	/* Remove for the convert reply, and premptively remove for the
-	   cancel reply.  A convert has been granted while there's still
-	   an outstanding cancel on it (the cancel is moot and the result
-	   in the cancel reply should be 0).  We preempt the cancel reply
-	   because the app gets the convert result and then can follow up
-	   with another op, like convert.  This subsequent op would see the
-	   lingering state of the cancel and fail with -EBUSY. */
+ out_add:
 
-	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
-	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
-	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
-		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
-			  lkb->lkb_id);
-		lkb->lkb_wait_type = 0;
-		lkb->lkb_wait_count--;
-		unhold_lkb(lkb);
-		goto out_del;
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
 	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+	*r_ret = r;
+	return error;
+}
 
-	/* N.B. type of reply may not always correspond to type of original
-	   msg due to lookup->request optimization, verify others? */
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
 
-	if (lkb->lkb_wait_type) {
-		lkb->lkb_wait_type = 0;
-		goto out_del;
-	}
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
+{
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
 
-	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
-		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
-		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
-	return -1;
+ retry:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (error)
+		goto do_new;
 
- out_del:
-	/* the force-unlock/cancel has completed and we haven't recvd a reply
-	   to the op that was in progress prior to the unlock/cancel; we
-	   give up on any reply to the earlier op.  FIXME: not sure when/how
-	   this would happen */
+	/* check if the rsb is in active state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	if (overlap_done && lkb->lkb_wait_type) {
-		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
-			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
-		lkb->lkb_wait_count--;
-		unhold_lkb(lkb);
-		lkb->lkb_wait_type = 0;
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
 	}
 
-	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
-	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
-	lkb->lkb_wait_count--;
-	if (!lkb->lkb_wait_count)
-		list_del_init(&lkb->lkb_wait_reply);
-	unhold_lkb(lkb);
-	return 0;
-}
+	kref_get(&r->res_ref);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 
-static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
-{
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	int error;
+	goto out;
 
-	spin_lock_bh(&ls->ls_waiters_lock);
-	error = _remove_from_waiters(lkb, mstype, NULL);
-	spin_unlock_bh(&ls->ls_waiters_lock);
-	return error;
-}
 
-/* Handles situations where we might be processing a "fake" or "local" reply in
- * the recovery context which stops any locking activity. Only debugfs might
- * change the lockspace waiters but they will held the recovery lock to ensure
- * remove_from_waiters_ms() in local case will be the only user manipulating the
- * lockspace waiters in recovery context.
- */
+ do_inactive:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-static int remove_from_waiters_ms(struct dlm_lkb *lkb,
-				  const struct dlm_message *ms, bool local)
-{
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	int error;
+	/* See comment in find_rsb_dir. */
+	if (rsb_flag(r, RSB_HASHED)) {
+		if (!rsb_flag(r, RSB_INACTIVE)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	if (!local)
-		spin_lock_bh(&ls->ls_waiters_lock);
-	else
-		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
-			     !dlm_locking_stopped(ls));
-	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
-	if (!local)
-		spin_unlock_bh(&ls->ls_waiters_lock);
-	return error;
-}
 
-/* lkb is master or local copy */
+	/*
+	 * rsb found inactive. No other thread is using this rsb because
+	 * it's inactive, so we can look at or update res_master_nodeid
+	 * without lock_rsb.
+	 */
 
-static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
-{
-	int b, len = r->res_ls->ls_lvblen;
+	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+		/* our rsb is not master, and another node has sent us a
+		   request; this should never happen */
+		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		error = -ENOTBLK;
+		goto out;
+	}
 
-	/* b=1 lvb returned to caller
-	   b=0 lvb written to rsb or invalidated
-	   b=-1 do nothing */
+	if (!recover && (r->res_master_nodeid != our_nodeid) &&
+	    (dir_nodeid == our_nodeid)) {
+		/* our rsb is not master, and we are dir; may as well fix it;
+		   this should never happen */
+		log_error(ls, "find_rsb inactive our %d master %d dir %d",
+			  our_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	}
 
-	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+	del_scan(ls, r);
+	list_move(&r->res_slow_list, &ls->ls_slow_active);
+	rsb_clear_flag(r, RSB_INACTIVE);
+	kref_init(&r->res_ref); /* ref is now used in active state */
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	if (b == 1) {
-		if (!lkb->lkb_lvbptr)
-			return;
+	goto out;
 
-		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
-			return;
 
-		if (!r->res_lvbptr)
-			return;
+ do_new:
+	/*
+	 * rsb not found
+	 */
 
-		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
-		lkb->lkb_lvbseq = r->res_lvbseq;
+	if (error == -EBADR && !create)
+		goto out;
 
-	} else if (b == 0) {
-		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
-			rsb_set_flag(r, RSB_VALNOTVALID);
-			return;
-		}
+	error = get_rsb_struct(ls, name, len, &r);
+	if (WARN_ON_ONCE(error))
+		goto out;
 
-		if (!lkb->lkb_lvbptr)
-			return;
+	r->res_hash = hash;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
 
-		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
-			return;
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
+	}
 
-		if (!r->res_lvbptr)
-			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		r = NULL;
+		error = -ENOTBLK;
+		goto out;
+	}
 
-		if (!r->res_lvbptr)
-			return;
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
+	}
 
-		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
-		r->res_lvbseq++;
-		lkb->lkb_lvbseq = r->res_lvbseq;
-		rsb_clear_flag(r, RSB_VALNOTVALID);
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	} else {
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
 	}
 
-	if (rsb_flag(r, RSB_VALNOTVALID))
-		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
+ out_add:
+
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry;
+	} else if (!error) {
+		list_add(&r->res_slow_list, &ls->ls_slow_active);
+	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+	*r_ret = r;
+	return error;
 }
 
-static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
+
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
 {
-	if (lkb->lkb_grmode < DLM_LOCK_PW)
-		return;
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
 
-	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
-		rsb_set_flag(r, RSB_VALNOTVALID);
-		return;
-	}
+ retry:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (error)
+		goto do_new;
 
-	if (!lkb->lkb_lvbptr)
-		return;
+	/* check if the rsb is in active state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
+	if (!rsb_flag(r, RSB_HASHED)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
 
-	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
-		return;
+	if (rsb_flag(r, RSB_INACTIVE)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_inactive;
+	}
 
-	if (!r->res_lvbptr)
-		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
 	if (!r->res_lvbptr)
 		return;
-- 
2.48.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ