X-Git-Url: http://ftp.safe.ca/?a=blobdiff_plain;f=fs%2Focfs2%2Fdlmglue.c;h=eae3d643a5e4316332b23cabb6f3d7edae4b7a5e;hb=ac11c827192272eabb68b8f4cf844066461d9690;hp=f7794306b2bd20a0dc2e1d1c9aec72e559bd4b8c;hpb=200bfae37a15e50e0f9aa5683958bdfc3fd55e05;p=safe%2Fjmp%2Flinux-2.6 diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index f779430..eae3d64 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -27,17 +27,11 @@ #include #include #include -#include #include #include #include #include - -#include -#include -#include - -#include +#include #define MLOG_MASK_PREFIX ML_DLM_GLUE #include @@ -53,6 +47,7 @@ #include "heartbeat.h" #include "inode.h" #include "journal.h" +#include "stackglue.h" #include "slot_map.h" #include "super.h" #include "uptodate.h" @@ -65,6 +60,9 @@ struct ocfs2_mask_waiter { struct completion mw_complete; unsigned long mw_mask; unsigned long mw_goal; +#ifdef CONFIG_OCFS2_FS_STATS + unsigned long long mw_lock_start; +#endif }; static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); @@ -113,7 +111,8 @@ static void ocfs2_dump_meta_lvb_info(u64 level, unsigned int line, struct ocfs2_lock_res *lockres) { - struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; + struct ocfs2_meta_lvb *lvb = + (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); mlog(level, "LVB information for %s (called from %s:%u):\n", lockres->l_name, function, line); @@ -259,31 +258,6 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = { .flags = 0, }; -/* - * This is the filesystem locking protocol version. - * - * Whenever the filesystem does new things with locks (adds or removes a - * lock, orders them differently, does different things underneath a lock), - * the version must be changed. The protocol is negotiated when joining - * the dlm domain. A node may join the domain if its major version is - * identical to all other nodes and its minor version is greater than - * or equal to all other nodes. When its minor version is greater than - * the other nodes, it will run at the minor version specified by the - * other nodes. - * - * If a locking change is made that will not be compatible with older - * versions, the major number must be increased and the minor version set - * to zero. If a change merely adds a behavior that can be disabled when - * speaking to older versions, the minor version must be increased. If a - * change adds a fully backwards compatible change (eg, LVB changes that - * are just ignored by older versions), the version does not need to be - * updated. - */ -const struct dlm_protocol_version ocfs2_locking_protocol = { - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, -}; - static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) { return lockres->l_type == OCFS2_LOCK_TYPE_META || @@ -316,7 +290,7 @@ static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *l static int ocfs2_lock_create(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, - int dlm_flags); + u32 dlm_flags); static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, int wanted); static void ocfs2_cluster_unlock(struct ocfs2_super *osb, @@ -330,10 +304,9 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres); static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert); -#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ - mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ - "resource %s: %s\n", dlm_errname(_stat), _func, \ - _lockres->l_name, dlm_errmsg(_stat)); \ +#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ + mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ + _err, _func, _lockres->l_name); \ } while (0) static int ocfs2_downconvert_thread(void *arg); static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, @@ -342,12 +315,13 @@ static int ocfs2_inode_lock_update(struct inode *inode, struct buffer_head **bh); static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); static inline int ocfs2_highest_compat_lock_level(int level); -static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, - int new_level); +static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, + int new_level); static int ocfs2_downconvert_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int new_level, - int lvb); + int lvb, + unsigned int generation); static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres); static int ocfs2_cancel_convert(struct ocfs2_super *osb, @@ -396,6 +370,75 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) spin_unlock(&ocfs2_dlm_tracking_lock); } +#ifdef CONFIG_OCFS2_FS_STATS +static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) +{ + res->l_lock_num_prmode = 0; + res->l_lock_num_prmode_failed = 0; + res->l_lock_total_prmode = 0; + res->l_lock_max_prmode = 0; + res->l_lock_num_exmode = 0; + res->l_lock_num_exmode_failed = 0; + res->l_lock_total_exmode = 0; + res->l_lock_max_exmode = 0; + res->l_lock_refresh = 0; +} + +static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, + struct ocfs2_mask_waiter *mw, int ret) +{ + unsigned long long *num, *sum; + unsigned int *max, *failed; + struct timespec ts = current_kernel_time(); + unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; + + if (level == LKM_PRMODE) { + num = &res->l_lock_num_prmode; + sum = &res->l_lock_total_prmode; + max = &res->l_lock_max_prmode; + failed = &res->l_lock_num_prmode_failed; + } else if (level == LKM_EXMODE) { + num = &res->l_lock_num_exmode; + sum = &res->l_lock_total_exmode; + max = &res->l_lock_max_exmode; + failed = &res->l_lock_num_exmode_failed; + } else + return; + + (*num)++; + (*sum) += time; + if (time > *max) + *max = time; + if (ret) + (*failed)++; +} + +static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) +{ + lockres->l_lock_refresh++; +} + +static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) +{ + struct timespec ts = current_kernel_time(); + mw->mw_lock_start = timespec_to_ns(&ts); +} +#else +static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) +{ +} +static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, + int level, struct ocfs2_mask_waiter *mw, int ret) +{ +} +static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) +{ +} +static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) +{ +} +#endif + static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, struct ocfs2_lock_res *res, enum ocfs2_lock_type type, @@ -406,15 +449,17 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, res->l_ops = ops; res->l_priv = priv; - res->l_level = LKM_IVMODE; - res->l_requested = LKM_IVMODE; - res->l_blocking = LKM_IVMODE; + res->l_level = DLM_LOCK_IV; + res->l_requested = DLM_LOCK_IV; + res->l_blocking = DLM_LOCK_IV; res->l_action = OCFS2_AST_INVALID; res->l_unlock_action = OCFS2_UNLOCK_INVALID; res->l_flags = OCFS2_LOCK_INITIALIZED; ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); + + ocfs2_init_lock_stats(res); } void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) @@ -604,10 +649,10 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, BUG_ON(!lockres); switch(level) { - case LKM_EXMODE: + case DLM_LOCK_EX: lockres->l_ex_holders++; break; - case LKM_PRMODE: + case DLM_LOCK_PR: lockres->l_ro_holders++; break; default: @@ -625,11 +670,11 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, BUG_ON(!lockres); switch(level) { - case LKM_EXMODE: + case DLM_LOCK_EX: BUG_ON(!lockres->l_ex_holders); lockres->l_ex_holders--; break; - case LKM_PRMODE: + case DLM_LOCK_PR: BUG_ON(!lockres->l_ro_holders); lockres->l_ro_holders--; break; @@ -644,12 +689,12 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, * lock types are added. */ static inline int ocfs2_highest_compat_lock_level(int level) { - int new_level = LKM_EXMODE; + int new_level = DLM_LOCK_EX; - if (level == LKM_EXMODE) - new_level = LKM_NLMODE; - else if (level == LKM_PRMODE) - new_level = LKM_PRMODE; + if (level == DLM_LOCK_EX) + new_level = DLM_LOCK_NL; + else if (level == DLM_LOCK_PR) + new_level = DLM_LOCK_PR; return new_level; } @@ -688,12 +733,12 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); - BUG_ON(lockres->l_blocking <= LKM_NLMODE); + BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); lockres->l_level = lockres->l_requested; if (lockres->l_level <= ocfs2_highest_compat_lock_level(lockres->l_blocking)) { - lockres->l_blocking = LKM_NLMODE; + lockres->l_blocking = DLM_LOCK_NL; lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); } lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); @@ -712,7 +757,7 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo * information is already up to data. Convert from NL to * *anything* however should mark ourselves as needing an * update */ - if (lockres->l_level == LKM_NLMODE && + if (lockres->l_level == DLM_LOCK_NL && lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); @@ -729,7 +774,7 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); - if (lockres->l_requested > LKM_NLMODE && + if (lockres->l_requested > DLM_LOCK_NL && !(lockres->l_flags & OCFS2_LOCK_LOCAL) && lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); @@ -767,6 +812,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, return needs_downconvert; } +/* + * OCFS2_LOCK_PENDING and l_pending_gen. + * + * Why does OCFS2_LOCK_PENDING exist? To close a race between setting + * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() + * for more details on the race. + * + * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces + * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() + * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear + * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, + * the caller is going to try to clear PENDING again. If nothing else is + * happening, __lockres_clear_pending() sees PENDING is unset and does + * nothing. + * + * But what if another path (eg downconvert thread) has just started a + * new locking action? The other path has re-set PENDING. Our path + * cannot clear PENDING, because that will re-open the original race + * window. + * + * [Example] + * + * ocfs2_meta_lock() + * ocfs2_cluster_lock() + * set BUSY + * set PENDING + * drop l_lock + * ocfs2_dlm_lock() + * ocfs2_locking_ast() ocfs2_downconvert_thread() + * clear PENDING ocfs2_unblock_lock() + * take_l_lock + * !BUSY + * ocfs2_prepare_downconvert() + * set BUSY + * set PENDING + * drop l_lock + * take l_lock + * clear PENDING + * drop l_lock + * + * ocfs2_dlm_lock() + * + * So as you can see, we now have a window where l_lock is not held, + * PENDING is not set, and ocfs2_dlm_lock() has not been called. + * + * The core problem is that ocfs2_cluster_lock() has cleared the PENDING + * set by ocfs2_prepare_downconvert(). That wasn't nice. + * + * To solve this we introduce l_pending_gen. A call to + * lockres_clear_pending() will only do so when it is passed a generation + * number that matches the lockres. lockres_set_pending() will return the + * current generation number. When ocfs2_cluster_lock() goes to clear + * PENDING, it passes the generation it got from set_pending(). In our + * example above, the generation numbers will *not* match. Thus, + * ocfs2_cluster_lock() will not clear the PENDING set by + * ocfs2_prepare_downconvert(). + */ + +/* Unlocked version for ocfs2_locking_ast() */ +static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, + unsigned int generation, + struct ocfs2_super *osb) +{ + assert_spin_locked(&lockres->l_lock); + + /* + * The ast and locking functions can race us here. The winner + * will clear pending, the loser will not. + */ + if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || + (lockres->l_pending_gen != generation)) + return; + + lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); + lockres->l_pending_gen++; + + /* + * The downconvert thread may have skipped us because we + * were PENDING. Wake it up. + */ + if (lockres->l_flags & OCFS2_LOCK_BLOCKED) + ocfs2_wake_downconvert_thread(osb); +} + +/* Locked version for callers of ocfs2_dlm_lock() */ +static void lockres_clear_pending(struct ocfs2_lock_res *lockres, + unsigned int generation, + struct ocfs2_super *osb) +{ + unsigned long flags; + + spin_lock_irqsave(&lockres->l_lock, flags); + __lockres_clear_pending(lockres, generation, osb); + spin_unlock_irqrestore(&lockres->l_lock, flags); +} + +static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) +{ + assert_spin_locked(&lockres->l_lock); + BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); + + lockres_or_flags(lockres, OCFS2_LOCK_PENDING); + + return lockres->l_pending_gen; +} + + static void ocfs2_blocking_ast(void *opaque, int level) { struct ocfs2_lock_res *lockres = opaque; @@ -774,7 +926,7 @@ static void ocfs2_blocking_ast(void *opaque, int level) int needs_downconvert; unsigned long flags; - BUG_ON(level <= LKM_NLMODE); + BUG_ON(level <= DLM_LOCK_NL); mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", lockres->l_name, level, lockres->l_level, @@ -801,14 +953,22 @@ static void ocfs2_blocking_ast(void *opaque, int level) static void ocfs2_locking_ast(void *opaque) { struct ocfs2_lock_res *lockres = opaque; - struct dlm_lockstatus *lksb = &lockres->l_lksb; + struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); unsigned long flags; + int status; spin_lock_irqsave(&lockres->l_lock, flags); - if (lksb->status != DLM_NORMAL) { - mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", - lockres->l_name, lksb->status); + status = ocfs2_dlm_lock_status(&lockres->l_lksb); + + if (status == -EAGAIN) { + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); + goto out; + } + + if (status) { + mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", + lockres->l_name, status); spin_unlock_irqrestore(&lockres->l_lock, flags); return; } @@ -831,11 +991,23 @@ static void ocfs2_locking_ast(void *opaque) lockres->l_unlock_action); BUG(); } - +out: /* set it to something invalid so if we get called again we * can catch it. */ lockres->l_action = OCFS2_AST_INVALID; + /* Did we try to cancel this lock? Clear that state */ + if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) + lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; + + /* + * We may have beaten the locking functions here. We certainly + * know that dlm_lock() has been called :-) + * Because we can't have two lock calls in flight at once, we + * can use lockres->l_pending_gen. + */ + __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); + wake_up(&lockres->l_event); spin_unlock_irqrestore(&lockres->l_lock, flags); } @@ -865,15 +1037,15 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, static int ocfs2_lock_create(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, - int dlm_flags) + u32 dlm_flags) { int ret = 0; - enum dlm_status status = DLM_NORMAL; unsigned long flags; + unsigned int gen; mlog_entry_void(); - mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, + mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, dlm_flags); spin_lock_irqsave(&lockres->l_lock, flags); @@ -886,24 +1058,23 @@ static int ocfs2_lock_create(struct ocfs2_super *osb, lockres->l_action = OCFS2_AST_ATTACH; lockres->l_requested = level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); + gen = lockres_set_pending(lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); - status = dlmlock(osb->dlm, - level, - &lockres->l_lksb, - dlm_flags, - lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - ocfs2_locking_ast, - lockres, - ocfs2_blocking_ast); - if (status != DLM_NORMAL) { - ocfs2_log_dlm_error("dlmlock", status, lockres); - ret = -EINVAL; + ret = ocfs2_dlm_lock(osb->cconn, + level, + &lockres->l_lksb, + dlm_flags, + lockres->l_name, + OCFS2_LOCK_ID_MAX_LEN - 1, + lockres); + lockres_clear_pending(lockres, gen, osb); + if (ret) { + ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); ocfs2_recover_from_dlm_error(lockres, 1); } - mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); + mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); bail: mlog_exit(ret); @@ -952,6 +1123,7 @@ static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) { INIT_LIST_HEAD(&mw->mw_item); init_completion(&mw->mw_complete); + ocfs2_init_start_time(mw); } static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) @@ -1016,21 +1188,22 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, static int ocfs2_cluster_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, - int lkm_flags, + u32 lkm_flags, int arg_flags) { struct ocfs2_mask_waiter mw; - enum dlm_status status; int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ unsigned long flags; + unsigned int gen; + int noqueue_attempted = 0; mlog_entry_void(); ocfs2_init_mask_waiter(&mw); if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) - lkm_flags |= LKM_VALBLK; + lkm_flags |= DLM_LKF_VALBLK; again: wait = 0; @@ -1068,52 +1241,56 @@ again: } if (level > lockres->l_level) { + if (noqueue_attempted > 0) { + ret = -EAGAIN; + goto unlock; + } + if (lkm_flags & DLM_LKF_NOQUEUE) + noqueue_attempted = 1; + if (lockres->l_action != OCFS2_AST_INVALID) mlog(ML_ERROR, "lockres %s has action %u pending\n", lockres->l_name, lockres->l_action); if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { lockres->l_action = OCFS2_AST_ATTACH; - lkm_flags &= ~LKM_CONVERT; + lkm_flags &= ~DLM_LKF_CONVERT; } else { lockres->l_action = OCFS2_AST_CONVERT; - lkm_flags |= LKM_CONVERT; + lkm_flags |= DLM_LKF_CONVERT; } lockres->l_requested = level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); + gen = lockres_set_pending(lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); - BUG_ON(level == LKM_IVMODE); - BUG_ON(level == LKM_NLMODE); + BUG_ON(level == DLM_LOCK_IV); + BUG_ON(level == DLM_LOCK_NL); mlog(0, "lock %s, convert from %d to level = %d\n", lockres->l_name, lockres->l_level, level); /* call dlm_lock to upgrade lock now */ - status = dlmlock(osb->dlm, - level, - &lockres->l_lksb, - lkm_flags, - lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - ocfs2_locking_ast, - lockres, - ocfs2_blocking_ast); - if (status != DLM_NORMAL) { - if ((lkm_flags & LKM_NOQUEUE) && - (status == DLM_NOTQUEUED)) - ret = -EAGAIN; - else { - ocfs2_log_dlm_error("dlmlock", status, - lockres); - ret = -EINVAL; + ret = ocfs2_dlm_lock(osb->cconn, + level, + &lockres->l_lksb, + lkm_flags, + lockres->l_name, + OCFS2_LOCK_ID_MAX_LEN - 1, + lockres); + lockres_clear_pending(lockres, gen, osb); + if (ret) { + if (!(lkm_flags & DLM_LKF_NOQUEUE) || + (ret != -EAGAIN)) { + ocfs2_log_dlm_error("ocfs2_dlm_lock", + ret, lockres); } ocfs2_recover_from_dlm_error(lockres, 1); goto out; } - mlog(0, "lock %s, successfull return from dlmlock\n", + mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n", lockres->l_name); /* At this point we've gone inside the dlm and need to @@ -1153,6 +1330,7 @@ out: goto again; mlog_errno(ret); } + ocfs2_update_lock_stats(lockres, level, &mw, ret); mlog_exit(ret); return ret; @@ -1177,9 +1355,9 @@ static int ocfs2_create_new_lock(struct ocfs2_super *osb, int ex, int local) { - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; unsigned long flags; - int lkm_flags = local ? LKM_LOCAL : 0; + u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; spin_lock_irqsave(&lockres->l_lock, flags); BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); @@ -1222,7 +1400,7 @@ int ocfs2_create_new_inode_locks(struct inode *inode) } /* - * We don't want to use LKM_LOCAL on a meta data lock as they + * We don't want to use DLM_LKF_LOCAL on a meta data lock as they * don't use a generation in their lock names. */ ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); @@ -1261,7 +1439,7 @@ int ocfs2_rw_lock(struct inode *inode, int write) lockres = &OCFS2_I(inode)->ip_rw_lockres; - level = write ? LKM_EXMODE : LKM_PRMODE; + level = write ? DLM_LOCK_EX : DLM_LOCK_PR; status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 0); @@ -1274,7 +1452,7 @@ int ocfs2_rw_lock(struct inode *inode, int write) void ocfs2_rw_unlock(struct inode *inode, int write) { - int level = write ? LKM_EXMODE : LKM_PRMODE; + int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); @@ -1312,7 +1490,7 @@ int ocfs2_open_lock(struct inode *inode) lockres = &OCFS2_I(inode)->ip_open_lockres; status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, - LKM_PRMODE, 0, 0); + DLM_LOCK_PR, 0, 0); if (status < 0) mlog_errno(status); @@ -1340,16 +1518,16 @@ int ocfs2_try_open_lock(struct inode *inode, int write) lockres = &OCFS2_I(inode)->ip_open_lockres; - level = write ? LKM_EXMODE : LKM_PRMODE; + level = write ? DLM_LOCK_EX : DLM_LOCK_PR; /* * The file system may already holding a PRMODE/EXMODE open lock. - * Since we pass LKM_NOQUEUE, the request won't block waiting on + * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on * other nodes and the -EAGAIN will indicate to the caller that * this inode is still in use. */ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, - level, LKM_NOQUEUE, 0); + level, DLM_LKF_NOQUEUE, 0); out: mlog_exit(status); @@ -1374,10 +1552,10 @@ void ocfs2_open_unlock(struct inode *inode) if(lockres->l_ro_holders) ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, - LKM_PRMODE); + DLM_LOCK_PR); if(lockres->l_ex_holders) ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, - LKM_EXMODE); + DLM_LOCK_EX); out: mlog_exit_void(); @@ -1453,8 +1631,8 @@ out: */ int ocfs2_file_lock(struct file *file, int ex, int trylock) { - int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; - unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; + int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; + unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; unsigned long flags; struct ocfs2_file_private *fp = file->private_data; struct ocfs2_lock_res *lockres = &fp->fp_flock; @@ -1464,7 +1642,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) ocfs2_init_mask_waiter(&mw); if ((lockres->l_flags & OCFS2_LOCK_BUSY) || - (lockres->l_level > LKM_NLMODE)) { + (lockres->l_level > DLM_LOCK_NL)) { mlog(ML_ERROR, "File lock \"%s\" has busy or locked state: flags: 0x%lx, " "level: %u\n", lockres->l_name, lockres->l_flags, @@ -1481,7 +1659,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) * Get the lock at NLMODE to start - that way we * can cancel the upconvert request if need be. */ - ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); + ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); if (ret < 0) { mlog_errno(ret); goto out; @@ -1496,21 +1674,19 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) } lockres->l_action = OCFS2_AST_CONVERT; - lkm_flags |= LKM_CONVERT; + lkm_flags |= DLM_LKF_CONVERT; lockres->l_requested = level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); spin_unlock_irqrestore(&lockres->l_lock, flags); - ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags, - lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, - ocfs2_locking_ast, lockres, ocfs2_blocking_ast); - if (ret != DLM_NORMAL) { - if (trylock && ret == DLM_NOTQUEUED) - ret = -EAGAIN; - else { - ocfs2_log_dlm_error("dlmlock", ret, lockres); + ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, + lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, + lockres); + if (ret) { + if (!trylock || (ret != -EAGAIN)) { + ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); ret = -EINVAL; } @@ -1537,6 +1713,10 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) * to just bubble sucess back up to the user. */ ret = ocfs2_flock_handle_signal(lockres, level); + } else if (!ret && (level > lockres->l_level)) { + /* Trylock failed asynchronously */ + BUG_ON(!trylock); + ret = -EAGAIN; } out: @@ -1549,6 +1729,7 @@ out: void ocfs2_file_unlock(struct file *file) { int ret; + unsigned int gen; unsigned long flags; struct ocfs2_file_private *fp = file->private_data; struct ocfs2_lock_res *lockres = &fp->fp_flock; @@ -1560,7 +1741,7 @@ void ocfs2_file_unlock(struct file *file) if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) return; - if (lockres->l_level == LKM_NLMODE) + if (lockres->l_level == DLM_LOCK_NL) return; mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", @@ -1572,13 +1753,13 @@ void ocfs2_file_unlock(struct file *file) * Fake a blocking ast for the downconvert code. */ lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); - lockres->l_blocking = LKM_EXMODE; + lockres->l_blocking = DLM_LOCK_EX; - ocfs2_prepare_downconvert(lockres, LKM_NLMODE); + gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); spin_unlock_irqrestore(&lockres->l_lock, flags); - ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); + ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); if (ret) { mlog_errno(ret); return; @@ -1601,11 +1782,11 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, * condition. */ if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { switch(lockres->l_blocking) { - case LKM_EXMODE: + case DLM_LOCK_EX: if (!lockres->l_ex_holders && !lockres->l_ro_holders) kick = 1; break; - case LKM_PRMODE: + case DLM_LOCK_PR: if (!lockres->l_ex_holders) kick = 1; break; @@ -1648,7 +1829,7 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode) mlog_entry_void(); - lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; + lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); /* * Invalidate the LVB of a deleted inode - this way other @@ -1700,7 +1881,7 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode) mlog_meta_lvb(0, lockres); - lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; + lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); /* We're safe here without the lockres lock... */ spin_lock(&oi->ip_lock); @@ -1735,7 +1916,8 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode) static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, struct ocfs2_lock_res *lockres) { - struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; + struct ocfs2_meta_lvb *lvb = + (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); if (lvb->lvb_version == OCFS2_LVB_VERSION && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) @@ -1878,6 +2060,7 @@ static int ocfs2_inode_lock_update(struct inode *inode, le32_to_cpu(fe->i_flags)); ocfs2_refresh_inode(inode, fe); + ocfs2_track_lock_refresh(lockres); } status = 0; @@ -1923,7 +2106,8 @@ int ocfs2_inode_lock_full(struct inode *inode, int ex, int arg_flags) { - int status, level, dlm_flags, acquired; + int status, level, acquired; + u32 dlm_flags; struct ocfs2_lock_res *lockres = NULL; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); struct buffer_head *local_bh = NULL; @@ -1950,14 +2134,13 @@ int ocfs2_inode_lock_full(struct inode *inode, goto local; if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) - wait_event(osb->recovery_event, - ocfs2_node_map_is_empty(osb, &osb->recovery_map)); + ocfs2_wait_for_recovery(osb); lockres = &OCFS2_I(inode)->ip_inode_lockres; - level = ex ? LKM_EXMODE : LKM_PRMODE; + level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; dlm_flags = 0; if (arg_flags & OCFS2_META_LOCK_NOQUEUE) - dlm_flags |= LKM_NOQUEUE; + dlm_flags |= DLM_LKF_NOQUEUE; status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); if (status < 0) { @@ -1974,8 +2157,7 @@ int ocfs2_inode_lock_full(struct inode *inode, * committed to owning this lock so we don't allow signals to * abort the operation. */ if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) - wait_event(osb->recovery_event, - ocfs2_node_map_is_empty(osb, &osb->recovery_map)); + ocfs2_wait_for_recovery(osb); local: /* @@ -2109,7 +2291,7 @@ int ocfs2_inode_lock_atime(struct inode *inode, void ocfs2_inode_unlock(struct inode *inode, int ex) { - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); @@ -2130,10 +2312,8 @@ int ocfs2_super_lock(struct ocfs2_super *osb, int ex) { int status = 0; - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; - struct buffer_head *bh; - struct ocfs2_slot_info *si = osb->slot_info; mlog_entry_void(); @@ -2159,16 +2339,13 @@ int ocfs2_super_lock(struct ocfs2_super *osb, goto bail; } if (status) { - bh = si->si_bh; - status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, - si->si_inode); - if (status == 0) - ocfs2_update_slot_info(si); + status = ocfs2_refresh_slot_info(osb); ocfs2_complete_lock_res_refresh(lockres, status); if (status < 0) mlog_errno(status); + ocfs2_track_lock_refresh(lockres); } bail: mlog_exit(status); @@ -2178,7 +2355,7 @@ bail: void ocfs2_super_unlock(struct ocfs2_super *osb, int ex) { - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; if (!ocfs2_mount_local(osb)) @@ -2196,7 +2373,7 @@ int ocfs2_rename_lock(struct ocfs2_super *osb) if (ocfs2_mount_local(osb)) return 0; - status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); + status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); if (status < 0) mlog_errno(status); @@ -2208,13 +2385,13 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb) struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; if (!ocfs2_mount_local(osb)) - ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); + ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); } int ocfs2_dentry_lock(struct dentry *dentry, int ex) { int ret; - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_dentry_lock *dl = dentry->d_fsdata; struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); @@ -2235,7 +2412,7 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex) void ocfs2_dentry_unlock(struct dentry *dentry, int ex) { - int level = ex ? LKM_EXMODE : LKM_PRMODE; + int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_dentry_lock *dl = dentry->d_fsdata; struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); @@ -2363,7 +2540,7 @@ static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) } /* So that debugfs.ocfs2 can determine which format is being used */ -#define OCFS2_DLM_DEBUG_STR_VERSION 1 +#define OCFS2_DLM_DEBUG_STR_VERSION 2 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) { int i; @@ -2400,16 +2577,57 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) lockres->l_blocking); /* Dump the raw LVB */ - lvb = lockres->l_lksb.lvb; + lvb = ocfs2_dlm_lvb(&lockres->l_lksb); for(i = 0; i < DLM_LVB_LEN; i++) seq_printf(m, "0x%x\t", lvb[i]); +#ifdef CONFIG_OCFS2_FS_STATS +# define lock_num_prmode(_l) (_l)->l_lock_num_prmode +# define lock_num_exmode(_l) (_l)->l_lock_num_exmode +# define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed +# define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed +# define lock_total_prmode(_l) (_l)->l_lock_total_prmode +# define lock_total_exmode(_l) (_l)->l_lock_total_exmode +# define lock_max_prmode(_l) (_l)->l_lock_max_prmode +# define lock_max_exmode(_l) (_l)->l_lock_max_exmode +# define lock_refresh(_l) (_l)->l_lock_refresh +#else +# define lock_num_prmode(_l) (0ULL) +# define lock_num_exmode(_l) (0ULL) +# define lock_num_prmode_failed(_l) (0) +# define lock_num_exmode_failed(_l) (0) +# define lock_total_prmode(_l) (0ULL) +# define lock_total_exmode(_l) (0ULL) +# define lock_max_prmode(_l) (0) +# define lock_max_exmode(_l) (0) +# define lock_refresh(_l) (0) +#endif + /* The following seq_print was added in version 2 of this output */ + seq_printf(m, "%llu\t" + "%llu\t" + "%u\t" + "%u\t" + "%llu\t" + "%llu\t" + "%u\t" + "%u\t" + "%u\t", + lock_num_prmode(lockres), + lock_num_exmode(lockres), + lock_num_prmode_failed(lockres), + lock_num_exmode_failed(lockres), + lock_total_prmode(lockres), + lock_total_exmode(lockres), + lock_max_prmode(lockres), + lock_max_exmode(lockres), + lock_refresh(lockres)); + /* End the line */ seq_printf(m, "\n"); return 0; } -static struct seq_operations ocfs2_dlm_seq_ops = { +static const struct seq_operations ocfs2_dlm_seq_ops = { .start = ocfs2_dlm_seq_start, .stop = ocfs2_dlm_seq_stop, .next = ocfs2_dlm_seq_next, @@ -2504,13 +2722,14 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) int ocfs2_dlm_init(struct ocfs2_super *osb) { int status = 0; - u32 dlm_key; - struct dlm_ctxt *dlm = NULL; + struct ocfs2_cluster_connection *conn = NULL; mlog_entry_void(); - if (ocfs2_mount_local(osb)) + if (ocfs2_mount_local(osb)) { + osb->node_num = 0; goto local; + } status = ocfs2_dlm_init_debug(osb); if (status < 0) { @@ -2527,26 +2746,31 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) goto bail; } - /* used by the dlm code to make message headers unique, each - * node in this domain must agree on this. */ - dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); - /* for now, uuid == domain */ - dlm = dlm_register_domain(osb->uuid_str, dlm_key, - &osb->osb_locking_proto); - if (IS_ERR(dlm)) { - status = PTR_ERR(dlm); + status = ocfs2_cluster_connect(osb->osb_cluster_stack, + osb->uuid_str, + strlen(osb->uuid_str), + ocfs2_do_node_down, osb, + &conn); + if (status) { mlog_errno(status); goto bail; } - dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); + status = ocfs2_cluster_this_node(&osb->node_num); + if (status < 0) { + mlog_errno(status); + mlog(ML_ERROR, + "could not find this host's node number\n"); + ocfs2_cluster_disconnect(conn, 0); + goto bail; + } local: ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); - osb->dlm = dlm; + osb->cconn = conn; status = 0; bail: @@ -2560,14 +2784,19 @@ bail: return status; } -void ocfs2_dlm_shutdown(struct ocfs2_super *osb) +void ocfs2_dlm_shutdown(struct ocfs2_super *osb, + int hangup_pending) { mlog_entry_void(); - dlm_unregister_eviction_cb(&osb->osb_eviction_cb); - ocfs2_drop_osb_locks(osb); + /* + * Now that we have dropped all locks and ocfs2_dismount_volume() + * has disabled recovery, the DLM won't be talking to us. It's + * safe to tear things down before disconnecting the cluster. + */ + if (osb->dc_task) { kthread_stop(osb->dc_task); osb->dc_task = NULL; @@ -2576,15 +2805,15 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb) ocfs2_lock_res_free(&osb->osb_super_lockres); ocfs2_lock_res_free(&osb->osb_rename_lockres); - dlm_unregister_domain(osb->dlm); - osb->dlm = NULL; + ocfs2_cluster_disconnect(osb->cconn, hangup_pending); + osb->cconn = NULL; ocfs2_dlm_shutdown_debug(osb); mlog_exit_void(); } -static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) +static void ocfs2_unlock_ast(void *opaque, int error) { struct ocfs2_lock_res *lockres = opaque; unsigned long flags; @@ -2595,24 +2824,9 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) lockres->l_unlock_action); spin_lock_irqsave(&lockres->l_lock, flags); - /* We tried to cancel a convert request, but it was already - * granted. All we want to do here is clear our unlock - * state. The wake_up call done at the bottom is redundant - * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't - * hurt anything anyway */ - if (status == DLM_CANCELGRANT && - lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { - mlog(0, "Got cancelgrant for %s\n", lockres->l_name); - - /* We don't clear the busy flag in this case as it - * should have been cleared by the ast which the dlm - * has called. */ - goto complete_unlock; - } - - if (status != DLM_NORMAL) { - mlog(ML_ERROR, "Dlm passes status %d for lock %s, " - "unlock_action %d\n", status, lockres->l_name, + if (error) { + mlog(ML_ERROR, "Dlm passes error %d for lock %s, " + "unlock_action %d\n", error, lockres->l_name, lockres->l_unlock_action); spin_unlock_irqrestore(&lockres->l_lock, flags); return; @@ -2624,14 +2838,13 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) lockres->l_action = OCFS2_AST_INVALID; break; case OCFS2_UNLOCK_DROP_LOCK: - lockres->l_level = LKM_IVMODE; + lockres->l_level = DLM_LOCK_IV; break; default: BUG(); } lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); -complete_unlock: lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; spin_unlock_irqrestore(&lockres->l_lock, flags); @@ -2643,16 +2856,16 @@ complete_unlock: static int ocfs2_drop_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { - enum dlm_status status; + int ret; unsigned long flags; - int lkm_flags = 0; + u32 lkm_flags = 0; /* We didn't get anywhere near actually using this lockres. */ if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) goto out; if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) - lkm_flags |= LKM_VALBLK; + lkm_flags |= DLM_LKF_VALBLK; spin_lock_irqsave(&lockres->l_lock, flags); @@ -2678,7 +2891,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb, if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { if (lockres->l_flags & OCFS2_LOCK_ATTACHED && - lockres->l_level == LKM_EXMODE && + lockres->l_level == DLM_LOCK_EX && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) lockres->l_ops->set_lvb(lockres); } @@ -2707,15 +2920,15 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb, mlog(0, "lock %s\n", lockres->l_name); - status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, - ocfs2_unlock_ast, lockres); - if (status != DLM_NORMAL) { - ocfs2_log_dlm_error("dlmunlock", status, lockres); + ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, + lockres); + if (ret) { + ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); - dlm_print_one_lock(lockres->l_lksb.lockid); + ocfs2_dlm_dump_lksb(&lockres->l_lksb); BUG(); } - mlog(0, "lock %s, successfull return from dlmunlock\n", + mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n", lockres->l_name); ocfs2_wait_on_busy_lock(lockres); @@ -2806,15 +3019,15 @@ int ocfs2_drop_inode_locks(struct inode *inode) return status; } -static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, - int new_level) +static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, + int new_level) { assert_spin_locked(&lockres->l_lock); - BUG_ON(lockres->l_blocking <= LKM_NLMODE); + BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); if (lockres->l_level <= new_level) { - mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", + mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", lockres->l_level, new_level); BUG(); } @@ -2825,33 +3038,33 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, lockres->l_action = OCFS2_AST_DOWNCONVERT; lockres->l_requested = new_level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); + return lockres_set_pending(lockres); } static int ocfs2_downconvert_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int new_level, - int lvb) + int lvb, + unsigned int generation) { - int ret, dlm_flags = LKM_CONVERT; - enum dlm_status status; + int ret; + u32 dlm_flags = DLM_LKF_CONVERT; mlog_entry_void(); if (lvb) - dlm_flags |= LKM_VALBLK; - - status = dlmlock(osb->dlm, - new_level, - &lockres->l_lksb, - dlm_flags, - lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - ocfs2_locking_ast, - lockres, - ocfs2_blocking_ast); - if (status != DLM_NORMAL) { - ocfs2_log_dlm_error("dlmlock", status, lockres); - ret = -EINVAL; + dlm_flags |= DLM_LKF_VALBLK; + + ret = ocfs2_dlm_lock(osb->cconn, + new_level, + &lockres->l_lksb, + dlm_flags, + lockres->l_name, + OCFS2_LOCK_ID_MAX_LEN - 1, + lockres); + lockres_clear_pending(lockres, generation, osb); + if (ret) { + ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); ocfs2_recover_from_dlm_error(lockres, 1); goto bail; } @@ -2862,7 +3075,7 @@ bail: return ret; } -/* returns 1 when the caller should unlock and call dlmunlock */ +/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { @@ -2898,24 +3111,18 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { int ret; - enum dlm_status status; mlog_entry_void(); mlog(0, "lock %s\n", lockres->l_name); - ret = 0; - status = dlmunlock(osb->dlm, - &lockres->l_lksb, - LKM_CANCEL, - ocfs2_unlock_ast, - lockres); - if (status != DLM_NORMAL) { - ocfs2_log_dlm_error("dlmunlock", status, lockres); - ret = -EINVAL; + ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, + DLM_LKF_CANCEL, lockres); + if (ret) { + ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); ocfs2_recover_from_dlm_error(lockres, 0); } - mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); + mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); mlog_exit(ret); return ret; @@ -2930,6 +3137,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb, int new_level; int ret = 0; int set_lvb = 0; + unsigned int gen; mlog_entry_void(); @@ -2939,6 +3147,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb, recheck: if (lockres->l_flags & OCFS2_LOCK_BUSY) { + /* XXX + * This is a *big* race. The OCFS2_LOCK_PENDING flag + * exists entirely for one reason - another thread has set + * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). + * + * If we do ocfs2_cancel_convert() before the other thread + * calls dlm_lock(), our cancel will do nothing. We will + * get no ast, and we will have no way of knowing the + * cancel failed. Meanwhile, the other thread will call + * into dlm_lock() and wait...forever. + * + * Why forever? Because another node has asked for the + * lock first; that's why we're here in unblock_lock(). + * + * The solution is OCFS2_LOCK_PENDING. When PENDING is + * set, we just requeue the unblock. Only when the other + * thread has called dlm_lock() and cleared PENDING will + * we then cancel their request. + * + * All callers of dlm_lock() must set OCFS2_DLM_PENDING + * at the same time they set OCFS2_DLM_BUSY. They must + * clear OCFS2_DLM_PENDING after dlm_lock() returns. + */ + if (lockres->l_flags & OCFS2_LOCK_PENDING) + goto leave_requeue; + ctl->requeue = 1; ret = ocfs2_prepare_cancel_convert(osb, lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); @@ -2952,13 +3186,13 @@ recheck: /* if we're blocking an exclusive and we have *any* holders, * then requeue. */ - if ((lockres->l_blocking == LKM_EXMODE) + if ((lockres->l_blocking == DLM_LOCK_EX) && (lockres->l_ex_holders || lockres->l_ro_holders)) goto leave_requeue; /* If it's a PR we're blocking, then only * requeue if we've got any EX holders */ - if (lockres->l_blocking == LKM_PRMODE && + if (lockres->l_blocking == DLM_LOCK_PR && lockres->l_ex_holders) goto leave_requeue; @@ -3005,7 +3239,7 @@ downconvert: ctl->requeue = 0; if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { - if (lockres->l_level == LKM_EXMODE) + if (lockres->l_level == DLM_LOCK_EX) set_lvb = 1; /* @@ -3018,9 +3252,11 @@ downconvert: lockres->l_ops->set_lvb(lockres); } - ocfs2_prepare_downconvert(lockres, new_level); + gen = ocfs2_prepare_downconvert(lockres, new_level); spin_unlock_irqrestore(&lockres->l_lock, flags); - ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); + ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, + gen); + leave: mlog_exit(ret); return ret; @@ -3059,7 +3295,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, (unsigned long long)OCFS2_I(inode)->ip_blkno); } sync_mapping_buffers(mapping); - if (blocking == LKM_EXMODE) { + if (blocking == DLM_LOCK_EX) { truncate_inode_pages(mapping, 0); } else { /* We only need to wait on the I/O if we're not also @@ -3080,8 +3316,8 @@ static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, struct inode *inode = ocfs2_lock_res_inode(lockres); int checkpointed = ocfs2_inode_fully_checkpointed(inode); - BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); - BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); + BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); + BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); if (checkpointed) return 1; @@ -3145,7 +3381,7 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, * valid. The downconvert code will retain a PR for this node, * so there's no further work to do. */ - if (blocking == LKM_PRMODE) + if (blocking == DLM_LOCK_PR) return UNBLOCK_CONTINUE; /* @@ -3219,6 +3455,45 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, return UNBLOCK_CONTINUE_POST; } +/* + * This is the filesystem locking protocol. It provides the lock handling + * hooks for the underlying DLM. It has a maximum version number. + * The version number allows interoperability with systems running at + * the same major number and an equal or smaller minor number. + * + * Whenever the filesystem does new things with locks (adds or removes a + * lock, orders them differently, does different things underneath a lock), + * the version must be changed. The protocol is negotiated when joining + * the dlm domain. A node may join the domain if its major version is + * identical to all other nodes and its minor version is greater than + * or equal to all other nodes. When its minor version is greater than + * the other nodes, it will run at the minor version specified by the + * other nodes. + * + * If a locking change is made that will not be compatible with older + * versions, the major number must be increased and the minor version set + * to zero. If a change merely adds a behavior that can be disabled when + * speaking to older versions, the minor version must be increased. If a + * change adds a fully backwards compatible change (eg, LVB changes that + * are just ignored by older versions), the version does not need to be + * updated. + */ +static struct ocfs2_locking_protocol lproto = { + .lp_max_version = { + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, + }, + .lp_lock_ast = ocfs2_locking_ast, + .lp_blocking_ast = ocfs2_blocking_ast, + .lp_unlock_ast = ocfs2_unlock_ast, +}; + +void ocfs2_set_locking_protocol(void) +{ + ocfs2_stack_glue_set_locking_protocol(&lproto); +} + + static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) {