X-Git-Url: http://ftp.safe.ca/?a=blobdiff_plain;f=fs%2Fdlm%2Flock.c;h=9c0c1db1e10534aa2a44b9634ff9790859d542cc;hb=84c664730374248adaf420c0846a6158d64413c7;hp=d8d6e729f96b669b5a6ed16bfb92c776cfc4744c;hpb=7d3c1feb80913ba4253c3517d48b9b3741c44fc9;p=safe%2Fjmp%2Flinux-2.6 diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index d8d6e72..9c0c1db 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -82,10 +82,12 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); static int send_remove(struct dlm_rsb *r); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); +static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms); static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); +static void del_timeout(struct dlm_lkb *lkb); /* * Lock compatibilty matrix - thanks Steve @@ -163,7 +165,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb) lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); } -void dlm_print_rsb(struct dlm_rsb *r) +static void dlm_print_rsb(struct dlm_rsb *r) { printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", r->res_nodeid, r->res_flags, r->res_first_lkid, @@ -194,17 +196,17 @@ void dlm_dump_rsb(struct dlm_rsb *r) /* Threads cannot use the lockspace while it's being recovered */ -static inline void lock_recovery(struct dlm_ls *ls) +static inline void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); } -static inline void unlock_recovery(struct dlm_ls *ls) +void dlm_unlock_recovery(struct dlm_ls *ls) { up_read(&ls->ls_in_recovery); } -static inline int lock_recovery_try(struct dlm_ls *ls) +int dlm_lock_recovery_try(struct dlm_ls *ls) { return down_read_trylock(&ls->ls_in_recovery); } @@ -286,12 +288,26 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) if (is_master_copy(lkb)) return; + del_timeout(lkb); + DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); + /* if the operation was a cancel, then return -DLM_ECANCEL, if a + timeout caused the cancel then return -ETIMEDOUT */ + if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { + lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; + rv = -ETIMEDOUT; + } + + if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { + lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; + rv = -EDEADLK; + } + lkb->lkb_lksb->sb_status = rv; lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; - dlm_add_ast(lkb, AST_COMP); + dlm_add_ast(lkb, AST_COMP, 0); } static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -302,12 +318,12 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { + lkb->lkb_time_bast = ktime_get(); + if (is_master_copy(lkb)) send_bast(r, lkb, rqmode); - else { - lkb->lkb_bastmode = rqmode; - dlm_add_ast(lkb, AST_BAST); - } + else + dlm_add_ast(lkb, AST_BAST, rqmode); } /* @@ -318,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; - r = allocate_rsb(ls, len); + r = dlm_allocate_rsb(ls, len); if (!r) return NULL; @@ -347,6 +363,7 @@ static int search_rsb_list(struct list_head *head, char *name, int len, if (len == r->res_length && !memcmp(name, r->res_name, len)) goto found; } + *r_ret = NULL; return -EBADR; found: @@ -395,9 +412,9 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, unsigned int flags, struct dlm_rsb **r_ret) { int error; - write_lock(&ls->ls_rsbtbl[b].lock); + spin_lock(&ls->ls_rsbtbl[b].lock); error = _search_rsb(ls, name, len, b, flags, r_ret); - write_unlock(&ls->ls_rsbtbl[b].lock); + spin_unlock(&ls->ls_rsbtbl[b].lock); return error; } @@ -418,13 +435,17 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, static int find_rsb(struct dlm_ls *ls, char *name, int namelen, unsigned int flags, struct dlm_rsb **r_ret) { - struct dlm_rsb *r, *tmp; + struct dlm_rsb *r = NULL, *tmp; uint32_t hash, bucket; - int error = 0; + int error = -EINVAL; + + if (namelen > DLM_RESNAME_MAXLEN) + goto out; if (dlm_no_directory(ls)) flags |= R_CREATE; + error = 0; hash = jhash(name, namelen, 0); bucket = hash & (ls->ls_rsbtbl_size - 1); @@ -457,28 +478,22 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, r->res_nodeid = nodeid; } - write_lock(&ls->ls_rsbtbl[bucket].lock); + spin_lock(&ls->ls_rsbtbl[bucket].lock); error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); if (!error) { - write_unlock(&ls->ls_rsbtbl[bucket].lock); - free_rsb(r); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); + dlm_free_rsb(r); r = tmp; goto out; } list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); - write_unlock(&ls->ls_rsbtbl[bucket].lock); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); error = 0; out: *r_ret = r; return error; } -int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) -{ - return find_rsb(ls, name, namelen, flags, r_ret); -} - /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ @@ -502,7 +517,7 @@ static void toss_rsb(struct kref *kref) list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { - free_lvb(r->res_lvbptr); + dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } @@ -515,9 +530,9 @@ static void put_rsb(struct dlm_rsb *r) struct dlm_ls *ls = r->res_ls; uint32_t bucket = r->res_bucket; - write_lock(&ls->ls_rsbtbl[bucket].lock); + spin_lock(&ls->ls_rsbtbl[bucket].lock); kref_put(&r->res_ref, toss_rsb); - write_unlock(&ls->ls_rsbtbl[bucket].lock); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -572,7 +587,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) uint32_t lkid = 0; uint16_t bucket; - lkb = allocate_lkb(ls); + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -581,6 +596,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); + INIT_LIST_HEAD(&lkb->lkb_time_list); get_random_bytes(&bucket, sizeof(bucket)); bucket &= (ls->ls_lkbtbl_size - 1); @@ -665,8 +681,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) /* for local/process lkbs, lvbptr points to caller's lksb */ if (lkb->lkb_lvbptr && is_master_copy(lkb)) - free_lvb(lkb->lkb_lvbptr); - free_lkb(lkb); + dlm_free_lvb(lkb->lkb_lvbptr); + dlm_free_lkb(lkb); return 1; } else { write_unlock(&ls->ls_lkbtbl[bucket].lock); @@ -728,6 +744,8 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); + lkb->lkb_timestamp = ktime_get(); + lkb->lkb_status = status; switch (status) { @@ -817,7 +835,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype) lkb->lkb_wait_count++; hold_lkb(lkb); - log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", + log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", lkb->lkb_id, lkb->lkb_wait_type, mstype, lkb->lkb_wait_count, lkb->lkb_flags); goto out; @@ -833,7 +851,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype) list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: if (error) - log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", + log_error(ls, "addwait error %x %d flags %x %d %d %s", lkb->lkb_id, error, lkb->lkb_flags, mstype, lkb->lkb_wait_type, lkb->lkb_resource->res_name); mutex_unlock(&ls->ls_waiters_mutex); @@ -845,23 +863,55 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype) request reply on the requestqueue) between dlm_recover_waiters_pre() which set RESEND and dlm_recover_waiters_post() */ -static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) +static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, + struct dlm_message *ms) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0; if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { + log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; overlap_done = 1; goto out_del; } if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { + log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; overlap_done = 1; goto out_del; } + /* Cancel state was preemptively cleared by a successful convert, + see next comment, nothing to do. */ + + if ((mstype == DLM_MSG_CANCEL_REPLY) && + (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { + log_debug(ls, "remwait %x cancel_reply wait_type %d", + lkb->lkb_id, lkb->lkb_wait_type); + return -1; + } + + /* Remove for the convert reply, and premptively remove for the + cancel reply. A convert has been granted while there's still + an outstanding cancel on it (the cancel is moot and the result + in the cancel reply should be 0). We preempt the cancel reply + because the app gets the convert result and then can follow up + with another op, like convert. This subsequent op would see the + lingering state of the cancel and fail with -EBUSY. */ + + if ((mstype == DLM_MSG_CONVERT_REPLY) && + (lkb->lkb_wait_type == DLM_MSG_CONVERT) && + is_overlap_cancel(lkb) && ms && !ms->m_result) { + log_debug(ls, "remwait %x convert_reply zap overlap_cancel", + lkb->lkb_id); + lkb->lkb_wait_type = 0; + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + lkb->lkb_wait_count--; + goto out_del; + } + /* N.B. type of reply may not always correspond to type of original msg due to lookup->request optimization, verify others? */ @@ -870,8 +920,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) goto out_del; } - log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", - lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); + log_error(ls, "remwait error %x reply %d flags %x no wait_type", + lkb->lkb_id, mstype, lkb->lkb_flags); return -1; out_del: @@ -881,7 +931,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) this would happen */ if (overlap_done && lkb->lkb_wait_type) { - log_error(ls, "remove_from_waiters %x reply %d give up on %d", + log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); lkb->lkb_wait_count--; lkb->lkb_wait_type = 0; @@ -903,7 +953,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) int error; mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb, mstype); + error = _remove_from_waiters(lkb, mstype, NULL); mutex_unlock(&ls->ls_waiters_mutex); return error; } @@ -918,7 +968,7 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) if (ms != &ls->ls_stub_ms) mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb, ms->m_type); + error = _remove_from_waiters(lkb, ms->m_type, ms); if (ms != &ls->ls_stub_ms) mutex_unlock(&ls->ls_waiters_mutex); return error; @@ -949,7 +999,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) for (;;) { found = 0; - write_lock(&ls->ls_rsbtbl[b].lock); + spin_lock(&ls->ls_rsbtbl[b].lock); list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, res_hashchain) { if (!time_after_eq(jiffies, r->res_toss_time + @@ -960,20 +1010,20 @@ static int shrink_bucket(struct dlm_ls *ls, int b) } if (!found) { - write_unlock(&ls->ls_rsbtbl[b].lock); + spin_unlock(&ls->ls_rsbtbl[b].lock); break; } if (kref_put(&r->res_ref, kill_rsb)) { list_del(&r->res_hashchain); - write_unlock(&ls->ls_rsbtbl[b].lock); + spin_unlock(&ls->ls_rsbtbl[b].lock); if (is_master(r)) dir_remove(r); - free_rsb(r); + dlm_free_rsb(r); count++; } else { - write_unlock(&ls->ls_rsbtbl[b].lock); + spin_unlock(&ls->ls_rsbtbl[b].lock); log_error(ls, "tossed rsb in use %s", r->res_name); } } @@ -985,15 +1035,135 @@ void dlm_scan_rsbs(struct dlm_ls *ls) { int i; - if (dlm_locking_stopped(ls)) - return; - for (i = 0; i < ls->ls_rsbtbl_size; i++) { shrink_bucket(ls, i); + if (dlm_locking_stopped(ls)) + break; cond_resched(); } } +static void add_timeout(struct dlm_lkb *lkb) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + + if (is_master_copy(lkb)) + return; + + if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && + !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { + lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; + goto add_it; + } + if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) + goto add_it; + return; + + add_it: + DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); + mutex_lock(&ls->ls_timeout_mutex); + hold_lkb(lkb); + list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); + mutex_unlock(&ls->ls_timeout_mutex); +} + +static void del_timeout(struct dlm_lkb *lkb) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + + mutex_lock(&ls->ls_timeout_mutex); + if (!list_empty(&lkb->lkb_time_list)) { + list_del_init(&lkb->lkb_time_list); + unhold_lkb(lkb); + } + mutex_unlock(&ls->ls_timeout_mutex); +} + +/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and + lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex + and then lock rsb because of lock ordering in add_timeout. We may need + to specify some special timeout-related bits in the lkb that are just to + be accessed under the timeout_mutex. */ + +void dlm_scan_timeout(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + struct dlm_lkb *lkb; + int do_cancel, do_warn; + s64 wait_us; + + for (;;) { + if (dlm_locking_stopped(ls)) + break; + + do_cancel = 0; + do_warn = 0; + mutex_lock(&ls->ls_timeout_mutex); + list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { + + wait_us = ktime_to_us(ktime_sub(ktime_get(), + lkb->lkb_timestamp)); + + if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && + wait_us >= (lkb->lkb_timeout_cs * 10000)) + do_cancel = 1; + + if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && + wait_us >= dlm_config.ci_timewarn_cs * 10000) + do_warn = 1; + + if (!do_cancel && !do_warn) + continue; + hold_lkb(lkb); + break; + } + mutex_unlock(&ls->ls_timeout_mutex); + + if (!do_cancel && !do_warn) + break; + + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + if (do_warn) { + /* clear flag so we only warn once */ + lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; + if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) + del_timeout(lkb); + dlm_timeout_warn(lkb); + } + + if (do_cancel) { + log_debug(ls, "timeout cancel %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; + lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; + del_timeout(lkb); + _cancel_lock(r, lkb); + } + + unlock_rsb(r); + unhold_rsb(r); + dlm_put_lkb(lkb); + } +} + +/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping + dlm_recoverd before checking/setting ls_recover_begin. */ + +void dlm_adjust_timeouts(struct dlm_ls *ls) +{ + struct dlm_lkb *lkb; + u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); + + ls->ls_recover_begin = 0; + mutex_lock(&ls->ls_timeout_mutex); + list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) + lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); + mutex_unlock(&ls->ls_timeout_mutex); +} + /* lkb is master or local copy */ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -1032,7 +1202,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1064,7 +1234,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1090,6 +1260,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) + len = DLM_RESNAME_MAXLEN; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); lkb->lkb_lvbseq = ms->m_lvbseq; } @@ -1275,10 +1447,8 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) * queue for one resource. The granted mode of each lock blocks the requested * mode of the other lock." * - * Part 2: if the granted mode of lkb is preventing the first lkb in the - * convert queue from being granted, then demote lkb (set grmode to NL). - * This second form requires that we check for conv-deadlk even when - * now == 0 in _can_be_granted(). + * Part 2: if the granted mode of lkb is preventing an earlier lkb in the + * convert queue from being granted, then deadlk/demote lkb. * * Example: * Granted Queue: empty @@ -1287,41 +1457,52 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) * * The first lock can't be granted because of the granted mode of the second * lock and the second lock can't be granted because it's not first in the - * list. We demote the granted mode of the second lock (the lkb passed to this - * function). + * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we + * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK + * flag set and return DEMOTED in the lksb flags. + * + * Originally, this function detected conv-deadlk in a more limited scope: + * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or + * - if lkb1 was the first entry in the queue (not just earlier), and was + * blocked by the granted mode of lkb2, and there was nothing on the + * granted queue preventing lkb1 from being granted immediately, i.e. + * lkb2 was the only thing preventing lkb1 from being granted. + * + * That second condition meant we'd only say there was conv-deadlk if + * resolving it (by demotion) would lead to the first lock on the convert + * queue being granted right away. It allowed conversion deadlocks to exist + * between locks on the convert queue while they couldn't be granted anyway. * - * After the resolution, the "grant pending" function needs to go back and try - * to grant locks on the convert queue again since the first lock can now be - * granted. + * Now, we detect and take action on conversion deadlocks immediately when + * they're created, even if they may not be immediately consequential. If + * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted + * mode that would prevent lkb1's conversion from being granted, we do a + * deadlk/demote on lkb2 right away and don't let it onto the convert queue. + * I think this means that the lkb_is_ahead condition below should always + * be zero, i.e. there will never be conv-deadlk between two locks that are + * both already on the convert queue. */ -static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) +static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) { - struct dlm_lkb *this, *first = NULL, *self = NULL; + struct dlm_lkb *lkb1; + int lkb_is_ahead = 0; - list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { - if (!first) - first = this; - if (this == lkb) { - self = lkb; + list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { + if (lkb1 == lkb2) { + lkb_is_ahead = 1; continue; } - if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) - return 1; - } - - /* if lkb is on the convert queue and is preventing the first - from being granted, then there's deadlock and we demote lkb. - multiple converting locks may need to do this before the first - converting lock can be granted. */ - - if (self && self != first) { - if (!modes_compat(lkb, first) && - !queue_conflict(&rsb->res_grantqueue, first)) - return 1; + if (!lkb_is_ahead) { + if (!modes_compat(lkb2, lkb1)) + return 1; + } else { + if (!modes_compat(lkb2, lkb1) && + !modes_compat(lkb1, lkb2)) + return 1; + } } - return 0; } @@ -1450,42 +1631,57 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) if (!now && !conv && list_empty(&r->res_convertqueue) && first_in_list(lkb, &r->res_waitqueue)) return 1; - out: - /* - * The following, enabled by CONVDEADLK, departs from VMS. - */ - - if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) && - conversion_deadlock_detect(r, lkb)) { - lkb->lkb_grmode = DLM_LOCK_NL; - lkb->lkb_sbflags |= DLM_SBF_DEMOTED; - } - return 0; } -/* - * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a - * simple way to provide a big optimization to applications that can use them. - */ - -static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) +static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, + int *err) { - uint32_t flags = lkb->lkb_exflags; int rv; int8_t alt = 0, rqmode = lkb->lkb_rqmode; + int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); + + if (err) + *err = 0; rv = _can_be_granted(r, lkb, now); if (rv) goto out; - if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) + /* + * The CONVDEADLK flag is non-standard and tells the dlm to resolve + * conversion deadlocks by demoting grmode to NL, otherwise the dlm + * cancels one of the locks. + */ + + if (is_convert && can_be_queued(lkb) && + conversion_deadlock_detect(r, lkb)) { + if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { + lkb->lkb_grmode = DLM_LOCK_NL; + lkb->lkb_sbflags |= DLM_SBF_DEMOTED; + } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { + if (err) + *err = -EDEADLK; + else { + log_print("can_be_granted deadlock %x now %d", + lkb->lkb_id, now); + dlm_dump_rsb(r); + } + } goto out; + } + + /* + * The ALTPR and ALTCW flags are non-standard and tell the dlm to try + * to grant a request in a mode other than the normal rqmode. It's a + * simple way to provide a big optimization to applications that can + * use them. + */ - if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) + if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) alt = DLM_LOCK_PR; - else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) + else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) alt = DLM_LOCK_CW; if (alt) { @@ -1500,10 +1696,21 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) return rv; } -static int grant_pending_convert(struct dlm_rsb *r, int high) +/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock + for locks pending on the convert list. Once verified (watch for these + log_prints), we should be able to just call _can_be_granted() and not + bother with the demote/deadlk cases here (and there's no easy way to deal + with a deadlk here, we'd have to generate something like grant_lock with + the deadlk error.) */ + +/* Returns the highest requested mode of all blocked conversions; sets + cw if there's a blocked conversion to DLM_LOCK_CW. */ + +static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; int hi, demoted, quit, grant_restart, demote_restart; + int deadlk; quit = 0; restart: @@ -1513,14 +1720,32 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { demoted = is_demoted(lkb); - if (can_be_granted(r, lkb, 0)) { + deadlk = 0; + + if (can_be_granted(r, lkb, 0, &deadlk)) { grant_lock_pending(r, lkb); grant_restart = 1; - } else { - hi = max_t(int, lkb->lkb_rqmode, hi); - if (!demoted && is_demoted(lkb)) - demote_restart = 1; + continue; + } + + if (!demoted && is_demoted(lkb)) { + log_print("WARN: pending demoted %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + demote_restart = 1; + continue; + } + + if (deadlk) { + log_print("WARN: pending deadlock %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, r->res_name); + dlm_dump_rsb(r); + continue; } + + hi = max_t(int, lkb->lkb_rqmode, hi); + + if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; } if (grant_restart) @@ -1533,29 +1758,52 @@ static int grant_pending_convert(struct dlm_rsb *r, int high) return max_t(int, high, hi); } -static int grant_pending_wait(struct dlm_rsb *r, int high) +static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) { struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { - if (can_be_granted(r, lkb, 0)) + if (can_be_granted(r, lkb, 0, NULL)) grant_lock_pending(r, lkb); - else + else { high = max_t(int, lkb->lkb_rqmode, high); + if (lkb->lkb_rqmode == DLM_LOCK_CW) + *cw = 1; + } } return high; } +/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked + on either the convert or waiting queue. + high is the largest rqmode of all locks blocked on the convert or + waiting queue. */ + +static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) +{ + if (gr->lkb_grmode == DLM_LOCK_PR && cw) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < high && + !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) + return 1; + return 0; +} + static void grant_pending_locks(struct dlm_rsb *r) { struct dlm_lkb *lkb, *s; int high = DLM_LOCK_IV; + int cw = 0; DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); - high = grant_pending_convert(r, high); - high = grant_pending_wait(r, high); + high = grant_pending_convert(r, high, &cw); + high = grant_pending_wait(r, high, &cw); if (high == DLM_LOCK_IV) return; @@ -1563,27 +1811,42 @@ static void grant_pending_locks(struct dlm_rsb *r) /* * If there are locks left on the wait/convert queue then send blocking * ASTs to granted locks based on the largest requested mode (high) - * found above. FIXME: highbast < high comparison not valid for PR/CW. + * found above. */ list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { - if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && - !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { - queue_bast(r, lkb, high); + if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { + if (cw && high == DLM_LOCK_PR && + lkb->lkb_grmode == DLM_LOCK_PR) + queue_bast(r, lkb, DLM_LOCK_CW); + else + queue_bast(r, lkb, high); lkb->lkb_highbast = high; } } } +static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) +{ + if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || + (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { + if (gr->lkb_highbast < DLM_LOCK_EX) + return 1; + return 0; + } + + if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) + return 1; + return 0; +} + static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, struct dlm_lkb *lkb) { struct dlm_lkb *gr; list_for_each_entry(gr, head, lkb_statequeue) { - if (gr->lkb_bastaddr && - gr->lkb_highbast < lkb->lkb_rqmode && - !modes_compat(gr, lkb)) { + if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { queue_bast(r, gr, lkb->lkb_rqmode); gr->lkb_highbast = lkb->lkb_rqmode; } @@ -1623,7 +1886,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -1657,7 +1920,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - for (;;) { + for (i = 0; i < 2; i++) { /* It's possible for dlm_scand to remove an old rsb for this same resource from the toss list, us to create a new one, look up the master locally, and find it @@ -1671,6 +1934,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) log_debug(ls, "dir_lookup error %d %s", error, r->res_name); schedule(); } + if (error && error != -EEXIST) + return error; if (ret_nodeid == our_nodeid) { r->res_first_lkid = 0; @@ -1712,8 +1977,11 @@ static void confirm_master(struct dlm_rsb *r, int error) break; case -EAGAIN: - /* the remote master didn't queue our NOQUEUE request; - make a waiting lkb the first_lkid */ + case -EBADR: + case -ENOTBLK: + /* the remote request failed and won't be retried (it was + a NOQUEUE, or has been canceled/unlocked); make a waiting + lkb the first_lkid */ r->res_first_lkid = 0; @@ -1723,8 +1991,7 @@ static void confirm_master(struct dlm_rsb *r, int error) list_del_init(&lkb->lkb_rsb_lookup); r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); - } else - r->res_nodeid = -1; + } break; default: @@ -1733,8 +2000,11 @@ static void confirm_master(struct dlm_rsb *r, int error) } static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, - int namelen, uint32_t parent_lkid, void *ast, - void *astarg, void *bast, struct dlm_args *args) + int namelen, unsigned long timeout_cs, + void (*ast) (void *astparam), + void *astparam, + void (*bast) (void *astparam, int mode), + struct dlm_args *args) { int rv = -EINVAL; @@ -1776,10 +2046,6 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) goto out; - /* parent/child locks not yet supported */ - if (parent_lkid) - goto out; - if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) goto out; @@ -1788,9 +2054,10 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, an active lkb cannot be modified before locking the rsb */ args->flags = flags; - args->astaddr = ast; - args->astparam = (long) astarg; - args->bastaddr = bast; + args->astfn = ast; + args->astparam = astparam; + args->bastfn = bast; + args->timeout = timeout_cs; args->mode = mode; args->lksb = lksb; rv = 0; @@ -1808,7 +2075,7 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) return -EINVAL; args->flags = flags; - args->astparam = (long) astarg; + args->astparam = astarg; return 0; } @@ -1838,15 +2105,21 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_exflags = args->flags; lkb->lkb_sbflags = 0; - lkb->lkb_astaddr = args->astaddr; + lkb->lkb_astfn = args->astfn; lkb->lkb_astparam = args->astparam; - lkb->lkb_bastaddr = args->bastaddr; + lkb->lkb_bastfn = args->bastfn; lkb->lkb_rqmode = args->mode; lkb->lkb_lksb = args->lksb; lkb->lkb_lvbptr = args->lksb->sb_lvbptr; lkb->lkb_ownpid = (int) current->pid; + lkb->lkb_timeout_cs = args->timeout; rv = 0; out: + if (rv) + log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s", + rv, lkb->lkb_id, lkb->lkb_flags, args->flags, + lkb->lkb_status, lkb->lkb_wait_type, + lkb->lkb_resource->res_name); return rv; } @@ -1881,17 +2154,18 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ - if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { - if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); list_del_init(&lkb->lkb_rsb_lookup); queue_cast(lkb->lkb_resource, lkb, args->flags & DLM_LKF_CANCEL ? -DLM_ECANCEL : -DLM_EUNLOCK); unhold_lkb(lkb); /* undoes create_lkb() */ - rv = -EBUSY; - goto out; } + /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ + rv = -EBUSY; + goto out; } /* cancel not allowed with another cancel/unlock in progress */ @@ -1903,12 +2177,22 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (is_overlap(lkb)) goto out; + /* don't let scand try to do a cancel */ + del_timeout(lkb); + if (lkb->lkb_flags & DLM_IFL_RESEND) { lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; rv = -EBUSY; goto out; } + /* there's nothing to cancel */ + if (lkb->lkb_status == DLM_LKSTS_GRANTED && + !lkb->lkb_wait_type) { + rv = -EBUSY; + goto out; + } + switch (lkb->lkb_wait_type) { case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: @@ -1934,6 +2218,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (is_overlap_unlock(lkb)) goto out; + /* don't let scand try to do a cancel */ + del_timeout(lkb); + if (lkb->lkb_flags & DLM_IFL_RESEND) { lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; rv = -EBUSY; @@ -1984,7 +2271,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; - if (can_be_granted(r, lkb, 1)) { + if (can_be_granted(r, lkb, 1, NULL)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -1994,6 +2281,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) error = -EINPROGRESS; add_lkb(r, lkb, DLM_LKSTS_WAITING); send_blocking_asts(r, lkb); + add_timeout(lkb); goto out; } @@ -2009,16 +2297,32 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; + int deadlk = 0; /* changing an existing lock may allow others to be granted */ - if (can_be_granted(r, lkb, 1)) { + if (can_be_granted(r, lkb, 1, &deadlk)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); grant_pending_locks(r); goto out; } + /* can_be_granted() detected that this lock would block in a conversion + deadlock, so we leave it on the granted queue and return EDEADLK in + the ast for the convert. */ + + if (deadlk) { + /* it's left on the granted queue */ + log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s", + lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status, + lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name); + revert_lock(r, lkb); + queue_cast(r, lkb, -EDEADLK); + error = -EDEADLK; + goto out; + } + /* is_demoted() means the can_be_granted() above set the grmode to NL, and left us on the granted queue. This auto-demotion (due to CONVDEADLK) might mean other locks, and/or this lock, are @@ -2026,7 +2330,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) before we try again to grant this one. */ if (is_demoted(lkb)) { - grant_pending_convert(r, DLM_LOCK_IV); + grant_pending_convert(r, DLM_LOCK_IV, NULL); if (_can_be_granted(r, lkb, 1)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); @@ -2041,6 +2345,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); send_blocking_asts(r, lkb); + add_timeout(lkb); goto out; } @@ -2274,7 +2579,7 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (!ls) return -EINVAL; - lock_recovery(ls); + dlm_lock_recovery(ls); if (convert) error = find_lkb(ls, lksb->sb_lkid, &lkb); @@ -2284,7 +2589,7 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error) goto out; - error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, + error = set_lock_args(mode, lksb, flags, namelen, 0, ast, astarg, bast, &args); if (error) goto out_put; @@ -2299,10 +2604,10 @@ int dlm_lock(dlm_lockspace_t *lockspace, out_put: if (convert || error) __put_lkb(ls, lkb); - if (error == -EAGAIN) + if (error == -EAGAIN || error == -EDEADLK) error = 0; out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); dlm_put_lockspace(ls); return error; } @@ -2322,7 +2627,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (!ls) return -EINVAL; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -2344,7 +2649,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace, out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); dlm_put_lockspace(ls); return error; } @@ -2384,7 +2689,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, pass into lowcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); + mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb); if (!mh) return -ENOBUFS; @@ -2461,9 +2766,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, /* m_result and m_bastmode are set from function args, not from lkb fields */ - if (lkb->lkb_bastaddr) + if (lkb->lkb_bastfn) ms->m_asts |= AST_BAST; - if (lkb->lkb_astaddr) + if (lkb->lkb_astfn) ms->m_asts |= AST_COMP; /* compare with switch in create_message; send_remove() doesn't @@ -2735,15 +3040,27 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_exflags & DLM_LKF_VALBLK) { if (!lkb->lkb_lvbptr) - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) + len = DLM_RESNAME_MAXLEN; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); } return 0; } +static void fake_bastfn(void *astparam, int mode) +{ + log_print("fake_bastfn should not be called"); +} + +static void fake_astfn(void *astparam) +{ + log_print("fake_astfn should not be called"); +} + static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { @@ -2752,14 +3069,13 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_remid = ms->m_lkid; lkb->lkb_grmode = DLM_LOCK_IV; lkb->lkb_rqmode = ms->m_rqmode; - lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); - lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); - DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); + lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; } @@ -2770,16 +3086,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { - log_error(ls, "convert_args nodeid %d %d lkid %x %x", - lkb->lkb_nodeid, ms->m_header.h_nodeid, - lkb->lkb_id, lkb->lkb_remid); - return -EINVAL; - } - - if (!is_master_copy(lkb)) - return -EINVAL; - if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -2795,8 +3101,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (!is_master_copy(lkb)) - return -EINVAL; if (receive_lvb(ls, lkb, ms)) return -ENOMEM; return 0; @@ -2812,6 +3116,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_remid = ms->m_lkid; } +/* This is called after the rsb is locked so that we can safely inspect + fields in the lkb. */ + +static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + int from = ms->m_header.h_nodeid; + int error = 0; + + switch (ms->m_type) { + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_UNLOCK_REPLY: + case DLM_MSG_CANCEL_REPLY: + case DLM_MSG_GRANT: + case DLM_MSG_BAST: + if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_REQUEST_REPLY: + if (!is_process_copy(lkb)) + error = -EINVAL; + else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + default: + error = -EINVAL; + } + + if (error) + log_error(lkb->lkb_resource->res_ls, + "ignore invalid message %d from %d %x %x %x %d", + ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, + lkb->lkb_flags, lkb->lkb_nodeid); + return error; +} + static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; @@ -2873,17 +3221,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_convert_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; reply = !down_conversion(lkb); error = do_convert(r, lkb); - out: + out_reply: if (reply) send_convert_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -2909,15 +3261,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_unlock_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; error = do_unlock(r, lkb); - out: + out_reply: send_unlock_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -2945,9 +3301,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + error = do_cancel(r, lkb); send_cancel_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -2966,22 +3326,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_grant no lkb"); + log_debug(ls, "receive_grant from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags_reply(lkb, ms); if (is_altmode(lkb)) munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -2995,18 +3359,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_bast no lkb"); + log_debug(ls, "receive_bast from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); - queue_bast(r, lkb, ms->m_bastmode); + error = validate_message(lkb, ms); + if (error) + goto out; + queue_bast(r, lkb, ms->m_bastmode); + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3072,15 +3440,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_request_reply no lkb"); + log_debug(ls, "receive_request_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) @@ -3111,9 +3483,10 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_remid = ms->m_lkid; if (is_altmode(lkb)) munge_altmode(lkb, ms); - if (result) + if (result) { add_lkb(r, lkb, DLM_LKSTS_WAITING); - else { + add_timeout(lkb); + } else { grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); } @@ -3131,6 +3504,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); + confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ } else _request_lock(r, lkb); @@ -3172,6 +3546,12 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, queue_cast(r, lkb, -EAGAIN); break; + case -EDEADLK: + receive_flags_reply(lkb, ms); + revert_lock_pc(r, lkb); + queue_cast(r, lkb, -EDEADLK); + break; + case -EINPROGRESS: /* convert was queued on remote master */ receive_flags_reply(lkb, ms); @@ -3179,6 +3559,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, munge_demoted(lkb, ms); del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); + add_timeout(lkb); break; case 0: @@ -3204,6 +3585,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3222,10 +3607,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_convert_reply no lkb"); + log_debug(ls, "receive_convert_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_convert_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3239,6 +3624,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3270,10 +3659,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_unlock_reply no lkb"); + log_debug(ls, "receive_unlock_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_unlock_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3287,6 +3676,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3298,8 +3691,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) case -DLM_ECANCEL: receive_flags_reply(lkb, ms); revert_lock_pc(r, lkb); - if (ms->m_result) - queue_cast(r, lkb, -DLM_ECANCEL); + queue_cast(r, lkb, -DLM_ECANCEL); break; case 0: break; @@ -3319,10 +3711,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_cancel_reply no lkb"); + log_debug(ls, "receive_cancel_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_cancel_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3380,53 +3772,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) +static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) { - struct dlm_message *ms = (struct dlm_message *) hd; - struct dlm_ls *ls; - int error = 0; - - if (!recovery) - dlm_message_in(ms); - - ls = dlm_find_lockspace_global(hd->h_lockspace); - if (!ls) { - log_print("drop message %d from %d for unknown lockspace %d", - ms->m_type, nodeid, hd->h_lockspace); - return -EINVAL; - } - - /* recovery may have just ended leaving a bunch of backed-up requests - in the requestqueue; wait while dlm_recoverd clears them */ - - if (!recovery) - dlm_wait_requestqueue(ls); - - /* recovery may have just started while there were a bunch of - in-flight requests -- save them in requestqueue to be processed - after recovery. we can't let dlm_recvd block on the recovery - lock. if dlm_recoverd is calling this function to clear the - requestqueue, it needs to be interrupted (-EINTR) if another - recovery operation is starting. */ - - while (1) { - if (dlm_locking_stopped(ls)) { - if (recovery) { - error = -EINTR; - goto out; - } - error = dlm_add_requestqueue(ls, nodeid, hd); - if (error == -EAGAIN) - continue; - else { - error = -EINTR; - goto out; - } - } - - if (lock_recovery_try(ls)) - break; - schedule(); + if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + log_debug(ls, "ignore non-member message %d from %d %x %x %d", + ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_remid, ms->m_result); + return; } switch (ms->m_type) { @@ -3503,17 +3855,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) log_error(ls, "unknown message type %d", ms->m_type); } - unlock_recovery(ls); - out: - dlm_put_lockspace(ls); dlm_astd_wake(); - return error; } +/* If the lockspace is in recovery mode (locking stopped), then normal + messages are saved on the requestqueue for processing after recovery is + done. When not in recovery mode, we wait for dlm_recoverd to drain saved + messages off the requestqueue before we process new ones. This occurs right + after recovery completes when we transition from saving all messages on + requestqueue, to processing all the saved messages, to processing new + messages as they arrive. */ -/* - * Recovery related - */ +static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, + int nodeid) +{ + if (dlm_locking_stopped(ls)) { + dlm_add_requestqueue(ls, nodeid, ms); + } else { + dlm_wait_requestqueue(ls); + _receive_message(ls, ms); + } +} + +/* This is called by dlm_recoverd to process messages that were saved on + the requestqueue. */ + +void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) +{ + _receive_message(ls, ms); +} + +/* This is called by the midcomms layer when something is received for + the lockspace. It could be either a MSG (normal message sent as part of + standard locking activity) or an RCOM (recovery message sent as part of + lockspace recovery). */ + +void dlm_receive_buffer(union dlm_packet *p, int nodeid) +{ + struct dlm_header *hd = &p->header; + struct dlm_ls *ls; + int type = 0; + + switch (hd->h_cmd) { + case DLM_MSG: + dlm_message_in(&p->message); + type = p->message.m_type; + break; + case DLM_RCOM: + dlm_rcom_in(&p->rcom); + type = p->rcom.rc_type; + break; + default: + log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); + return; + } + + if (hd->h_nodeid != nodeid) { + log_print("invalid h_nodeid %d from %d lockspace %x", + hd->h_nodeid, nodeid, hd->h_lockspace); + return; + } + + ls = dlm_find_lockspace_global(hd->h_lockspace); + if (!ls) { + if (dlm_config.ci_log_debug) + log_print("invalid lockspace %x from %d cmd %d type %d", + hd->h_lockspace, nodeid, hd->h_cmd, type); + + if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) + dlm_send_ls_not_ready(nodeid, &p->rcom); + return; + } + + /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to + be inactive (in this ls) before transitioning to recovery mode */ + + down_read(&ls->ls_recv_active); + if (hd->h_cmd == DLM_MSG) + dlm_receive_message(ls, &p->message, nodeid); + else + dlm_receive_rcom(ls, &p->rcom, nodeid); + up_read(&ls->ls_recv_active); + + dlm_put_lockspace(ls); +} static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) { @@ -3522,6 +3947,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3563,6 +3989,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; + int wait_type, stub_unlock_result, stub_cancel_result; mutex_lock(&ls->ls_waiters_mutex); @@ -3581,7 +4008,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!waiter_needs_recovery(ls, lkb)) continue; - switch (lkb->lkb_wait_type) { + wait_type = lkb->lkb_wait_type; + stub_unlock_result = -DLM_EUNLOCK; + stub_cancel_result = -DLM_ECANCEL; + + /* Main reply may have been received leaving a zero wait_type, + but a reply for the overlapping op may not have been + received. In that case we need to fake the appropriate + reply for the overlap op. */ + + if (!wait_type) { + if (is_overlap_cancel(lkb)) { + wait_type = DLM_MSG_CANCEL; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_cancel_result = 0; + } + if (is_overlap_unlock(lkb)) { + wait_type = DLM_MSG_UNLOCK; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_unlock_result = -ENOENT; + } + + log_debug(ls, "rwpre overlap %x %x %d %d %d", + lkb->lkb_id, lkb->lkb_flags, wait_type, + stub_cancel_result, stub_unlock_result); + } + + switch (wait_type) { case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_RESEND; @@ -3594,8 +4047,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; - ls->ls_stub_ms.m_result = -DLM_EUNLOCK; + ls->ls_stub_ms.m_result = stub_unlock_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3603,15 +4057,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; - ls->ls_stub_ms.m_result = -DLM_ECANCEL; + ls->ls_stub_ms.m_result = stub_cancel_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; default: - log_error(ls, "invalid lkb wait_type %d", - lkb->lkb_wait_type); + log_error(ls, "invalid lkb wait_type %d %d", + lkb->lkb_wait_type, wait_type); } schedule(); } @@ -3813,7 +4268,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) { struct dlm_rsb *r, *r_ret = NULL; - read_lock(&ls->ls_rsbtbl[bucket].lock); + spin_lock(&ls->ls_rsbtbl[bucket].lock); list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { if (!rsb_flag(r, RSB_LOCKS_PURGED)) continue; @@ -3822,7 +4277,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) r_ret = r; break; } - read_unlock(&ls->ls_rsbtbl[bucket].lock); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); return r_ret; } @@ -3879,32 +4334,34 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, return NULL; } +/* needs at least dlm_rcom + rcom_lock */ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_rsb *r, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; - int lvblen; lkb->lkb_nodeid = rc->rc_header.h_nodeid; - lkb->lkb_ownpid = rl->rl_ownpid; - lkb->lkb_remid = rl->rl_lkid; - lkb->lkb_exflags = rl->rl_exflags; - lkb->lkb_flags = rl->rl_flags & 0x0000FFFF; + lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); + lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); + lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); + lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; lkb->lkb_flags |= DLM_IFL_MSTCPY; - lkb->lkb_lvbseq = rl->rl_lvbseq; + lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); lkb->lkb_rqmode = rl->rl_rqmode; lkb->lkb_grmode = rl->rl_grmode; /* don't set lkb_status because add_lkb wants to itself */ - lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST); - lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); + lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - lkb->lkb_lvbptr = allocate_lvb(ls); + int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - + sizeof(struct rcom_lock); + if (lvblen > ls->ls_lvblen) + return -EINVAL; + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; - lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - - sizeof(struct rcom_lock); memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); } @@ -3912,7 +4369,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) { + if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && + middle_conversion(lkb)) { rl->rl_status = DLM_LKSTS_CONVERT; lkb->lkb_grmode = DLM_LOCK_IV; rsb_set_flag(r, RSB_RECOVER_CONVERT); @@ -3927,6 +4385,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, the given values and send back our lkid. We send back our lkid by sending back the rcom_lock struct we got but with the remid field filled in. */ +/* needs at least dlm_rcom + rcom_lock */ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -3939,13 +4398,14 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) goto out; } - error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r); + error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), + R_MASTER, &r); if (error) goto out; lock_rsb(r); - lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid); + lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); if (lkb) { error = -EEXIST; goto out_remid; @@ -3968,18 +4428,20 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ - rl->rl_remid = lkb->lkb_id; + rl->rl_remid = cpu_to_le32(lkb->lkb_id); out_unlock: unlock_rsb(r); put_rsb(r); out: if (error) - log_print("recover_master_copy %d %x", error, rl->rl_lkid); - rl->rl_result = error; + log_debug(ls, "recover_master_copy %d %x", error, + le32_to_cpu(rl->rl_lkid)); + rl->rl_result = cpu_to_le32(error); return error; } +/* needs at least dlm_rcom + rcom_lock */ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -3987,15 +4449,16 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct dlm_lkb *lkb; int error; - error = find_lkb(ls, rl->rl_lkid, &lkb); + error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); if (error) { - log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid); + log_error(ls, "recover_process_copy no lkid %x", + le32_to_cpu(rl->rl_lkid)); return error; } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - error = rl->rl_result; + error = le32_to_cpu(rl->rl_result); r = lkb->lkb_resource; hold_rsb(r); @@ -4014,7 +4477,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) log_debug(ls, "master copy exists %x", lkb->lkb_id); /* fall through */ case 0: - lkb->lkb_remid = rl->rl_remid; + lkb->lkb_remid = le32_to_cpu(rl->rl_remid); break; default: log_error(ls, "dlm_recover_process_copy unknown error %d %x", @@ -4034,13 +4497,13 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, uint32_t flags, void *name, unsigned int namelen, - uint32_t parent_lkid) + unsigned long timeout_cs) { struct dlm_lkb *lkb; struct dlm_args args; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = create_lkb(ls, &lkb); if (error) { @@ -4049,7 +4512,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } if (flags & DLM_LKF_VALBLK) { - ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); + ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); if (!ua->lksb.sb_lvbptr) { kfree(ua); __put_lkb(ls, lkb); @@ -4058,12 +4521,12 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } } - /* After ua is attached to lkb it will be freed by free_lkb(). + /* After ua is attached to lkb it will be freed by dlm_free_lkb(). When DLM_IFL_USER is set, the dlm knows that this is a userspace lock and that lkb_astparam is the dlm_user_args structure. */ - error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, - DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); + error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, + fake_astfn, ua, fake_bastfn, &args); lkb->lkb_flags |= DLM_IFL_USER; ua->old_mode = DLM_LOCK_IV; @@ -4094,19 +4557,20 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); spin_unlock(&ua->proc->locks_spin); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); return error; } int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, - int mode, uint32_t flags, uint32_t lkid, char *lvb_in) + int mode, uint32_t flags, uint32_t lkid, char *lvb_in, + unsigned long timeout_cs) { struct dlm_lkb *lkb; struct dlm_args args; struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) @@ -4115,10 +4579,10 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, /* user can change the params on its lock when it converts it, or add an lvb that didn't exist before */ - ua = (struct dlm_user_args *)lkb->lkb_astparam; + ua = lkb->lkb_ua; if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { - ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); + ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); if (!ua->lksb.sb_lvbptr) { error = -ENOMEM; goto out_put; @@ -4127,6 +4591,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (lvb_in && ua->lksb.sb_lvbptr) memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); + ua->xid = ua_tmp->xid; ua->castparam = ua_tmp->castparam; ua->castaddr = ua_tmp->castaddr; ua->bastparam = ua_tmp->bastparam; @@ -4134,19 +4599,19 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua->user_lksb = ua_tmp->user_lksb; ua->old_mode = lkb->lkb_grmode; - error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, - ua, DLM_FAKE_USER_AST, &args); + error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, + fake_astfn, ua, fake_bastfn, &args); if (error) goto out_put; error = convert_lock(ls, lkb, &args); - if (error == -EINPROGRESS || error == -EAGAIN) + if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) error = 0; out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); kfree(ua_tmp); return error; } @@ -4159,17 +4624,18 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) goto out; - ua = (struct dlm_user_args *)lkb->lkb_astparam; + ua = lkb->lkb_ua; if (lvb_in && ua->lksb.sb_lvbptr) memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); - ua->castparam = ua_tmp->castparam; + if (ua_tmp->castparam) + ua->castparam = ua_tmp->castparam; ua->user_lksb = ua_tmp->user_lksb; error = set_unlock_args(flags, ua, &args); @@ -4194,7 +4660,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); kfree(ua_tmp); return error; } @@ -4207,14 +4673,15 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, struct dlm_user_args *ua; int error; - lock_recovery(ls); + dlm_lock_recovery(ls); error = find_lkb(ls, lkid, &lkb); if (error) goto out; - ua = (struct dlm_user_args *)lkb->lkb_astparam; - ua->castparam = ua_tmp->castparam; + ua = lkb->lkb_ua; + if (ua_tmp->castparam) + ua->castparam = ua_tmp->castparam; ua->user_lksb = ua_tmp->user_lksb; error = set_unlock_args(flags, ua, &args); @@ -4231,17 +4698,64 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, out_put: dlm_put_lkb(lkb); out: - unlock_recovery(ls); + dlm_unlock_recovery(ls); kfree(ua_tmp); return error; } +int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) +{ + struct dlm_lkb *lkb; + struct dlm_args args; + struct dlm_user_args *ua; + struct dlm_rsb *r; + int error; + + dlm_lock_recovery(ls); + + error = find_lkb(ls, lkid, &lkb); + if (error) + goto out; + + ua = lkb->lkb_ua; + + error = set_unlock_args(flags, ua, &args); + if (error) + goto out_put; + + /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ + + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + + error = validate_unlock_args(lkb, &args); + if (error) + goto out_r; + lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; + + error = _cancel_lock(r, lkb); + out_r: + unlock_rsb(r); + put_rsb(r); + + if (error == -DLM_ECANCEL) + error = 0; + /* from validate_unlock_args() */ + if (error == -EBUSY) + error = 0; + out_put: + dlm_put_lkb(lkb); + out: + dlm_unlock_recovery(ls); + return error; +} + /* lkb's that are removed from the waiters list by revert are just left on the orphans list with the granted orphan locks, to be freed by purge */ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { - struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; struct dlm_args args; int error; @@ -4250,7 +4764,7 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); mutex_unlock(&ls->ls_orphans_mutex); - set_unlock_args(0, ua, &args); + set_unlock_args(0, lkb->lkb_ua, &args); error = cancel_lock(ls, lkb, &args); if (error == -DLM_ECANCEL) @@ -4263,11 +4777,10 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { - struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; struct dlm_args args; int error; - set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); + set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args); error = unlock_lock(ls, lkb, &args); if (error == -DLM_EUNLOCK) @@ -4314,12 +4827,13 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { struct dlm_lkb *lkb, *safe; - lock_recovery(ls); + dlm_lock_recovery(ls); while (1) { lkb = del_proc_lock(ls, proc); if (!lkb) break; + del_timeout(lkb); if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) orphan_proc_lock(ls, lkb); else @@ -4342,12 +4856,13 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) } list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + lkb->lkb_ast_type = 0; list_del(&lkb->lkb_astqueue); dlm_put_lkb(lkb); } mutex_unlock(&ls->ls_clear_proc_locks); - unlock_recovery(ls); + dlm_unlock_recovery(ls); } static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) @@ -4429,12 +4944,12 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, if (nodeid != dlm_our_nodeid()) { error = send_purge(ls, nodeid, pid); } else { - lock_recovery(ls); + dlm_lock_recovery(ls); if (pid == current->pid) purge_proc_locks(ls, proc); else do_purge(ls, nodeid, pid); - unlock_recovery(ls); + dlm_unlock_recovery(ls); } return error; }