Merge branch 'for-next' into for-linus
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
index 6e6cc0a..50c4ee8 100644 (file)
@@ -32,6 +32,7 @@
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
 #include <linux/time.h>
+#include <linux/quotaops.h>
 
 #define MLOG_MASK_PREFIX ML_DLM_GLUE
 #include <cluster/masklog.h>
@@ -51,6 +52,8 @@
 #include "slot_map.h"
 #include "super.h"
 #include "uptodate.h"
+#include "quota.h"
+#include "refcounttree.h"
 
 #include "buffer_head_io.h"
 
@@ -68,6 +71,7 @@ struct ocfs2_mask_waiter {
 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
 
 /*
  * Return value from ->downconvert_worker functions.
@@ -89,6 +93,9 @@ struct ocfs2_unblock_ctl {
        enum ocfs2_unblock_action unblock_action;
 };
 
+/* Lockdep class keys */
+struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
+
 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
                                        int new_level);
 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
@@ -102,6 +109,12 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
                                     struct ocfs2_lock_res *lockres);
 
+static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
+
+static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
+                                           int new_level);
+static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
+                                        int blocking);
 
 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
 
@@ -111,8 +124,7 @@ static void ocfs2_dump_meta_lvb_info(u64 level,
                                     unsigned int line,
                                     struct ocfs2_lock_res *lockres)
 {
-       struct ocfs2_meta_lvb *lvb =
-               (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
+       struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
        mlog(level, "LVB information for %s (called from %s:%u):\n",
             lockres->l_name, function, line);
@@ -241,6 +253,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
        .flags          = 0,
 };
 
+static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
+       .flags          = 0,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
+};
+
 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
        .get_osb        = ocfs2_get_dentry_osb,
        .post_unlock    = ocfs2_dentry_post_unlock,
@@ -258,6 +278,18 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
        .flags          = 0,
 };
 
+static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
+       .set_lvb        = ocfs2_set_qinfo_lvb,
+       .get_osb        = ocfs2_get_qinfo_osb,
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
+       .check_downconvert = ocfs2_check_refcount_downconvert,
+       .downconvert_worker = ocfs2_refcount_convert_worker,
+       .flags          = 0,
+};
+
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 {
        return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -265,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
                lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
 }
 
+static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
+{
+       return container_of(lksb, struct ocfs2_lock_res, l_lksb);
+}
+
 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 {
        BUG_ON(!ocfs2_is_inode_lock(lockres));
@@ -279,6 +316,19 @@ static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res
        return (struct ocfs2_dentry_lock *)lockres->l_priv;
 }
 
+static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
+{
+       BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
+
+       return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
+}
+
+static inline struct ocfs2_refcount_tree *
+ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
+{
+       return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
+}
+
 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
 {
        if (lockres->l_ops->get_osb)
@@ -293,9 +343,16 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
                             u32 dlm_flags);
 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
                                                     int wanted);
-static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres,
-                                int level);
+static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                  struct ocfs2_lock_res *lockres,
+                                  int level, unsigned long caller_ip);
+static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                       struct ocfs2_lock_res *lockres,
+                                       int level)
+{
+       __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
+}
+
 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
@@ -304,9 +361,14 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
                                        struct ocfs2_lock_res *lockres);
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
                                                int convert);
-#define ocfs2_log_dlm_error(_func, _err, _lockres) do {                        \
-       mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
-            _err, _func, _lockres->l_name);                            \
+#define ocfs2_log_dlm_error(_func, _err, _lockres) do {                                        \
+       if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)                               \
+               mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",        \
+                    _err, _func, _lockres->l_name);                                    \
+       else                                                                            \
+               mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
+                    _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
+                    (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));                \
 } while (0)
 static int ocfs2_downconvert_thread(void *arg);
 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
@@ -460,6 +522,13 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
        ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
 
        ocfs2_init_lock_stats(res);
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (type != OCFS2_LOCK_TYPE_OPEN)
+               lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
+                                &lockdep_keys[type], 0);
+       else
+               res->l_lockdep_map.key = NULL;
+#endif
 }
 
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
@@ -507,6 +576,13 @@ static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
        return OCFS2_SB(inode->i_sb);
 }
 
+static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_mem_dqinfo *info = lockres->l_priv;
+
+       return OCFS2_SB(info->dqi_gi.dqi_sb);
+}
+
 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
 {
        struct ocfs2_file_private *fp = lockres->l_priv;
@@ -594,6 +670,26 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
                                   &ocfs2_rename_lops, osb);
 }
 
+static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
+                                        struct ocfs2_super *osb)
+{
+       /* nfs_sync lockres doesn't come from a slab so we call init
+        * once on it manually.  */
+       ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
+                                  &ocfs2_nfs_sync_lops, osb);
+}
+
+static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
+                                           struct ocfs2_super *osb)
+{
+       ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
+                                  &ocfs2_orphan_scan_lops, osb);
+}
+
 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
                              struct ocfs2_file_private *fp)
 {
@@ -609,6 +705,28 @@ void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
        lockres->l_flags |= OCFS2_LOCK_NOCACHE;
 }
 
+void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
+                              struct ocfs2_mem_dqinfo *info)
+{
+       ocfs2_lock_res_init_once(lockres);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
+                             0, lockres->l_name);
+       ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
+                                  OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
+                                  info);
+}
+
+void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
+                                 struct ocfs2_super *osb, u64 ref_blkno,
+                                 unsigned int generation)
+{
+       ocfs2_lock_res_init_once(lockres);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
+                             generation, lockres->l_name);
+       ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
+                                  &ocfs2_refcount_block_lops, osb);
+}
+
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
 {
        mlog_entry_void();
@@ -762,6 +880,14 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
        lockres->l_level = lockres->l_requested;
+
+       /*
+        * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
+        * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
+        * downconverting the lock before the upconvert has fully completed.
+        */
+       lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
+
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
        mlog_exit_void();
@@ -794,8 +920,6 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
 
        assert_spin_locked(&lockres->l_lock);
 
-       lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
-
        if (level > lockres->l_blocking) {
                /* only schedule a downconvert if we haven't already scheduled
                 * one that goes low enough to satisfy the level we're
@@ -808,6 +932,13 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
                lockres->l_blocking = level;
        }
 
+       mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
+            lockres->l_name, level, lockres->l_level, lockres->l_blocking,
+            needs_downconvert);
+
+       if (needs_downconvert)
+               lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
+
        mlog_exit(needs_downconvert);
        return needs_downconvert;
 }
@@ -918,18 +1049,17 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
        return lockres->l_pending_gen;
 }
 
-
-static void ocfs2_blocking_ast(void *opaque, int level)
+static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
 {
-       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
        struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        int needs_downconvert;
        unsigned long flags;
 
        BUG_ON(level <= DLM_LOCK_NL);
 
-       mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
-            lockres->l_name, level, lockres->l_level,
+       mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
+            "type %s\n", lockres->l_name, level, lockres->l_level,
             ocfs2_lock_type_string(lockres->l_type));
 
        /*
@@ -950,9 +1080,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
        ocfs2_wake_downconvert_thread(osb);
 }
 
-static void ocfs2_locking_ast(void *opaque)
+static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
 {
-       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
        struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        unsigned long flags;
        int status;
@@ -973,6 +1103,10 @@ static void ocfs2_locking_ast(void *opaque)
                return;
        }
 
+       mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
+            "level %d => %d\n", lockres->l_name, lockres->l_action,
+            lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
+
        switch(lockres->l_action) {
        case OCFS2_AST_ATTACH:
                ocfs2_generic_handle_attach_action(lockres);
@@ -985,8 +1119,8 @@ static void ocfs2_locking_ast(void *opaque)
                ocfs2_generic_handle_downconvert_action(lockres);
                break;
        default:
-               mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
-                    "lockres flags = 0x%lx, unlock action: %u\n",
+               mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
+                    "flags 0x%lx, unlock: %u\n",
                     lockres->l_name, lockres->l_action, lockres->l_flags,
                     lockres->l_unlock_action);
                BUG();
@@ -1012,6 +1146,88 @@ out:
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
+static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
+{
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
+       unsigned long flags;
+
+       mlog_entry_void();
+
+       mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
+            lockres->l_name, lockres->l_unlock_action);
+
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       if (error) {
+               mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
+                    "unlock_action %d\n", error, lockres->l_name,
+                    lockres->l_unlock_action);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+               mlog_exit_void();
+               return;
+       }
+
+       switch(lockres->l_unlock_action) {
+       case OCFS2_UNLOCK_CANCEL_CONVERT:
+               mlog(0, "Cancel convert success for %s\n", lockres->l_name);
+               lockres->l_action = OCFS2_AST_INVALID;
+               /* Downconvert thread may have requeued this lock, we
+                * need to wake it. */
+               if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+                       ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
+               break;
+       case OCFS2_UNLOCK_DROP_LOCK:
+               lockres->l_level = DLM_LOCK_IV;
+               break;
+       default:
+               BUG();
+       }
+
+       lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+       lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+       wake_up(&lockres->l_event);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+       mlog_exit_void();
+}
+
+/*
+ * This is the filesystem locking protocol.  It provides the lock handling
+ * hooks for the underlying DLM.  It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed.  The protocol is negotiated when joining
+ * the dlm domain.  A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes.  When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero.  If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased.  If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
+static struct ocfs2_locking_protocol lproto = {
+       .lp_max_version = {
+               .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+               .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+       },
+       .lp_lock_ast            = ocfs2_locking_ast,
+       .lp_blocking_ast        = ocfs2_blocking_ast,
+       .lp_unlock_ast          = ocfs2_unlock_ast,
+};
+
+void ocfs2_set_locking_protocol(void)
+{
+       ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
+}
+
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
                                                int convert)
 {
@@ -1020,6 +1236,7 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
        mlog_entry_void();
        spin_lock_irqsave(&lockres->l_lock, flags);
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+       lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
        if (convert)
                lockres->l_action = OCFS2_AST_INVALID;
        else
@@ -1066,8 +1283,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
-                            OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            OCFS2_LOCK_ID_MAX_LEN - 1);
        lockres_clear_pending(lockres, gen, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -1185,11 +1401,13 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
        return ret;
 }
 
-static int ocfs2_cluster_lock(struct ocfs2_super *osb,
-                             struct ocfs2_lock_res *lockres,
-                             int level,
-                             u32 lkm_flags,
-                             int arg_flags)
+static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
+                               struct ocfs2_lock_res *lockres,
+                               int level,
+                               u32 lkm_flags,
+                               int arg_flags,
+                               int l_subclass,
+                               unsigned long caller_ip)
 {
        struct ocfs2_mask_waiter mw;
        int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
@@ -1208,13 +1426,13 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
 again:
        wait = 0;
 
+       spin_lock_irqsave(&lockres->l_lock, flags);
+
        if (catch_signals && signal_pending(current)) {
                ret = -ERESTARTSYS;
-               goto out;
+               goto unlock;
        }
 
-       spin_lock_irqsave(&lockres->l_lock, flags);
-
        mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
                        "Cluster lock called on freeing lockres %s! flags "
                        "0x%lx\n", lockres->l_name, lockres->l_flags);
@@ -1231,6 +1449,25 @@ again:
                goto unlock;
        }
 
+       if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
+               /*
+                * We've upconverted. If the lock now has a level we can
+                * work with, we take it. If, however, the lock is not at the
+                * required level, we go thru the full cycle. One way this could
+                * happen is if a process requesting an upconvert to PR is
+                * closely followed by another requesting upconvert to an EX.
+                * If the process requesting EX lands here, we want it to
+                * continue attempting to upconvert and let the process
+                * requesting PR take the lock.
+                * If multiple processes request upconvert to PR, the first one
+                * here will take the lock. The others will have to go thru the
+                * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
+                * downconvert request.
+                */
+               if (level <= lockres->l_level)
+                       goto update_holders;
+       }
+
        if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
            !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
                /* is the lock is currently blocked on behalf of
@@ -1268,7 +1505,7 @@ again:
                BUG_ON(level == DLM_LOCK_IV);
                BUG_ON(level == DLM_LOCK_NL);
 
-               mlog(0, "lock %s, convert from %d to level = %d\n",
+               mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
                     lockres->l_name, lockres->l_level, level);
 
                /* call dlm_lock to upgrade lock now */
@@ -1277,8 +1514,7 @@ again:
                                     &lockres->l_lksb,
                                     lkm_flags,
                                     lockres->l_name,
-                                    OCFS2_LOCK_ID_MAX_LEN - 1,
-                                    lockres);
+                                    OCFS2_LOCK_ID_MAX_LEN - 1);
                lockres_clear_pending(lockres, gen, osb);
                if (ret) {
                        if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
@@ -1290,7 +1526,7 @@ again:
                        goto out;
                }
 
-               mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
+               mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
                     lockres->l_name);
 
                /* At this point we've gone inside the dlm and need to
@@ -1301,11 +1537,14 @@ again:
                goto again;
        }
 
+update_holders:
        /* Ok, if we get here then we're good to go. */
        ocfs2_inc_holders(lockres, level);
 
        ret = 0;
 unlock:
+       lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
+
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 out:
        /*
@@ -1332,13 +1571,37 @@ out:
        }
        ocfs2_update_lock_stats(lockres, level, &mw, ret);
 
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (!ret && lockres->l_lockdep_map.key != NULL) {
+               if (level == DLM_LOCK_PR)
+                       rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
+                               !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
+                               caller_ip);
+               else
+                       rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
+                               !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
+                               caller_ip);
+       }
+#endif
        mlog_exit(ret);
        return ret;
 }
 
-static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres,
-                                int level)
+static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres,
+                                    int level,
+                                    u32 lkm_flags,
+                                    int arg_flags)
+{
+       return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
+                                   0, _RET_IP_);
+}
+
+
+static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                  struct ocfs2_lock_res *lockres,
+                                  int level,
+                                  unsigned long caller_ip)
 {
        unsigned long flags;
 
@@ -1347,6 +1610,10 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
        ocfs2_dec_holders(lockres, level);
        ocfs2_downconvert_on_unlock(osb, lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (lockres->l_lockdep_map.key != NULL)
+               rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
+#endif
        mlog_exit_void();
 }
 
@@ -1434,8 +1701,10 @@ int ocfs2_rw_lock(struct inode *inode, int write)
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             write ? "EXMODE" : "PRMODE");
 
-       if (ocfs2_mount_local(osb))
+       if (ocfs2_mount_local(osb)) {
+               mlog_exit(0);
                return 0;
+       }
 
        lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
@@ -1612,7 +1881,7 @@ out:
  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
  * flock() calls. The locking approach this requires is sufficiently
  * different from all other cluster lock types that we implement a
- * seperate path to the "low-level" dlm calls. In particular:
+ * separate path to the "low-level" dlm calls. In particular:
  *
  * - No optimization of lock levels is done - we take at exactly
  *   what's been requested.
@@ -1682,8 +1951,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
-                            lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
        if (ret) {
                if (!trylock || (ret != -EAGAIN)) {
                        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -1710,7 +1978,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
                 * outstanding lock request, so a cancel convert is
                 * required. We intentionally overwrite 'ret' - if the
                 * cancel fails and the lock was granted, it's easier
-                * to just bubble sucess back up to the user.
+                * to just bubble success back up to the user.
                 */
                ret = ocfs2_flock_handle_signal(lockres, level);
        } else if (!ret && (level > lockres->l_level)) {
@@ -1829,7 +2097,7 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)
 
        mlog_entry_void();
 
-       lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
        /*
         * Invalidate the LVB of a deleted inode - this way other
@@ -1881,7 +2149,7 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
 
        mlog_meta_lvb(0, lockres);
 
-       lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
        /* We're safe here without the lockres lock... */
        spin_lock(&oi->ip_lock);
@@ -1916,10 +2184,10 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
                                              struct ocfs2_lock_res *lockres)
 {
-       struct ocfs2_meta_lvb *lvb =
-               (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
+       struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
-       if (lvb->lvb_version == OCFS2_LVB_VERSION
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
+           && lvb->lvb_version == OCFS2_LVB_VERSION
            && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
                return 1;
        return 0;
@@ -2013,7 +2281,7 @@ static int ocfs2_inode_lock_update(struct inode *inode,
 
        /* This will discard any caching information we might have had
         * for the inode metadata. */
-       ocfs2_metadata_cache_purge(inode);
+       ocfs2_metadata_cache_purge(INODE_CACHE(inode));
 
        ocfs2_extent_map_trunc(inode, 0);
 
@@ -2024,7 +2292,7 @@ static int ocfs2_inode_lock_update(struct inode *inode,
        } else {
                /* Boo, we have to go to disk. */
                /* read bh, cast, ocfs2_refresh_inode */
-               status = ocfs2_read_block(inode, oi->ip_blkno, bh);
+               status = ocfs2_read_inode_block(inode, bh);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail_refresh;
@@ -2032,18 +2300,14 @@ static int ocfs2_inode_lock_update(struct inode *inode,
                fe = (struct ocfs2_dinode *) (*bh)->b_data;
 
                /* This is a good chance to make sure we're not
-                * locking an invalid object.
+                * locking an invalid object.  ocfs2_read_inode_block()
+                * already checked that the inode block is sane.
                 *
                 * We bug on a stale inode here because we checked
                 * above whether it was wiped from disk. The wiping
                 * node provides a guarantee that we receive that
                 * message and can mark the inode before dropping any
                 * locks associated with it. */
-               if (!OCFS2_IS_VALID_DINODE(fe)) {
-                       OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
-                       status = -EIO;
-                       goto bail_refresh;
-               }
                mlog_bug_on_msg(inode->i_generation !=
                                le32_to_cpu(fe->i_generation),
                                "Invalid dinode %llu disk generation: %u "
@@ -2085,7 +2349,7 @@ static int ocfs2_assign_bh(struct inode *inode,
                return 0;
        }
 
-       status = ocfs2_read_block(inode, OCFS2_I(inode)->ip_blkno, ret_bh);
+       status = ocfs2_read_inode_block(inode, ret_bh);
        if (status < 0)
                mlog_errno(status);
 
@@ -2096,10 +2360,11 @@ static int ocfs2_assign_bh(struct inode *inode,
  * returns < 0 error if the callback will never be called, otherwise
  * the result of the lock will be communicated via the callback.
  */
-int ocfs2_inode_lock_full(struct inode *inode,
-                        struct buffer_head **ret_bh,
-                        int ex,
-                        int arg_flags)
+int ocfs2_inode_lock_full_nested(struct inode *inode,
+                                struct buffer_head **ret_bh,
+                                int ex,
+                                int arg_flags,
+                                int subclass)
 {
        int status, level, acquired;
        u32 dlm_flags;
@@ -2137,7 +2402,8 @@ int ocfs2_inode_lock_full(struct inode *inode,
        if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
                dlm_flags |= DLM_LKF_NOQUEUE;
 
-       status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
+       status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
+                                     arg_flags, subclass, _RET_IP_);
        if (status < 0) {
                if (status != -EAGAIN && status != -EIOCBRETRY)
                        mlog_errno(status);
@@ -2303,6 +2569,47 @@ void ocfs2_inode_unlock(struct inode *inode,
        mlog_exit_void();
 }
 
+int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
+{
+       struct ocfs2_lock_res *lockres;
+       struct ocfs2_orphan_scan_lvb *lvb;
+       int status = 0;
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       lockres = &osb->osb_orphan_scan.os_lockres;
+       status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
+       if (status < 0)
+               return status;
+
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
+           lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
+               *seqno = be32_to_cpu(lvb->lvb_os_seqno);
+       else
+               *seqno = osb->osb_orphan_scan.os_seqno + 1;
+
+       return status;
+}
+
+void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
+{
+       struct ocfs2_lock_res *lockres;
+       struct ocfs2_orphan_scan_lvb *lvb;
+
+       if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
+               lockres = &osb->osb_orphan_scan.os_lockres;
+               lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+               lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
+               lvb->lvb_os_seqno = cpu_to_be32(seqno);
+               ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
+       }
+}
+
 int ocfs2_super_lock(struct ocfs2_super *osb,
                     int ex)
 {
@@ -2383,6 +2690,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
                ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
 }
 
+int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
+{
+       int status;
+       struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
+                                   0, 0);
+       if (status < 0)
+               mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
+
+       return status;
+}
+
+void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
+{
+       struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
+
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres,
+                                    ex ? LKM_EXMODE : LKM_PRMODE);
+}
+
 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
 {
        int ret;
@@ -2745,7 +3080,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
        status = ocfs2_cluster_connect(osb->osb_cluster_stack,
                                       osb->uuid_str,
                                       strlen(osb->uuid_str),
-                                      ocfs2_do_node_down, osb,
+                                      &lproto, ocfs2_do_node_down, osb,
                                       &conn);
        if (status) {
                mlog_errno(status);
@@ -2764,6 +3099,8 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
 local:
        ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
        ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
+       ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
+       ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
 
        osb->cconn = conn;
 
@@ -2799,6 +3136,8 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
 
        ocfs2_lock_res_free(&osb->osb_super_lockres);
        ocfs2_lock_res_free(&osb->osb_rename_lockres);
+       ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
+       ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
 
        ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
        osb->cconn = NULL;
@@ -2808,45 +3147,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
        mlog_exit_void();
 }
 
-static void ocfs2_unlock_ast(void *opaque, int error)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-       unsigned long flags;
-
-       mlog_entry_void();
-
-       mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
-            lockres->l_unlock_action);
-
-       spin_lock_irqsave(&lockres->l_lock, flags);
-       if (error) {
-               mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
-                    "unlock_action %d\n", error, lockres->l_name,
-                    lockres->l_unlock_action);
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               return;
-       }
-
-       switch(lockres->l_unlock_action) {
-       case OCFS2_UNLOCK_CANCEL_CONVERT:
-               mlog(0, "Cancel convert success for %s\n", lockres->l_name);
-               lockres->l_action = OCFS2_AST_INVALID;
-               break;
-       case OCFS2_UNLOCK_DROP_LOCK:
-               lockres->l_level = DLM_LOCK_IV;
-               break;
-       default:
-               BUG();
-       }
-
-       lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
-       lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
-       wake_up(&lockres->l_event);
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
-
-       mlog_exit_void();
-}
-
 static int ocfs2_drop_lock(struct ocfs2_super *osb,
                           struct ocfs2_lock_res *lockres)
 {
@@ -2914,15 +3214,14 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        mlog(0, "lock %s\n", lockres->l_name);
 
-       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
-                              lockres);
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
                ocfs2_dlm_dump_lksb(&lockres->l_lksb);
                BUG();
        }
-       mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
+       mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
             lockres->l_name);
 
        ocfs2_wait_on_busy_lock(lockres);
@@ -2934,7 +3233,7 @@ out:
 /* Mark the lockres as being dropped. It will no longer be
  * queued if blocking, but we still may have to wait on it
  * being dequeued from the downconvert thread before we can consider
- * it safe to drop. 
+ * it safe to drop.
  *
  * You can *not* attempt to call cluster_lock on this lockres anymore. */
 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
@@ -2977,6 +3276,8 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
 {
        ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
        ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
 }
 
 int ocfs2_drop_inode_locks(struct inode *inode)
@@ -3021,13 +3322,20 @@ static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
        BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
 
        if (lockres->l_level <= new_level) {
-               mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
-                    lockres->l_level, new_level);
+               mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
+                    "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
+                    "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
+                    new_level, list_empty(&lockres->l_blocked_list),
+                    list_empty(&lockres->l_mask_waiters), lockres->l_type,
+                    lockres->l_flags, lockres->l_ro_holders,
+                    lockres->l_ex_holders, lockres->l_action,
+                    lockres->l_unlock_action, lockres->l_requested,
+                    lockres->l_blocking, lockres->l_pending_gen);
                BUG();
        }
 
-       mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
-            lockres->l_name, new_level, lockres->l_blocking);
+       mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
+            lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
 
        lockres->l_action = OCFS2_AST_DOWNCONVERT;
        lockres->l_requested = new_level;
@@ -3046,6 +3354,9 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
 
        mlog_entry_void();
 
+       mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
+            lockres->l_level, new_level);
+
        if (lvb)
                dlm_flags |= DLM_LKF_VALBLK;
 
@@ -3054,8 +3365,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
-                            OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            OCFS2_LOCK_ID_MAX_LEN - 1);
        lockres_clear_pending(lockres, generation, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3076,14 +3386,12 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
        assert_spin_locked(&lockres->l_lock);
 
        mlog_entry_void();
-       mlog(0, "lock %s\n", lockres->l_name);
 
        if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
                /* If we're already trying to cancel a lock conversion
                 * then just drop the spinlock and allow the caller to
                 * requeue this lock. */
-
-               mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
+               mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
                return 0;
        }
 
@@ -3098,6 +3406,8 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
                        "lock %s, invalid flags: 0x%lx\n",
                        lockres->l_name, lockres->l_flags);
 
+       mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
+
        return 1;
 }
 
@@ -3107,16 +3417,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
        int ret;
 
        mlog_entry_void();
-       mlog(0, "lock %s\n", lockres->l_name);
 
        ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
-                              DLM_LKF_CANCEL, lockres);
+                              DLM_LKF_CANCEL);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 0);
        }
 
-       mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
+       mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
 
        mlog_exit(ret);
        return ret;
@@ -3129,6 +3438,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
        unsigned long flags;
        int blocking;
        int new_level;
+       int level;
        int ret = 0;
        int set_lvb = 0;
        unsigned int gen;
@@ -3137,9 +3447,17 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 
-       BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
-
 recheck:
+       /*
+        * Is it still blocking? If not, we have no more work to do.
+        */
+       if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
+               BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+               ret = 0;
+               goto leave;
+       }
+
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
                /* XXX
                 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
@@ -3164,8 +3482,11 @@ recheck:
                 * at the same time they set OCFS2_DLM_BUSY.  They must
                 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
                 */
-               if (lockres->l_flags & OCFS2_LOCK_PENDING)
+               if (lockres->l_flags & OCFS2_LOCK_PENDING) {
+                       mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
+                            lockres->l_name);
                        goto leave_requeue;
+               }
 
                ctl->requeue = 1;
                ret = ocfs2_prepare_cancel_convert(osb, lockres);
@@ -3178,31 +3499,70 @@ recheck:
                goto leave;
        }
 
+       /*
+        * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
+        * set when the ast is received for an upconvert just before the
+        * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
+        * on the heels of the ast, we want to delay the downconvert just
+        * enough to allow the up requestor to do its task. Because this
+        * lock is in the blocked queue, the lock will be downconverted
+        * as soon as the requestor is done with the lock.
+        */
+       if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
+               goto leave_requeue;
+
+       /*
+        * How can we block and yet be at NL?  We were trying to upconvert
+        * from NL and got canceled.  The code comes back here, and now
+        * we notice and clear BLOCKING.
+        */
+       if (lockres->l_level == DLM_LOCK_NL) {
+               BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
+               mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
+               lockres->l_blocking = DLM_LOCK_NL;
+               lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+               goto leave;
+       }
+
        /* if we're blocking an exclusive and we have *any* holders,
         * then requeue. */
        if ((lockres->l_blocking == DLM_LOCK_EX)
-           && (lockres->l_ex_holders || lockres->l_ro_holders))
+           && (lockres->l_ex_holders || lockres->l_ro_holders)) {
+               mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
+                    lockres->l_name, lockres->l_ex_holders,
+                    lockres->l_ro_holders);
                goto leave_requeue;
+       }
 
        /* If it's a PR we're blocking, then only
         * requeue if we've got any EX holders */
        if (lockres->l_blocking == DLM_LOCK_PR &&
-           lockres->l_ex_holders)
+           lockres->l_ex_holders) {
+               mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
+                    lockres->l_name, lockres->l_ex_holders);
                goto leave_requeue;
+       }
 
        /*
         * Can we get a lock in this state if the holder counts are
         * zero? The meta data unblock code used to check this.
         */
        if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
-           && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
+           && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
+               mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
+                    lockres->l_name);
                goto leave_requeue;
+       }
 
        new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
 
        if (lockres->l_ops->check_downconvert
-           && !lockres->l_ops->check_downconvert(lockres, new_level))
+           && !lockres->l_ops->check_downconvert(lockres, new_level)) {
+               mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
+                    lockres->l_name);
                goto leave_requeue;
+       }
 
        /* If we get here, then we know that there are no more
         * incompatible holders (and anyone asking for an incompatible
@@ -3215,17 +3575,24 @@ recheck:
         * may sleep, so we save off a copy of what we're blocking as
         * it may change while we're not holding the spin lock. */
        blocking = lockres->l_blocking;
+       level = lockres->l_level;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
 
-       if (ctl->unblock_action == UNBLOCK_STOP_POST)
+       if (ctl->unblock_action == UNBLOCK_STOP_POST) {
+               mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
+                    lockres->l_name);
                goto leave;
+       }
 
        spin_lock_irqsave(&lockres->l_lock, flags);
-       if (blocking != lockres->l_blocking) {
+       if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
                /* If this changed underneath us, then we can't drop
                 * it just yet. */
+               mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
+                    "Recheck\n", lockres->l_name, blocking,
+                    lockres->l_blocking, level, lockres->l_level);
                goto recheck;
        }
 
@@ -3304,11 +3671,11 @@ out:
        return UNBLOCK_CONTINUE;
 }
 
-static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
-                                       int new_level)
+static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
+                                struct ocfs2_lock_res *lockres,
+                                int new_level)
 {
-       struct inode *inode = ocfs2_lock_res_inode(lockres);
-       int checkpointed = ocfs2_inode_fully_checkpointed(inode);
+       int checkpointed = ocfs2_ci_fully_checkpointed(ci);
 
        BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
        BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
@@ -3316,10 +3683,18 @@ static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
        if (checkpointed)
                return 1;
 
-       ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
+       ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
        return 0;
 }
 
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+                                       int new_level)
+{
+       struct inode *inode = ocfs2_lock_res_inode(lockres);
+
+       return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
+}
+
 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 {
        struct inode *inode = ocfs2_lock_res_inode(lockres);
@@ -3449,44 +3824,168 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
        return UNBLOCK_CONTINUE_POST;
 }
 
-/*
- * This is the filesystem locking protocol.  It provides the lock handling
- * hooks for the underlying DLM.  It has a maximum version number.
- * The version number allows interoperability with systems running at
- * the same major number and an equal or smaller minor number.
- *
- * Whenever the filesystem does new things with locks (adds or removes a
- * lock, orders them differently, does different things underneath a lock),
- * the version must be changed.  The protocol is negotiated when joining
- * the dlm domain.  A node may join the domain if its major version is
- * identical to all other nodes and its minor version is greater than
- * or equal to all other nodes.  When its minor version is greater than
- * the other nodes, it will run at the minor version specified by the
- * other nodes.
- *
- * If a locking change is made that will not be compatible with older
- * versions, the major number must be increased and the minor version set
- * to zero.  If a change merely adds a behavior that can be disabled when
- * speaking to older versions, the minor version must be increased.  If a
- * change adds a fully backwards compatible change (eg, LVB changes that
- * are just ignored by older versions), the version does not need to be
- * updated.
- */
-static struct ocfs2_locking_protocol lproto = {
-       .lp_max_version = {
-               .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
-               .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
-       },
-       .lp_lock_ast            = ocfs2_locking_ast,
-       .lp_blocking_ast        = ocfs2_blocking_ast,
-       .lp_unlock_ast          = ocfs2_unlock_ast,
-};
+static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
+                                           int new_level)
+{
+       struct ocfs2_refcount_tree *tree =
+                               ocfs2_lock_res_refcount_tree(lockres);
 
-void ocfs2_set_locking_protocol(void)
+       return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
+}
+
+static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
+                                        int blocking)
+{
+       struct ocfs2_refcount_tree *tree =
+                               ocfs2_lock_res_refcount_tree(lockres);
+
+       ocfs2_metadata_cache_purge(&tree->rf_ci);
+
+       return UNBLOCK_CONTINUE;
+}
+
+static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_qinfo_lvb *lvb;
+       struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
+       struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
+                                           oinfo->dqi_gi.dqi_type);
+
+       mlog_entry_void();
+
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
+       lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
+       lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
+       lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
+       lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
+       lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
+       lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
+
+       mlog_exit_void();
+}
+
+void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
+{
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+
+       mlog_entry_void();
+       if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, level);
+       mlog_exit_void();
+}
+
+static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
+{
+       struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
+                                           oinfo->dqi_gi.dqi_type);
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       struct buffer_head *bh = NULL;
+       struct ocfs2_global_disk_dqinfo *gdinfo;
+       int status = 0;
+
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
+           lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
+               info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
+               info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
+               oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
+               oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
+               oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
+               oinfo->dqi_gi.dqi_free_entry =
+                                       be32_to_cpu(lvb->lvb_free_entry);
+       } else {
+               status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
+               if (status) {
+                       mlog_errno(status);
+                       goto bail;
+               }
+               gdinfo = (struct ocfs2_global_disk_dqinfo *)
+                                       (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
+               info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
+               info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
+               oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
+               oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
+               oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
+               oinfo->dqi_gi.dqi_free_entry =
+                                       le32_to_cpu(gdinfo->dqi_free_entry);
+               brelse(bh);
+               ocfs2_track_lock_refresh(lockres);
+       }
+
+bail:
+       return status;
+}
+
+/* Lock quota info, this function expects at least shared lock on the quota file
+ * so that we can safely refresh quota info from disk. */
+int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
 {
-       ocfs2_stack_glue_set_locking_protocol(&lproto);
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       int status = 0;
+
+       mlog_entry_void();
+
+       /* On RO devices, locking really isn't needed... */
+       if (ocfs2_is_hard_readonly(osb)) {
+               if (ex)
+                       status = -EROFS;
+               goto bail;
+       }
+       if (ocfs2_mount_local(osb))
+               goto bail;
+
+       status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
+       if (status < 0) {
+               mlog_errno(status);
+               goto bail;
+       }
+       if (!ocfs2_should_refresh_lock_res(lockres))
+               goto bail;
+       /* OK, we have the lock but we need to refresh the quota info */
+       status = ocfs2_refresh_qinfo(oinfo);
+       if (status)
+               ocfs2_qinfo_unlock(oinfo, ex);
+       ocfs2_complete_lock_res_refresh(lockres, status);
+bail:
+       mlog_exit(status);
+       return status;
 }
 
+int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
+{
+       int status;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
+       struct ocfs2_super *osb = lockres->l_priv;
+
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
+       if (status < 0)
+               mlog_errno(status);
+
+       return status;
+}
+
+void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
+{
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
+       struct ocfs2_super *osb = lockres->l_priv;
+
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, level);
+}
 
 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *lockres)
@@ -3504,7 +4003,7 @@ static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
        BUG_ON(!lockres);
        BUG_ON(!lockres->l_ops);
 
-       mlog(0, "lockres %s blocked.\n", lockres->l_name);
+       mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
 
        /* Detect whether a lock has been marked as going away while
         * the downconvert thread was processing other things. A lock can
@@ -3527,7 +4026,7 @@ unqueue:
        } else
                ocfs2_schedule_blocked_lock(osb, lockres);
 
-       mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
+       mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
             ctl.requeue ? "yes" : "no");
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
@@ -3549,7 +4048,7 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
                /* Do not schedule a lock for downconvert when it's on
                 * the way to destruction - any nodes wanting access
                 * to the resource will get it soon. */
-               mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
+               mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
                     lockres->l_name, lockres->l_flags);
                return;
        }