tunnels: fix netns vs proto registration ordering
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
index 27e43b0..c5e4a49 100644 (file)
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/mm.h>
-#include <linux/smp_lock.h>
-#include <linux/crc32.h>
 #include <linux/kthread.h>
 #include <linux/pagemap.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
-
-#include <cluster/heartbeat.h>
-#include <cluster/nodemanager.h>
-#include <cluster/tcp.h>
-
-#include <dlm/dlmapi.h>
+#include <linux/time.h>
+#include <linux/quotaops.h>
 
 #define MLOG_MASK_PREFIX ML_DLM_GLUE
 #include <cluster/masklog.h>
 
 #include "ocfs2.h"
+#include "ocfs2_lockingver.h"
 
 #include "alloc.h"
 #include "dcache.h"
 #include "heartbeat.h"
 #include "inode.h"
 #include "journal.h"
+#include "stackglue.h"
 #include "slot_map.h"
 #include "super.h"
 #include "uptodate.h"
-#include "vote.h"
+#include "quota.h"
+#include "refcounttree.h"
 
 #include "buffer_head_io.h"
 
@@ -66,10 +63,15 @@ struct ocfs2_mask_waiter {
        struct completion       mw_complete;
        unsigned long           mw_mask;
        unsigned long           mw_goal;
+#ifdef CONFIG_OCFS2_FS_STATS
+       unsigned long long      mw_lock_start;
+#endif
 };
 
 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
 
 /*
  * Return value from ->downconvert_worker functions.
@@ -91,6 +93,9 @@ struct ocfs2_unblock_ctl {
        enum ocfs2_unblock_action unblock_action;
 };
 
+/* Lockdep class keys */
+struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
+
 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
                                        int new_level);
 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
@@ -104,6 +109,41 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
                                     struct ocfs2_lock_res *lockres);
 
+static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
+
+static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
+                                           int new_level);
+static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
+                                        int blocking);
+
+#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
+
+/* This aids in debugging situations where a bad LVB might be involved. */
+static void ocfs2_dump_meta_lvb_info(u64 level,
+                                    const char *function,
+                                    unsigned int line,
+                                    struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+
+       mlog(level, "LVB information for %s (called from %s:%u):\n",
+            lockres->l_name, function, line);
+       mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
+            lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
+            be32_to_cpu(lvb->lvb_igeneration));
+       mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
+            (unsigned long long)be64_to_cpu(lvb->lvb_isize),
+            be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
+            be16_to_cpu(lvb->lvb_imode));
+       mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
+            "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
+            (long long)be64_to_cpu(lvb->lvb_iatime_packed),
+            (long long)be64_to_cpu(lvb->lvb_ictime_packed),
+            (long long)be64_to_cpu(lvb->lvb_imtime_packed),
+            be32_to_cpu(lvb->lvb_iattr));
+}
+
+
 /*
  * OCFS2 Lock Resource Operations
  *
@@ -125,10 +165,10 @@ struct ocfs2_lock_res_ops {
        struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
 
        /*
-        * Optionally called in the downconvert (or "vote") thread
-        * after a successful downconvert. The lockres will not be
-        * referenced after this callback is called, so it is safe to
-        * free memory, etc.
+        * Optionally called in the downconvert thread after a
+        * successful downconvert. The lockres will not be referenced
+        * after this callback is called, so it is safe to free
+        * memory, etc.
         *
         * The exact semantics of when this is called are controlled
         * by ->downconvert_worker()
@@ -197,17 +237,12 @@ static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
        .flags          = 0,
 };
 
-static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
+static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
        .get_osb        = ocfs2_get_inode_osb,
        .check_downconvert = ocfs2_check_meta_downconvert,
        .set_lvb        = ocfs2_set_meta_lvb,
-       .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
-};
-
-static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
-       .get_osb        = ocfs2_get_inode_osb,
        .downconvert_worker = ocfs2_data_convert_worker,
-       .flags          = 0,
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
@@ -218,6 +253,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
        .flags          = 0,
 };
 
+static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
+       .flags          = 0,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
+};
+
 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
        .get_osb        = ocfs2_get_dentry_osb,
        .post_unlock    = ocfs2_dentry_post_unlock,
@@ -230,10 +273,26 @@ static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
        .flags          = 0,
 };
 
+static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
+       .get_osb        = ocfs2_get_file_osb,
+       .flags          = 0,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
+       .set_lvb        = ocfs2_set_qinfo_lvb,
+       .get_osb        = ocfs2_get_qinfo_osb,
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
+       .check_downconvert = ocfs2_check_refcount_downconvert,
+       .downconvert_worker = ocfs2_refcount_convert_worker,
+       .flags          = 0,
+};
+
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 {
        return lockres->l_type == OCFS2_LOCK_TYPE_META ||
-               lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
                lockres->l_type == OCFS2_LOCK_TYPE_RW ||
                lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
 }
@@ -252,6 +311,19 @@ static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res
        return (struct ocfs2_dentry_lock *)lockres->l_priv;
 }
 
+static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
+{
+       BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
+
+       return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
+}
+
+static inline struct ocfs2_refcount_tree *
+ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
+{
+       return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
+}
+
 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
 {
        if (lockres->l_ops->get_osb)
@@ -263,12 +335,19 @@ static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *l
 static int ocfs2_lock_create(struct ocfs2_super *osb,
                             struct ocfs2_lock_res *lockres,
                             int level,
-                            int dlm_flags);
+                            u32 dlm_flags);
 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
                                                     int wanted);
-static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres,
-                                int level);
+static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                  struct ocfs2_lock_res *lockres,
+                                  int level, unsigned long caller_ip);
+static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                       struct ocfs2_lock_res *lockres,
+                                       int level)
+{
+       __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
+}
+
 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
@@ -277,17 +356,34 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
                                        struct ocfs2_lock_res *lockres);
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
                                                int convert);
-#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {       \
-       mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
-               "resource %s: %s\n", dlm_errname(_stat), _func, \
-               _lockres->l_name, dlm_errmsg(_stat));           \
+#define ocfs2_log_dlm_error(_func, _err, _lockres) do {                                        \
+       if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)                               \
+               mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",        \
+                    _err, _func, _lockres->l_name);                                    \
+       else                                                                            \
+               mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
+                    _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
+                    (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));                \
 } while (0)
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres);
-static int ocfs2_meta_lock_update(struct inode *inode,
+static int ocfs2_downconvert_thread(void *arg);
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+                                       struct ocfs2_lock_res *lockres);
+static int ocfs2_inode_lock_update(struct inode *inode,
                                  struct buffer_head **bh);
 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 static inline int ocfs2_highest_compat_lock_level(int level);
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+                                             int new_level);
+static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
+                                 struct ocfs2_lock_res *lockres,
+                                 int new_level,
+                                 int lvb,
+                                 unsigned int generation);
+static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
+                                       struct ocfs2_lock_res *lockres);
+static int ocfs2_cancel_convert(struct ocfs2_super *osb,
+                               struct ocfs2_lock_res *lockres);
+
 
 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
                                  u64 blkno,
@@ -331,6 +427,75 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
        spin_unlock(&ocfs2_dlm_tracking_lock);
 }
 
+#ifdef CONFIG_OCFS2_FS_STATS
+static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
+{
+       res->l_lock_num_prmode = 0;
+       res->l_lock_num_prmode_failed = 0;
+       res->l_lock_total_prmode = 0;
+       res->l_lock_max_prmode = 0;
+       res->l_lock_num_exmode = 0;
+       res->l_lock_num_exmode_failed = 0;
+       res->l_lock_total_exmode = 0;
+       res->l_lock_max_exmode = 0;
+       res->l_lock_refresh = 0;
+}
+
+static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
+                                   struct ocfs2_mask_waiter *mw, int ret)
+{
+       unsigned long long *num, *sum;
+       unsigned int *max, *failed;
+       struct timespec ts = current_kernel_time();
+       unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
+
+       if (level == LKM_PRMODE) {
+               num = &res->l_lock_num_prmode;
+               sum = &res->l_lock_total_prmode;
+               max = &res->l_lock_max_prmode;
+               failed = &res->l_lock_num_prmode_failed;
+       } else if (level == LKM_EXMODE) {
+               num = &res->l_lock_num_exmode;
+               sum = &res->l_lock_total_exmode;
+               max = &res->l_lock_max_exmode;
+               failed = &res->l_lock_num_exmode_failed;
+       } else
+               return;
+
+       (*num)++;
+       (*sum) += time;
+       if (time > *max)
+               *max = time;
+       if (ret)
+               (*failed)++;
+}
+
+static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
+{
+       lockres->l_lock_refresh++;
+}
+
+static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
+{
+       struct timespec ts = current_kernel_time();
+       mw->mw_lock_start = timespec_to_ns(&ts);
+}
+#else
+static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
+{
+}
+static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
+                          int level, struct ocfs2_mask_waiter *mw, int ret)
+{
+}
+static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
+{
+}
+static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
+{
+}
+#endif
+
 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *res,
                                       enum ocfs2_lock_type type,
@@ -341,15 +506,24 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
        res->l_ops           = ops;
        res->l_priv          = priv;
 
-       res->l_level         = LKM_IVMODE;
-       res->l_requested     = LKM_IVMODE;
-       res->l_blocking      = LKM_IVMODE;
+       res->l_level         = DLM_LOCK_IV;
+       res->l_requested     = DLM_LOCK_IV;
+       res->l_blocking      = DLM_LOCK_IV;
        res->l_action        = OCFS2_AST_INVALID;
        res->l_unlock_action = OCFS2_UNLOCK_INVALID;
 
        res->l_flags         = OCFS2_LOCK_INITIALIZED;
 
        ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
+
+       ocfs2_init_lock_stats(res);
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (type != OCFS2_LOCK_TYPE_OPEN)
+               lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
+                                &lockdep_keys[type], 0);
+       else
+               res->l_lockdep_map.key = NULL;
+#endif
 }
 
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
@@ -374,10 +548,7 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                        ops = &ocfs2_inode_rw_lops;
                        break;
                case OCFS2_LOCK_TYPE_META:
-                       ops = &ocfs2_inode_meta_lops;
-                       break;
-               case OCFS2_LOCK_TYPE_DATA:
-                       ops = &ocfs2_inode_data_lops;
+                       ops = &ocfs2_inode_inode_lops;
                        break;
                case OCFS2_LOCK_TYPE_OPEN:
                        ops = &ocfs2_inode_open_lops;
@@ -400,6 +571,20 @@ static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
        return OCFS2_SB(inode->i_sb);
 }
 
+static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_mem_dqinfo *info = lockres->l_priv;
+
+       return OCFS2_SB(info->dqi_gi.dqi_sb);
+}
+
+static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_file_private *fp = lockres->l_priv;
+
+       return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
+}
+
 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
 {
        __be64 inode_blkno_be;
@@ -480,6 +665,63 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
                                   &ocfs2_rename_lops, osb);
 }
 
+static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
+                                        struct ocfs2_super *osb)
+{
+       /* nfs_sync lockres doesn't come from a slab so we call init
+        * once on it manually.  */
+       ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
+                                  &ocfs2_nfs_sync_lops, osb);
+}
+
+static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
+                                           struct ocfs2_super *osb)
+{
+       ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
+                                  &ocfs2_orphan_scan_lops, osb);
+}
+
+void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
+                             struct ocfs2_file_private *fp)
+{
+       struct inode *inode = fp->fp_file->f_mapping->host;
+       struct ocfs2_inode_info *oi = OCFS2_I(inode);
+
+       ocfs2_lock_res_init_once(lockres);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
+                             inode->i_generation, lockres->l_name);
+       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+                                  OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
+                                  fp);
+       lockres->l_flags |= OCFS2_LOCK_NOCACHE;
+}
+
+void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
+                              struct ocfs2_mem_dqinfo *info)
+{
+       ocfs2_lock_res_init_once(lockres);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
+                             0, lockres->l_name);
+       ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
+                                  OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
+                                  info);
+}
+
+void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
+                                 struct ocfs2_super *osb, u64 ref_blkno,
+                                 unsigned int generation)
+{
+       ocfs2_lock_res_init_once(lockres);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
+                             generation, lockres->l_name);
+       ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
+                                  &ocfs2_refcount_block_lops, osb);
+}
+
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
 {
        mlog_entry_void();
@@ -520,10 +762,10 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
        BUG_ON(!lockres);
 
        switch(level) {
-       case LKM_EXMODE:
+       case DLM_LOCK_EX:
                lockres->l_ex_holders++;
                break;
-       case LKM_PRMODE:
+       case DLM_LOCK_PR:
                lockres->l_ro_holders++;
                break;
        default:
@@ -541,11 +783,11 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
        BUG_ON(!lockres);
 
        switch(level) {
-       case LKM_EXMODE:
+       case DLM_LOCK_EX:
                BUG_ON(!lockres->l_ex_holders);
                lockres->l_ex_holders--;
                break;
-       case LKM_PRMODE:
+       case DLM_LOCK_PR:
                BUG_ON(!lockres->l_ro_holders);
                lockres->l_ro_holders--;
                break;
@@ -560,27 +802,25 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
  * lock types are added. */
 static inline int ocfs2_highest_compat_lock_level(int level)
 {
-       int new_level = LKM_EXMODE;
+       int new_level = DLM_LOCK_EX;
 
-       if (level == LKM_EXMODE)
-               new_level = LKM_NLMODE;
-       else if (level == LKM_PRMODE)
-               new_level = LKM_PRMODE;
+       if (level == DLM_LOCK_EX)
+               new_level = DLM_LOCK_NL;
+       else if (level == DLM_LOCK_PR)
+               new_level = DLM_LOCK_PR;
        return new_level;
 }
 
 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
                              unsigned long newflags)
 {
-       struct list_head *pos, *tmp;
-       struct ocfs2_mask_waiter *mw;
+       struct ocfs2_mask_waiter *mw, *tmp;
 
        assert_spin_locked(&lockres->l_lock);
 
        lockres->l_flags = newflags;
 
-       list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
-               mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
+       list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
                if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
                        continue;
 
@@ -606,12 +846,12 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
-       BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+       BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
 
        lockres->l_level = lockres->l_requested;
        if (lockres->l_level <=
            ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
-               lockres->l_blocking = LKM_NLMODE;
+               lockres->l_blocking = DLM_LOCK_NL;
                lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
        }
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
@@ -630,7 +870,7 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
         * information is already up to data. Convert from NL to
         * *anything* however should mark ourselves as needing an
         * update */
-       if (lockres->l_level == LKM_NLMODE &&
+       if (lockres->l_level == DLM_LOCK_NL &&
            lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
@@ -644,10 +884,10 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc
 {
        mlog_entry_void();
 
-       BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
+       BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
-       if (lockres->l_requested > LKM_NLMODE &&
+       if (lockres->l_requested > DLM_LOCK_NL &&
            !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
            lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
@@ -685,6 +925,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
        return needs_downconvert;
 }
 
+/*
+ * OCFS2_LOCK_PENDING and l_pending_gen.
+ *
+ * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
+ * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
+ * for more details on the race.
+ *
+ * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
+ * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
+ * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
+ * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
+ * the caller is going to try to clear PENDING again.  If nothing else is
+ * happening, __lockres_clear_pending() sees PENDING is unset and does
+ * nothing.
+ *
+ * But what if another path (eg downconvert thread) has just started a
+ * new locking action?  The other path has re-set PENDING.  Our path
+ * cannot clear PENDING, because that will re-open the original race
+ * window.
+ *
+ * [Example]
+ *
+ * ocfs2_meta_lock()
+ *  ocfs2_cluster_lock()
+ *   set BUSY
+ *   set PENDING
+ *   drop l_lock
+ *   ocfs2_dlm_lock()
+ *    ocfs2_locking_ast()              ocfs2_downconvert_thread()
+ *     clear PENDING                    ocfs2_unblock_lock()
+ *                                       take_l_lock
+ *                                       !BUSY
+ *                                       ocfs2_prepare_downconvert()
+ *                                        set BUSY
+ *                                        set PENDING
+ *                                       drop l_lock
+ *   take l_lock
+ *   clear PENDING
+ *   drop l_lock
+ *                     <window>
+ *                                       ocfs2_dlm_lock()
+ *
+ * So as you can see, we now have a window where l_lock is not held,
+ * PENDING is not set, and ocfs2_dlm_lock() has not been called.
+ *
+ * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
+ * set by ocfs2_prepare_downconvert().  That wasn't nice.
+ *
+ * To solve this we introduce l_pending_gen.  A call to
+ * lockres_clear_pending() will only do so when it is passed a generation
+ * number that matches the lockres.  lockres_set_pending() will return the
+ * current generation number.  When ocfs2_cluster_lock() goes to clear
+ * PENDING, it passes the generation it got from set_pending().  In our
+ * example above, the generation numbers will *not* match.  Thus,
+ * ocfs2_cluster_lock() will not clear the PENDING set by
+ * ocfs2_prepare_downconvert().
+ */
+
+/* Unlocked version for ocfs2_locking_ast() */
+static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
+                                   unsigned int generation,
+                                   struct ocfs2_super *osb)
+{
+       assert_spin_locked(&lockres->l_lock);
+
+       /*
+        * The ast and locking functions can race us here.  The winner
+        * will clear pending, the loser will not.
+        */
+       if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
+           (lockres->l_pending_gen != generation))
+               return;
+
+       lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
+       lockres->l_pending_gen++;
+
+       /*
+        * The downconvert thread may have skipped us because we
+        * were PENDING.  Wake it up.
+        */
+       if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+               ocfs2_wake_downconvert_thread(osb);
+}
+
+/* Locked version for callers of ocfs2_dlm_lock() */
+static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
+                                 unsigned int generation,
+                                 struct ocfs2_super *osb)
+{
+       unsigned long flags;
+
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       __lockres_clear_pending(lockres, generation, osb);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+}
+
+static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
+{
+       assert_spin_locked(&lockres->l_lock);
+       BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+
+       lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
+
+       return lockres->l_pending_gen;
+}
+
+
 static void ocfs2_blocking_ast(void *opaque, int level)
 {
        struct ocfs2_lock_res *lockres = opaque;
@@ -692,12 +1039,19 @@ static void ocfs2_blocking_ast(void *opaque, int level)
        int needs_downconvert;
        unsigned long flags;
 
-       BUG_ON(level <= LKM_NLMODE);
+       BUG_ON(level <= DLM_LOCK_NL);
 
        mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
             lockres->l_name, level, lockres->l_level,
             ocfs2_lock_type_string(lockres->l_type));
 
+       /*
+        * We can skip the bast for locks which don't enable caching -
+        * they'll be dropped at the earliest possible time anyway.
+        */
+       if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
+               return;
+
        spin_lock_irqsave(&lockres->l_lock, flags);
        needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
        if (needs_downconvert)
@@ -706,20 +1060,28 @@ static void ocfs2_blocking_ast(void *opaque, int level)
 
        wake_up(&lockres->l_event);
 
-       ocfs2_kick_vote_thread(osb);
+       ocfs2_wake_downconvert_thread(osb);
 }
 
 static void ocfs2_locking_ast(void *opaque)
 {
        struct ocfs2_lock_res *lockres = opaque;
-       struct dlm_lockstatus *lksb = &lockres->l_lksb;
+       struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        unsigned long flags;
+       int status;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 
-       if (lksb->status != DLM_NORMAL) {
-               mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
-                    lockres->l_name, lksb->status);
+       status = ocfs2_dlm_lock_status(&lockres->l_lksb);
+
+       if (status == -EAGAIN) {
+               lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+               goto out;
+       }
+
+       if (status) {
+               mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
+                    lockres->l_name, status);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
                return;
        }
@@ -742,11 +1104,23 @@ static void ocfs2_locking_ast(void *opaque)
                     lockres->l_unlock_action);
                BUG();
        }
-
+out:
        /* set it to something invalid so if we get called again we
         * can catch it. */
        lockres->l_action = OCFS2_AST_INVALID;
 
+       /* Did we try to cancel this lock?  Clear that state */
+       if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
+               lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+
+       /*
+        * We may have beaten the locking functions here.  We certainly
+        * know that dlm_lock() has been called :-)
+        * Because we can't have two lock calls in flight at once, we
+        * can use lockres->l_pending_gen.
+        */
+       __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
+
        wake_up(&lockres->l_event);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
@@ -776,15 +1150,15 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
 static int ocfs2_lock_create(struct ocfs2_super *osb,
                             struct ocfs2_lock_res *lockres,
                             int level,
-                            int dlm_flags)
+                            u32 dlm_flags)
 {
        int ret = 0;
-       enum dlm_status status = DLM_NORMAL;
        unsigned long flags;
+       unsigned int gen;
 
        mlog_entry_void();
 
-       mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
+       mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
             dlm_flags);
 
        spin_lock_irqsave(&lockres->l_lock, flags);
@@ -797,24 +1171,23 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
        lockres->l_action = OCFS2_AST_ATTACH;
        lockres->l_requested = level;
        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+       gen = lockres_set_pending(lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       status = dlmlock(osb->dlm,
-                        level,
-                        &lockres->l_lksb,
-                        dlm_flags,
-                        lockres->l_name,
-                        OCFS2_LOCK_ID_MAX_LEN - 1,
-                        ocfs2_locking_ast,
-                        lockres,
-                        ocfs2_blocking_ast);
-       if (status != DLM_NORMAL) {
-               ocfs2_log_dlm_error("dlmlock", status, lockres);
-               ret = -EINVAL;
+       ret = ocfs2_dlm_lock(osb->cconn,
+                            level,
+                            &lockres->l_lksb,
+                            dlm_flags,
+                            lockres->l_name,
+                            OCFS2_LOCK_ID_MAX_LEN - 1,
+                            lockres);
+       lockres_clear_pending(lockres, gen, osb);
+       if (ret) {
+               ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 1);
        }
 
-       mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
+       mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
 
 bail:
        mlog_exit(ret);
@@ -863,6 +1236,7 @@ static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
 {
        INIT_LIST_HEAD(&mw->mw_item);
        init_completion(&mw->mw_complete);
+       ocfs2_init_start_time(mw);
 }
 
 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
@@ -909,24 +1283,42 @@ static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
 
 }
 
-static int ocfs2_cluster_lock(struct ocfs2_super *osb,
-                             struct ocfs2_lock_res *lockres,
-                             int level,
-                             int lkm_flags,
-                             int arg_flags)
+static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
+                                            struct ocfs2_lock_res *lockres)
+{
+       int ret;
+
+       ret = wait_for_completion_interruptible(&mw->mw_complete);
+       if (ret)
+               lockres_remove_mask_waiter(lockres, mw);
+       else
+               ret = mw->mw_status;
+       /* Re-arm the completion in case we want to wait on it again */
+       INIT_COMPLETION(mw->mw_complete);
+       return ret;
+}
+
+static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
+                               struct ocfs2_lock_res *lockres,
+                               int level,
+                               u32 lkm_flags,
+                               int arg_flags,
+                               int l_subclass,
+                               unsigned long caller_ip)
 {
        struct ocfs2_mask_waiter mw;
-       enum dlm_status status;
        int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
        int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
        unsigned long flags;
+       unsigned int gen;
+       int noqueue_attempted = 0;
 
        mlog_entry_void();
 
        ocfs2_init_mask_waiter(&mw);
 
        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
-               lkm_flags |= LKM_VALBLK;
+               lkm_flags |= DLM_LKF_VALBLK;
 
 again:
        wait = 0;
@@ -954,18 +1346,6 @@ again:
                goto unlock;
        }
 
-       if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
-               /* lock has not been created yet. */
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-
-               ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
-               if (ret < 0) {
-                       mlog_errno(ret);
-                       goto out;
-               }
-               goto again;
-       }
-
        if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
            !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
                /* is the lock is currently blocked on behalf of
@@ -976,45 +1356,56 @@ again:
        }
 
        if (level > lockres->l_level) {
+               if (noqueue_attempted > 0) {
+                       ret = -EAGAIN;
+                       goto unlock;
+               }
+               if (lkm_flags & DLM_LKF_NOQUEUE)
+                       noqueue_attempted = 1;
+
                if (lockres->l_action != OCFS2_AST_INVALID)
                        mlog(ML_ERROR, "lockres %s has action %u pending\n",
                             lockres->l_name, lockres->l_action);
 
-               lockres->l_action = OCFS2_AST_CONVERT;
+               if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
+                       lockres->l_action = OCFS2_AST_ATTACH;
+                       lkm_flags &= ~DLM_LKF_CONVERT;
+               } else {
+                       lockres->l_action = OCFS2_AST_CONVERT;
+                       lkm_flags |= DLM_LKF_CONVERT;
+               }
+
                lockres->l_requested = level;
                lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+               gen = lockres_set_pending(lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-               BUG_ON(level == LKM_IVMODE);
-               BUG_ON(level == LKM_NLMODE);
+               BUG_ON(level == DLM_LOCK_IV);
+               BUG_ON(level == DLM_LOCK_NL);
 
                mlog(0, "lock %s, convert from %d to level = %d\n",
                     lockres->l_name, lockres->l_level, level);
 
                /* call dlm_lock to upgrade lock now */
-               status = dlmlock(osb->dlm,
-                                level,
-                                &lockres->l_lksb,
-                                lkm_flags|LKM_CONVERT,
-                                lockres->l_name,
-                                OCFS2_LOCK_ID_MAX_LEN - 1,
-                                ocfs2_locking_ast,
-                                lockres,
-                                ocfs2_blocking_ast);
-               if (status != DLM_NORMAL) {
-                       if ((lkm_flags & LKM_NOQUEUE) &&
-                           (status == DLM_NOTQUEUED))
-                               ret = -EAGAIN;
-                       else {
-                               ocfs2_log_dlm_error("dlmlock", status,
-                                                   lockres);
-                               ret = -EINVAL;
+               ret = ocfs2_dlm_lock(osb->cconn,
+                                    level,
+                                    &lockres->l_lksb,
+                                    lkm_flags,
+                                    lockres->l_name,
+                                    OCFS2_LOCK_ID_MAX_LEN - 1,
+                                    lockres);
+               lockres_clear_pending(lockres, gen, osb);
+               if (ret) {
+                       if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
+                           (ret != -EAGAIN)) {
+                               ocfs2_log_dlm_error("ocfs2_dlm_lock",
+                                                   ret, lockres);
                        }
                        ocfs2_recover_from_dlm_error(lockres, 1);
                        goto out;
                }
 
-               mlog(0, "lock %s, successfull return from dlmlock\n",
+               mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
                     lockres->l_name);
 
                /* At this point we've gone inside the dlm and need to
@@ -1054,22 +1445,51 @@ out:
                        goto again;
                mlog_errno(ret);
        }
+       ocfs2_update_lock_stats(lockres, level, &mw, ret);
 
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (!ret && lockres->l_lockdep_map.key != NULL) {
+               if (level == DLM_LOCK_PR)
+                       rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
+                               !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
+                               caller_ip);
+               else
+                       rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
+                               !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
+                               caller_ip);
+       }
+#endif
        mlog_exit(ret);
        return ret;
 }
 
-static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres,
-                                int level)
+static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres,
+                                    int level,
+                                    u32 lkm_flags,
+                                    int arg_flags)
+{
+       return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
+                                   0, _RET_IP_);
+}
+
+
+static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
+                                  struct ocfs2_lock_res *lockres,
+                                  int level,
+                                  unsigned long caller_ip)
 {
        unsigned long flags;
 
        mlog_entry_void();
        spin_lock_irqsave(&lockres->l_lock, flags);
        ocfs2_dec_holders(lockres, level);
-       ocfs2_vote_on_unlock(osb, lockres);
+       ocfs2_downconvert_on_unlock(osb, lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (lockres->l_lockdep_map.key != NULL)
+               rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
+#endif
        mlog_exit_void();
 }
 
@@ -1078,9 +1498,9 @@ static int ocfs2_create_new_lock(struct ocfs2_super *osb,
                                 int ex,
                                 int local)
 {
-       int level =  ex ? LKM_EXMODE : LKM_PRMODE;
+       int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        unsigned long flags;
-       int lkm_flags = local ? LKM_LOCAL : 0;
+       u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
@@ -1123,16 +1543,10 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
        }
 
        /*
-        * We don't want to use LKM_LOCAL on a meta data lock as they
+        * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
         * don't use a generation in their lock names.
         */
-       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
-       if (ret) {
-               mlog_errno(ret);
-               goto bail;
-       }
-
-       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
        if (ret) {
                mlog_errno(ret);
                goto bail;
@@ -1163,12 +1577,14 @@ int ocfs2_rw_lock(struct inode *inode, int write)
             (unsigned long long)OCFS2_I(inode)->ip_blkno,
             write ? "EXMODE" : "PRMODE");
 
-       if (ocfs2_mount_local(osb))
+       if (ocfs2_mount_local(osb)) {
+               mlog_exit(0);
                return 0;
+       }
 
        lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
-       level = write ? LKM_EXMODE : LKM_PRMODE;
+       level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
 
        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
                                    0);
@@ -1181,7 +1597,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)
 
 void ocfs2_rw_unlock(struct inode *inode, int write)
 {
-       int level = write ? LKM_EXMODE : LKM_PRMODE;
+       int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
@@ -1219,7 +1635,7 @@ int ocfs2_open_lock(struct inode *inode)
        lockres = &OCFS2_I(inode)->ip_open_lockres;
 
        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
-                                   LKM_PRMODE, 0, 0);
+                                   DLM_LOCK_PR, 0, 0);
        if (status < 0)
                mlog_errno(status);
 
@@ -1247,16 +1663,16 @@ int ocfs2_try_open_lock(struct inode *inode, int write)
 
        lockres = &OCFS2_I(inode)->ip_open_lockres;
 
-       level = write ? LKM_EXMODE : LKM_PRMODE;
+       level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
 
        /*
         * The file system may already holding a PRMODE/EXMODE open lock.
-        * Since we pass LKM_NOQUEUE, the request won't block waiting on
+        * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
         * other nodes and the -EAGAIN will indicate to the caller that
         * this inode is still in use.
         */
        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
-                                   level, LKM_NOQUEUE, 0);
+                                   level, DLM_LKF_NOQUEUE, 0);
 
 out:
        mlog_exit(status);
@@ -1281,123 +1697,251 @@ void ocfs2_open_unlock(struct inode *inode)
 
        if(lockres->l_ro_holders)
                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
-                                    LKM_PRMODE);
+                                    DLM_LOCK_PR);
        if(lockres->l_ex_holders)
                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
-                                    LKM_EXMODE);
+                                    DLM_LOCK_EX);
 
 out:
        mlog_exit_void();
 }
 
-int ocfs2_data_lock_full(struct inode *inode,
-                        int write,
-                        int arg_flags)
+static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
+                                    int level)
 {
-       int status = 0, level;
-       struct ocfs2_lock_res *lockres;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+       int ret;
+       struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
+       unsigned long flags;
+       struct ocfs2_mask_waiter mw;
 
-       BUG_ON(!inode);
+       ocfs2_init_mask_waiter(&mw);
 
-       mlog_entry_void();
+retry_cancel:
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+               ret = ocfs2_prepare_cancel_convert(osb, lockres);
+               if (ret) {
+                       spin_unlock_irqrestore(&lockres->l_lock, flags);
+                       ret = ocfs2_cancel_convert(osb, lockres);
+                       if (ret < 0) {
+                               mlog_errno(ret);
+                               goto out;
+                       }
+                       goto retry_cancel;
+               }
+               lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       mlog(0, "inode %llu take %s DATA lock\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno,
-            write ? "EXMODE" : "PRMODE");
+               ocfs2_wait_for_mask(&mw);
+               goto retry_cancel;
+       }
 
-       /* We'll allow faking a readonly data lock for
-        * rodevices. */
-       if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
-               if (write) {
-                       status = -EROFS;
-                       mlog_errno(status);
+       ret = -ERESTARTSYS;
+       /*
+        * We may still have gotten the lock, in which case there's no
+        * point to restarting the syscall.
+        */
+       if (lockres->l_level == level)
+               ret = 0;
+
+       mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
+            lockres->l_flags, lockres->l_level, lockres->l_action);
+
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+out:
+       return ret;
+}
+
+/*
+ * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
+ * flock() calls. The locking approach this requires is sufficiently
+ * different from all other cluster lock types that we implement a
+ * seperate path to the "low-level" dlm calls. In particular:
+ *
+ * - No optimization of lock levels is done - we take at exactly
+ *   what's been requested.
+ *
+ * - No lock caching is employed. We immediately downconvert to
+ *   no-lock at unlock time. This also means flock locks never go on
+ *   the blocking list).
+ *
+ * - Since userspace can trivially deadlock itself with flock, we make
+ *   sure to allow cancellation of a misbehaving applications flock()
+ *   request.
+ *
+ * - Access to any flock lockres doesn't require concurrency, so we
+ *   can simplify the code by requiring the caller to guarantee
+ *   serialization of dlmglue flock calls.
+ */
+int ocfs2_file_lock(struct file *file, int ex, int trylock)
+{
+       int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
+       unsigned long flags;
+       struct ocfs2_file_private *fp = file->private_data;
+       struct ocfs2_lock_res *lockres = &fp->fp_flock;
+       struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
+       struct ocfs2_mask_waiter mw;
+
+       ocfs2_init_mask_waiter(&mw);
+
+       if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
+           (lockres->l_level > DLM_LOCK_NL)) {
+               mlog(ML_ERROR,
+                    "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
+                    "level: %u\n", lockres->l_name, lockres->l_flags,
+                    lockres->l_level);
+               return -EINVAL;
+       }
+
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
+               lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+               /*
+                * Get the lock at NLMODE to start - that way we
+                * can cancel the upconvert request if need be.
+                */
+               ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       goto out;
                }
-               goto out;
+
+               ret = ocfs2_wait_for_mask(&mw);
+               if (ret) {
+                       mlog_errno(ret);
+                       goto out;
+               }
+               spin_lock_irqsave(&lockres->l_lock, flags);
        }
 
-       if (ocfs2_mount_local(osb))
-               goto out;
+       lockres->l_action = OCFS2_AST_CONVERT;
+       lkm_flags |= DLM_LKF_CONVERT;
+       lockres->l_requested = level;
+       lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
-       lockres = &OCFS2_I(inode)->ip_data_lockres;
+       lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+       ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
+                            lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
+                            lockres);
+       if (ret) {
+               if (!trylock || (ret != -EAGAIN)) {
+                       ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
+                       ret = -EINVAL;
+               }
 
-       level = write ? LKM_EXMODE : LKM_PRMODE;
+               ocfs2_recover_from_dlm_error(lockres, 1);
+               lockres_remove_mask_waiter(lockres, &mw);
+               goto out;
+       }
 
-       status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
-                                   0, arg_flags);
-       if (status < 0 && status != -EAGAIN)
-               mlog_errno(status);
+       ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
+       if (ret == -ERESTARTSYS) {
+               /*
+                * Userspace can cause deadlock itself with
+                * flock(). Current behavior locally is to allow the
+                * deadlock, but abort the system call if a signal is
+                * received. We follow this example, otherwise a
+                * poorly written program could sit in kernel until
+                * reboot.
+                *
+                * Handling this is a bit more complicated for Ocfs2
+                * though. We can't exit this function with an
+                * outstanding lock request, so a cancel convert is
+                * required. We intentionally overwrite 'ret' - if the
+                * cancel fails and the lock was granted, it's easier
+                * to just bubble success back up to the user.
+                */
+               ret = ocfs2_flock_handle_signal(lockres, level);
+       } else if (!ret && (level > lockres->l_level)) {
+               /* Trylock failed asynchronously */
+               BUG_ON(!trylock);
+               ret = -EAGAIN;
+       }
 
 out:
-       mlog_exit(status);
-       return status;
+
+       mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
+            lockres->l_name, ex, trylock, ret);
+       return ret;
 }
 
-/* see ocfs2_meta_lock_with_page() */
-int ocfs2_data_lock_with_page(struct inode *inode,
-                             int write,
-                             struct page *page)
+void ocfs2_file_unlock(struct file *file)
 {
        int ret;
+       unsigned int gen;
+       unsigned long flags;
+       struct ocfs2_file_private *fp = file->private_data;
+       struct ocfs2_lock_res *lockres = &fp->fp_flock;
+       struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
+       struct ocfs2_mask_waiter mw;
 
-       ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
-       if (ret == -EAGAIN) {
-               unlock_page(page);
-               if (ocfs2_data_lock(inode, write) == 0)
-                       ocfs2_data_unlock(inode, write);
-               ret = AOP_TRUNCATED_PAGE;
+       ocfs2_init_mask_waiter(&mw);
+
+       if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
+               return;
+
+       if (lockres->l_level == DLM_LOCK_NL)
+               return;
+
+       mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
+            lockres->l_name, lockres->l_flags, lockres->l_level,
+            lockres->l_action);
+
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       /*
+        * Fake a blocking ast for the downconvert code.
+        */
+       lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
+       lockres->l_blocking = DLM_LOCK_EX;
+
+       gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
+       lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+       ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
+       if (ret) {
+               mlog_errno(ret);
+               return;
        }
 
-       return ret;
+       ret = ocfs2_wait_for_mask(&mw);
+       if (ret)
+               mlog_errno(ret);
 }
 
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
-                                struct ocfs2_lock_res *lockres)
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+                                       struct ocfs2_lock_res *lockres)
 {
        int kick = 0;
 
        mlog_entry_void();
 
        /* If we know that another node is waiting on our lock, kick
-        * the vote thread * pre-emptively when we reach a release
+        * the downconvert thread * pre-emptively when we reach a release
         * condition. */
        if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
                switch(lockres->l_blocking) {
-               case LKM_EXMODE:
+               case DLM_LOCK_EX:
                        if (!lockres->l_ex_holders && !lockres->l_ro_holders)
                                kick = 1;
                        break;
-               case LKM_PRMODE:
+               case DLM_LOCK_PR:
                        if (!lockres->l_ex_holders)
                                kick = 1;
                        break;
                default:
                        BUG();
-               }
-       }
-
-       if (kick)
-               ocfs2_kick_vote_thread(osb);
-
-       mlog_exit_void();
-}
-
-void ocfs2_data_unlock(struct inode *inode,
-                      int write)
-{
-       int level = write ? LKM_EXMODE : LKM_PRMODE;
-       struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-
-       mlog_entry_void();
-
-       mlog(0, "inode %llu drop %s DATA lock\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno,
-            write ? "EXMODE" : "PRMODE");
+               }
+       }
 
-       if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
-           !ocfs2_mount_local(osb))
-               ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+       if (kick)
+               ocfs2_wake_downconvert_thread(osb);
 
        mlog_exit_void();
 }
@@ -1421,16 +1965,16 @@ static u64 ocfs2_pack_timespec(struct timespec *spec)
 
 /* Call this with the lockres locked. I am reasonably sure we don't
  * need ip_lock in this function as anyone who would be changing those
- * values is supposed to be blocked in ocfs2_meta_lock right now. */
+ * values is supposed to be blocked in ocfs2_inode_lock right now. */
 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
 {
        struct ocfs2_inode_info *oi = OCFS2_I(inode);
-       struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+       struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
        struct ocfs2_meta_lvb *lvb;
 
        mlog_entry_void();
 
-       lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
        /*
         * Invalidate the LVB of a deleted inode - this way other
@@ -1456,6 +2000,7 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)
        lvb->lvb_imtime_packed =
                cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
        lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
+       lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
        lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
 
 out:
@@ -1474,14 +2019,14 @@ static void ocfs2_unpack_timespec(struct timespec *spec,
 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
 {
        struct ocfs2_inode_info *oi = OCFS2_I(inode);
-       struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+       struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
        struct ocfs2_meta_lvb *lvb;
 
        mlog_entry_void();
 
        mlog_meta_lvb(0, lockres);
 
-       lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
        /* We're safe here without the lockres lock... */
        spin_lock(&oi->ip_lock);
@@ -1489,6 +2034,7 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
        i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
 
        oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
+       oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
        ocfs2_set_inode_flags(inode);
 
        /* fast-symlinks are a special case */
@@ -1515,9 +2061,10 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
                                              struct ocfs2_lock_res *lockres)
 {
-       struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+       struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
-       if (lvb->lvb_version == OCFS2_LVB_VERSION
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
+           && lvb->lvb_version == OCFS2_LVB_VERSION
            && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
                return 1;
        return 0;
@@ -1581,12 +2128,12 @@ static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockre
 }
 
 /* may or may not return a bh if it went to disk. */
-static int ocfs2_meta_lock_update(struct inode *inode,
+static int ocfs2_inode_lock_update(struct inode *inode,
                                  struct buffer_head **bh)
 {
        int status = 0;
        struct ocfs2_inode_info *oi = OCFS2_I(inode);
-       struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
+       struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
        struct ocfs2_dinode *fe;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
@@ -1611,7 +2158,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 
        /* This will discard any caching information we might have had
         * for the inode metadata. */
-       ocfs2_metadata_cache_purge(inode);
+       ocfs2_metadata_cache_purge(INODE_CACHE(inode));
 
        ocfs2_extent_map_trunc(inode, 0);
 
@@ -1622,8 +2169,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
        } else {
                /* Boo, we have to go to disk. */
                /* read bh, cast, ocfs2_refresh_inode */
-               status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
-                                         bh, OCFS2_BH_CACHED, inode);
+               status = ocfs2_read_inode_block(inode, bh);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail_refresh;
@@ -1631,18 +2177,14 @@ static int ocfs2_meta_lock_update(struct inode *inode,
                fe = (struct ocfs2_dinode *) (*bh)->b_data;
 
                /* This is a good chance to make sure we're not
-                * locking an invalid object.
+                * locking an invalid object.  ocfs2_read_inode_block()
+                * already checked that the inode block is sane.
                 *
                 * We bug on a stale inode here because we checked
                 * above whether it was wiped from disk. The wiping
                 * node provides a guarantee that we receive that
                 * message and can mark the inode before dropping any
                 * locks associated with it. */
-               if (!OCFS2_IS_VALID_DINODE(fe)) {
-                       OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
-                       status = -EIO;
-                       goto bail_refresh;
-               }
                mlog_bug_on_msg(inode->i_generation !=
                                le32_to_cpu(fe->i_generation),
                                "Invalid dinode %llu disk generation: %u "
@@ -1658,6 +2200,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
                                le32_to_cpu(fe->i_flags));
 
                ocfs2_refresh_inode(inode, fe);
+               ocfs2_track_lock_refresh(lockres);
        }
 
        status = 0;
@@ -1683,11 +2226,7 @@ static int ocfs2_assign_bh(struct inode *inode,
                return 0;
        }
 
-       status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
-                                 OCFS2_I(inode)->ip_blkno,
-                                 ret_bh,
-                                 OCFS2_BH_CACHED,
-                                 inode);
+       status = ocfs2_read_inode_block(inode, ret_bh);
        if (status < 0)
                mlog_errno(status);
 
@@ -1698,12 +2237,14 @@ static int ocfs2_assign_bh(struct inode *inode,
  * returns < 0 error if the callback will never be called, otherwise
  * the result of the lock will be communicated via the callback.
  */
-int ocfs2_meta_lock_full(struct inode *inode,
-                        struct buffer_head **ret_bh,
-                        int ex,
-                        int arg_flags)
+int ocfs2_inode_lock_full_nested(struct inode *inode,
+                                struct buffer_head **ret_bh,
+                                int ex,
+                                int arg_flags,
+                                int subclass)
 {
-       int status, level, dlm_flags, acquired;
+       int status, level, acquired;
+       u32 dlm_flags;
        struct ocfs2_lock_res *lockres = NULL;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
        struct buffer_head *local_bh = NULL;
@@ -1730,16 +2271,16 @@ int ocfs2_meta_lock_full(struct inode *inode,
                goto local;
 
        if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
-               wait_event(osb->recovery_event,
-                          ocfs2_node_map_is_empty(osb, &osb->recovery_map));
+               ocfs2_wait_for_recovery(osb);
 
-       lockres = &OCFS2_I(inode)->ip_meta_lockres;
-       level = ex ? LKM_EXMODE : LKM_PRMODE;
+       lockres = &OCFS2_I(inode)->ip_inode_lockres;
+       level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        dlm_flags = 0;
        if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
-               dlm_flags |= LKM_NOQUEUE;
+               dlm_flags |= DLM_LKF_NOQUEUE;
 
-       status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
+       status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
+                                     arg_flags, subclass, _RET_IP_);
        if (status < 0) {
                if (status != -EAGAIN && status != -EIOCBRETRY)
                        mlog_errno(status);
@@ -1754,8 +2295,7 @@ int ocfs2_meta_lock_full(struct inode *inode,
         * committed to owning this lock so we don't allow signals to
         * abort the operation. */
        if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
-               wait_event(osb->recovery_event,
-                          ocfs2_node_map_is_empty(osb, &osb->recovery_map));
+               ocfs2_wait_for_recovery(osb);
 
 local:
        /*
@@ -1772,11 +2312,11 @@ local:
        }
 
        /* This is fun. The caller may want a bh back, or it may
-        * not. ocfs2_meta_lock_update definitely wants one in, but
+        * not. ocfs2_inode_lock_update definitely wants one in, but
         * may or may not read one, depending on what's in the
         * LVB. The result of all of this is that we've *only* gone to
         * disk if we have to, so the complexity is worthwhile. */
-       status = ocfs2_meta_lock_update(inode, &local_bh);
+       status = ocfs2_inode_lock_update(inode, &local_bh);
        if (status < 0) {
                if (status != -ENOENT)
                        mlog_errno(status);
@@ -1798,7 +2338,7 @@ bail:
                        *ret_bh = NULL;
                }
                if (acquired)
-                       ocfs2_meta_unlock(inode, ex);
+                       ocfs2_inode_unlock(inode, ex);
        }
 
        if (local_bh)
@@ -1809,19 +2349,20 @@ bail:
 }
 
 /*
- * This is working around a lock inversion between tasks acquiring DLM locks
- * while holding a page lock and the vote thread which blocks dlm lock acquiry
- * while acquiring page locks.
+ * This is working around a lock inversion between tasks acquiring DLM
+ * locks while holding a page lock and the downconvert thread which
+ * blocks dlm lock acquiry while acquiring page locks.
  *
  * ** These _with_page variantes are only intended to be called from aop
  * methods that hold page locks and return a very specific *positive* error
  * code that aop methods pass up to the VFS -- test for errors with != 0. **
  *
- * The DLM is called such that it returns -EAGAIN if it would have blocked
- * waiting for the vote thread.  In that case we unlock our page so the vote
- * thread can make progress.  Once we've done this we have to return
- * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
- * into the VFS who will then immediately retry the aop call.
+ * The DLM is called such that it returns -EAGAIN if it would have
+ * blocked waiting for the downconvert thread.  In that case we unlock
+ * our page so the downconvert thread can make progress.  Once we've
+ * done this we have to return AOP_TRUNCATED_PAGE so the aop method
+ * that called us can bubble that back up into the VFS who will then
+ * immediately retry the aop call.
  *
  * We do a blocking lock and immediate unlock before returning, though, so that
  * the lock has a great chance of being cached on this node by the time the VFS
@@ -1829,32 +2370,32 @@ bail:
  * ping locks back and forth, but that's a risk we're willing to take to avoid
  * the lock inversion simply.
  */
-int ocfs2_meta_lock_with_page(struct inode *inode,
+int ocfs2_inode_lock_with_page(struct inode *inode,
                              struct buffer_head **ret_bh,
                              int ex,
                              struct page *page)
 {
        int ret;
 
-       ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
+       ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
        if (ret == -EAGAIN) {
                unlock_page(page);
-               if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
-                       ocfs2_meta_unlock(inode, ex);
+               if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
+                       ocfs2_inode_unlock(inode, ex);
                ret = AOP_TRUNCATED_PAGE;
        }
 
        return ret;
 }
 
-int ocfs2_meta_lock_atime(struct inode *inode,
+int ocfs2_inode_lock_atime(struct inode *inode,
                          struct vfsmount *vfsmnt,
                          int *level)
 {
        int ret;
 
        mlog_entry_void();
-       ret = ocfs2_meta_lock(inode, NULL, 0);
+       ret = ocfs2_inode_lock(inode, NULL, 0);
        if (ret < 0) {
                mlog_errno(ret);
                return ret;
@@ -1867,8 +2408,8 @@ int ocfs2_meta_lock_atime(struct inode *inode,
        if (ocfs2_should_update_atime(inode, vfsmnt)) {
                struct buffer_head *bh = NULL;
 
-               ocfs2_meta_unlock(inode, 0);
-               ret = ocfs2_meta_lock(inode, &bh, 1);
+               ocfs2_inode_unlock(inode, 0);
+               ret = ocfs2_inode_lock(inode, &bh, 1);
                if (ret < 0) {
                        mlog_errno(ret);
                        return ret;
@@ -1885,11 +2426,11 @@ int ocfs2_meta_lock_atime(struct inode *inode,
        return ret;
 }
 
-void ocfs2_meta_unlock(struct inode *inode,
+void ocfs2_inode_unlock(struct inode *inode,
                       int ex)
 {
-       int level = ex ? LKM_EXMODE : LKM_PRMODE;
-       struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        mlog_entry_void();
@@ -1905,14 +2446,53 @@ void ocfs2_meta_unlock(struct inode *inode,
        mlog_exit_void();
 }
 
+int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
+{
+       struct ocfs2_lock_res *lockres;
+       struct ocfs2_orphan_scan_lvb *lvb;
+       int status = 0;
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       lockres = &osb->osb_orphan_scan.os_lockres;
+       status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
+       if (status < 0)
+               return status;
+
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
+           lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
+               *seqno = be32_to_cpu(lvb->lvb_os_seqno);
+       else
+               *seqno = osb->osb_orphan_scan.os_seqno + 1;
+
+       return status;
+}
+
+void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
+{
+       struct ocfs2_lock_res *lockres;
+       struct ocfs2_orphan_scan_lvb *lvb;
+
+       if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
+               lockres = &osb->osb_orphan_scan.os_lockres;
+               lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+               lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
+               lvb->lvb_os_seqno = cpu_to_be32(seqno);
+               ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
+       }
+}
+
 int ocfs2_super_lock(struct ocfs2_super *osb,
                     int ex)
 {
        int status = 0;
-       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
-       struct buffer_head *bh;
-       struct ocfs2_slot_info *si = osb->slot_info;
 
        mlog_entry_void();
 
@@ -1938,16 +2518,13 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
                goto bail;
        }
        if (status) {
-               bh = si->si_bh;
-               status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
-                                         si->si_inode);
-               if (status == 0)
-                       ocfs2_update_slot_info(si);
+               status = ocfs2_refresh_slot_info(osb);
 
                ocfs2_complete_lock_res_refresh(lockres, status);
 
                if (status < 0)
                        mlog_errno(status);
+               ocfs2_track_lock_refresh(lockres);
        }
 bail:
        mlog_exit(status);
@@ -1957,7 +2534,7 @@ bail:
 void ocfs2_super_unlock(struct ocfs2_super *osb,
                        int ex)
 {
-       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 
        if (!ocfs2_mount_local(osb))
@@ -1975,7 +2552,7 @@ int ocfs2_rename_lock(struct ocfs2_super *osb)
        if (ocfs2_mount_local(osb))
                return 0;
 
-       status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
+       status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
        if (status < 0)
                mlog_errno(status);
 
@@ -1987,13 +2564,41 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
        struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
 
        if (!ocfs2_mount_local(osb))
-               ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
+               ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
+}
+
+int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
+{
+       int status;
+       struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
+                                   0, 0);
+       if (status < 0)
+               mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
+
+       return status;
+}
+
+void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
+{
+       struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
+
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres,
+                                    ex ? LKM_EXMODE : LKM_PRMODE);
 }
 
 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
 {
        int ret;
-       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
        struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
@@ -2014,7 +2619,7 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)
 
 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
 {
-       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
        struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
        struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
@@ -2142,7 +2747,7 @@ static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
 }
 
 /* So that debugfs.ocfs2 can determine which format is being used */
-#define OCFS2_DLM_DEBUG_STR_VERSION 1
+#define OCFS2_DLM_DEBUG_STR_VERSION 2
 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
 {
        int i;
@@ -2179,16 +2784,57 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
                   lockres->l_blocking);
 
        /* Dump the raw LVB */
-       lvb = lockres->l_lksb.lvb;
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
        for(i = 0; i < DLM_LVB_LEN; i++)
                seq_printf(m, "0x%x\t", lvb[i]);
 
+#ifdef CONFIG_OCFS2_FS_STATS
+# define lock_num_prmode(_l)           (_l)->l_lock_num_prmode
+# define lock_num_exmode(_l)           (_l)->l_lock_num_exmode
+# define lock_num_prmode_failed(_l)    (_l)->l_lock_num_prmode_failed
+# define lock_num_exmode_failed(_l)    (_l)->l_lock_num_exmode_failed
+# define lock_total_prmode(_l)         (_l)->l_lock_total_prmode
+# define lock_total_exmode(_l)         (_l)->l_lock_total_exmode
+# define lock_max_prmode(_l)           (_l)->l_lock_max_prmode
+# define lock_max_exmode(_l)           (_l)->l_lock_max_exmode
+# define lock_refresh(_l)              (_l)->l_lock_refresh
+#else
+# define lock_num_prmode(_l)           (0ULL)
+# define lock_num_exmode(_l)           (0ULL)
+# define lock_num_prmode_failed(_l)    (0)
+# define lock_num_exmode_failed(_l)    (0)
+# define lock_total_prmode(_l)         (0ULL)
+# define lock_total_exmode(_l)         (0ULL)
+# define lock_max_prmode(_l)           (0)
+# define lock_max_exmode(_l)           (0)
+# define lock_refresh(_l)              (0)
+#endif
+       /* The following seq_print was added in version 2 of this output */
+       seq_printf(m, "%llu\t"
+                  "%llu\t"
+                  "%u\t"
+                  "%u\t"
+                  "%llu\t"
+                  "%llu\t"
+                  "%u\t"
+                  "%u\t"
+                  "%u\t",
+                  lock_num_prmode(lockres),
+                  lock_num_exmode(lockres),
+                  lock_num_prmode_failed(lockres),
+                  lock_num_exmode_failed(lockres),
+                  lock_total_prmode(lockres),
+                  lock_total_exmode(lockres),
+                  lock_max_prmode(lockres),
+                  lock_max_exmode(lockres),
+                  lock_refresh(lockres));
+
        /* End the line */
        seq_printf(m, "\n");
        return 0;
 }
 
-static struct seq_operations ocfs2_dlm_seq_ops = {
+static const struct seq_operations ocfs2_dlm_seq_ops = {
        .start =        ocfs2_dlm_seq_start,
        .stop =         ocfs2_dlm_seq_stop,
        .next =         ocfs2_dlm_seq_next,
@@ -2283,13 +2929,14 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
 int ocfs2_dlm_init(struct ocfs2_super *osb)
 {
        int status = 0;
-       u32 dlm_key;
-       struct dlm_ctxt *dlm = NULL;
+       struct ocfs2_cluster_connection *conn = NULL;
 
        mlog_entry_void();
 
-       if (ocfs2_mount_local(osb))
+       if (ocfs2_mount_local(osb)) {
+               osb->node_num = 0;
                goto local;
+       }
 
        status = ocfs2_dlm_init_debug(osb);
        if (status < 0) {
@@ -2297,72 +2944,87 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
                goto bail;
        }
 
-       /* launch vote thread */
-       osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
-       if (IS_ERR(osb->vote_task)) {
-               status = PTR_ERR(osb->vote_task);
-               osb->vote_task = NULL;
+       /* launch downconvert thread */
+       osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
+       if (IS_ERR(osb->dc_task)) {
+               status = PTR_ERR(osb->dc_task);
+               osb->dc_task = NULL;
                mlog_errno(status);
                goto bail;
        }
 
-       /* used by the dlm code to make message headers unique, each
-        * node in this domain must agree on this. */
-       dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
-
        /* for now, uuid == domain */
-       dlm = dlm_register_domain(osb->uuid_str, dlm_key);
-       if (IS_ERR(dlm)) {
-               status = PTR_ERR(dlm);
+       status = ocfs2_cluster_connect(osb->osb_cluster_stack,
+                                      osb->uuid_str,
+                                      strlen(osb->uuid_str),
+                                      ocfs2_do_node_down, osb,
+                                      &conn);
+       if (status) {
                mlog_errno(status);
                goto bail;
        }
 
-       dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
+       status = ocfs2_cluster_this_node(&osb->node_num);
+       if (status < 0) {
+               mlog_errno(status);
+               mlog(ML_ERROR,
+                    "could not find this host's node number\n");
+               ocfs2_cluster_disconnect(conn, 0);
+               goto bail;
+       }
 
 local:
        ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
        ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
+       ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
+       ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
 
-       osb->dlm = dlm;
+       osb->cconn = conn;
 
        status = 0;
 bail:
        if (status < 0) {
                ocfs2_dlm_shutdown_debug(osb);
-               if (osb->vote_task)
-                       kthread_stop(osb->vote_task);
+               if (osb->dc_task)
+                       kthread_stop(osb->dc_task);
        }
 
        mlog_exit(status);
        return status;
 }
 
-void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
+void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
+                       int hangup_pending)
 {
        mlog_entry_void();
 
-       dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
-
        ocfs2_drop_osb_locks(osb);
 
-       if (osb->vote_task) {
-               kthread_stop(osb->vote_task);
-               osb->vote_task = NULL;
+       /*
+        * Now that we have dropped all locks and ocfs2_dismount_volume()
+        * has disabled recovery, the DLM won't be talking to us.  It's
+        * safe to tear things down before disconnecting the cluster.
+        */
+
+       if (osb->dc_task) {
+               kthread_stop(osb->dc_task);
+               osb->dc_task = NULL;
        }
 
        ocfs2_lock_res_free(&osb->osb_super_lockres);
        ocfs2_lock_res_free(&osb->osb_rename_lockres);
+       ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
+       ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
 
-       dlm_unregister_domain(osb->dlm);
-       osb->dlm = NULL;
+       ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
+       osb->cconn = NULL;
 
        ocfs2_dlm_shutdown_debug(osb);
 
        mlog_exit_void();
 }
 
-static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
+static void ocfs2_unlock_ast(void *opaque, int error)
 {
        struct ocfs2_lock_res *lockres = opaque;
        unsigned long flags;
@@ -2373,26 +3035,12 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
             lockres->l_unlock_action);
 
        spin_lock_irqsave(&lockres->l_lock, flags);
-       /* We tried to cancel a convert request, but it was already
-        * granted. All we want to do here is clear our unlock
-        * state. The wake_up call done at the bottom is redundant
-        * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
-        * hurt anything anyway */
-       if (status == DLM_CANCELGRANT &&
-           lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
-               mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
-
-               /* We don't clear the busy flag in this case as it
-                * should have been cleared by the ast which the dlm
-                * has called. */
-               goto complete_unlock;
-       }
-
-       if (status != DLM_NORMAL) {
-               mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
-                    "unlock_action %d\n", status, lockres->l_name,
+       if (error) {
+               mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
+                    "unlock_action %d\n", error, lockres->l_name,
                     lockres->l_unlock_action);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
+               mlog_exit_void();
                return;
        }
 
@@ -2400,20 +3048,22 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
        case OCFS2_UNLOCK_CANCEL_CONVERT:
                mlog(0, "Cancel convert success for %s\n", lockres->l_name);
                lockres->l_action = OCFS2_AST_INVALID;
+               /* Downconvert thread may have requeued this lock, we
+                * need to wake it. */
+               if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+                       ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
                break;
        case OCFS2_UNLOCK_DROP_LOCK:
-               lockres->l_level = LKM_IVMODE;
+               lockres->l_level = DLM_LOCK_IV;
                break;
        default:
                BUG();
        }
 
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
-complete_unlock:
        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
-
        wake_up(&lockres->l_event);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        mlog_exit_void();
 }
@@ -2421,16 +3071,16 @@ complete_unlock:
 static int ocfs2_drop_lock(struct ocfs2_super *osb,
                           struct ocfs2_lock_res *lockres)
 {
-       enum dlm_status status;
+       int ret;
        unsigned long flags;
-       int lkm_flags = 0;
+       u32 lkm_flags = 0;
 
        /* We didn't get anywhere near actually using this lockres. */
        if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
                goto out;
 
        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
-               lkm_flags |= LKM_VALBLK;
+               lkm_flags |= DLM_LKF_VALBLK;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 
@@ -2456,7 +3106,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
                if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
-                   lockres->l_level == LKM_EXMODE &&
+                   lockres->l_level == DLM_LOCK_EX &&
                    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
                        lockres->l_ops->set_lvb(lockres);
        }
@@ -2485,15 +3135,15 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        mlog(0, "lock %s\n", lockres->l_name);
 
-       status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
-                          ocfs2_unlock_ast, lockres);
-       if (status != DLM_NORMAL) {
-               ocfs2_log_dlm_error("dlmunlock", status, lockres);
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
+                              lockres);
+       if (ret) {
+               ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
-               dlm_print_one_lock(lockres->l_lksb.lockid);
+               ocfs2_dlm_dump_lksb(&lockres->l_lksb);
                BUG();
        }
-       mlog(0, "lock %s, successfull return from dlmunlock\n",
+       mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
             lockres->l_name);
 
        ocfs2_wait_on_busy_lock(lockres);
@@ -2504,7 +3154,7 @@ out:
 
 /* Mark the lockres as being dropped. It will no longer be
  * queued if blocking, but we still may have to wait on it
- * being dequeued from the vote thread before we can consider
+ * being dequeued from the downconvert thread before we can consider
  * it safe to drop. 
  *
  * You can *not* attempt to call cluster_lock on this lockres anymore. */
@@ -2548,6 +3198,8 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
 {
        ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
        ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
 }
 
 int ocfs2_drop_inode_locks(struct inode *inode)
@@ -2567,14 +3219,7 @@ int ocfs2_drop_inode_locks(struct inode *inode)
        status = err;
 
        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-                             &OCFS2_I(inode)->ip_data_lockres);
-       if (err < 0)
-               mlog_errno(err);
-       if (err < 0 && !status)
-               status = err;
-
-       err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-                             &OCFS2_I(inode)->ip_meta_lockres);
+                             &OCFS2_I(inode)->ip_inode_lockres);
        if (err < 0)
                mlog_errno(err);
        if (err < 0 && !status)
@@ -2591,15 +3236,15 @@ int ocfs2_drop_inode_locks(struct inode *inode)
        return status;
 }
 
-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
-                                     int new_level)
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+                                             int new_level)
 {
        assert_spin_locked(&lockres->l_lock);
 
-       BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+       BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
 
        if (lockres->l_level <= new_level) {
-               mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
+               mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
                     lockres->l_level, new_level);
                BUG();
        }
@@ -2610,33 +3255,33 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
        lockres->l_action = OCFS2_AST_DOWNCONVERT;
        lockres->l_requested = new_level;
        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+       return lockres_set_pending(lockres);
 }
 
 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                                  struct ocfs2_lock_res *lockres,
                                  int new_level,
-                                 int lvb)
+                                 int lvb,
+                                 unsigned int generation)
 {
-       int ret, dlm_flags = LKM_CONVERT;
-       enum dlm_status status;
+       int ret;
+       u32 dlm_flags = DLM_LKF_CONVERT;
 
        mlog_entry_void();
 
        if (lvb)
-               dlm_flags |= LKM_VALBLK;
-
-       status = dlmlock(osb->dlm,
-                        new_level,
-                        &lockres->l_lksb,
-                        dlm_flags,
-                        lockres->l_name,
-                        OCFS2_LOCK_ID_MAX_LEN - 1,
-                        ocfs2_locking_ast,
-                        lockres,
-                        ocfs2_blocking_ast);
-       if (status != DLM_NORMAL) {
-               ocfs2_log_dlm_error("dlmlock", status, lockres);
-               ret = -EINVAL;
+               dlm_flags |= DLM_LKF_VALBLK;
+
+       ret = ocfs2_dlm_lock(osb->cconn,
+                            new_level,
+                            &lockres->l_lksb,
+                            dlm_flags,
+                            lockres->l_name,
+                            OCFS2_LOCK_ID_MAX_LEN - 1,
+                            lockres);
+       lockres_clear_pending(lockres, generation, osb);
+       if (ret) {
+               ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 1);
                goto bail;
        }
@@ -2647,7 +3292,7 @@ bail:
        return ret;
 }
 
-/* returns 1 when the caller should unlock and call dlmunlock */
+/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
                                        struct ocfs2_lock_res *lockres)
 {
@@ -2683,24 +3328,18 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
                                struct ocfs2_lock_res *lockres)
 {
        int ret;
-       enum dlm_status status;
 
        mlog_entry_void();
        mlog(0, "lock %s\n", lockres->l_name);
 
-       ret = 0;
-       status = dlmunlock(osb->dlm,
-                          &lockres->l_lksb,
-                          LKM_CANCEL,
-                          ocfs2_unlock_ast,
-                          lockres);
-       if (status != DLM_NORMAL) {
-               ocfs2_log_dlm_error("dlmunlock", status, lockres);
-               ret = -EINVAL;
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
+                              DLM_LKF_CANCEL, lockres);
+       if (ret) {
+               ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 0);
        }
 
-       mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
+       mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
 
        mlog_exit(ret);
        return ret;
@@ -2715,6 +3354,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
        int new_level;
        int ret = 0;
        int set_lvb = 0;
+       unsigned int gen;
 
        mlog_entry_void();
 
@@ -2724,6 +3364,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
 
 recheck:
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+               /* XXX
+                * This is a *big* race.  The OCFS2_LOCK_PENDING flag
+                * exists entirely for one reason - another thread has set
+                * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
+                *
+                * If we do ocfs2_cancel_convert() before the other thread
+                * calls dlm_lock(), our cancel will do nothing.  We will
+                * get no ast, and we will have no way of knowing the
+                * cancel failed.  Meanwhile, the other thread will call
+                * into dlm_lock() and wait...forever.
+                *
+                * Why forever?  Because another node has asked for the
+                * lock first; that's why we're here in unblock_lock().
+                *
+                * The solution is OCFS2_LOCK_PENDING.  When PENDING is
+                * set, we just requeue the unblock.  Only when the other
+                * thread has called dlm_lock() and cleared PENDING will
+                * we then cancel their request.
+                *
+                * All callers of dlm_lock() must set OCFS2_DLM_PENDING
+                * at the same time they set OCFS2_DLM_BUSY.  They must
+                * clear OCFS2_DLM_PENDING after dlm_lock() returns.
+                */
+               if (lockres->l_flags & OCFS2_LOCK_PENDING)
+                       goto leave_requeue;
+
                ctl->requeue = 1;
                ret = ocfs2_prepare_cancel_convert(osb, lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2737,13 +3403,13 @@ recheck:
 
        /* if we're blocking an exclusive and we have *any* holders,
         * then requeue. */
-       if ((lockres->l_blocking == LKM_EXMODE)
+       if ((lockres->l_blocking == DLM_LOCK_EX)
            && (lockres->l_ex_holders || lockres->l_ro_holders))
                goto leave_requeue;
 
        /* If it's a PR we're blocking, then only
         * requeue if we've got any EX holders */
-       if (lockres->l_blocking == LKM_PRMODE &&
+       if (lockres->l_blocking == DLM_LOCK_PR &&
            lockres->l_ex_holders)
                goto leave_requeue;
 
@@ -2790,7 +3456,7 @@ downconvert:
        ctl->requeue = 0;
 
        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
-               if (lockres->l_level == LKM_EXMODE)
+               if (lockres->l_level == DLM_LOCK_EX)
                        set_lvb = 1;
 
                /*
@@ -2803,9 +3469,11 @@ downconvert:
                        lockres->l_ops->set_lvb(lockres);
        }
 
-       ocfs2_prepare_downconvert(lockres, new_level);
+       gen = ocfs2_prepare_downconvert(lockres, new_level);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
-       ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
+       ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
+                                    gen);
+
 leave:
        mlog_exit(ret);
        return ret;
@@ -2827,6 +3495,9 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
                inode = ocfs2_lock_res_inode(lockres);
        mapping = inode->i_mapping;
 
+       if (!S_ISREG(inode->i_mode))
+               goto out;
+
        /*
         * We need this before the filemap_fdatawrite() so that it can
         * transfer the dirty bit from the PTE to the
@@ -2841,7 +3512,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
                     (unsigned long long)OCFS2_I(inode)->ip_blkno);
        }
        sync_mapping_buffers(mapping);
-       if (blocking == LKM_EXMODE) {
+       if (blocking == DLM_LOCK_EX) {
                truncate_inode_pages(mapping, 0);
        } else {
                /* We only need to wait on the I/O if we're not also
@@ -2852,25 +3523,34 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
                filemap_fdatawait(mapping);
        }
 
+out:
        return UNBLOCK_CONTINUE;
 }
 
-static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
-                                       int new_level)
+static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
+                                struct ocfs2_lock_res *lockres,
+                                int new_level)
 {
-       struct inode *inode = ocfs2_lock_res_inode(lockres);
-       int checkpointed = ocfs2_inode_fully_checkpointed(inode);
+       int checkpointed = ocfs2_ci_fully_checkpointed(ci);
 
-       BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
-       BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
+       BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
+       BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
 
        if (checkpointed)
                return 1;
 
-       ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
+       ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
        return 0;
 }
 
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+                                       int new_level)
+{
+       struct inode *inode = ocfs2_lock_res_inode(lockres);
+
+       return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
+}
+
 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 {
        struct inode *inode = ocfs2_lock_res_inode(lockres);
@@ -2880,7 +3560,7 @@ static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 
 /*
  * Does the final reference drop on our dentry lock. Right now this
- * happens in the vote thread, but we could choose to simplify the
+ * happens in the downconvert thread, but we could choose to simplify the
  * dlmglue API and push these off to the ocfs2_wq in the future.
  */
 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
@@ -2926,7 +3606,7 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
         * valid. The downconvert code will retain a PR for this node,
         * so there's no further work to do.
         */
-       if (blocking == LKM_PRMODE)
+       if (blocking == DLM_LOCK_PR)
                return UNBLOCK_CONTINUE;
 
        /*
@@ -3000,8 +3680,210 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
        return UNBLOCK_CONTINUE_POST;
 }
 
-void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
-                               struct ocfs2_lock_res *lockres)
+static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
+                                           int new_level)
+{
+       struct ocfs2_refcount_tree *tree =
+                               ocfs2_lock_res_refcount_tree(lockres);
+
+       return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
+}
+
+static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
+                                        int blocking)
+{
+       struct ocfs2_refcount_tree *tree =
+                               ocfs2_lock_res_refcount_tree(lockres);
+
+       ocfs2_metadata_cache_purge(&tree->rf_ci);
+
+       return UNBLOCK_CONTINUE;
+}
+
+static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_qinfo_lvb *lvb;
+       struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
+       struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
+                                           oinfo->dqi_gi.dqi_type);
+
+       mlog_entry_void();
+
+       lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
+       lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
+       lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
+       lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
+       lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
+       lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
+       lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
+
+       mlog_exit_void();
+}
+
+void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
+{
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+
+       mlog_entry_void();
+       if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, level);
+       mlog_exit_void();
+}
+
+static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
+{
+       struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
+                                           oinfo->dqi_gi.dqi_type);
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
+       struct buffer_head *bh = NULL;
+       struct ocfs2_global_disk_dqinfo *gdinfo;
+       int status = 0;
+
+       if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
+           lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
+               info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
+               info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
+               oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
+               oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
+               oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
+               oinfo->dqi_gi.dqi_free_entry =
+                                       be32_to_cpu(lvb->lvb_free_entry);
+       } else {
+               status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
+               if (status) {
+                       mlog_errno(status);
+                       goto bail;
+               }
+               gdinfo = (struct ocfs2_global_disk_dqinfo *)
+                                       (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
+               info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
+               info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
+               oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
+               oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
+               oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
+               oinfo->dqi_gi.dqi_free_entry =
+                                       le32_to_cpu(gdinfo->dqi_free_entry);
+               brelse(bh);
+               ocfs2_track_lock_refresh(lockres);
+       }
+
+bail:
+       return status;
+}
+
+/* Lock quota info, this function expects at least shared lock on the quota file
+ * so that we can safely refresh quota info from disk. */
+int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
+{
+       struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
+       struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       int status = 0;
+
+       mlog_entry_void();
+
+       /* On RO devices, locking really isn't needed... */
+       if (ocfs2_is_hard_readonly(osb)) {
+               if (ex)
+                       status = -EROFS;
+               goto bail;
+       }
+       if (ocfs2_mount_local(osb))
+               goto bail;
+
+       status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
+       if (status < 0) {
+               mlog_errno(status);
+               goto bail;
+       }
+       if (!ocfs2_should_refresh_lock_res(lockres))
+               goto bail;
+       /* OK, we have the lock but we need to refresh the quota info */
+       status = ocfs2_refresh_qinfo(oinfo);
+       if (status)
+               ocfs2_qinfo_unlock(oinfo, ex);
+       ocfs2_complete_lock_res_refresh(lockres, status);
+bail:
+       mlog_exit(status);
+       return status;
+}
+
+int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
+{
+       int status;
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
+       struct ocfs2_super *osb = lockres->l_priv;
+
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       if (ocfs2_mount_local(osb))
+               return 0;
+
+       status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
+       if (status < 0)
+               mlog_errno(status);
+
+       return status;
+}
+
+void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
+{
+       int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
+       struct ocfs2_super *osb = lockres->l_priv;
+
+       if (!ocfs2_mount_local(osb))
+               ocfs2_cluster_unlock(osb, lockres, level);
+}
+
+/*
+ * This is the filesystem locking protocol.  It provides the lock handling
+ * hooks for the underlying DLM.  It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed.  The protocol is negotiated when joining
+ * the dlm domain.  A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes.  When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero.  If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased.  If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
+static struct ocfs2_locking_protocol lproto = {
+       .lp_max_version = {
+               .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+               .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+       },
+       .lp_lock_ast            = ocfs2_locking_ast,
+       .lp_blocking_ast        = ocfs2_blocking_ast,
+       .lp_unlock_ast          = ocfs2_unlock_ast,
+};
+
+void ocfs2_set_locking_protocol(void)
+{
+       ocfs2_stack_glue_set_locking_protocol(&lproto);
+}
+
+
+static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
+                                      struct ocfs2_lock_res *lockres)
 {
        int status;
        struct ocfs2_unblock_ctl ctl = {0, 0,};
@@ -3019,7 +3901,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
        mlog(0, "lockres %s blocked.\n", lockres->l_name);
 
        /* Detect whether a lock has been marked as going away while
-        * the vote thread was processing other things. A lock can
+        * the downconvert thread was processing other things. A lock can
         * still be marked with OCFS2_LOCK_FREEING after this check,
         * but short circuiting here will still save us some
         * performance. */
@@ -3068,38 +3950,104 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
 
        lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
 
-       spin_lock(&osb->vote_task_lock);
+       spin_lock(&osb->dc_task_lock);
        if (list_empty(&lockres->l_blocked_list)) {
                list_add_tail(&lockres->l_blocked_list,
                              &osb->blocked_lock_list);
                osb->blocked_lock_count++;
        }
-       spin_unlock(&osb->vote_task_lock);
+       spin_unlock(&osb->dc_task_lock);
 
        mlog_exit_void();
 }
 
-/* This aids in debugging situations where a bad LVB might be involved. */
-void ocfs2_dump_meta_lvb_info(u64 level,
-                             const char *function,
-                             unsigned int line,
-                             struct ocfs2_lock_res *lockres)
+static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
 {
-       struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+       unsigned long processed;
+       struct ocfs2_lock_res *lockres;
 
-       mlog(level, "LVB information for %s (called from %s:%u):\n",
-            lockres->l_name, function, line);
-       mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
-            lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
-            be32_to_cpu(lvb->lvb_igeneration));
-       mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
-            (unsigned long long)be64_to_cpu(lvb->lvb_isize),
-            be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
-            be16_to_cpu(lvb->lvb_imode));
-       mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
-            "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
-            (long long)be64_to_cpu(lvb->lvb_iatime_packed),
-            (long long)be64_to_cpu(lvb->lvb_ictime_packed),
-            (long long)be64_to_cpu(lvb->lvb_imtime_packed),
-            be32_to_cpu(lvb->lvb_iattr));
+       mlog_entry_void();
+
+       spin_lock(&osb->dc_task_lock);
+       /* grab this early so we know to try again if a state change and
+        * wake happens part-way through our work  */
+       osb->dc_work_sequence = osb->dc_wake_sequence;
+
+       processed = osb->blocked_lock_count;
+       while (processed) {
+               BUG_ON(list_empty(&osb->blocked_lock_list));
+
+               lockres = list_entry(osb->blocked_lock_list.next,
+                                    struct ocfs2_lock_res, l_blocked_list);
+               list_del_init(&lockres->l_blocked_list);
+               osb->blocked_lock_count--;
+               spin_unlock(&osb->dc_task_lock);
+
+               BUG_ON(!processed);
+               processed--;
+
+               ocfs2_process_blocked_lock(osb, lockres);
+
+               spin_lock(&osb->dc_task_lock);
+       }
+       spin_unlock(&osb->dc_task_lock);
+
+       mlog_exit_void();
+}
+
+static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
+{
+       int empty = 0;
+
+       spin_lock(&osb->dc_task_lock);
+       if (list_empty(&osb->blocked_lock_list))
+               empty = 1;
+
+       spin_unlock(&osb->dc_task_lock);
+       return empty;
+}
+
+static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
+{
+       int should_wake = 0;
+
+       spin_lock(&osb->dc_task_lock);
+       if (osb->dc_work_sequence != osb->dc_wake_sequence)
+               should_wake = 1;
+       spin_unlock(&osb->dc_task_lock);
+
+       return should_wake;
+}
+
+static int ocfs2_downconvert_thread(void *arg)
+{
+       int status = 0;
+       struct ocfs2_super *osb = arg;
+
+       /* only quit once we've been asked to stop and there is no more
+        * work available */
+       while (!(kthread_should_stop() &&
+               ocfs2_downconvert_thread_lists_empty(osb))) {
+
+               wait_event_interruptible(osb->dc_event,
+                                        ocfs2_downconvert_thread_should_wake(osb) ||
+                                        kthread_should_stop());
+
+               mlog(0, "downconvert_thread: awoken\n");
+
+               ocfs2_downconvert_thread_do_work(osb);
+       }
+
+       osb->dc_task = NULL;
+       return status;
+}
+
+void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
+{
+       spin_lock(&osb->dc_task_lock);
+       /* make sure the voting thread gets a swipe at whatever changes
+        * the caller may have made to the voting state */
+       osb->dc_wake_sequence++;
+       spin_unlock(&osb->dc_task_lock);
+       wake_up(&osb->dc_event);
 }