dlm: check for null in device_write
[safe/jmp/linux-2.6] / fs / dlm / recover.c
index 34876f6..80aba5b 100644 (file)
@@ -44,7 +44,7 @@
 static void dlm_wait_timer_fn(unsigned long data)
 {
        struct dlm_ls *ls = (struct dlm_ls *) data;
-       mod_timer(&ls->ls_timer, jiffies + (dlm_config.recover_timer * HZ));
+       mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
        wake_up(&ls->ls_wait_general);
 }
 
@@ -55,7 +55,7 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
        init_timer(&ls->ls_timer);
        ls->ls_timer.function = dlm_wait_timer_fn;
        ls->ls_timer.data = (long) ls;
-       ls->ls_timer.expires = jiffies + (dlm_config.recover_timer * HZ);
+       ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
        add_timer(&ls->ls_timer);
 
        wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
@@ -94,7 +94,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 
 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
 {
-       struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
+       struct dlm_rcom *rc = ls->ls_recover_buf;
        struct dlm_member *memb;
        int error = 0, delay;
 
@@ -123,7 +123,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
 
 static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
 {
-       struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
+       struct dlm_rcom *rc = ls->ls_recover_buf;
        int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 
        for (;;) {
@@ -252,6 +252,7 @@ static void recover_list_clear(struct dlm_ls *ls)
        spin_lock(&ls->ls_recover_list_lock);
        list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
                list_del_init(&r->res_recover_list);
+               r->res_recover_locks_count = 0;
                dlm_put_rsb(r);
                ls->ls_recover_list_count--;
        }
@@ -305,7 +306,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
 /*
  * Propogate the new master nodeid to locks
  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() which rsb's to consider.
+ * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * rsb's to consider.
  */
 
 static void set_new_master(struct dlm_rsb *r, int nodeid)
@@ -395,7 +397,9 @@ int dlm_recover_masters(struct dlm_ls *ls)
 
                if (dlm_no_directory(ls))
                        count += recover_master_static(r);
-               else if (!is_master(r) && dlm_is_removed(ls, r->res_nodeid)) {
+               else if (!is_master(r) &&
+                        (dlm_is_removed(ls, r->res_nodeid) ||
+                         rsb_flag(r, RSB_NEW_MASTER))) {
                        recover_master(r);
                        count++;
                }
@@ -472,24 +476,13 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
        return error;
 }
 
-static int all_queues_empty(struct dlm_rsb *r)
-{
-       if (!list_empty(&r->res_grantqueue) ||
-           !list_empty(&r->res_convertqueue) ||
-           !list_empty(&r->res_waitqueue))
-               return 0;
-       return 1;
-}
-
 static int recover_locks(struct dlm_rsb *r)
 {
        int error = 0;
 
        lock_rsb(r);
-       if (all_queues_empty(r))
-               goto out;
 
-       DLM_ASSERT(!r->res_recover_locks_count, dlm_print_rsb(r););
+       DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 
        error = recover_locks_queue(r, &r->res_grantqueue);
        if (error)
@@ -556,7 +549,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
 
 void dlm_recovered_lock(struct dlm_rsb *r)
 {
-       DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_print_rsb(r););
+       DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 
        r->res_recover_locks_count--;
        if (!r->res_recover_locks_count) {
@@ -636,7 +629,7 @@ static void recover_lvb(struct dlm_rsb *r)
                goto out;
 
        if (!r->res_lvbptr) {
-               r->res_lvbptr = allocate_lvb(r->res_ls);
+               r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
                if (!r->res_lvbptr)
                        goto out;
        }
@@ -681,6 +674,16 @@ static void recover_conversion(struct dlm_rsb *r)
        }
 }
 
+/* We've become the new master for this rsb and waiting/converting locks may
+   need to be granted in dlm_grant_after_purge() due to locks that may have
+   existed from a removed node. */
+
+static void set_locks_purged(struct dlm_rsb *r)
+{
+       if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+               rsb_set_flag(r, RSB_LOCKS_PURGED);
+}
+
 void dlm_recover_rsbs(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
@@ -694,10 +697,13 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
                if (is_master(r)) {
                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
                                recover_conversion(r);
+                       if (rsb_flag(r, RSB_NEW_MASTER2))
+                               set_locks_purged(r);
                        recover_lvb(r);
                        count++;
                }
                rsb_clear_flag(r, RSB_RECOVER_CONVERT);
+               rsb_clear_flag(r, RSB_NEW_MASTER2);
                unlock_rsb(r);
        }
        up_read(&ls->ls_root_sem);
@@ -725,6 +731,20 @@ int dlm_create_root_list(struct dlm_ls *ls)
                        list_add(&r->res_root_list, &ls->ls_root_list);
                        dlm_hold_rsb(r);
                }
+
+               /* If we're using a directory, add tossed rsbs to the root
+                  list; they'll have entries created in the new directory,
+                  but no other recovery steps should do anything with them. */
+
+               if (dlm_no_directory(ls)) {
+                       read_unlock(&ls->ls_rsbtbl[i].lock);
+                       continue;
+               }
+
+               list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+                       list_add(&r->res_root_list, &ls->ls_root_list);
+                       dlm_hold_rsb(r);
+               }
                read_unlock(&ls->ls_rsbtbl[i].lock);
        }
  out:
@@ -744,6 +764,11 @@ void dlm_release_root_list(struct dlm_ls *ls)
        up_write(&ls->ls_root_sem);
 }
 
+/* If not using a directory, clear the entire toss list, there's no benefit to
+   caching the master value since it's fixed.  If we are using a dir, keep the
+   rsb's we're the master of.  Recovery will add them to the root list and from
+   there they'll be entered in the rebuilt directory. */
+
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
        struct dlm_rsb *r, *safe;
@@ -753,8 +778,10 @@ void dlm_clear_toss_list(struct dlm_ls *ls)
                write_lock(&ls->ls_rsbtbl[i].lock);
                list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
                                         res_hashchain) {
-                       list_del(&r->res_hashchain);
-                       free_rsb(r);
+                       if (dlm_no_directory(ls) || !is_master(r)) {
+                               list_del(&r->res_hashchain);
+                               dlm_free_rsb(r);
+                       }
                }
                write_unlock(&ls->ls_rsbtbl[i].lock);
        }