ocfs2: Remove pointless !!
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
index 4590376..eae3d64 100644 (file)
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/mm.h>
-#include <linux/crc32.h>
 #include <linux/kthread.h>
 #include <linux/pagemap.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
-
-#include <cluster/heartbeat.h>
-#include <cluster/nodemanager.h>
-#include <cluster/tcp.h>
+#include <linux/time.h>
 
 #define MLOG_MASK_PREFIX ML_DLM_GLUE
 #include <cluster/masklog.h>
@@ -64,6 +60,9 @@ struct ocfs2_mask_waiter {
        struct completion       mw_complete;
        unsigned long           mw_mask;
        unsigned long           mw_goal;
+#ifdef CONFIG_OCFS2_FS_STATS
+       unsigned long long      mw_lock_start;
+#endif
 };
 
 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
@@ -259,31 +258,6 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
        .flags          = 0,
 };
 
-/*
- * This is the filesystem locking protocol version.
- *
- * Whenever the filesystem does new things with locks (adds or removes a
- * lock, orders them differently, does different things underneath a lock),
- * the version must be changed.  The protocol is negotiated when joining
- * the dlm domain.  A node may join the domain if its major version is
- * identical to all other nodes and its minor version is greater than
- * or equal to all other nodes.  When its minor version is greater than
- * the other nodes, it will run at the minor version specified by the
- * other nodes.
- *
- * If a locking change is made that will not be compatible with older
- * versions, the major number must be increased and the minor version set
- * to zero.  If a change merely adds a behavior that can be disabled when
- * speaking to older versions, the minor version must be increased.  If a
- * change adds a fully backwards compatible change (eg, LVB changes that
- * are just ignored by older versions), the version does not need to be
- * updated.
- */
-const struct dlm_protocol_version ocfs2_locking_protocol = {
-       .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
-       .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
-};
-
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 {
        return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -341,12 +315,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
                                  struct buffer_head **bh);
 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 static inline int ocfs2_highest_compat_lock_level(int level);
-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
-                                     int new_level);
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+                                             int new_level);
 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                                  struct ocfs2_lock_res *lockres,
                                  int new_level,
-                                 int lvb);
+                                 int lvb,
+                                 unsigned int generation);
 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
                                        struct ocfs2_lock_res *lockres);
 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
@@ -395,6 +370,75 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
        spin_unlock(&ocfs2_dlm_tracking_lock);
 }
 
+#ifdef CONFIG_OCFS2_FS_STATS
+static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
+{
+       res->l_lock_num_prmode = 0;
+       res->l_lock_num_prmode_failed = 0;
+       res->l_lock_total_prmode = 0;
+       res->l_lock_max_prmode = 0;
+       res->l_lock_num_exmode = 0;
+       res->l_lock_num_exmode_failed = 0;
+       res->l_lock_total_exmode = 0;
+       res->l_lock_max_exmode = 0;
+       res->l_lock_refresh = 0;
+}
+
+static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
+                                   struct ocfs2_mask_waiter *mw, int ret)
+{
+       unsigned long long *num, *sum;
+       unsigned int *max, *failed;
+       struct timespec ts = current_kernel_time();
+       unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
+
+       if (level == LKM_PRMODE) {
+               num = &res->l_lock_num_prmode;
+               sum = &res->l_lock_total_prmode;
+               max = &res->l_lock_max_prmode;
+               failed = &res->l_lock_num_prmode_failed;
+       } else if (level == LKM_EXMODE) {
+               num = &res->l_lock_num_exmode;
+               sum = &res->l_lock_total_exmode;
+               max = &res->l_lock_max_exmode;
+               failed = &res->l_lock_num_exmode_failed;
+       } else
+               return;
+
+       (*num)++;
+       (*sum) += time;
+       if (time > *max)
+               *max = time;
+       if (ret)
+               (*failed)++;
+}
+
+static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
+{
+       lockres->l_lock_refresh++;
+}
+
+static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
+{
+       struct timespec ts = current_kernel_time();
+       mw->mw_lock_start = timespec_to_ns(&ts);
+}
+#else
+static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
+{
+}
+static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
+                          int level, struct ocfs2_mask_waiter *mw, int ret)
+{
+}
+static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
+{
+}
+static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
+{
+}
+#endif
+
 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *res,
                                       enum ocfs2_lock_type type,
@@ -414,6 +458,8 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
        res->l_flags         = OCFS2_LOCK_INITIALIZED;
 
        ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
+
+       ocfs2_init_lock_stats(res);
 }
 
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
@@ -766,6 +812,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
        return needs_downconvert;
 }
 
+/*
+ * OCFS2_LOCK_PENDING and l_pending_gen.
+ *
+ * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
+ * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
+ * for more details on the race.
+ *
+ * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
+ * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
+ * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
+ * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
+ * the caller is going to try to clear PENDING again.  If nothing else is
+ * happening, __lockres_clear_pending() sees PENDING is unset and does
+ * nothing.
+ *
+ * But what if another path (eg downconvert thread) has just started a
+ * new locking action?  The other path has re-set PENDING.  Our path
+ * cannot clear PENDING, because that will re-open the original race
+ * window.
+ *
+ * [Example]
+ *
+ * ocfs2_meta_lock()
+ *  ocfs2_cluster_lock()
+ *   set BUSY
+ *   set PENDING
+ *   drop l_lock
+ *   ocfs2_dlm_lock()
+ *    ocfs2_locking_ast()              ocfs2_downconvert_thread()
+ *     clear PENDING                    ocfs2_unblock_lock()
+ *                                       take_l_lock
+ *                                       !BUSY
+ *                                       ocfs2_prepare_downconvert()
+ *                                        set BUSY
+ *                                        set PENDING
+ *                                       drop l_lock
+ *   take l_lock
+ *   clear PENDING
+ *   drop l_lock
+ *                     <window>
+ *                                       ocfs2_dlm_lock()
+ *
+ * So as you can see, we now have a window where l_lock is not held,
+ * PENDING is not set, and ocfs2_dlm_lock() has not been called.
+ *
+ * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
+ * set by ocfs2_prepare_downconvert().  That wasn't nice.
+ *
+ * To solve this we introduce l_pending_gen.  A call to
+ * lockres_clear_pending() will only do so when it is passed a generation
+ * number that matches the lockres.  lockres_set_pending() will return the
+ * current generation number.  When ocfs2_cluster_lock() goes to clear
+ * PENDING, it passes the generation it got from set_pending().  In our
+ * example above, the generation numbers will *not* match.  Thus,
+ * ocfs2_cluster_lock() will not clear the PENDING set by
+ * ocfs2_prepare_downconvert().
+ */
+
+/* Unlocked version for ocfs2_locking_ast() */
+static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
+                                   unsigned int generation,
+                                   struct ocfs2_super *osb)
+{
+       assert_spin_locked(&lockres->l_lock);
+
+       /*
+        * The ast and locking functions can race us here.  The winner
+        * will clear pending, the loser will not.
+        */
+       if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
+           (lockres->l_pending_gen != generation))
+               return;
+
+       lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
+       lockres->l_pending_gen++;
+
+       /*
+        * The downconvert thread may have skipped us because we
+        * were PENDING.  Wake it up.
+        */
+       if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
+               ocfs2_wake_downconvert_thread(osb);
+}
+
+/* Locked version for callers of ocfs2_dlm_lock() */
+static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
+                                 unsigned int generation,
+                                 struct ocfs2_super *osb)
+{
+       unsigned long flags;
+
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       __lockres_clear_pending(lockres, generation, osb);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+}
+
+static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
+{
+       assert_spin_locked(&lockres->l_lock);
+       BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+
+       lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
+
+       return lockres->l_pending_gen;
+}
+
+
 static void ocfs2_blocking_ast(void *opaque, int level)
 {
        struct ocfs2_lock_res *lockres = opaque;
@@ -800,14 +953,22 @@ static void ocfs2_blocking_ast(void *opaque, int level)
 static void ocfs2_locking_ast(void *opaque)
 {
        struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        unsigned long flags;
+       int status;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 
-       if (ocfs2_dlm_lock_status(&lockres->l_lksb)) {
+       status = ocfs2_dlm_lock_status(&lockres->l_lksb);
+
+       if (status == -EAGAIN) {
+               lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+               goto out;
+       }
+
+       if (status) {
                mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
-                    lockres->l_name,
-                    ocfs2_dlm_lock_status(&lockres->l_lksb));
+                    lockres->l_name, status);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
                return;
        }
@@ -830,11 +991,23 @@ static void ocfs2_locking_ast(void *opaque)
                     lockres->l_unlock_action);
                BUG();
        }
-
+out:
        /* set it to something invalid so if we get called again we
         * can catch it. */
        lockres->l_action = OCFS2_AST_INVALID;
 
+       /* Did we try to cancel this lock?  Clear that state */
+       if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
+               lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+
+       /*
+        * We may have beaten the locking functions here.  We certainly
+        * know that dlm_lock() has been called :-)
+        * Because we can't have two lock calls in flight at once, we
+        * can use lockres->l_pending_gen.
+        */
+       __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
+
        wake_up(&lockres->l_event);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
@@ -868,6 +1041,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
 {
        int ret = 0;
        unsigned long flags;
+       unsigned int gen;
 
        mlog_entry_void();
 
@@ -884,15 +1058,17 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
        lockres->l_action = OCFS2_AST_ATTACH;
        lockres->l_requested = level;
        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+       gen = lockres_set_pending(lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       ret = ocfs2_dlm_lock(osb->dlm,
+       ret = ocfs2_dlm_lock(osb->cconn,
                             level,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
                             OCFS2_LOCK_ID_MAX_LEN - 1,
                             lockres);
+       lockres_clear_pending(lockres, gen, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 1);
@@ -947,6 +1123,7 @@ static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
 {
        INIT_LIST_HEAD(&mw->mw_item);
        init_completion(&mw->mw_complete);
+       ocfs2_init_start_time(mw);
 }
 
 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
@@ -1018,6 +1195,8 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
        int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
        int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
        unsigned long flags;
+       unsigned int gen;
+       int noqueue_attempted = 0;
 
        mlog_entry_void();
 
@@ -1062,6 +1241,13 @@ again:
        }
 
        if (level > lockres->l_level) {
+               if (noqueue_attempted > 0) {
+                       ret = -EAGAIN;
+                       goto unlock;
+               }
+               if (lkm_flags & DLM_LKF_NOQUEUE)
+                       noqueue_attempted = 1;
+
                if (lockres->l_action != OCFS2_AST_INVALID)
                        mlog(ML_ERROR, "lockres %s has action %u pending\n",
                             lockres->l_name, lockres->l_action);
@@ -1076,6 +1262,7 @@ again:
 
                lockres->l_requested = level;
                lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+               gen = lockres_set_pending(lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
 
                BUG_ON(level == DLM_LOCK_IV);
@@ -1085,13 +1272,14 @@ again:
                     lockres->l_name, lockres->l_level, level);
 
                /* call dlm_lock to upgrade lock now */
-               ret = ocfs2_dlm_lock(osb->dlm,
+               ret = ocfs2_dlm_lock(osb->cconn,
                                     level,
                                     &lockres->l_lksb,
                                     lkm_flags,
                                     lockres->l_name,
                                     OCFS2_LOCK_ID_MAX_LEN - 1,
                                     lockres);
+               lockres_clear_pending(lockres, gen, osb);
                if (ret) {
                        if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
                            (ret != -EAGAIN)) {
@@ -1142,6 +1330,7 @@ out:
                        goto again;
                mlog_errno(ret);
        }
+       ocfs2_update_lock_stats(lockres, level, &mw, ret);
 
        mlog_exit(ret);
        return ret;
@@ -1442,8 +1631,8 @@ out:
  */
 int ocfs2_file_lock(struct file *file, int ex, int trylock)
 {
-       int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
-       unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
+       int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
+       unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
        unsigned long flags;
        struct ocfs2_file_private *fp = file->private_data;
        struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1470,7 +1659,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
                 * Get the lock at NLMODE to start - that way we
                 * can cancel the upconvert request if need be.
                 */
-               ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
+               ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
                if (ret < 0) {
                        mlog_errno(ret);
                        goto out;
@@ -1485,14 +1674,14 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
        }
 
        lockres->l_action = OCFS2_AST_CONVERT;
-       lkm_flags |= LKM_CONVERT;
+       lkm_flags |= DLM_LKF_CONVERT;
        lockres->l_requested = level;
        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
+       ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
                             lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
                             lockres);
        if (ret) {
@@ -1524,6 +1713,10 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
                 * to just bubble sucess back up to the user.
                 */
                ret = ocfs2_flock_handle_signal(lockres, level);
+       } else if (!ret && (level > lockres->l_level)) {
+               /* Trylock failed asynchronously */
+               BUG_ON(!trylock);
+               ret = -EAGAIN;
        }
 
 out:
@@ -1536,6 +1729,7 @@ out:
 void ocfs2_file_unlock(struct file *file)
 {
        int ret;
+       unsigned int gen;
        unsigned long flags;
        struct ocfs2_file_private *fp = file->private_data;
        struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1547,7 +1741,7 @@ void ocfs2_file_unlock(struct file *file)
        if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
                return;
 
-       if (lockres->l_level == LKM_NLMODE)
+       if (lockres->l_level == DLM_LOCK_NL)
                return;
 
        mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
@@ -1561,11 +1755,11 @@ void ocfs2_file_unlock(struct file *file)
        lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
        lockres->l_blocking = DLM_LOCK_EX;
 
-       ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
+       gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
+       ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
        if (ret) {
                mlog_errno(ret);
                return;
@@ -1866,6 +2060,7 @@ static int ocfs2_inode_lock_update(struct inode *inode,
                                le32_to_cpu(fe->i_flags));
 
                ocfs2_refresh_inode(inode, fe);
+               ocfs2_track_lock_refresh(lockres);
        }
 
        status = 0;
@@ -2150,6 +2345,7 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
 
                if (status < 0)
                        mlog_errno(status);
+               ocfs2_track_lock_refresh(lockres);
        }
 bail:
        mlog_exit(status);
@@ -2344,7 +2540,7 @@ static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
 }
 
 /* So that debugfs.ocfs2 can determine which format is being used */
-#define OCFS2_DLM_DEBUG_STR_VERSION 1
+#define OCFS2_DLM_DEBUG_STR_VERSION 2
 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
 {
        int i;
@@ -2385,6 +2581,47 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
        for(i = 0; i < DLM_LVB_LEN; i++)
                seq_printf(m, "0x%x\t", lvb[i]);
 
+#ifdef CONFIG_OCFS2_FS_STATS
+# define lock_num_prmode(_l)           (_l)->l_lock_num_prmode
+# define lock_num_exmode(_l)           (_l)->l_lock_num_exmode
+# define lock_num_prmode_failed(_l)    (_l)->l_lock_num_prmode_failed
+# define lock_num_exmode_failed(_l)    (_l)->l_lock_num_exmode_failed
+# define lock_total_prmode(_l)         (_l)->l_lock_total_prmode
+# define lock_total_exmode(_l)         (_l)->l_lock_total_exmode
+# define lock_max_prmode(_l)           (_l)->l_lock_max_prmode
+# define lock_max_exmode(_l)           (_l)->l_lock_max_exmode
+# define lock_refresh(_l)              (_l)->l_lock_refresh
+#else
+# define lock_num_prmode(_l)           (0ULL)
+# define lock_num_exmode(_l)           (0ULL)
+# define lock_num_prmode_failed(_l)    (0)
+# define lock_num_exmode_failed(_l)    (0)
+# define lock_total_prmode(_l)         (0ULL)
+# define lock_total_exmode(_l)         (0ULL)
+# define lock_max_prmode(_l)           (0)
+# define lock_max_exmode(_l)           (0)
+# define lock_refresh(_l)              (0)
+#endif
+       /* The following seq_print was added in version 2 of this output */
+       seq_printf(m, "%llu\t"
+                  "%llu\t"
+                  "%u\t"
+                  "%u\t"
+                  "%llu\t"
+                  "%llu\t"
+                  "%u\t"
+                  "%u\t"
+                  "%u\t",
+                  lock_num_prmode(lockres),
+                  lock_num_exmode(lockres),
+                  lock_num_prmode_failed(lockres),
+                  lock_num_exmode_failed(lockres),
+                  lock_total_prmode(lockres),
+                  lock_total_exmode(lockres),
+                  lock_max_prmode(lockres),
+                  lock_max_exmode(lockres),
+                  lock_refresh(lockres));
+
        /* End the line */
        seq_printf(m, "\n");
        return 0;
@@ -2485,13 +2722,14 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
 int ocfs2_dlm_init(struct ocfs2_super *osb)
 {
        int status = 0;
-       u32 dlm_key;
-       struct dlm_ctxt *dlm = NULL;
+       struct ocfs2_cluster_connection *conn = NULL;
 
        mlog_entry_void();
 
-       if (ocfs2_mount_local(osb))
+       if (ocfs2_mount_local(osb)) {
+               osb->node_num = 0;
                goto local;
+       }
 
        status = ocfs2_dlm_init_debug(osb);
        if (status < 0) {
@@ -2508,26 +2746,31 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
                goto bail;
        }
 
-       /* used by the dlm code to make message headers unique, each
-        * node in this domain must agree on this. */
-       dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
-
        /* for now, uuid == domain */
-       dlm = dlm_register_domain(osb->uuid_str, dlm_key,
-                                 &osb->osb_locking_proto);
-       if (IS_ERR(dlm)) {
-               status = PTR_ERR(dlm);
+       status = ocfs2_cluster_connect(osb->osb_cluster_stack,
+                                      osb->uuid_str,
+                                      strlen(osb->uuid_str),
+                                      ocfs2_do_node_down, osb,
+                                      &conn);
+       if (status) {
                mlog_errno(status);
                goto bail;
        }
 
-       dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
+       status = ocfs2_cluster_this_node(&osb->node_num);
+       if (status < 0) {
+               mlog_errno(status);
+               mlog(ML_ERROR,
+                    "could not find this host's node number\n");
+               ocfs2_cluster_disconnect(conn, 0);
+               goto bail;
+       }
 
 local:
        ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
        ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
 
-       osb->dlm = dlm;
+       osb->cconn = conn;
 
        status = 0;
 bail:
@@ -2541,14 +2784,19 @@ bail:
        return status;
 }
 
-void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
+void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
+                       int hangup_pending)
 {
        mlog_entry_void();
 
-       dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
-
        ocfs2_drop_osb_locks(osb);
 
+       /*
+        * Now that we have dropped all locks and ocfs2_dismount_volume()
+        * has disabled recovery, the DLM won't be talking to us.  It's
+        * safe to tear things down before disconnecting the cluster.
+        */
+
        if (osb->dc_task) {
                kthread_stop(osb->dc_task);
                osb->dc_task = NULL;
@@ -2557,8 +2805,8 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
        ocfs2_lock_res_free(&osb->osb_super_lockres);
        ocfs2_lock_res_free(&osb->osb_rename_lockres);
 
-       dlm_unregister_domain(osb->dlm);
-       osb->dlm = NULL;
+       ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
+       osb->cconn = NULL;
 
        ocfs2_dlm_shutdown_debug(osb);
 
@@ -2576,23 +2824,7 @@ static void ocfs2_unlock_ast(void *opaque, int error)
             lockres->l_unlock_action);
 
        spin_lock_irqsave(&lockres->l_lock, flags);
-       /* We tried to cancel a convert request, but it was already
-        * granted. All we want to do here is clear our unlock
-        * state. The wake_up call done at the bottom is redundant
-        * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
-        * hurt anything anyway */
-       if (error == -DLM_ECANCEL &&
-           lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
-               mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
-
-               /* We don't clear the busy flag in this case as it
-                * should have been cleared by the ast which the dlm
-                * has called. */
-               goto complete_unlock;
-       }
-
-       /* DLM_EUNLOCK is the success code for unlock */
-       if (error != -DLM_EUNLOCK) {
+       if (error) {
                mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
                     "unlock_action %d\n", error, lockres->l_name,
                     lockres->l_unlock_action);
@@ -2613,7 +2845,6 @@ static void ocfs2_unlock_ast(void *opaque, int error)
        }
 
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
-complete_unlock:
        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
@@ -2689,13 +2920,12 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        mlog(0, "lock %s\n", lockres->l_name);
 
-       ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
                               lockres);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
-               /* XXX Need to abstract this */
-               dlm_print_one_lock(lockres->l_lksb.lksb_o2dlm.lockid);
+               ocfs2_dlm_dump_lksb(&lockres->l_lksb);
                BUG();
        }
        mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
@@ -2789,8 +3019,8 @@ int ocfs2_drop_inode_locks(struct inode *inode)
        return status;
 }
 
-static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
-                                     int new_level)
+static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
+                                             int new_level)
 {
        assert_spin_locked(&lockres->l_lock);
 
@@ -2808,12 +3038,14 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
        lockres->l_action = OCFS2_AST_DOWNCONVERT;
        lockres->l_requested = new_level;
        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+       return lockres_set_pending(lockres);
 }
 
 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                                  struct ocfs2_lock_res *lockres,
                                  int new_level,
-                                 int lvb)
+                                 int lvb,
+                                 unsigned int generation)
 {
        int ret;
        u32 dlm_flags = DLM_LKF_CONVERT;
@@ -2823,13 +3055,14 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
        if (lvb)
                dlm_flags |= DLM_LKF_VALBLK;
 
-       ret = ocfs2_dlm_lock(osb->dlm,
+       ret = ocfs2_dlm_lock(osb->cconn,
                             new_level,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
                             OCFS2_LOCK_ID_MAX_LEN - 1,
                             lockres);
+       lockres_clear_pending(lockres, generation, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 1);
@@ -2882,7 +3115,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
        mlog_entry_void();
        mlog(0, "lock %s\n", lockres->l_name);
 
-       ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb,
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
                               DLM_LKF_CANCEL, lockres);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -2904,6 +3137,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
        int new_level;
        int ret = 0;
        int set_lvb = 0;
+       unsigned int gen;
 
        mlog_entry_void();
 
@@ -2913,6 +3147,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
 
 recheck:
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+               /* XXX
+                * This is a *big* race.  The OCFS2_LOCK_PENDING flag
+                * exists entirely for one reason - another thread has set
+                * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
+                *
+                * If we do ocfs2_cancel_convert() before the other thread
+                * calls dlm_lock(), our cancel will do nothing.  We will
+                * get no ast, and we will have no way of knowing the
+                * cancel failed.  Meanwhile, the other thread will call
+                * into dlm_lock() and wait...forever.
+                *
+                * Why forever?  Because another node has asked for the
+                * lock first; that's why we're here in unblock_lock().
+                *
+                * The solution is OCFS2_LOCK_PENDING.  When PENDING is
+                * set, we just requeue the unblock.  Only when the other
+                * thread has called dlm_lock() and cleared PENDING will
+                * we then cancel their request.
+                *
+                * All callers of dlm_lock() must set OCFS2_DLM_PENDING
+                * at the same time they set OCFS2_DLM_BUSY.  They must
+                * clear OCFS2_DLM_PENDING after dlm_lock() returns.
+                */
+               if (lockres->l_flags & OCFS2_LOCK_PENDING)
+                       goto leave_requeue;
+
                ctl->requeue = 1;
                ret = ocfs2_prepare_cancel_convert(osb, lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2992,9 +3252,11 @@ downconvert:
                        lockres->l_ops->set_lvb(lockres);
        }
 
-       ocfs2_prepare_downconvert(lockres, new_level);
+       gen = ocfs2_prepare_downconvert(lockres, new_level);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
-       ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
+       ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
+                                    gen);
+
 leave:
        mlog_exit(ret);
        return ret;
@@ -3193,22 +3455,44 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
        return UNBLOCK_CONTINUE_POST;
 }
 
+/*
+ * This is the filesystem locking protocol.  It provides the lock handling
+ * hooks for the underlying DLM.  It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed.  The protocol is negotiated when joining
+ * the dlm domain.  A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes.  When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero.  If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased.  If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
 static struct ocfs2_locking_protocol lproto = {
+       .lp_max_version = {
+               .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+               .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+       },
        .lp_lock_ast            = ocfs2_locking_ast,
        .lp_blocking_ast        = ocfs2_blocking_ast,
        .lp_unlock_ast          = ocfs2_unlock_ast,
 };
 
-/* This interface isn't the final one, hence the less-than-perfect names */
-void dlmglue_init_stack(void)
+void ocfs2_set_locking_protocol(void)
 {
-       o2cb_get_stack(&lproto);
+       ocfs2_stack_glue_set_locking_protocol(&lproto);
 }
 
-void dlmglue_exit_stack(void)
-{
-       o2cb_put_stack();
-}
 
 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *lockres)