[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250830101413.602637-10-226562783+SigAttilio@users.noreply.github.com>
Date: Sat, 30 Aug 2025 12:14:11 +0200
From: Alessio Attilio <alessio.attilio.dev@...il.com>
To: gfs2@...ts.linux.dev
Cc: linux-kernel@...r.kernel.org,
aahringo@...hat.com,
teigland@...hat.com,
Alessio Attilio <226562783+SigAttilio@...rs.noreply.github.com>
Subject: [PATCH 10/12] fix: improve dlm_rsb struct
---
fs/dlm/lock.c | 889 +++++++++++++++++++++++++-------------------------
1 file changed, 451 insertions(+), 438 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 3ead785d8dbe..4236b38aae94 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4470,545 +4470,558 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
continue;
}
- case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
- receive_lookup_reply(ls, ms);
- break;
+ if (deadlk) {
+ /*
+ * If DLM_LKB_NODLKWT flag is set and conversion
+ * deadlock is detected, we request blocking AST and
+ * down (or cancel) conversion.
+ */
+ if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
+ if (lkb->lkb_highbast < lkb->lkb_rqmode) {
+ queue_bast(r, lkb, lkb->lkb_rqmode);
+ lkb->lkb_highbast = lkb->lkb_rqmode;
+ }
+ } else {
+ log_print("WARN: pending deadlock %x node %d %s",
+ lkb->lkb_id, lkb->lkb_nodeid,
+ r->res_name);
+ dlm_dump_rsb(r);
+ }
+ continue;
+ }
- /* other messages */
+ hi = max_t(int, lkb->lkb_rqmode, hi);
- case cpu_to_le32(DLM_MSG_PURGE):
- receive_purge(ls, ms);
- break;
+ if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
+ *cw = 1;
+ }
- default:
- log_error(ls, "unknown message type %d",
- le32_to_cpu(ms->m_type));
+ if (grant_restart)
+ goto restart;
+ if (demote_restart && !quit) {
+ quit = 1;
+ goto restart;
}
- /*
- * When checking for ENOENT, we're checking the result of
- * find_lkb(m_remid):
- *
- * The lock id referenced in the message wasn't found. This may
- * happen in normal usage for the async messages and cancel, so
- * only use log_debug for them.
- *
- * Some errors are expected and normal.
- */
+ return max_t(int, high, hi);
+}
- if (error == -ENOENT && noent) {
- log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
- le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
- le32_to_cpu(ms->m_header.h_nodeid),
- le32_to_cpu(ms->m_lkid), saved_seq);
- } else if (error == -ENOENT) {
- log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
- le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
- le32_to_cpu(ms->m_header.h_nodeid),
- le32_to_cpu(ms->m_lkid), saved_seq);
-
- if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
- dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
- }
-
- if (error == -EINVAL) {
- log_error(ls, "receive %d inval from %d lkid %x remid %x "
- "saved_seq %u",
- le32_to_cpu(ms->m_type),
- le32_to_cpu(ms->m_header.h_nodeid),
- le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
- saved_seq);
- }
-}
-
-/* If the lockspace is in recovery mode (locking stopped), then normal
- messages are saved on the requestqueue for processing after recovery is
- done. When not in recovery mode, we wait for dlm_recoverd to drain saved
- messages off the requestqueue before we process new ones. This occurs right
- after recovery completes when we transition from saving all messages on
- requestqueue, to processing all the saved messages, to processing new
- messages as they arrive. */
-
-static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
- int nodeid)
-{
-try_again:
- read_lock_bh(&ls->ls_requestqueue_lock);
- if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
- /* If we were a member of this lockspace, left, and rejoined,
- other nodes may still be sending us messages from the
- lockspace generation before we left. */
- if (WARN_ON_ONCE(!ls->ls_generation)) {
- read_unlock_bh(&ls->ls_requestqueue_lock);
- log_limit(ls, "receive %d from %d ignore old gen",
- le32_to_cpu(ms->m_type), nodeid);
- return;
- }
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+ unsigned int *count)
+{
+ struct dlm_lkb *lkb, *s;
- read_unlock_bh(&ls->ls_requestqueue_lock);
- write_lock_bh(&ls->ls_requestqueue_lock);
- /* recheck because we hold writelock now */
- if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
- write_unlock_bh(&ls->ls_requestqueue_lock);
- goto try_again;
+ list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
+ if (can_be_granted(r, lkb, 0, 0, NULL)) {
+ grant_lock_pending(r, lkb);
+ if (count)
+ (*count)++;
+ } else {
+ high = max_t(int, lkb->lkb_rqmode, high);
+ if (lkb->lkb_rqmode == DLM_LOCK_CW)
+ *cw = 1;
}
-
- dlm_add_requestqueue(ls, nodeid, ms);
- write_unlock_bh(&ls->ls_requestqueue_lock);
- } else {
- _receive_message(ls, ms, 0);
- read_unlock_bh(&ls->ls_requestqueue_lock);
}
+
+ return high;
}
-/* This is called by dlm_recoverd to process messages that were saved on
- the requestqueue. */
+/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
+ on either the convert or waiting queue.
+ high is the largest rqmode of all locks blocked on the convert or
+ waiting queue. */
-void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
- uint32_t saved_seq)
+static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
{
- _receive_message(ls, ms, saved_seq);
-}
+ if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
+ if (gr->lkb_highbast < DLM_LOCK_EX)
+ return 1;
+ return 0;
+ }
-/* This is called by the midcomms layer when something is received for
- the lockspace. It could be either a MSG (normal message sent as part of
- standard locking activity) or an RCOM (recovery message sent as part of
- lockspace recovery). */
+ if (gr->lkb_highbast < high &&
+ !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
+ return 1;
+ return 0;
+}
-void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
{
- const struct dlm_header *hd = &p->header;
- struct dlm_ls *ls;
- int type = 0;
+ struct dlm_lkb *lkb, *s;
+ int high = DLM_LOCK_IV;
+ int cw = 0;
- switch (hd->h_cmd) {
- case DLM_MSG:
- type = le32_to_cpu(p->message.m_type);
- break;
- case DLM_RCOM:
- type = le32_to_cpu(p->rcom.rc_type);
- break;
- default:
- log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+ if (!is_master(r)) {
+ log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+ dlm_dump_rsb(r);
return;
}
- if (le32_to_cpu(hd->h_nodeid) != nodeid) {
- log_print("invalid h_nodeid %d from %d lockspace %x",
- le32_to_cpu(hd->h_nodeid), nodeid,
- le32_to_cpu(hd->u.h_lockspace));
+ high = grant_pending_convert(r, high, &cw, count);
+ high = grant_pending_wait(r, high, &cw, count);
+
+ if (high == DLM_LOCK_IV)
return;
- }
- ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
- if (!ls) {
- if (dlm_config.ci_log_debug) {
- printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
- "%u from %d cmd %d type %d\n",
- le32_to_cpu(hd->u.h_lockspace), nodeid,
- hd->h_cmd, type);
+ /*
+ * If there are locks left on the wait/convert queue then send blocking
+ * ASTs to granted locks based on the largest requested mode (high)
+ * found above.
+ */
+
+ list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
+ if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
+ if (cw && high == DLM_LOCK_PR &&
+ lkb->lkb_grmode == DLM_LOCK_PR)
+ queue_bast(r, lkb, DLM_LOCK_CW);
+ else
+ queue_bast(r, lkb, high);
+ lkb->lkb_highbast = high;
}
+ }
+}
- if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
- dlm_send_ls_not_ready(nodeid, &p->rcom);
- return;
+static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
+{
+ if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
+ (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
+ if (gr->lkb_highbast < DLM_LOCK_EX)
+ return 1;
+ return 0;
}
- /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
- be inactive (in this ls) before transitioning to recovery mode */
+ if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
+ return 1;
+ return 0;
+}
- read_lock_bh(&ls->ls_recv_active);
- if (hd->h_cmd == DLM_MSG)
- dlm_receive_message(ls, &p->message, nodeid);
- else if (hd->h_cmd == DLM_RCOM)
- dlm_receive_rcom(ls, &p->rcom, nodeid);
- else
- log_error(ls, "invalid h_cmd %d from %d lockspace %x",
- hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
- read_unlock_bh(&ls->ls_recv_active);
+static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
+ struct dlm_lkb *lkb)
+{
+ struct dlm_lkb *gr;
- dlm_put_lockspace(ls);
+ list_for_each_entry(gr, head, lkb_statequeue) {
+ /* skip self when sending basts to convertqueue */
+ if (gr == lkb)
+ continue;
+ if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
+ queue_bast(r, gr, lkb->lkb_rqmode);
+ gr->lkb_highbast = lkb->lkb_rqmode;
+ }
+ }
}
-static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms_local)
+static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- if (middle_conversion(lkb)) {
- log_rinfo(ls, "%s %x middle convert in progress", __func__,
- lkb->lkb_id);
+ send_bast_queue(r, &r->res_grantqueue, lkb);
+}
- /* We sent this lock to the new master. The new master will
- * tell us when it's granted. We no longer need a reply, so
- * use a fake reply to put the lkb into the right state.
- */
- hold_lkb(lkb);
- memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
- ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_convert_reply(lkb, ms_local, true);
- unhold_lkb(lkb);
+static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ send_bast_queue(r, &r->res_grantqueue, lkb);
+ send_bast_queue(r, &r->res_convertqueue, lkb);
+}
- } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
- set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
- }
+/* set_master(r, lkb) -- set the master nodeid of a resource
- /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
- conversions are async; there's no reply from the remote master */
-}
+ The purpose of this function is to set the nodeid field in the given
+ lkb using the nodeid field in the given rsb. If the rsb's nodeid is
+ known, it can just be copied to the lkb and the function will return
+ 0. If the rsb's nodeid is _not_ known, it needs to be looked up
+ before it can be copied to the lkb.
+
+ When the rsb nodeid is being looked up remotely, the initial lkb
+ causing the lookup is kept on the ls_waiters list waiting for the
+ lookup reply. Other lkb's waiting for the same rsb lookup are kept
+ on the rsb's res_lookup list until the master is verified.
-/* A waiting lkb needs recovery if the master node has failed, or
- the master node is changing (only when no directory is used) */
+ Return values:
+ 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
+ 1: the rsb master is not available and the lkb has been placed on
+ a wait queue
+*/
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
- int dir_nodeid)
+static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- if (dlm_no_directory(ls))
- return 1;
+ int our_nodeid = dlm_our_nodeid();
+
+ if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
+ rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
+ r->res_first_lkid = lkb->lkb_id;
+ lkb->lkb_nodeid = r->res_nodeid;
+ return 0;
+ }
- if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
+ if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
+ list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
return 1;
+ }
- return 0;
-}
+ if (r->res_master_nodeid == our_nodeid) {
+ lkb->lkb_nodeid = 0;
+ return 0;
+ }
+
+ if (r->res_master_nodeid) {
+ lkb->lkb_nodeid = r->res_master_nodeid;
+ return 0;
+ }
-/* Recovery for locks that are waiting for replies from nodes that are now
- gone. We can just complete unlocks and cancels by faking a reply from the
- dead node. Requests and up-conversions we flag to be resent after
- recovery. Down-conversions can just be completed with a fake reply like
- unlocks. Conversions between PR and CW need special attention. */
+ if (dlm_dir_nodeid(r) == our_nodeid) {
+ /* This is a somewhat unusual case; find_rsb will usually
+ have set res_master_nodeid when dir nodeid is local, but
+ there are cases where we become the dir node after we've
+ past find_rsb and go through _request_lock again.
+ confirm_master() or process_lookup_list() needs to be
+ called after this. */
+ log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
+ lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
+ r->res_name);
+ r->res_master_nodeid = our_nodeid;
+ r->res_nodeid = 0;
+ lkb->lkb_nodeid = 0;
+ return 0;
+ }
+
+ r->res_first_lkid = lkb->lkb_id;
+ send_lookup(r, lkb);
+ return 1;
+}
-void dlm_recover_waiters_pre(struct dlm_ls *ls)
+static void process_lookup_list(struct dlm_rsb *r)
{
struct dlm_lkb *lkb, *safe;
- struct dlm_message *ms_local;
- int wait_type, local_unlock_result, local_cancel_result;
- int dir_nodeid;
- ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
- if (!ms_local)
+ list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
+ list_del_init(&lkb->lkb_rsb_lookup);
+ _request_lock(r, lkb);
+ }
+}
+
+/* confirm_master -- confirm (or deny) an rsb's master nodeid */
+
+static void confirm_master(struct dlm_rsb *r, int error)
+{
+ struct dlm_lkb *lkb;
+
+ if (!r->res_first_lkid)
return;
- list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+ switch (error) {
+ case 0:
+ case -EINPROGRESS:
+ r->res_first_lkid = 0;
+ process_lookup_list(r);
+ break;
- dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+ case -EAGAIN:
+ case -EBADR:
+ case -ENOTBLK:
+ /* the remote request failed and won't be retried (it was
+ a NOQUEUE, or has been canceled/unlocked); make a waiting
+ lkb the first_lkid */
- /* exclude debug messages about unlocks because there can be so
- many and they aren't very interesting */
+ r->res_first_lkid = 0;
- if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
- log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
- "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
- lkb->lkb_id,
- lkb->lkb_remid,
- lkb->lkb_wait_type,
- lkb->lkb_resource->res_nodeid,
- lkb->lkb_nodeid,
- lkb->lkb_wait_nodeid,
- dir_nodeid);
+ if (!list_empty(&r->res_lookup)) {
+ lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
+ lkb_rsb_lookup);
+ list_del_init(&lkb->lkb_rsb_lookup);
+ r->res_first_lkid = lkb->lkb_id;
+ _request_lock(r, lkb);
}
+ break;
- /* all outstanding lookups, regardless of destination will be
- resent after recovery is done */
+ default:
+ log_error(r->res_ls, "confirm_master unknown error %d", error);
+ }
+}
- if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
- set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
- continue;
- }
+static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
+ int namelen, void (*ast)(void *astparam),
+ void *astparam,
+ void (*bast)(void *astparam, int mode),
+ struct dlm_args *args)
+{
+ int rv = -EINVAL;
- if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
- continue;
+ /* check for invalid arg usage */
- wait_type = lkb->lkb_wait_type;
- local_unlock_result = -DLM_EUNLOCK;
- local_cancel_result = -DLM_ECANCEL;
+ if (mode < 0 || mode > DLM_LOCK_EX)
+ goto out;
- /* Main reply may have been received leaving a zero wait_type,
- but a reply for the overlapping op may not have been
- received. In that case we need to fake the appropriate
- reply for the overlap op. */
+ if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
+ goto out;
- if (!wait_type) {
- if (is_overlap_cancel(lkb)) {
- wait_type = DLM_MSG_CANCEL;
- if (lkb->lkb_grmode == DLM_LOCK_IV)
- local_cancel_result = 0;
- }
- if (is_overlap_unlock(lkb)) {
- wait_type = DLM_MSG_UNLOCK;
- if (lkb->lkb_grmode == DLM_LOCK_IV)
- local_unlock_result = -ENOENT;
- }
+ if (flags & DLM_LKF_CANCEL)
+ goto out;
- log_debug(ls, "rwpre overlap %x %x %d %d %d",
- lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
- local_cancel_result, local_unlock_result);
- }
+ if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
+ goto out;
- switch (wait_type) {
+ if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
+ goto out;
- case DLM_MSG_REQUEST:
- set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
- break;
+ if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
+ goto out;
- case DLM_MSG_CONVERT:
- recover_convert_waiter(ls, lkb, ms_local);
- break;
+ if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
+ goto out;
- case DLM_MSG_UNLOCK:
- hold_lkb(lkb);
- memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
- ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_unlock_reply(lkb, ms_local, true);
- dlm_put_lkb(lkb);
- break;
+ if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
+ goto out;
- case DLM_MSG_CANCEL:
- hold_lkb(lkb);
- memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
- ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_cancel_reply(lkb, ms_local, true);
- dlm_put_lkb(lkb);
- break;
+ if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
+ goto out;
- default:
- log_error(ls, "invalid lkb wait_type %d %d",
- lkb->lkb_wait_type, wait_type);
- }
- schedule();
- }
- kfree(ms_local);
+ if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
+ goto out;
+
+ if (!ast || !lksb)
+ goto out;
+
+ if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
+ goto out;
+
+ if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
+ goto out;
+
+ /* these args will be copied to the lkb in validate_lock_args,
+ it cannot be done now because when converting locks, fields in
+ an active lkb cannot be modified before locking the rsb */
+
+ args->flags = flags;
+ args->astfn = ast;
+ args->astparam = astparam;
+ args->bastfn = bast;
+ args->mode = mode;
+ args->lksb = lksb;
+ rv = 0;
+ out:
+ return rv;
}
-static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
+static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
{
- struct dlm_lkb *lkb = NULL, *iter;
+ if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
+ DLM_LKF_FORCEUNLOCK))
+ return -EINVAL;
- spin_lock_bh(&ls->ls_waiters_lock);
- list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
- if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
- hold_lkb(iter);
- lkb = iter;
- break;
- }
- }
- spin_unlock_bh(&ls->ls_waiters_lock);
+ if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
+ return -EINVAL;
- return lkb;
+ args->flags = flags;
+ args->astparam = astarg;
+ return 0;
}
-/*
- * Forced state reset for locks that were in the middle of remote operations
- * when recovery happened (i.e. lkbs that were on the waiters list, waiting
- * for a reply from a remote operation.) The lkbs remaining on the waiters
- * list need to be reevaluated; some may need resending to a different node
- * than previously, and some may now need local handling rather than remote.
- *
- * First, the lkb state for the voided remote operation is forcibly reset,
- * equivalent to what remove_from_waiters() would normally do:
- * . lkb removed from ls_waiters list
- * . lkb wait_type cleared
- * . lkb waiters_count cleared
- * . lkb ref count decremented for each waiters_count (almost always 1,
- * but possibly 2 in case of cancel/unlock overlapping, which means
- * two remote replies were being expected for the lkb.)
- *
- * Second, the lkb is reprocessed like an original operation would be,
- * by passing it to _request_lock or _convert_lock, which will either
- * process the lkb operation locally, or send it to a remote node again
- * and put the lkb back onto the waiters list.
- *
- * When reprocessing the lkb, we may find that it's flagged for an overlapping
- * force-unlock or cancel, either from before recovery began, or after recovery
- * finished. If this is the case, the unlock/cancel is done directly, and the
- * original operation is not initiated again (no _request_lock/_convert_lock.)
- */
-
-int dlm_recover_waiters_post(struct dlm_ls *ls)
+static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
+ struct dlm_args *args)
{
- struct dlm_lkb *lkb;
- struct dlm_rsb *r;
- int error = 0, mstype, err, oc, ou;
+ int rv = -EBUSY;
- while (1) {
- if (dlm_locking_stopped(ls)) {
- log_debug(ls, "recover_waiters_post aborted");
- error = -EINTR;
- break;
- }
+ if (args->flags & DLM_LKF_CONVERT) {
+ if (lkb->lkb_status != DLM_LKSTS_GRANTED)
+ goto out;
- /*
- * Find an lkb from the waiters list that's been affected by
- * recovery node changes, and needs to be reprocessed. Does
- * hold_lkb(), adding a refcount.
- */
- lkb = find_resend_waiter(ls);
- if (!lkb)
- break;
+ /* lock not allowed if there's any op in progress */
+ if (lkb->lkb_wait_type || lkb->lkb_wait_count)
+ goto out;
- r = lkb->lkb_resource;
- hold_rsb(r);
- lock_rsb(r);
+ if (is_overlap(lkb))
+ goto out;
- /*
- * If the lkb has been flagged for a force unlock or cancel,
- * then the reprocessing below will be replaced by just doing
- * the unlock/cancel directly.
- */
- mstype = lkb->lkb_wait_type;
- oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
- &lkb->lkb_iflags);
- ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
- &lkb->lkb_iflags);
- err = 0;
-
- log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
- "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
- "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
- r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
- dlm_dir_nodeid(r), oc, ou);
+ rv = -EINVAL;
+ if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
+ goto out;
- /*
- * No reply to the pre-recovery operation will now be received,
- * so a forced equivalent of remove_from_waiters() is needed to
- * reset the waiters state that was in place before recovery.
- */
+ if (args->flags & DLM_LKF_QUECVT &&
+ !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
+ goto out;
+ }
- clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+ lkb->lkb_exflags = args->flags;
+ dlm_set_sbflags_val(lkb, 0);
+ lkb->lkb_astfn = args->astfn;
+ lkb->lkb_astparam = args->astparam;
+ lkb->lkb_bastfn = args->bastfn;
+ lkb->lkb_rqmode = args->mode;
+ lkb->lkb_lksb = args->lksb;
+ lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
+ lkb->lkb_ownpid = (int) current->pid;
+ rv = 0;
+ out:
+ switch (rv) {
+ case 0:
+ break;
+ case -EINVAL:
+ /* annoy the user because dlm usage is wrong */
+ WARN_ON(1);
+ log_error(ls, "%s %d %x %x %x %d %d", __func__,
+ rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
+ lkb->lkb_status, lkb->lkb_wait_type);
+ break;
+ default:
+ log_debug(ls, "%s %d %x %x %x %d %d", __func__,
+ rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
+ lkb->lkb_status, lkb->lkb_wait_type);
+ break;
+ }
- /* Forcibly clear wait_type */
- lkb->lkb_wait_type = 0;
+ return rv;
+}
- /*
- * Forcibly reset wait_count and associated refcount. The
- * wait_count will almost always be 1, but in case of an
- * overlapping unlock/cancel it could be 2: see where
- * add_to_waiters() finds the lkb is already on the waiters
- * list and does lkb_wait_count++; hold_lkb().
- */
- while (lkb->lkb_wait_count) {
- lkb->lkb_wait_count--;
- unhold_lkb(lkb);
- }
+/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
+ for success */
- /* Forcibly remove from waiters list */
- spin_lock_bh(&ls->ls_waiters_lock);
- list_del_init(&lkb->lkb_wait_reply);
- spin_unlock_bh(&ls->ls_waiters_lock);
+/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+ because there may be a lookup in progress and it's valid to do
+ cancel/unlockf on it */
- /*
- * The lkb is now clear of all prior waiters state and can be
- * processed locally, or sent to remote node again, or directly
- * cancelled/unlocked.
- */
+static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
+{
+ struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+ int rv = -EBUSY;
- if (oc || ou) {
- /* do an unlock or cancel instead of resending */
- switch (mstype) {
- case DLM_MSG_LOOKUP:
- case DLM_MSG_REQUEST:
- queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
- -DLM_ECANCEL);
- unhold_lkb(lkb); /* undoes create_lkb() */
- break;
- case DLM_MSG_CONVERT:
- if (oc) {
- queue_cast(r, lkb, -DLM_ECANCEL);
- } else {
- lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
- _unlock_lock(r, lkb);
- }
- break;
- default:
- err = 1;
- }
- } else {
- switch (mstype) {
- case DLM_MSG_LOOKUP:
- case DLM_MSG_REQUEST:
- _request_lock(r, lkb);
- if (r->res_nodeid != -1 && is_master(r))
- confirm_master(r, 0);
- break;
- case DLM_MSG_CONVERT:
- _convert_lock(r, lkb);
- break;
- default:
- err = 1;
- }
- }
+ /* normal unlock not allowed if there's any op in progress */
+ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
+ (lkb->lkb_wait_type || lkb->lkb_wait_count))
+ goto out;
- if (err) {
- log_error(ls, "waiter %x msg %d r_nodeid %d "
- "dir_nodeid %d overlap %d %d",
- lkb->lkb_id, mstype, r->res_nodeid,
- dlm_dir_nodeid(r), oc, ou);
+ /* an lkb may be waiting for an rsb lookup to complete where the
+ lookup was initiated by another lock */
+
+ if (!list_empty(&lkb->lkb_rsb_lookup)) {
+ if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
+ log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
+ list_del_init(&lkb->lkb_rsb_lookup);
+ queue_cast(lkb->lkb_resource, lkb,
+ args->flags & DLM_LKF_CANCEL ?
+ -DLM_ECANCEL : -DLM_EUNLOCK);
+ unhold_lkb(lkb); /* undoes create_lkb() */
}
- unlock_rsb(r);
- put_rsb(r);
- dlm_put_lkb(lkb);
+ /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
+ goto out;
}
- return error;
-}
+ rv = -EINVAL;
+ if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
+ log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
+ dlm_print_lkb(lkb);
+ goto out;
+ }
-static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
- struct list_head *list)
-{
- struct dlm_lkb *lkb, *safe;
+ /* an lkb may still exist even though the lock is EOL'ed due to a
+ * cancel, unlock or failed noqueue request; an app can't use these
+ * locks; return same error as if the lkid had not been found at all
+ */
- list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
- if (!is_master_copy(lkb))
- continue;
+ if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
+ log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
+ rv = -ENOENT;
+ goto out;
+ }
- /* don't purge lkbs we've added in recover_master_copy for
- the current recovery seq */
+ if (is_overlap_unlock(lkb))
+ goto out;
- if (lkb->lkb_recover_seq == ls->ls_recover_seq)
- continue;
+ /* cancel not allowed with another cancel/unlock in progress */
- del_lkb(r, lkb);
+ if (args->flags & DLM_LKF_CANCEL) {
+ if (lkb->lkb_exflags & DLM_LKF_CANCEL)
+ goto out;
- /* this put should free the lkb */
- if (!dlm_put_lkb(lkb))
- log_error(ls, "purged mstcpy lkb not released");
+ if (is_overlap_cancel(lkb))
+ goto out;
+
+ if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
+ set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
+ rv = -EBUSY;
+ goto out;
+ }
+
+ /* there's nothing to cancel */
+ if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
+ !lkb->lkb_wait_type) {
+ rv = -EBUSY;
+ goto out;
+ }
+
+ switch (lkb->lkb_wait_type) {
+ case DLM_MSG_LOOKUP:
+ case DLM_MSG_REQUEST:
+ set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
+ rv = -EBUSY;
+ goto out;
+ case DLM_MSG_UNLOCK:
+ case DLM_MSG_CANCEL:
+ goto out;
+ }
+ /* add_to_waiters() will set OVERLAP_CANCEL */
+ goto out_ok;
}
-}
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
- struct dlm_ls *ls = r->res_ls;
+ /* do we need to allow a force-unlock if there's a normal unlock
+ already in progress? in what conditions could the normal unlock
+ fail such that we'd want to send a force-unlock to be sure? */
- purge_mstcpy_list(ls, r, &r->res_grantqueue);
- purge_mstcpy_list(ls, r, &r->res_convertqueue);
- purge_mstcpy_list(ls, r, &r->res_waitqueue);
-}
+ if (args->flags & DLM_LKF_FORCEUNLOCK) {
+ if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
+ goto out;
-static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
- struct list_head *list,
- int nodeid_gone, unsigned int *count)
-{
- struct dlm_lkb *lkb, *safe;
+ if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
+ set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
+ rv = -EBUSY;
+ goto out;
+ }
- list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
- if (!is_master_copy(lkb))
- continue;
+ switch (lkb->lkb_wait_type) {
+ case DLM_MSG_LOOKUP:
+ case DLM_MSG_REQUEST:
+ set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
+ rv = -EBUSY;
+ goto out;
+ case DLM_MSG_UNLOCK:
+ goto out;
+ }
+ /* add_to_waiters() will set OVERLAP_UNLOCK */
+ }
- if ((lkb->lkb_nodeid == nodeid_gone) ||
- dlm_is_removed(ls, lkb->lkb_nodeid)) {
+ out_ok:
+ /* an overlapping op shouldn't blow away exflags from other op */
+ lkb->lkb_exflags |= args->flags;
+ dlm_set_sbflags_val(lkb, 0);
+ lkb->lkb_astparam = args->astparam;
+ rv = 0;
+ out:
+ switch (rv) {
+ case 0:
+ break;
+ case -EINVAL:
+ /* annoy the user because dlm usage is wrong */
+ WARN_ON(1);
+ log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
+ lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
+ args->flags, lkb->lkb_wait_type,
+ lkb->lkb_resource->res_name);
+ break;
+ default:
+ log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
+ lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
+ args->flags, lkb->lkb_wait_type,
+ lkb->lkb_resource->res_name);
+ break;
+ }
- /* tell recover_lvb to invalidate the lvb
- because a node holding EX/PW failed */
- if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
- (lkb->lkb_grmode >= DLM_LOCK_PW)) {
- rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
- }
+ return rv;
+}
- del_lkb(r, lkb);
+/*
+ * Four stage 4 varieties:
+ * do_request(), do_convert(), do_unlock(), do_cancel()
+ * These are called on the master node for the given lock and
+ * from the central locking logic.
+ */
/* this put should free the lkb */
if (!dlm_put_lkb(lkb))
--
2.48.1
Powered by blists - more mailing lists