ocfs2_dlm: Silence a failed convert
[safe/jmp/linux-2.6] / fs / ocfs2 / dlm / dlmrecovery.c
index 22e6a5b..e57636c 100644 (file)
@@ -95,11 +95,14 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
 static void dlm_request_all_locks_worker(struct dlm_work_item *item,
                                         void *data);
 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     u8 *real_master);
 
 static u64 dlm_get_next_mig_cookie(void);
 
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(dlm_reco_state_lock);
+static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
 static u64 dlm_mig_cookie = 1;
 
 static u64 dlm_get_next_mig_cookie(void)
@@ -150,18 +153,16 @@ static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
 }
 
 /* Worker function used during recovery. */
-void dlm_dispatch_work(void *data)
+void dlm_dispatch_work(struct work_struct *work)
 {
-       struct dlm_ctxt *dlm = (struct dlm_ctxt *)data;
+       struct dlm_ctxt *dlm =
+               container_of(work, struct dlm_ctxt, dispatched_work);
        LIST_HEAD(tmp_list);
        struct list_head *iter, *iter2;
        struct dlm_work_item *item;
        dlm_workfunc_t *workfunc;
        int tot=0;
 
-       if (!dlm_joined(dlm))
-               return;
-
        spin_lock(&dlm->work_lock);
        list_splice_init(&dlm->work_list, &tmp_list);
        spin_unlock(&dlm->work_lock);
@@ -354,7 +355,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
 
 /* returns true if node is no longer in the domain
  * could be dead or just not joined */
-int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
+static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
 {
        int recovered;
        spin_lock(&dlm->spinlock);
@@ -753,7 +754,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
                }
                BUG_ON(num == dead_node);
 
-               ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
+               ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
                if (!ndata) {
                        dlm_destroy_recovery_area(dlm, dead_node);
                        return -ENOMEM;
@@ -838,7 +839,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        }
        BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
-       item = kcalloc(1, sizeof(*item), GFP_NOFS);
+       item = kzalloc(sizeof(*item), GFP_NOFS);
        if (!item) {
                dlm_put(dlm);
                return -ENOMEM;
@@ -1125,6 +1126,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
        if (total_locks == mres_total_locks)
                mres->flags |= DLM_MRES_ALL_DONE;
 
+       mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+            send_to);
+
        /* send it */
        ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
                                 sz, send_to, &status);
@@ -1209,6 +1215,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
        return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+                              struct dlm_migratable_lockres *mres)
+{
+       struct dlm_lock dummy;
+       memset(&dummy, 0, sizeof(dummy));
+       dummy.ml.cookie = 0;
+       dummy.ml.type = LKM_IVMODE;
+       dummy.ml.convert_type = LKM_IVMODE;
+       dummy.ml.highest_blocked = LKM_IVMODE;
+       dummy.lksb = NULL;
+       dummy.ml.node = dlm->node_num;
+       dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+                                   struct dlm_migratable_lock *ml,
+                                   u8 *nodenum)
+{
+       if (unlikely(ml->cookie == 0 &&
+           ml->type == LKM_IVMODE &&
+           ml->convert_type == LKM_IVMODE &&
+           ml->highest_blocked == LKM_IVMODE &&
+           ml->list == DLM_BLOCKED_LIST)) {
+               *nodenum = ml->node;
+               return 1;
+       }
+       return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                         struct dlm_migratable_lockres *mres,
@@ -1256,6 +1290,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                                goto error;
                }
        }
+       if (total_locks == 0) {
+               /* send a dummy lock to indicate a mastery reference only */
+               mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+                    dlm->name, res->lockname.len, res->lockname.name,
+                    send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+                    "migration");
+               dlm_add_dummy_lock(dlm, mres);
+       }
        /* flush any remaining locks */
        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
        if (ret < 0)
@@ -1319,7 +1361,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
 
        ret = -ENOMEM;
        buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
-       item = kcalloc(1, sizeof(*item), GFP_NOFS);
+       item = kzalloc(sizeof(*item), GFP_NOFS);
        if (!buf || !item)
                goto leave;
 
@@ -1378,17 +1420,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                spin_lock(&res->spinlock);
                res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
                spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
 
                /* add an extra ref for just-allocated lockres 
                 * otherwise the lockres will be purged immediately */
                dlm_lockres_get(res);
-
        }
 
        /* at this point we have allocated everything we need,
         * and we have a hashed lockres with an extra ref and
         * the proper res->state flags. */
        ret = 0;
+       spin_lock(&res->spinlock);
+       /* drop this either when master requery finds a different master
+        * or when a lock is added by the recovery worker */
+       dlm_lockres_grab_inflight_ref(dlm, res);
        if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
                /* migration cannot have an unknown master */
                BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1396,10 +1442,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                          "unknown owner.. will need to requery: "
                          "%.*s\n", mres->lockname_len, mres->lockname);
        } else {
-               spin_lock(&res->spinlock);
+               /* take a reference now to pin the lockres, drop it
+                * when locks are added in the worker */
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
-               spin_unlock(&res->spinlock);
        }
+       spin_unlock(&res->spinlock);
 
        /* queue up work for dlm_mig_lockres_worker */
        dlm_grab(dlm);  /* get an extra ref for the work item */
@@ -1455,6 +1502,9 @@ again:
                                   "this node will take it.\n",
                                   res->lockname.len, res->lockname.name);
                } else {
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_drop_inflight_ref(dlm, res);
+                       spin_unlock(&res->spinlock);
                        mlog(0, "master needs to respond to sender "
                                  "that node %u still owns %.*s\n",
                                  real_master, res->lockname.len,
@@ -1484,8 +1534,9 @@ leave:
 
 
 
-int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                              struct dlm_lock_resource *res, u8 *real_master)
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     u8 *real_master)
 {
        struct dlm_node_iter iter;
        int nodenum;
@@ -1655,21 +1706,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 {
        struct dlm_migratable_lock *ml;
        struct list_head *queue;
+       struct list_head *tmpq = NULL;
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i, bad;
+       int i, j, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
+       u8 from = O2NM_MAX_NODES;
+       unsigned int added = 0;
 
        mlog(0, "running %d locks for this lockres\n", mres->num_locks);
        for (i=0; i<mres->num_locks; i++) {
                ml = &(mres->ml[i]);
+
+               if (dlm_is_dummy_lock(dlm, ml, &from)) {
+                       /* placeholder, just need to set the refmap bit */
+                       BUG_ON(mres->num_locks != 1);
+                       mlog(0, "%s:%.*s: dummy lock for %u\n",
+                            dlm->name, mres->lockname_len, mres->lockname,
+                            from);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(from, res);
+                       spin_unlock(&res->spinlock);
+                       added++;
+                       break;
+               }
                BUG_ON(ml->highest_blocked != LKM_IVMODE);
                newlock = NULL;
                lksb = NULL;
 
                queue = dlm_list_num_to_pointer(res, ml->list);
+               tmpq = NULL;
 
                /* if the lock is for the local node it needs to
                 * be moved to the proper location within the queue.
@@ -1679,11 +1747,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
 
                        spin_lock(&res->spinlock);
-                       list_for_each(iter, queue) {
-                               lock = list_entry (iter, struct dlm_lock, list);
-                               if (lock->ml.cookie != ml->cookie)
-                                       lock = NULL;
-                               else
+                       for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+                               tmpq = dlm_list_idx_to_ptr(res, j);
+                               list_for_each(iter, tmpq) {
+                                       lock = list_entry (iter, struct dlm_lock, list);
+                                       if (lock->ml.cookie != ml->cookie)
+                                               lock = NULL;
+                                       else
+                                               break;
+                               }
+                               if (lock)
                                        break;
                        }
 
@@ -1695,10 +1768,18 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                               "with cookie %u:%llu!\n",
                                               dlm_get_lock_cookie_node(c),
                                               dlm_get_lock_cookie_seq(c));
+                               __dlm_print_one_lock_resource(res);
                                BUG();
                        }
                        BUG_ON(lock->ml.node != ml->node);
 
+                       if (tmpq != queue) {
+                               mlog(0, "lock was on %u instead of %u for %.*s\n",
+                                    j, ml->list, res->lockname.len, res->lockname.name);
+                               spin_unlock(&res->spinlock);
+                               continue;
+                       }
+
                        /* see NOTE above about why we do not update
                         * to match the master here */
 
@@ -1706,6 +1787,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* do not alter lock refcount.  switching lists. */
                        list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
+                       added++;
 
                        mlog(0, "just reordered a local lock!\n");
                        continue;
@@ -1812,12 +1894,22 @@ skip_lvb:
                if (!bad) {
                        dlm_lock_get(newlock);
                        list_add_tail(&newlock->list, queue);
+                       mlog(0, "%s:%.*s: added lock for node %u, "
+                            "setting refmap bit\n", dlm->name,
+                            res->lockname.len, res->lockname.name, ml->node);
+                       dlm_lockres_set_refmap_bit(ml->node, res);
+                       added++;
                }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
 
 leave:
+       /* balance the ref taken when the work was queued */
+       spin_lock(&res->spinlock);
+       dlm_lockres_drop_inflight_ref(dlm, res);
+       spin_unlock(&res->spinlock);
+
        if (ret < 0) {
                mlog_errno(ret);
                if (newlock)
@@ -1930,9 +2022,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                if (res->owner == dead_node) {
                        list_del_init(&res->recovering);
                        spin_lock(&res->spinlock);
+                       /* new_master has our reference from
+                        * the lock state sent during recovery */
                        dlm_change_lockres_owner(dlm, res, new_master);
                        res->state &= ~DLM_LOCK_RES_RECOVERING;
-                       if (!__dlm_lockres_unused(res))
+                       if (__dlm_lockres_has_locks(res))
                                __dlm_dirty_lockres(dlm, res);
                        spin_unlock(&res->spinlock);
                        wake_up(&res->wq);
@@ -1972,9 +2066,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                        dlm_lockres_put(res);
                                }
                                spin_lock(&res->spinlock);
+                               /* new_master has our reference from
+                                * the lock state sent during recovery */
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
-                               if (!__dlm_lockres_unused(res))
+                               if (__dlm_lockres_has_locks(res))
                                        __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                                wake_up(&res->wq);
@@ -2043,6 +2139,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
 {
        struct list_head *iter, *tmpiter;
        struct dlm_lock *lock;
+       unsigned int freed = 0;
 
        /* this node is the lockres master:
         * 1) remove any stale locks for the dead node
@@ -2057,6 +2154,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2064,6 +2162,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2071,9 +2170,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
 
+       if (freed) {
+               mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+                    "dropping ref from lockres\n", dlm->name,
+                    res->lockname.len, res->lockname.name, freed, dead_node);
+               BUG_ON(!test_bit(dead_node, res->refmap));
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       } else if (test_bit(dead_node, res->refmap)) {
+               mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+                    "no locks and had not purged before dying\n", dlm->name,
+                    res->lockname.len, res->lockname.name, dead_node);
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       }
+
        /* do not kick thread yet */
        __dlm_dirty_lockres(dlm, res);
 }
@@ -2136,9 +2249,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
                        spin_lock(&res->spinlock);
                        /* zero the lvb if necessary */
                        dlm_revalidate_lvb(dlm, res, dead_node);
-                       if (res->owner == dead_node)
+                       if (res->owner == dead_node) {
+                               if (res->state & DLM_LOCK_RES_DROPPING_REF)
+                                       mlog(0, "%s:%.*s: owned by "
+                                            "dead node %u, this node was "
+                                            "dropping its ref when it died. "
+                                            "continue, dropping the flag.\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, dead_node);
+
+                               /* the wake_up for this will happen when the
+                                * RECOVERING flag is dropped later */
+                               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
                                dlm_move_lockres_to_recovery_list(dlm, res);
-                       else if (res->owner == dlm->node_num) {
+                       else if (res->owner == dlm->node_num) {
                                dlm_free_dead_locks(dlm, res, dead_node);
                                __dlm_lockres_calc_usage(dlm, res);
                        }
@@ -2281,7 +2406,8 @@ again:
        memset(&lksb, 0, sizeof(lksb));
 
        ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
-                     DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+                     DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+                     dlm_reco_ast, dlm, dlm_reco_bast);
 
        mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
             dlm->name, ret, lksb.status);