/******************************************************************************
*******************************************************************************
**
-** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
+** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
break;
case -EAGAIN:
- /* the remote master didn't queue our NOQUEUE request;
- make a waiting lkb the first_lkid */
+ case -EBADR:
+ case -ENOTBLK:
+ /* the remote request failed and won't be retried (it was
+ a NOQUEUE, or has been canceled/unlocked); make a waiting
+ lkb the first_lkid */
r->res_first_lkid = 0;
/* an lkb may be waiting for an rsb lookup to complete where the
lookup was initiated by another lock */
- if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
- if (!list_empty(&lkb->lkb_rsb_lookup)) {
+ if (!list_empty(&lkb->lkb_rsb_lookup)) {
+ if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
list_del_init(&lkb->lkb_rsb_lookup);
queue_cast(lkb->lkb_resource, lkb,
args->flags & DLM_LKF_CANCEL ?
-DLM_ECANCEL : -DLM_EUNLOCK);
unhold_lkb(lkb); /* undoes create_lkb() */
- rv = -EBUSY;
- goto out;
}
+ /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
+ rv = -EBUSY;
+ goto out;
}
/* cancel not allowed with another cancel/unlock in progress */
lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
- DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
-
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
/* lkb was just created so there won't be an lvb yet */
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms)
{
- if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
- log_error(ls, "convert_args nodeid %d %d lkid %x %x",
- lkb->lkb_nodeid, ms->m_header.h_nodeid,
- lkb->lkb_id, lkb->lkb_remid);
- return -EINVAL;
- }
-
- if (!is_master_copy(lkb))
- return -EINVAL;
-
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
return -EBUSY;
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms)
{
- if (!is_master_copy(lkb))
- return -EINVAL;
if (receive_lvb(ls, lkb, ms))
return -ENOMEM;
return 0;
lkb->lkb_remid = ms->m_lkid;
}
+/* This is called after the rsb is locked so that we can safely inspect
+ fields in the lkb. */
+
+static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+ int from = ms->m_header.h_nodeid;
+ int error = 0;
+
+ switch (ms->m_type) {
+ case DLM_MSG_CONVERT:
+ case DLM_MSG_UNLOCK:
+ case DLM_MSG_CANCEL:
+ if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+ error = -EINVAL;
+ break;
+
+ case DLM_MSG_CONVERT_REPLY:
+ case DLM_MSG_UNLOCK_REPLY:
+ case DLM_MSG_CANCEL_REPLY:
+ case DLM_MSG_GRANT:
+ case DLM_MSG_BAST:
+ if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+ error = -EINVAL;
+ break;
+
+ case DLM_MSG_REQUEST_REPLY:
+ if (!is_process_copy(lkb))
+ error = -EINVAL;
+ else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+ error = -EINVAL;
+ break;
+
+ default:
+ error = -EINVAL;
+ }
+
+ if (error)
+ log_error(lkb->lkb_resource->res_ls,
+ "ignore invalid message %d from %d %x %x %x %d",
+ ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
+ lkb->lkb_flags, lkb->lkb_nodeid);
+ return error;
+}
+
static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
receive_flags(lkb, ms);
error = receive_convert_args(ls, lkb, ms);
if (error)
- goto out;
+ goto out_reply;
reply = !down_conversion(lkb);
error = do_convert(r, lkb);
- out:
+ out_reply:
if (reply)
send_convert_reply(r, lkb, error);
-
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
receive_flags(lkb, ms);
error = receive_unlock_args(ls, lkb, ms);
if (error)
- goto out;
+ goto out_reply;
error = do_unlock(r, lkb);
- out:
+ out_reply:
send_unlock_reply(r, lkb, error);
-
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
error = do_cancel(r, lkb);
send_cancel_reply(r, lkb, error);
-
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_grant no lkb");
+ log_debug(ls, "receive_grant from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
receive_flags_reply(lkb, ms);
if (is_altmode(lkb))
munge_altmode(lkb, ms);
grant_lock_pc(r, lkb, ms);
queue_cast(r, lkb, 0);
-
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_bast no lkb");
+ log_debug(ls, "receive_bast from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
- queue_bast(r, lkb, ms->m_bastmode);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+ queue_bast(r, lkb, ms->m_bastmode);
+ out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_request_reply no lkb");
+ log_debug(ls, "receive_request_reply from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
mstype = lkb->lkb_wait_type;
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
if (error)
if (is_overlap(lkb)) {
/* we'll ignore error in cancel/unlock reply */
queue_cast_overlap(r, lkb);
+ confirm_master(r, result);
unhold_lkb(lkb); /* undoes create_lkb() */
} else
_request_lock(r, lkb);
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_convert_reply no lkb");
+ log_debug(ls, "receive_convert_reply from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
_receive_convert_reply(lkb, ms);
dlm_put_lkb(lkb);
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_unlock_reply no lkb");
+ log_debug(ls, "receive_unlock_reply from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
_receive_unlock_reply(lkb, ms);
dlm_put_lkb(lkb);
hold_rsb(r);
lock_rsb(r);
+ error = validate_message(lkb, ms);
+ if (error)
+ goto out;
+
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
error = find_lkb(ls, ms->m_remid, &lkb);
if (error) {
- log_error(ls, "receive_cancel_reply no lkb");
+ log_debug(ls, "receive_cancel_reply from %d no lkb %x",
+ ms->m_header.h_nodeid, ms->m_remid);
return;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
_receive_cancel_reply(lkb, ms);
dlm_put_lkb(lkb);
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
+ if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
+ log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+ ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_remid, ms->m_result);
+ return;
+ }
+
switch (ms->m_type) {
/* messages sent to a master node */
ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
ls->ls_stub_ms.m_result = -EINPROGRESS;
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+ ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
_receive_convert_reply(lkb, &ls->ls_stub_ms);
/* Same special case as in receive_rcom_lock_args() */
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
+ int wait_type, stub_unlock_result, stub_cancel_result;
mutex_lock(&ls->ls_waiters_mutex);
if (!waiter_needs_recovery(ls, lkb))
continue;
- switch (lkb->lkb_wait_type) {
+ wait_type = lkb->lkb_wait_type;
+ stub_unlock_result = -DLM_EUNLOCK;
+ stub_cancel_result = -DLM_ECANCEL;
+
+ /* Main reply may have been received leaving a zero wait_type,
+ but a reply for the overlapping op may not have been
+ received. In that case we need to fake the appropriate
+ reply for the overlap op. */
+
+ if (!wait_type) {
+ if (is_overlap_cancel(lkb)) {
+ wait_type = DLM_MSG_CANCEL;
+ if (lkb->lkb_grmode == DLM_LOCK_IV)
+ stub_cancel_result = 0;
+ }
+ if (is_overlap_unlock(lkb)) {
+ wait_type = DLM_MSG_UNLOCK;
+ if (lkb->lkb_grmode == DLM_LOCK_IV)
+ stub_unlock_result = -ENOENT;
+ }
+
+ log_debug(ls, "rwpre overlap %x %x %d %d %d",
+ lkb->lkb_id, lkb->lkb_flags, wait_type,
+ stub_cancel_result, stub_unlock_result);
+ }
+
+ switch (wait_type) {
case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_RESEND;
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
- ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+ ls->ls_stub_ms.m_result = stub_unlock_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+ ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
break;
case DLM_MSG_CANCEL:
hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
- ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+ ls->ls_stub_ms.m_result = stub_cancel_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+ ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb);
break;
default:
- log_error(ls, "invalid lkb wait_type %d",
- lkb->lkb_wait_type);
+ log_error(ls, "invalid lkb wait_type %d %d",
+ lkb->lkb_wait_type, wait_type);
}
schedule();
}