X-Git-Url: http://ftp.safe.ca/?a=blobdiff_plain;f=fs%2Fdlm%2Flock.c;h=2d3d1027ce2bbdfbc30fef5631b1ce378deaf05e;hb=53492b1de46a7576170e865062ffcfc93bb5650b;hp=3c4d570477b4655b93486b52921e9245d7cdccc1;hpb=c85d65e91430db94ae9ce0cf38b56e496658b642;p=safe%2Fjmp%2Flinux-2.6 diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 3c4d570..2d3d102 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void del_timeout(struct dlm_lkb *lkb); -void dlm_timeout_warn(struct dlm_lkb *lkb); /* * Lock compatibilty matrix - thanks Steve @@ -166,7 +165,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb) lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); } -void dlm_print_rsb(struct dlm_rsb *r) +static void dlm_print_rsb(struct dlm_rsb *r) { printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", r->res_nodeid, r->res_flags, r->res_first_lkid, @@ -300,6 +299,11 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) rv = -ETIMEDOUT; } + if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { + lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; + rv = -EDEADLK; + } + lkb->lkb_lksb->sb_status = rv; lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; @@ -330,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; - r = allocate_rsb(ls, len); + r = dlm_allocate_rsb(ls, len); if (!r) return NULL; @@ -432,11 +436,15 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, { struct dlm_rsb *r, *tmp; uint32_t hash, bucket; - int error = 0; + int error = -EINVAL; + + if (namelen > DLM_RESNAME_MAXLEN) + goto out; if (dlm_no_directory(ls)) flags |= R_CREATE; + error = 0; hash = jhash(name, namelen, 0); bucket = hash & (ls->ls_rsbtbl_size - 1); @@ -473,7 +481,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); if (!error) { write_unlock(&ls->ls_rsbtbl[bucket].lock); - free_rsb(r); + dlm_free_rsb(r); r = tmp; goto out; } @@ -485,12 +493,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, return error; } -int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) -{ - return find_rsb(ls, name, namelen, flags, r_ret); -} - /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ @@ -514,7 +516,7 @@ static void toss_rsb(struct kref *kref) list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { - free_lvb(r->res_lvbptr); + dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } @@ -584,7 +586,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) uint32_t lkid = 0; uint16_t bucket; - lkb = allocate_lkb(ls); + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -678,8 +680,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) /* for local/process lkbs, lvbptr points to caller's lksb */ if (lkb->lkb_lvbptr && is_master_copy(lkb)) - free_lvb(lkb->lkb_lvbptr); - free_lkb(lkb); + dlm_free_lvb(lkb->lkb_lvbptr); + dlm_free_lkb(lkb); return 1; } else { write_unlock(&ls->ls_lkbtbl[bucket].lock); @@ -983,7 +985,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) if (is_master(r)) dir_remove(r); - free_rsb(r); + dlm_free_rsb(r); count++; } else { write_unlock(&ls->ls_rsbtbl[b].lock); @@ -1010,17 +1012,18 @@ static void add_timeout(struct dlm_lkb *lkb) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - if (is_master_copy(lkb)) + if (is_master_copy(lkb)) { + lkb->lkb_timestamp = jiffies; return; - - if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) - goto add_it; + } if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; goto add_it; } + if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) + goto add_it; return; add_it: @@ -1098,8 +1101,8 @@ void dlm_scan_timeout(struct dlm_ls *ls) } if (do_cancel) { - log_debug("timeout cancel %x node %d %s", lkb->lkb_id, - lkb->lkb_nodeid, r->res_name); + log_debug(ls, "timeout cancel %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; del_timeout(lkb); @@ -1165,7 +1168,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1197,7 +1200,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1223,6 +1226,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) + len = DLM_RESNAME_MAXLEN; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); lkb->lkb_lvbseq = ms->m_lvbseq; } @@ -1664,9 +1669,10 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, with a deadlk here, we'd have to generate something like grant_lock with the deadlk error.) */ -/* returns the highest requested mode of all blocked conversions */ +/* Returns the highest requested mode of all blocked conversions; sets + cw if there's a blocked conversion to DLM_LOCK_CW. */ -static int grant_pending_convert(struct dlm_rsb *r, int high) +static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; int hi, demoted, quit, grant_restart, demote_restart; @@ -1703,6 +1709,9 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) } hi = max_t(int, lkb->lkb_rqmode, hi); + + if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; } if (grant_restart) @@ -1715,29 +1724,52 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) return max_t(int, high, hi); } -static int grant_pending_wait(struct dlm_rsb *r, int high) +static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { if (can_be_granted(r, lkb, 0, NULL)) grant_lock_pending(r, lkb); - else + else { high = max_t(int, lkb->lkb_rqmode, high); + if (lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; + } } return high; } +/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked + on either the convert or waiting queue. + high is the largest rqmode of all locks blocked on the convert or + waiting queue. */ + +static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) +{ + if (gr->lkb_grmode == DLM_LOCK_PR && cw) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < high && + !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) + return 1; + return 0; +} + static void grant_pending_locks(struct dlm_rsb *r) { struct dlm_lkb *lkb, *s; int high = DLM_LOCK_IV; + int cw = 0; DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); - high = grant_pending_convert(r, high); - high = grant_pending_wait(r, high); + high = grant_pending_convert(r, high, &cw); + high = grant_pending_wait(r, high, &cw); if (high == DLM_LOCK_IV) return; @@ -1745,27 +1777,41 @@ static void grant_pending_locks(struct dlm_rsb *r) /* * If there are locks left on the wait/convert queue then send blocking * ASTs to granted locks based on the largest requested mode (high) - * found above. FIXME: highbast < high comparison not valid for PR/CW. + * found above. */ list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { - if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && - !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { - queue_bast(r, lkb, high); + if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { + if (cw && high == DLM_LOCK_PR) + queue_bast(r, lkb, DLM_LOCK_CW); + else + queue_bast(r, lkb, high); lkb->lkb_highbast = high; } } } +static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) +{ + if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || + (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) + return 1; + return 0; +} + static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, struct dlm_lkb *lkb) { struct dlm_lkb *gr; list_for_each_entry(gr, head, lkb_statequeue) { - if (gr->lkb_bastaddr && - gr->lkb_highbast < lkb->lkb_rqmode && - !modes_compat(gr, lkb)) { + if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { queue_bast(r, gr, lkb->lkb_rqmode); gr->lkb_highbast = lkb->lkb_rqmode; } @@ -1805,7 +1851,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -1839,7 +1885,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - for (;;) { + for (i = 0; i < 2; i++) { /* It's possible for dlm_scand to remove an old rsb for this same resource from the toss list, us to create a new one, look up the master locally, and find it @@ -1853,6 +1899,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) log_debug(ls, "dir_lookup error %d %s", error, r->res_name); schedule(); } + if (error && error != -EEXIST) + return error; if (ret_nodeid == our_nodeid) { r->res_first_lkid = 0; @@ -1894,8 +1942,11 @@ static void confirm_master(struct dlm_rsb *r, int error) break; case -EAGAIN: - /* the remote master didn't queue our NOQUEUE request; - make a waiting lkb the first_lkid */ + case -EBADR: + case -ENOTBLK: + /* the remote request failed and won't be retried (it was + a NOQUEUE, or has been canceled/unlocked); make a waiting + lkb the first_lkid */ r->res_first_lkid = 0; @@ -1905,8 +1956,7 @@ static void confirm_master(struct dlm_rsb *r, int error) list_del_init(&lkb->lkb_rsb_lookup); r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); - } else - r->res_nodeid = -1; + } break; default: @@ -1915,8 +1965,11 @@ static void confirm_master(struct dlm_rsb *r, int error) } static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, - int namelen, unsigned long timeout_cs, void *ast, - void *astarg, void *bast, struct dlm_args *args) + int namelen, unsigned long timeout_cs, + void (*ast) (void *astparam), + void *astparam, + void (*bast) (void *astparam, int mode), + struct dlm_args *args) { int rv = -EINVAL; @@ -1966,9 +2019,9 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, an active lkb cannot be modified before locking the rsb */ args->flags = flags; - args->astaddr = ast; - args->astparam = (long) astarg; - args->bastaddr = bast; + args->astfn = ast; + args->astparam = astparam; + args->bastfn = bast; args->timeout = timeout_cs; args->mode = mode; args->lksb = lksb; @@ -1987,7 +2040,7 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) return -EINVAL; args->flags = flags; - args->astparam = (long) astarg; + args->astparam = astarg; return 0; } @@ -2017,9 +2070,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_exflags = args->flags; lkb->lkb_sbflags = 0; - lkb->lkb_astaddr = args->astaddr; + lkb->lkb_astfn = args->astfn; lkb->lkb_astparam = args->astparam; - lkb->lkb_bastaddr = args->bastaddr; + lkb->lkb_bastfn = args->bastfn; lkb->lkb_rqmode = args->mode; lkb->lkb_lksb = args->lksb; lkb->lkb_lvbptr = args->lksb->sb_lvbptr; @@ -2061,17 +2114,18 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ - if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { - if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); list_del_init(&lkb->lkb_rsb_lookup); queue_cast(lkb->lkb_resource, lkb, args->flags & DLM_LKF_CANCEL ? -DLM_ECANCEL : -DLM_EUNLOCK); unhold_lkb(lkb); /* undoes create_lkb() */ - rv = -EBUSY; - goto out; } + /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ + rv = -EBUSY; + goto out; } /* cancel not allowed with another cancel/unlock in progress */ @@ -2229,7 +2283,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) before we try again to grant this one. */ if (is_demoted(lkb)) { - grant_pending_convert(r, DLM_LOCK_IV); + grant_pending_convert(r, DLM_LOCK_IV, NULL); if (_can_be_granted(r, lkb, 1)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); @@ -2588,7 +2642,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, pass into lowcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); + mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb); if (!mh) return -ENOBUFS; @@ -2665,9 +2719,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, /* m_result and m_bastmode are set from function args, not from lkb fields */ - if (lkb->lkb_bastaddr) + if (lkb->lkb_bastfn) ms->m_asts |= AST_BAST; - if (lkb->lkb_astaddr) + if (lkb->lkb_astfn) ms->m_asts |= AST_COMP; /* compare with switch in create_message; send_remove() doesn't @@ -2939,15 +2993,27 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_exflags & DLM_LKF_VALBLK) { if (!lkb->lkb_lvbptr) - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) + len = DLM_RESNAME_MAXLEN; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); } return 0; } +static void fake_bastfn(void *astparam, int mode) +{ + log_print("fake_bastfn should not be called"); +} + +static void fake_astfn(void *astparam) +{ + log_print("fake_astfn should not be called"); +} + static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { @@ -2956,14 +3022,13 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_remid = ms->m_lkid; lkb->lkb_grmode = DLM_LOCK_IV; lkb->lkb_rqmode = ms->m_rqmode; - lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); - lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); - DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); + lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; } @@ -2974,16 +3039,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { - log_error(ls, "convert_args nodeid %d %d lkid %x %x", - lkb->lkb_nodeid, ms->m_header.h_nodeid, - lkb->lkb_id, lkb->lkb_remid); - return -EINVAL; - } - - if (!is_master_copy(lkb)) - return -EINVAL; - if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -2999,8 +3054,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (!is_master_copy(lkb)) - return -EINVAL; if (receive_lvb(ls, lkb, ms)) return -ENOMEM; return 0; @@ -3016,6 +3069,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_remid = ms->m_lkid; } +/* This is called after the rsb is locked so that we can safely inspect + fields in the lkb. */ + +static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + int from = ms->m_header.h_nodeid; + int error = 0; + + switch (ms->m_type) { + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_UNLOCK_REPLY: + case DLM_MSG_CANCEL_REPLY: + case DLM_MSG_GRANT: + case DLM_MSG_BAST: + if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_REQUEST_REPLY: + if (!is_process_copy(lkb)) + error = -EINVAL; + else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + default: + error = -EINVAL; + } + + if (error) + log_error(lkb->lkb_resource->res_ls, + "ignore invalid message %d from %d %x %x %x %d", + ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, + lkb->lkb_flags, lkb->lkb_nodeid); + return error; +} + static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; @@ -3077,17 +3174,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_convert_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; reply = !down_conversion(lkb); error = do_convert(r, lkb); - out: + out_reply: if (reply) send_convert_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3113,15 +3214,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_unlock_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; error = do_unlock(r, lkb); - out: + out_reply: send_unlock_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3149,9 +3254,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + error = do_cancel(r, lkb); send_cancel_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3170,22 +3279,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_grant no lkb"); + log_debug(ls, "receive_grant from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags_reply(lkb, ms); if (is_altmode(lkb)) munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3199,18 +3312,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_bast no lkb"); + log_debug(ls, "receive_bast from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); - queue_bast(r, lkb, ms->m_bastmode); + error = validate_message(lkb, ms); + if (error) + goto out; + queue_bast(r, lkb, ms->m_bastmode); + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3276,15 +3393,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_request_reply no lkb"); + log_debug(ls, "receive_request_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) @@ -3336,6 +3457,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); + confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ } else _request_lock(r, lkb); @@ -3416,6 +3538,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3434,10 +3560,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_convert_reply no lkb"); + log_debug(ls, "receive_convert_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_convert_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3451,6 +3577,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3482,10 +3612,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_unlock_reply no lkb"); + log_debug(ls, "receive_unlock_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_unlock_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3499,6 +3629,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3510,8 +3644,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) case -DLM_ECANCEL: receive_flags_reply(lkb, ms); revert_lock_pc(r, lkb); - if (ms->m_result) - queue_cast(r, lkb, -DLM_ECANCEL); + queue_cast(r, lkb, -DLM_ECANCEL); break; case 0: break; @@ -3531,10 +3664,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_cancel_reply no lkb"); + log_debug(ls, "receive_cancel_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_cancel_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3592,53 +3725,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) +static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) { - struct dlm_message *ms = (struct dlm_message *) hd; - struct dlm_ls *ls; - int error = 0; - - if (!recovery) - dlm_message_in(ms); - - ls = dlm_find_lockspace_global(hd->h_lockspace); - if (!ls) { - log_print("drop message %d from %d for unknown lockspace %d", - ms->m_type, nodeid, hd->h_lockspace); - return -EINVAL; - } - - /* recovery may have just ended leaving a bunch of backed-up requests - in the requestqueue; wait while dlm_recoverd clears them */ - - if (!recovery) - dlm_wait_requestqueue(ls); - - /* recovery may have just started while there were a bunch of - in-flight requests -- save them in requestqueue to be processed - after recovery. we can't let dlm_recvd block on the recovery - lock. if dlm_recoverd is calling this function to clear the - requestqueue, it needs to be interrupted (-EINTR) if another - recovery operation is starting. */ - - while (1) { - if (dlm_locking_stopped(ls)) { - if (recovery) { - error = -EINTR; - goto out; - } - error = dlm_add_requestqueue(ls, nodeid, hd); - if (error == -EAGAIN) - continue; - else { - error = -EINTR; - goto out; - } - } - - if (dlm_lock_recovery_try(ls)) - break; - schedule(); + if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + log_debug(ls, "ignore non-member message %d from %d %x %x %d", + ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_remid, ms->m_result); + return; } switch (ms->m_type) { @@ -3715,17 +3808,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) log_error(ls, "unknown message type %d", ms->m_type); } - dlm_unlock_recovery(ls); - out: - dlm_put_lockspace(ls); dlm_astd_wake(); - return error; } +/* If the lockspace is in recovery mode (locking stopped), then normal + messages are saved on the requestqueue for processing after recovery is + done. When not in recovery mode, we wait for dlm_recoverd to drain saved + messages off the requestqueue before we process new ones. This occurs right + after recovery completes when we transition from saving all messages on + requestqueue, to processing all the saved messages, to processing new + messages as they arrive. */ -/* - * Recovery related - */ +static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, + int nodeid) +{ + if (dlm_locking_stopped(ls)) { + dlm_add_requestqueue(ls, nodeid, ms); + } else { + dlm_wait_requestqueue(ls); + _receive_message(ls, ms); + } +} + +/* This is called by dlm_recoverd to process messages that were saved on + the requestqueue. */ + +void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) +{ + _receive_message(ls, ms); +} + +/* This is called by the midcomms layer when something is received for + the lockspace. It could be either a MSG (normal message sent as part of + standard locking activity) or an RCOM (recovery message sent as part of + lockspace recovery). */ + +void dlm_receive_buffer(union dlm_packet *p, int nodeid) +{ + struct dlm_header *hd = &p->header; + struct dlm_ls *ls; + int type = 0; + + switch (hd->h_cmd) { + case DLM_MSG: + dlm_message_in(&p->message); + type = p->message.m_type; + break; + case DLM_RCOM: + dlm_rcom_in(&p->rcom); + type = p->rcom.rc_type; + break; + default: + log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); + return; + } + + if (hd->h_nodeid != nodeid) { + log_print("invalid h_nodeid %d from %d lockspace %x", + hd->h_nodeid, nodeid, hd->h_lockspace); + return; + } + + ls = dlm_find_lockspace_global(hd->h_lockspace); + if (!ls) { + if (dlm_config.ci_log_debug) + log_print("invalid lockspace %x from %d cmd %d type %d", + hd->h_lockspace, nodeid, hd->h_cmd, type); + + if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) + dlm_send_ls_not_ready(nodeid, &p->rcom); + return; + } + + /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to + be inactive (in this ls) before transitioning to recovery mode */ + + down_read(&ls->ls_recv_active); + if (hd->h_cmd == DLM_MSG) + dlm_receive_message(ls, &p->message, nodeid); + else + dlm_receive_rcom(ls, &p->rcom, nodeid); + up_read(&ls->ls_recv_active); + + dlm_put_lockspace(ls); +} static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) { @@ -3734,6 +3900,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3775,6 +3942,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; + int wait_type, stub_unlock_result, stub_cancel_result; mutex_lock(&ls->ls_waiters_mutex); @@ -3793,7 +3961,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!waiter_needs_recovery(ls, lkb)) continue; - switch (lkb->lkb_wait_type) { + wait_type = lkb->lkb_wait_type; + stub_unlock_result = -DLM_EUNLOCK; + stub_cancel_result = -DLM_ECANCEL; + + /* Main reply may have been received leaving a zero wait_type, + but a reply for the overlapping op may not have been + received. In that case we need to fake the appropriate + reply for the overlap op. */ + + if (!wait_type) { + if (is_overlap_cancel(lkb)) { + wait_type = DLM_MSG_CANCEL; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_cancel_result = 0; + } + if (is_overlap_unlock(lkb)) { + wait_type = DLM_MSG_UNLOCK; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_unlock_result = -ENOENT; + } + + log_debug(ls, "rwpre overlap %x %x %d %d %d", + lkb->lkb_id, lkb->lkb_flags, wait_type, + stub_cancel_result, stub_unlock_result); + } + + switch (wait_type) { case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_RESEND; @@ -3806,8 +4000,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; - ls->ls_stub_ms.m_result = -DLM_EUNLOCK; + ls->ls_stub_ms.m_result = stub_unlock_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3815,15 +4010,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; - ls->ls_stub_ms.m_result = -DLM_ECANCEL; + ls->ls_stub_ms.m_result = stub_cancel_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; default: - log_error(ls, "invalid lkb wait_type %d", - lkb->lkb_wait_type); + log_error(ls, "invalid lkb wait_type %d %d", + lkb->lkb_wait_type, wait_type); } schedule(); } @@ -4091,32 +4287,34 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, return NULL; } +/* needs at least dlm_rcom + rcom_lock */ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_rsb *r, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; - int lvblen; lkb->lkb_nodeid = rc->rc_header.h_nodeid; - lkb->lkb_ownpid = rl->rl_ownpid; - lkb->lkb_remid = rl->rl_lkid; - lkb->lkb_exflags = rl->rl_exflags; - lkb->lkb_flags = rl->rl_flags & 0x0000FFFF; + lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); + lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); + lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); + lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; lkb->lkb_flags |= DLM_IFL_MSTCPY; - lkb->lkb_lvbseq = rl->rl_lvbseq; + lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); lkb->lkb_rqmode = rl->rl_rqmode; lkb->lkb_grmode = rl->rl_grmode; /* don't set lkb_status because add_lkb wants to itself */ - lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST); - lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); + lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - lkb->lkb_lvbptr = allocate_lvb(ls); + int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - + sizeof(struct rcom_lock); + if (lvblen > ls->ls_lvblen) + return -EINVAL; + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; - lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - - sizeof(struct rcom_lock); memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); } @@ -4124,7 +4322,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) { + if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && + middle_conversion(lkb)) { rl->rl_status = DLM_LKSTS_CONVERT; lkb->lkb_grmode = DLM_LOCK_IV; rsb_set_flag(r, RSB_RECOVER_CONVERT); @@ -4139,6 +4338,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, the given values and send back our lkid. We send back our lkid by sending back the rcom_lock struct we got but with the remid field filled in. */ +/* needs at least dlm_rcom + rcom_lock */ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -4151,13 +4351,14 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) goto out; } - error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r); + error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), + R_MASTER, &r); if (error) goto out; lock_rsb(r); - lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid); + lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); if (lkb) { error = -EEXIST; goto out_remid; @@ -4180,18 +4381,20 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ - rl->rl_remid = lkb->lkb_id; + rl->rl_remid = cpu_to_le32(lkb->lkb_id); out_unlock: unlock_rsb(r); put_rsb(r); out: if (error) - log_print("recover_master_copy %d %x", error, rl->rl_lkid); - rl->rl_result = error; + log_debug(ls, "recover_master_copy %d %x", error, + le32_to_cpu(rl->rl_lkid)); + rl->rl_result = cpu_to_le32(error); return error; } +/* needs at least dlm_rcom + rcom_lock */ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -4199,15 +4402,16 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct dlm_lkb *lkb; int error; - error = find_lkb(ls, rl->rl_lkid, &lkb); + error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); if (error) { - log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid); + log_error(ls, "recover_process_copy no lkid %x", + le32_to_cpu(rl->rl_lkid)); return error; } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = rl->rl_result; + error = le32_to_cpu(rl->rl_result); r = lkb->lkb_resource; hold_rsb(r); @@ -4226,7 +4430,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) log_debug(ls, "master copy exists %x", lkb->lkb_id); /* fall through */ case 0: - lkb->lkb_remid = rl->rl_remid; + lkb->lkb_remid = le32_to_cpu(rl->rl_remid); break; default: log_error(ls, "dlm_recover_process_copy unknown error %d %x", @@ -4270,12 +4474,12 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } } - /* After ua is attached to lkb it will be freed by free_lkb(). + /* After ua is attached to lkb it will be freed by dlm_free_lkb(). When DLM_IFL_USER is set, the dlm knows that this is a userspace lock and that lkb_astparam is the dlm_user_args structure. */ error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, - DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); + fake_astfn, ua, fake_bastfn, &args); lkb->lkb_flags |= DLM_IFL_USER; ua->old_mode = DLM_LOCK_IV; @@ -4328,7 +4532,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, /* user can change the params on its lock when it converts it, or add an lvb that didn't exist before */ - ua = (struct dlm_user_args *)lkb->lkb_astparam; + ua = lkb->lkb_ua; if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); @@ -4349,7 +4553,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua->old_mode = lkb->lkb_grmode; error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, - DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); + fake_astfn, ua, fake_bastfn, &args); if (error) goto out_put; @@ -4379,11 +4583,12 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out; - ua = (struct dlm_user_args *)lkb->lkb_astparam; + ua = lkb->lkb_ua; if (lvb_in && ua->lksb.sb_lvbptr) memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); - ua->castparam = ua_tmp->castparam; + if (ua_tmp->castparam) + ua->castparam = ua_tmp->castparam; ua->user_lksb = ua_tmp->user_lksb; error = set_unlock_args(flags, ua, &args); @@ -4427,8 +4632,9 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out; - ua = (struct dlm_user_args *)lkb->lkb_astparam; - ua->castparam = ua_tmp->castparam; + ua = lkb->lkb_ua; + if (ua_tmp->castparam) + ua->castparam = ua_tmp->castparam; ua->user_lksb = ua_tmp->user_lksb; error = set_unlock_args(flags, ua, &args); @@ -4450,12 +4656,59 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, return error; } +int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) +{ + struct dlm_lkb *lkb; + struct dlm_args args; + struct dlm_user_args *ua; + struct dlm_rsb *r; + int error; + + dlm_lock_recovery(ls); + + error = find_lkb(ls, lkid, &lkb); + if (error) + goto out; + + ua = lkb->lkb_ua; + + error = set_unlock_args(flags, ua, &args); + if (error) + goto out_put; + + /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ + + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + error = validate_unlock_args(lkb, &args); + if (error) + goto out_r; + lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; + + error = _cancel_lock(r, lkb); + out_r: + unlock_rsb(r); + put_rsb(r); + + if (error == -DLM_ECANCEL) + error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY) + error = 0; + out_put: + dlm_put_lkb(lkb); + out: + dlm_unlock_recovery(ls); + return error; +} + /* lkb's that are removed from the waiters list by revert are just left on the orphans list with the granted orphan locks, to be freed by purge */ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { - struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; struct dlm_args args; int error; @@ -4464,7 +4717,7 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); mutex_unlock(&ls->ls_orphans_mutex); - set_unlock_args(0, ua, &args); + set_unlock_args(0, lkb->lkb_ua, &args); error = cancel_lock(ls, lkb, &args); if (error == -DLM_ECANCEL) @@ -4477,11 +4730,10 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { - struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; struct dlm_args args; int error; - set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); + set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args); error = unlock_lock(ls, lkb, &args); if (error == -DLM_EUNLOCK) @@ -4534,6 +4786,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) lkb = del_proc_lock(ls, proc); if (!lkb) break; + del_timeout(lkb); if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) orphan_proc_lock(ls, lkb); else @@ -4556,6 +4809,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) } list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + lkb->lkb_ast_type = 0; list_del(&lkb->lkb_astqueue); dlm_put_lkb(lkb); }