WorkStruct: make allyesconfig
[safe/jmp/linux-2.6] / fs / ocfs2 / dlm / dlmrecovery.c
index 4e0aada..fb3e2b0 100644 (file)
@@ -95,11 +95,14 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
 static void dlm_request_all_locks_worker(struct dlm_work_item *item,
                                         void *data);
 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     u8 *real_master);
 
 static u64 dlm_get_next_mig_cookie(void);
 
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(dlm_reco_state_lock);
+static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
 static u64 dlm_mig_cookie = 1;
 
 static u64 dlm_get_next_mig_cookie(void)
@@ -150,19 +153,29 @@ static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
 }
 
 /* Worker function used during recovery. */
-void dlm_dispatch_work(void *data)
+void dlm_dispatch_work(struct work_struct *work)
 {
-       struct dlm_ctxt *dlm = (struct dlm_ctxt *)data;
+       struct dlm_ctxt *dlm =
+               container_of(work, struct dlm_ctxt, dispatched_work);
        LIST_HEAD(tmp_list);
        struct list_head *iter, *iter2;
        struct dlm_work_item *item;
        dlm_workfunc_t *workfunc;
+       int tot=0;
+
+       if (!dlm_joined(dlm))
+               return;
 
        spin_lock(&dlm->work_lock);
        list_splice_init(&dlm->work_list, &tmp_list);
        spin_unlock(&dlm->work_lock);
 
        list_for_each_safe(iter, iter2, &tmp_list) {
+               tot++;
+       }
+       mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
+
+       list_for_each_safe(iter, iter2, &tmp_list) {
                item = list_entry(iter, struct dlm_work_item, list);
                workfunc = item->func;
                list_del_init(&item->list);
@@ -343,6 +356,18 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
        return dead;
 }
 
+/* returns true if node is no longer in the domain
+ * could be dead or just not joined */
+static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
+{
+       int recovered;
+       spin_lock(&dlm->spinlock);
+       recovered = !test_bit(node, dlm->recovery_map);
+       spin_unlock(&dlm->spinlock);
+       return recovered;
+}
+
+
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
 {
        if (timeout) {
@@ -361,6 +386,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
        return 0;
 }
 
+int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
+{
+       if (timeout) {
+               mlog(0, "%s: waiting %dms for notification of "
+                    "recovery of node %u\n", dlm->name, timeout, node);
+               wait_event_timeout(dlm->dlm_reco_thread_wq,
+                          dlm_is_node_recovered(dlm, node),
+                          msecs_to_jiffies(timeout));
+       } else {
+               mlog(0, "%s: waiting indefinitely for notification "
+                    "of recovery of node %u\n", dlm->name, node);
+               wait_event(dlm->dlm_reco_thread_wq,
+                          dlm_is_node_recovered(dlm, node));
+       }
+       /* for now, return 0 */
+       return 0;
+}
+
 /* callers of the top-level api calls (dlmlock/dlmunlock) should
  * block on the dlm->reco.event when recovery is in progress.
  * the dlm recovery thread will set this state when it begins
@@ -379,6 +422,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
 
 void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
 {
+       if (dlm_in_recovery(dlm)) {
+               mlog(0, "%s: reco thread %d in recovery: "
+                    "state=%d, master=%u, dead=%u\n",
+                    dlm->name, dlm->dlm_reco_thread_task->pid,
+                    dlm->reco.state, dlm->reco.new_master,
+                    dlm->reco.dead_node);
+       }
        wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
 }
 
@@ -707,7 +757,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
                }
                BUG_ON(num == dead_node);
 
-               ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL);
+               ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
                if (!ndata) {
                        dlm_destroy_recovery_area(dlm, dead_node);
                        return -ENOMEM;
@@ -792,14 +842,14 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        }
        BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
-       item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+       item = kcalloc(1, sizeof(*item), GFP_NOFS);
        if (!item) {
                dlm_put(dlm);
                return -ENOMEM;
        }
 
        /* this will get freed by dlm_request_all_locks_worker */
-       buf = (char *) __get_free_page(GFP_KERNEL);
+       buf = (char *) __get_free_page(GFP_NOFS);
        if (!buf) {
                kfree(item);
                dlm_put(dlm);
@@ -814,7 +864,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        spin_lock(&dlm->work_lock);
        list_add_tail(&item->list, &dlm->work_list);
        spin_unlock(&dlm->work_lock);
-       schedule_work(&dlm->dispatched_work);
+       queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
        dlm_put(dlm);
        return 0;
@@ -1272,8 +1322,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                mlog(0, "all done flag.  all lockres data received!\n");
 
        ret = -ENOMEM;
-       buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL);
-       item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+       buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
+       item = kcalloc(1, sizeof(*item), GFP_NOFS);
        if (!buf || !item)
                goto leave;
 
@@ -1364,7 +1414,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
        spin_lock(&dlm->work_lock);
        list_add_tail(&item->list, &dlm->work_list);
        spin_unlock(&dlm->work_lock);
-       schedule_work(&dlm->dispatched_work);
+       queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
 leave:
        dlm_put(dlm);
@@ -1438,8 +1488,9 @@ leave:
 
 
 
-int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                              struct dlm_lock_resource *res, u8 *real_master)
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     u8 *real_master)
 {
        struct dlm_node_iter iter;
        int nodenum;
@@ -2235,7 +2286,8 @@ again:
        memset(&lksb, 0, sizeof(lksb));
 
        ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
-                     DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+                     DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+                     dlm_reco_ast, dlm, dlm_reco_bast);
 
        mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
             dlm->name, ret, lksb.status);
@@ -2541,6 +2593,7 @@ stage2:
                                mlog(ML_ERROR, "node %u went down after this "
                                     "node finished recovery.\n", nodenum);
                                ret = 0;
+                               continue;
                        }
                        break;
                }