tunnels: fix netns vs proto registration ordering
[safe/jmp/linux-2.6] / fs / ocfs2 / journal.c
index c2e654e..bf34c49 100644 (file)
@@ -28,6 +28,8 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/kthread.h>
+#include <linux/time.h>
+#include <linux/random.h>
 
 #define MLOG_MASK_PREFIX ML_JOURNAL
 #include <cluster/masklog.h>
@@ -35,6 +37,7 @@
 #include "ocfs2.h"
 
 #include "alloc.h"
+#include "blockcheck.h"
 #include "dir.h"
 #include "dlmglue.h"
 #include "extent_map.h"
 #include "slot_map.h"
 #include "super.h"
 #include "sysfile.h"
+#include "uptodate.h"
+#include "quota.h"
 
 #include "buffer_head_io.h"
 
 DEFINE_SPINLOCK(trans_inc_lock);
 
+#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
+
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-                             int node_num);
+                             int node_num, int slot_num);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
-static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
+static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
-                                     int dirty);
+                                     int dirty, int replayed);
 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
                                 int slot_num);
 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                                 int slot);
 static int ocfs2_commit_thread(void *arg);
+static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
+                                           int slot_num,
+                                           struct ocfs2_dinode *la_dinode,
+                                           struct ocfs2_dinode *tl_dinode,
+                                           struct ocfs2_quota_recovery *qrec);
+
+static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
+{
+       return __ocfs2_wait_on_mount(osb, 0);
+}
+
+static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
+{
+       return __ocfs2_wait_on_mount(osb, 1);
+}
+
+/*
+ * This replay_map is to track online/offline slots, so we could recover
+ * offline slots during recovery and mount
+ */
+
+enum ocfs2_replay_state {
+       REPLAY_UNNEEDED = 0,    /* Replay is not needed, so ignore this map */
+       REPLAY_NEEDED,          /* Replay slots marked in rm_replay_slots */
+       REPLAY_DONE             /* Replay was already queued */
+};
+
+struct ocfs2_replay_map {
+       unsigned int rm_slots;
+       enum ocfs2_replay_state rm_state;
+       unsigned char rm_replay_slots[0];
+};
+
+void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
+{
+       if (!osb->replay_map)
+               return;
+
+       /* If we've already queued the replay, we don't have any more to do */
+       if (osb->replay_map->rm_state == REPLAY_DONE)
+               return;
+
+       osb->replay_map->rm_state = state;
+}
+
+int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map;
+       int i, node_num;
+
+       /* If replay map is already set, we don't do it again */
+       if (osb->replay_map)
+               return 0;
+
+       replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
+                            (osb->max_slots * sizeof(char)), GFP_KERNEL);
+
+       if (!replay_map) {
+               mlog_errno(-ENOMEM);
+               return -ENOMEM;
+       }
+
+       spin_lock(&osb->osb_lock);
+
+       replay_map->rm_slots = osb->max_slots;
+       replay_map->rm_state = REPLAY_UNNEEDED;
+
+       /* set rm_replay_slots for offline slot(s) */
+       for (i = 0; i < replay_map->rm_slots; i++) {
+               if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
+                       replay_map->rm_replay_slots[i] = 1;
+       }
+
+       osb->replay_map = replay_map;
+       spin_unlock(&osb->osb_lock);
+       return 0;
+}
+
+void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map = osb->replay_map;
+       int i;
+
+       if (!replay_map)
+               return;
+
+       if (replay_map->rm_state != REPLAY_NEEDED)
+               return;
+
+       for (i = 0; i < replay_map->rm_slots; i++)
+               if (replay_map->rm_replay_slots[i])
+                       ocfs2_queue_recovery_completion(osb->journal, i, NULL,
+                                                       NULL, NULL);
+       replay_map->rm_state = REPLAY_DONE;
+}
+
+void ocfs2_free_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map = osb->replay_map;
+
+       if (!osb->replay_map)
+               return;
+
+       kfree(replay_map);
+       osb->replay_map = NULL;
+}
+
+int ocfs2_recovery_init(struct ocfs2_super *osb)
+{
+       struct ocfs2_recovery_map *rm;
+
+       mutex_init(&osb->recovery_lock);
+       osb->disable_recovery = 0;
+       osb->recovery_thread_task = NULL;
+       init_waitqueue_head(&osb->recovery_event);
+
+       rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
+                    osb->max_slots * sizeof(unsigned int),
+                    GFP_KERNEL);
+       if (!rm) {
+               mlog_errno(-ENOMEM);
+               return -ENOMEM;
+       }
+
+       rm->rm_entries = (unsigned int *)((char *)rm +
+                                         sizeof(struct ocfs2_recovery_map));
+       osb->recovery_map = rm;
+
+       return 0;
+}
+
+/* we can't grab the goofy sem lock from inside wait_event, so we use
+ * memory barriers to make sure that we'll see the null task before
+ * being woken up */
+static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
+{
+       mb();
+       return osb->recovery_thread_task != NULL;
+}
+
+void ocfs2_recovery_exit(struct ocfs2_super *osb)
+{
+       struct ocfs2_recovery_map *rm;
+
+       /* disable any new recovery threads and wait for any currently
+        * running ones to exit. Do this before setting the vol_state. */
+       mutex_lock(&osb->recovery_lock);
+       osb->disable_recovery = 1;
+       mutex_unlock(&osb->recovery_lock);
+       wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
+
+       /* At this point, we know that no more recovery threads can be
+        * launched, so wait for any recovery completion work to
+        * complete. */
+       flush_workqueue(ocfs2_wq);
+
+       /*
+        * Now that recovery is shut down, and the osb is about to be
+        * freed,  the osb_lock is not taken here.
+        */
+       rm = osb->recovery_map;
+       /* XXX: Should we bug if there are dirty entries? */
+
+       kfree(rm);
+}
+
+static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+                                    unsigned int node_num)
+{
+       int i;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       assert_spin_locked(&osb->osb_lock);
+
+       for (i = 0; i < rm->rm_used; i++) {
+               if (rm->rm_entries[i] == node_num)
+                       return 1;
+       }
+
+       return 0;
+}
+
+/* Behaves like test-and-set.  Returns the previous value */
+static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+                                 unsigned int node_num)
+{
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+       if (__ocfs2_recovery_map_test(osb, node_num)) {
+               spin_unlock(&osb->osb_lock);
+               return 1;
+       }
+
+       /* XXX: Can this be exploited? Not from o2dlm... */
+       BUG_ON(rm->rm_used >= osb->max_slots);
+
+       rm->rm_entries[rm->rm_used] = node_num;
+       rm->rm_used++;
+       spin_unlock(&osb->osb_lock);
+
+       return 0;
+}
+
+static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
+                                    unsigned int node_num)
+{
+       int i;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+
+       for (i = 0; i < rm->rm_used; i++) {
+               if (rm->rm_entries[i] == node_num)
+                       break;
+       }
+
+       if (i < rm->rm_used) {
+               /* XXX: be careful with the pointer math */
+               memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
+                       (rm->rm_used - i - 1) * sizeof(unsigned int));
+               rm->rm_used--;
+       }
+
+       spin_unlock(&osb->osb_lock);
+}
 
 static int ocfs2_commit_cache(struct ocfs2_super *osb)
 {
@@ -84,9 +317,9 @@ static int ocfs2_commit_cache(struct ocfs2_super *osb)
                goto finally;
        }
 
-       journal_lock_updates(journal->j_journal);
-       status = journal_flush(journal->j_journal);
-       journal_unlock_updates(journal->j_journal);
+       jbd2_journal_lock_updates(journal->j_journal);
+       status = jbd2_journal_flush(journal->j_journal);
+       jbd2_journal_unlock_updates(journal->j_journal);
        if (status < 0) {
                up_write(&journal->j_trans_barrier);
                mlog_errno(status);
@@ -125,15 +358,13 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
        BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
        BUG_ON(max_buffs <= 0);
 
-       /* JBD might support this, but our journalling code doesn't yet. */
-       if (journal_current_handle()) {
-               mlog(ML_ERROR, "Recursive transaction attempted!\n");
-               BUG();
-       }
+       /* Nested transaction? Just return the handle... */
+       if (journal_current_handle())
+               return jbd2_journal_start(journal, max_buffs);
 
        down_read(&osb->journal->j_trans_barrier);
 
-       handle = journal_start(journal, max_buffs);
+       handle = jbd2_journal_start(journal, max_buffs);
        if (IS_ERR(handle)) {
                up_read(&osb->journal->j_trans_barrier);
 
@@ -154,16 +385,18 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
 int ocfs2_commit_trans(struct ocfs2_super *osb,
                       handle_t *handle)
 {
-       int ret;
+       int ret, nested;
        struct ocfs2_journal *journal = osb->journal;
 
        BUG_ON(!handle);
 
-       ret = journal_stop(handle);
+       nested = handle->h_ref > 1;
+       ret = jbd2_journal_stop(handle);
        if (ret < 0)
                mlog_errno(ret);
 
-       up_read(&journal->j_trans_barrier);
+       if (!nested)
+               up_read(&journal->j_trans_barrier);
 
        return ret;
 }
@@ -173,7 +406,7 @@ int ocfs2_commit_trans(struct ocfs2_super *osb,
  * transaction. extend_trans will either extend the current handle by
  * nblocks, or commit it and start a new one with nblocks credits.
  *
- * This might call journal_restart() which will commit dirty buffers
+ * This might call jbd2_journal_restart() which will commit dirty buffers
  * and then restart the transaction. Before calling
  * ocfs2_extend_trans(), any changed blocks should have been
  * dirtied. After calling it, all blocks which need to be changed must
@@ -198,10 +431,10 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks)
 
        mlog(0, "Trying to extend transaction by %d blocks\n", nblocks);
 
-#ifdef OCFS2_DEBUG_FS
+#ifdef CONFIG_OCFS2_DEBUG_FS
        status = 1;
 #else
-       status = journal_extend(handle, nblocks);
+       status = jbd2_journal_extend(handle, nblocks);
        if (status < 0) {
                mlog_errno(status);
                goto bail;
@@ -209,8 +442,10 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks)
 #endif
 
        if (status > 0) {
-               mlog(0, "journal_extend failed, trying journal_restart\n");
-               status = journal_restart(handle, nblocks);
+               mlog(0,
+                    "jbd2_journal_extend failed, trying "
+                    "jbd2_journal_restart\n");
+               status = jbd2_journal_restart(handle, nblocks);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail;
@@ -224,14 +459,167 @@ bail:
        return status;
 }
 
-int ocfs2_journal_access(handle_t *handle,
-                        struct inode *inode,
-                        struct buffer_head *bh,
-                        int type)
+struct ocfs2_triggers {
+       struct jbd2_buffer_trigger_type ot_triggers;
+       int                             ot_offset;
+};
+
+static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers)
+{
+       return container_of(triggers, struct ocfs2_triggers, ot_triggers);
+}
+
+static void ocfs2_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+                                struct buffer_head *bh,
+                                void *data, size_t size)
+{
+       struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers);
+
+       /*
+        * We aren't guaranteed to have the superblock here, so we
+        * must unconditionally compute the ecc data.
+        * __ocfs2_journal_access() will only set the triggers if
+        * metaecc is enabled.
+        */
+       ocfs2_block_check_compute(data, size, data + ot->ot_offset);
+}
+
+/*
+ * Quota blocks have their own trigger because the struct ocfs2_block_check
+ * offset depends on the blocksize.
+ */
+static void ocfs2_dq_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+                                struct buffer_head *bh,
+                                void *data, size_t size)
+{
+       struct ocfs2_disk_dqtrailer *dqt =
+               ocfs2_block_dqtrailer(size, data);
+
+       /*
+        * We aren't guaranteed to have the superblock here, so we
+        * must unconditionally compute the ecc data.
+        * __ocfs2_journal_access() will only set the triggers if
+        * metaecc is enabled.
+        */
+       ocfs2_block_check_compute(data, size, &dqt->dq_check);
+}
+
+/*
+ * Directory blocks also have their own trigger because the
+ * struct ocfs2_block_check offset depends on the blocksize.
+ */
+static void ocfs2_db_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+                                struct buffer_head *bh,
+                                void *data, size_t size)
+{
+       struct ocfs2_dir_block_trailer *trailer =
+               ocfs2_dir_trailer_from_size(size, data);
+
+       /*
+        * We aren't guaranteed to have the superblock here, so we
+        * must unconditionally compute the ecc data.
+        * __ocfs2_journal_access() will only set the triggers if
+        * metaecc is enabled.
+        */
+       ocfs2_block_check_compute(data, size, &trailer->db_check);
+}
+
+static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers,
+                               struct buffer_head *bh)
+{
+       mlog(ML_ERROR,
+            "ocfs2_abort_trigger called by JBD2.  bh = 0x%lx, "
+            "bh->b_blocknr = %llu\n",
+            (unsigned long)bh,
+            (unsigned long long)bh->b_blocknr);
+
+       /* We aren't guaranteed to have the superblock here - but if we
+        * don't, it'll just crash. */
+       ocfs2_error(bh->b_assoc_map->host->i_sb,
+                   "JBD2 has aborted our journal, ocfs2 cannot continue\n");
+}
+
+static struct ocfs2_triggers di_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_dinode, i_check),
+};
+
+static struct ocfs2_triggers eb_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_extent_block, h_check),
+};
+
+static struct ocfs2_triggers rb_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_refcount_block, rf_check),
+};
+
+static struct ocfs2_triggers gd_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_group_desc, bg_check),
+};
+
+static struct ocfs2_triggers db_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_db_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+};
+
+static struct ocfs2_triggers xb_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_xattr_block, xb_check),
+};
+
+static struct ocfs2_triggers dq_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_dq_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+};
+
+static struct ocfs2_triggers dr_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_dx_root_block, dr_check),
+};
+
+static struct ocfs2_triggers dl_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_dx_leaf, dl_check),
+};
+
+static int __ocfs2_journal_access(handle_t *handle,
+                                 struct ocfs2_caching_info *ci,
+                                 struct buffer_head *bh,
+                                 struct ocfs2_triggers *triggers,
+                                 int type)
 {
        int status;
+       struct ocfs2_super *osb =
+               OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
 
-       BUG_ON(!inode);
+       BUG_ON(!ci || !ci->ci_ops);
        BUG_ON(!handle);
        BUG_ON(!bh);
 
@@ -250,30 +638,32 @@ int ocfs2_journal_access(handle_t *handle,
                BUG();
        }
 
-       /* Set the current transaction information on the inode so
+       /* Set the current transaction information on the ci so
         * that the locking code knows whether it can drop it's locks
-        * on this inode or not. We're protected from the commit
+        * on this ci or not. We're protected from the commit
         * thread updating the current transaction id until
         * ocfs2_commit_trans() because ocfs2_start_trans() took
         * j_trans_barrier for us. */
-       ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode);
+       ocfs2_set_ci_lock_trans(osb->journal, ci);
 
-       mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
+       ocfs2_metadata_cache_io_lock(ci);
        switch (type) {
        case OCFS2_JOURNAL_ACCESS_CREATE:
        case OCFS2_JOURNAL_ACCESS_WRITE:
-               status = journal_get_write_access(handle, bh);
+               status = jbd2_journal_get_write_access(handle, bh);
                break;
 
        case OCFS2_JOURNAL_ACCESS_UNDO:
-               status = journal_get_undo_access(handle, bh);
+               status = jbd2_journal_get_undo_access(handle, bh);
                break;
 
        default:
                status = -EINVAL;
-               mlog(ML_ERROR, "Uknown access type!\n");
+               mlog(ML_ERROR, "Unknown access type!\n");
        }
-       mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
+       if (!status && ocfs2_meta_ecc(osb) && triggers)
+               jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
+       ocfs2_metadata_cache_io_unlock(ci);
 
        if (status < 0)
                mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
@@ -283,6 +673,67 @@ int ocfs2_journal_access(handle_t *handle,
        return status;
 }
 
+int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type);
+}
+
+int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type);
+}
+
+int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &rb_triggers,
+                                     type);
+}
+
+int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type);
+}
+
+int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type);
+}
+
+int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type);
+}
+
+int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type);
+}
+
+int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type);
+}
+
+int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type);
+}
+
+int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci,
+                        struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, NULL, type);
+}
+
 int ocfs2_journal_dirty(handle_t *handle,
                        struct buffer_head *bh)
 {
@@ -291,7 +742,7 @@ int ocfs2_journal_dirty(handle_t *handle,
        mlog_entry("(bh->b_blocknr=%llu)\n",
                   (unsigned long long)bh->b_blocknr);
 
-       status = journal_dirty_metadata(handle, bh);
+       status = jbd2_journal_dirty_metadata(handle, bh);
        if (status < 0)
                mlog(ML_ERROR, "Could not dirty metadata buffer. "
                     "(bh->b_blocknr=%llu)\n",
@@ -301,19 +752,7 @@ int ocfs2_journal_dirty(handle_t *handle,
        return status;
 }
 
-int ocfs2_journal_dirty_data(handle_t *handle,
-                            struct buffer_head *bh)
-{
-       int err = journal_dirty_data(handle, bh);
-       if (err)
-               mlog_errno(err);
-       /* TODO: When we can handle it, abort the handle and go RO on
-        * error here. */
-
-       return err;
-}
-
-#define OCFS2_DEFAULT_COMMIT_INTERVAL  (HZ * JBD_DEFAULT_MAX_COMMIT_AGE)
+#define OCFS2_DEFAULT_COMMIT_INTERVAL  (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE)
 
 void ocfs2_set_journal_params(struct ocfs2_super *osb)
 {
@@ -326,9 +765,9 @@ void ocfs2_set_journal_params(struct ocfs2_super *osb)
        spin_lock(&journal->j_state_lock);
        journal->j_commit_interval = commit_interval;
        if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER)
-               journal->j_flags |= JFS_BARRIER;
+               journal->j_flags |= JBD2_BARRIER;
        else
-               journal->j_flags &= ~JFS_BARRIER;
+               journal->j_flags &= ~JBD2_BARRIER;
        spin_unlock(&journal->j_state_lock);
 }
 
@@ -393,14 +832,14 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
        mlog(0, "inode->ip_clusters = %u\n", OCFS2_I(inode)->ip_clusters);
 
        /* call the kernels journal init function now */
-       j_journal = journal_init_inode(inode);
+       j_journal = jbd2_journal_init_inode(inode);
        if (j_journal == NULL) {
                mlog(ML_ERROR, "Linux journal layer error\n");
                status = -EINVAL;
                goto done;
        }
 
-       mlog(0, "Returned from journal_init_inode\n");
+       mlog(0, "Returned from jbd2_journal_init_inode\n");
        mlog(0, "j_journal->j_maxlen = %u\n", j_journal->j_maxlen);
 
        *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) &
@@ -419,8 +858,7 @@ done:
        if (status < 0) {
                if (inode_lock)
                        ocfs2_inode_unlock(inode, 1);
-               if (bh != NULL)
-                       brelse(bh);
+               brelse(bh);
                if (inode) {
                        OCFS2_I(inode)->ip_open_count--;
                        iput(inode);
@@ -431,8 +869,18 @@ done:
        return status;
 }
 
+static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di)
+{
+       le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1);
+}
+
+static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di)
+{
+       return le32_to_cpu(di->id1.journal1.ij_recovery_generation);
+}
+
 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
-                                     int dirty)
+                                     int dirty, int replayed)
 {
        int status;
        unsigned int flags;
@@ -443,17 +891,11 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
        mlog_entry_void();
 
        fe = (struct ocfs2_dinode *)bh->b_data;
-       if (!OCFS2_IS_VALID_DINODE(fe)) {
-               /* This is called from startup/shutdown which will
-                * handle the errors in a specific manner, so no need
-                * to call ocfs2_error() here. */
-               mlog(ML_ERROR, "Journal dinode %llu  has invalid "
-                    "signature: %.*s",
-                    (unsigned long long)le64_to_cpu(fe->i_blkno), 7,
-                    fe->i_signature);
-               status = -EIO;
-               goto out;
-       }
+
+       /* The journal bh on the osb always comes from ocfs2_journal_init()
+        * and was validated there inside ocfs2_inode_lock_full().  It's a
+        * code bug if we mess it up. */
+       BUG_ON(!OCFS2_IS_VALID_DINODE(fe));
 
        flags = le32_to_cpu(fe->id1.journal1.ij_flags);
        if (dirty)
@@ -462,11 +904,14 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
                flags &= ~OCFS2_JOURNAL_DIRTY_FL;
        fe->id1.journal1.ij_flags = cpu_to_le32(flags);
 
-       status = ocfs2_write_block(osb, bh, journal->j_inode);
+       if (replayed)
+               ocfs2_bump_recovery_generation(fe);
+
+       ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
+       status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode));
        if (status < 0)
                mlog_errno(status);
 
-out:
        mlog_exit(status);
        return status;
 }
@@ -495,7 +940,7 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
        if (journal->j_state != OCFS2_JOURNAL_LOADED)
                goto done;
 
-       /* need to inc inode use count as journal_destroy will iput. */
+       /* need to inc inode use count - jbd2_journal_destroy will iput. */
        if (!igrab(inode))
                BUG();
 
@@ -524,9 +969,9 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
        BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
 
        if (ocfs2_mount_local(osb)) {
-               journal_lock_updates(journal->j_journal);
-               status = journal_flush(journal->j_journal);
-               journal_unlock_updates(journal->j_journal);
+               jbd2_journal_lock_updates(journal->j_journal);
+               status = jbd2_journal_flush(journal->j_journal);
+               jbd2_journal_unlock_updates(journal->j_journal);
                if (status < 0)
                        mlog_errno(status);
        }
@@ -536,13 +981,14 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
                 * Do not toggle if flush was unsuccessful otherwise
                 * will leave dirty metadata in a "clean" journal
                 */
-               status = ocfs2_journal_toggle_dirty(osb, 0);
+               status = ocfs2_journal_toggle_dirty(osb, 0, 0);
                if (status < 0)
                        mlog_errno(status);
        }
 
        /* Shutdown the kernel journal system */
-       journal_destroy(journal->j_journal);
+       jbd2_journal_destroy(journal->j_journal);
+       journal->j_journal = NULL;
 
        OCFS2_I(inode)->ip_open_count--;
 
@@ -567,31 +1013,30 @@ static void ocfs2_clear_journal_error(struct super_block *sb,
 {
        int olderr;
 
-       olderr = journal_errno(journal);
+       olderr = jbd2_journal_errno(journal);
        if (olderr) {
                mlog(ML_ERROR, "File system error %d recorded in "
                     "journal %u.\n", olderr, slot);
                mlog(ML_ERROR, "File system on device %s needs checking.\n",
                     sb->s_id);
 
-               journal_ack_err(journal);
-               journal_clear_err(journal);
+               jbd2_journal_ack_err(journal);
+               jbd2_journal_clear_err(journal);
        }
 }
 
-int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
+int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed)
 {
        int status = 0;
        struct ocfs2_super *osb;
 
        mlog_entry_void();
 
-       if (!journal)
-               BUG();
+       BUG_ON(!journal);
 
        osb = journal->j_osb;
 
-       status = journal_load(journal->j_journal);
+       status = jbd2_journal_load(journal->j_journal);
        if (status < 0) {
                mlog(ML_ERROR, "Failed to load journal!\n");
                goto done;
@@ -599,7 +1044,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
 
        ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num);
 
-       status = ocfs2_journal_toggle_dirty(osb, 1);
+       status = ocfs2_journal_toggle_dirty(osb, 1, replayed);
        if (status < 0) {
                mlog_errno(status);
                goto done;
@@ -635,13 +1080,13 @@ int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
 
        BUG_ON(!journal);
 
-       status = journal_wipe(journal->j_journal, full);
+       status = jbd2_journal_wipe(journal->j_journal, full);
        if (status < 0) {
                mlog_errno(status);
                goto bail;
        }
 
-       status = ocfs2_journal_toggle_dirty(journal->j_osb, 0);
+       status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0);
        if (status < 0)
                mlog_errno(status);
 
@@ -650,6 +1095,23 @@ bail:
        return status;
 }
 
+static int ocfs2_recovery_completed(struct ocfs2_super *osb)
+{
+       int empty;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+       spin_lock(&osb->osb_lock);
+       empty = (rm->rm_used == 0);
+       spin_unlock(&osb->osb_lock);
+
+       return empty;
+}
+
+void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
+{
+       wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
+}
+
 /*
  * JBD Might read a cached version of another nodes journal file. We
  * don't want this as this file changes often and we get no
@@ -687,9 +1149,8 @@ static int ocfs2_force_read_journal(struct inode *inode)
 
                /* We are reading journal data which should not
                 * be put in the uptodate cache */
-               status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
-                                          p_blkno, p_blocks, bhs, 0,
-                                          NULL);
+               status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb),
+                                               p_blkno, p_blocks, bhs);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail;
@@ -705,8 +1166,7 @@ static int ocfs2_force_read_journal(struct inode *inode)
 
 bail:
        for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++)
-               if (bhs[i])
-                       brelse(bhs[i]);
+               brelse(bhs[i]);
        mlog_exit(status);
        return status;
 }
@@ -716,6 +1176,7 @@ struct ocfs2_la_recovery_item {
        int                     lri_slot;
        struct ocfs2_dinode     *lri_la_dinode;
        struct ocfs2_dinode     *lri_tl_dinode;
+       struct ocfs2_quota_recovery *lri_qrec;
 };
 
 /* Does the second half of the recovery process. By this point, the
@@ -736,6 +1197,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
        struct ocfs2_super *osb = journal->j_osb;
        struct ocfs2_dinode *la_dinode, *tl_dinode;
        struct ocfs2_la_recovery_item *item, *n;
+       struct ocfs2_quota_recovery *qrec;
        LIST_HEAD(tmp_la_list);
 
        mlog_entry_void();
@@ -751,6 +1213,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
 
                mlog(0, "Complete recovery for slot %d\n", item->lri_slot);
 
+               ocfs2_wait_on_quotas(osb);
+
                la_dinode = item->lri_la_dinode;
                if (la_dinode) {
                        mlog(0, "Clean up local alloc %llu\n",
@@ -781,6 +1245,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
                if (ret < 0)
                        mlog_errno(ret);
 
+               qrec = item->lri_qrec;
+               if (qrec) {
+                       mlog(0, "Recovering quota files");
+                       ret = ocfs2_finish_quota_recovery(osb, qrec,
+                                                         item->lri_slot);
+                       if (ret < 0)
+                               mlog_errno(ret);
+                       /* Recovery info is already freed now */
+               }
+
                kfree(item);
        }
 
@@ -794,7 +1268,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
                                            int slot_num,
                                            struct ocfs2_dinode *la_dinode,
-                                           struct ocfs2_dinode *tl_dinode)
+                                           struct ocfs2_dinode *tl_dinode,
+                                           struct ocfs2_quota_recovery *qrec)
 {
        struct ocfs2_la_recovery_item *item;
 
@@ -809,6 +1284,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
                if (tl_dinode)
                        kfree(tl_dinode);
 
+               if (qrec)
+                       ocfs2_free_quota_recovery(qrec);
+
                mlog_errno(-ENOMEM);
                return;
        }
@@ -817,6 +1295,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
        item->lri_la_dinode = la_dinode;
        item->lri_slot = slot_num;
        item->lri_tl_dinode = tl_dinode;
+       item->lri_qrec = qrec;
 
        spin_lock(&journal->j_lock);
        list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -825,29 +1304,46 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 }
 
 /* Called by the mount code to queue recovery the last part of
- * recovery for it's own slot. */
+ * recovery for it's own and offline slot(s). */
 void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
 {
        struct ocfs2_journal *journal = osb->journal;
 
-       if (osb->dirty) {
-               /* No need to queue up our truncate_log as regular
-                * cleanup will catch that. */
-               ocfs2_queue_recovery_completion(journal,
-                                               osb->slot_num,
-                                               osb->local_alloc_copy,
-                                               NULL);
-               ocfs2_schedule_truncate_log_flush(osb, 0);
+       /* No need to queue up our truncate_log as regular cleanup will catch
+        * that */
+       ocfs2_queue_recovery_completion(journal, osb->slot_num,
+                                       osb->local_alloc_copy, NULL, NULL);
+       ocfs2_schedule_truncate_log_flush(osb, 0);
 
-               osb->local_alloc_copy = NULL;
-               osb->dirty = 0;
+       osb->local_alloc_copy = NULL;
+       osb->dirty = 0;
+
+       /* queue to recover orphan slots for all offline slots */
+       ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+       ocfs2_queue_replay_slots(osb);
+       ocfs2_free_replay_slots(osb);
+}
+
+void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
+{
+       if (osb->quota_rec) {
+               ocfs2_queue_recovery_completion(osb->journal,
+                                               osb->slot_num,
+                                               NULL,
+                                               NULL,
+                                               osb->quota_rec);
+               osb->quota_rec = NULL;
        }
 }
 
 static int __ocfs2_recovery_thread(void *arg)
 {
-       int status, node_num;
+       int status, node_num, slot_num;
        struct ocfs2_super *osb = arg;
+       struct ocfs2_recovery_map *rm = osb->recovery_map;
+       int *rm_quota = NULL;
+       int rm_quota_used = 0, i;
+       struct ocfs2_quota_recovery *qrec;
 
        mlog_entry_void();
 
@@ -856,6 +1352,11 @@ static int __ocfs2_recovery_thread(void *arg)
                goto bail;
        }
 
+       rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
+       if (!rm_quota) {
+               status = -ENOMEM;
+               goto bail;
+       }
 restart:
        status = ocfs2_super_lock(osb, 1);
        if (status < 0) {
@@ -863,48 +1364,99 @@ restart:
                goto bail;
        }
 
-       while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
-               node_num = ocfs2_node_map_first_set_bit(osb,
-                                                       &osb->recovery_map);
-               if (node_num == O2NM_INVALID_NODE_NUM) {
-                       mlog(0, "Out of nodes to recover.\n");
-                       break;
-               }
+       status = ocfs2_compute_replay_slots(osb);
+       if (status < 0)
+               mlog_errno(status);
 
-               status = ocfs2_recover_node(osb, node_num);
-               if (status < 0) {
+       /* queue recovery for our own slot */
+       ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
+                                       NULL, NULL);
+
+       spin_lock(&osb->osb_lock);
+       while (rm->rm_used) {
+               /* It's always safe to remove entry zero, as we won't
+                * clear it until ocfs2_recover_node() has succeeded. */
+               node_num = rm->rm_entries[0];
+               spin_unlock(&osb->osb_lock);
+               mlog(0, "checking node %d\n", node_num);
+               slot_num = ocfs2_node_num_to_slot(osb, node_num);
+               if (slot_num == -ENOENT) {
+                       status = 0;
+                       mlog(0, "no slot for this node, so no recovery"
+                            "required.\n");
+                       goto skip_recovery;
+               }
+               mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+
+               /* It is a bit subtle with quota recovery. We cannot do it
+                * immediately because we have to obtain cluster locks from
+                * quota files and we also don't want to just skip it because
+                * then quota usage would be out of sync until some node takes
+                * the slot. So we remember which nodes need quota recovery
+                * and when everything else is done, we recover quotas. */
+               for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+               if (i == rm_quota_used)
+                       rm_quota[rm_quota_used++] = slot_num;
+
+               status = ocfs2_recover_node(osb, node_num, slot_num);
+skip_recovery:
+               if (!status) {
+                       ocfs2_recovery_map_clear(osb, node_num);
+               } else {
                        mlog(ML_ERROR,
                             "Error %d recovering node %d on device (%u,%u)!\n",
                             status, node_num,
                             MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
                        mlog(ML_ERROR, "Volume requires unmount.\n");
-                       continue;
                }
 
-               ocfs2_recovery_map_clear(osb, node_num);
+               spin_lock(&osb->osb_lock);
+       }
+       spin_unlock(&osb->osb_lock);
+       mlog(0, "All nodes recovered\n");
+
+       /* Refresh all journal recovery generations from disk */
+       status = ocfs2_check_journals_nolocks(osb);
+       status = (status == -EROFS) ? 0 : status;
+       if (status < 0)
+               mlog_errno(status);
+
+       /* Now it is right time to recover quotas... We have to do this under
+        * superblock lock so that noone can start using the slot (and crash)
+        * before we recover it */
+       for (i = 0; i < rm_quota_used; i++) {
+               qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
+               if (IS_ERR(qrec)) {
+                       status = PTR_ERR(qrec);
+                       mlog_errno(status);
+                       continue;
+               }
+               ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
+                                               NULL, NULL, qrec);
        }
+
        ocfs2_super_unlock(osb, 1);
 
-       /* We always run recovery on our own orphan dir - the dead
-        * node(s) may have disallowd a previos inode delete. Re-processing
-        * is therefore required. */
-       ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
-                                       NULL);
+       /* queue recovery for offline slots */
+       ocfs2_queue_replay_slots(osb);
 
 bail:
        mutex_lock(&osb->recovery_lock);
-       if (!status &&
-           !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
+       if (!status && !ocfs2_recovery_completed(osb)) {
                mutex_unlock(&osb->recovery_lock);
                goto restart;
        }
 
+       ocfs2_free_replay_slots(osb);
        osb->recovery_thread_task = NULL;
        mb(); /* sync with ocfs2_recovery_thread_running */
        wake_up(&osb->recovery_event);
 
        mutex_unlock(&osb->recovery_lock);
 
+       if (rm_quota)
+               kfree(rm_quota);
+
        mlog_exit(status);
        /* no one is callint kthread_stop() for us so the kthread() api
         * requires that we call do_exit().  And it isn't exported, but
@@ -924,8 +1476,8 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
 
        /* People waiting on recovery will wait on
         * the recovery map to empty. */
-       if (!ocfs2_recovery_map_set(osb, node_num))
-               mlog(0, "node %d already be in recovery.\n", node_num);
+       if (ocfs2_recovery_map_set(osb, node_num))
+               mlog(0, "node %d already in recovery map.\n", node_num);
 
        mlog(0, "starting recovery thread...\n");
 
@@ -946,6 +1498,42 @@ out:
        mlog_exit_void();
 }
 
+static int ocfs2_read_journal_inode(struct ocfs2_super *osb,
+                                   int slot_num,
+                                   struct buffer_head **bh,
+                                   struct inode **ret_inode)
+{
+       int status = -EACCES;
+       struct inode *inode = NULL;
+
+       BUG_ON(slot_num >= osb->max_slots);
+
+       inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+                                           slot_num);
+       if (!inode || is_bad_inode(inode)) {
+               mlog_errno(status);
+               goto bail;
+       }
+       SET_INODE_JOURNAL(inode);
+
+       status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE);
+       if (status < 0) {
+               mlog_errno(status);
+               goto bail;
+       }
+
+       status = 0;
+
+bail:
+       if (inode) {
+               if (status || !ret_inode)
+                       iput(inode);
+               else
+                       *ret_inode = inode;
+       }
+       return status;
+}
+
 /* Does the actual journal replay and marks the journal inode as
  * clean. Will only replay if the journal inode is marked dirty. */
 static int ocfs2_replay_journal(struct ocfs2_super *osb,
@@ -959,22 +1547,36 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
        struct ocfs2_dinode *fe;
        journal_t *journal = NULL;
        struct buffer_head *bh = NULL;
+       u32 slot_reco_gen;
 
-       inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
-                                           slot_num);
-       if (inode == NULL) {
-               status = -EACCES;
+       status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode);
+       if (status) {
                mlog_errno(status);
                goto done;
        }
-       if (is_bad_inode(inode)) {
-               status = -EACCES;
-               iput(inode);
-               inode = NULL;
-               mlog_errno(status);
+
+       fe = (struct ocfs2_dinode *)bh->b_data;
+       slot_reco_gen = ocfs2_get_recovery_generation(fe);
+       brelse(bh);
+       bh = NULL;
+
+       /*
+        * As the fs recovery is asynchronous, there is a small chance that
+        * another node mounted (and recovered) the slot before the recovery
+        * thread could get the lock. To handle that, we dirty read the journal
+        * inode for that slot to get the recovery generation. If it is
+        * different than what we expected, the slot has been recovered.
+        * If not, it needs recovery.
+        */
+       if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) {
+               mlog(0, "Slot %u already recovered (old/new=%u/%u)\n", slot_num,
+                    osb->slot_recovery_generations[slot_num], slot_reco_gen);
+               osb->slot_recovery_generations[slot_num] = slot_reco_gen;
+               status = -EBUSY;
                goto done;
        }
-       SET_INODE_JOURNAL(inode);
+
+       /* Continue with recovery as the journal has not yet been recovered */
 
        status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
        if (status < 0) {
@@ -988,12 +1590,18 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
        fe = (struct ocfs2_dinode *) bh->b_data;
 
        flags = le32_to_cpu(fe->id1.journal1.ij_flags);
+       slot_reco_gen = ocfs2_get_recovery_generation(fe);
 
        if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
                mlog(0, "No recovery required for node %d\n", node_num);
+               /* Refresh recovery generation for the slot */
+               osb->slot_recovery_generations[slot_num] = slot_reco_gen;
                goto done;
        }
 
+       /* we need to run complete recovery for offline orphan slots */
+       ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+
        mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
             node_num, slot_num,
             MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
@@ -1007,19 +1615,19 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
        }
 
        mlog(0, "calling journal_init_inode\n");
-       journal = journal_init_inode(inode);
+       journal = jbd2_journal_init_inode(inode);
        if (journal == NULL) {
                mlog(ML_ERROR, "Linux journal layer error\n");
                status = -EIO;
                goto done;
        }
 
-       status = journal_load(journal);
+       status = jbd2_journal_load(journal);
        if (status < 0) {
                mlog_errno(status);
                if (!igrab(inode))
                        BUG();
-               journal_destroy(journal);
+               jbd2_journal_destroy(journal);
                goto done;
        }
 
@@ -1027,9 +1635,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
 
        /* wipe the journal */
        mlog(0, "flushing the journal.\n");
-       journal_lock_updates(journal);
-       status = journal_flush(journal);
-       journal_unlock_updates(journal);
+       jbd2_journal_lock_updates(journal);
+       status = jbd2_journal_flush(journal);
+       jbd2_journal_unlock_updates(journal);
        if (status < 0)
                mlog_errno(status);
 
@@ -1038,14 +1646,20 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
        flags &= ~OCFS2_JOURNAL_DIRTY_FL;
        fe->id1.journal1.ij_flags = cpu_to_le32(flags);
 
-       status = ocfs2_write_block(osb, bh, inode);
+       /* Increment recovery generation to indicate successful recovery */
+       ocfs2_bump_recovery_generation(fe);
+       osb->slot_recovery_generations[slot_num] =
+                                       ocfs2_get_recovery_generation(fe);
+
+       ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
+       status = ocfs2_write_block(osb, bh, INODE_CACHE(inode));
        if (status < 0)
                mlog_errno(status);
 
        if (!igrab(inode))
                BUG();
 
-       journal_destroy(journal);
+       jbd2_journal_destroy(journal);
 
 done:
        /* drop the lock on this nodes journal */
@@ -1055,8 +1669,7 @@ done:
        if (inode)
                iput(inode);
 
-       if (bh)
-               brelse(bh);
+       brelse(bh);
 
        mlog_exit(status);
        return status;
@@ -1075,34 +1688,28 @@ done:
  * far less concerning.
  */
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-                             int node_num)
+                             int node_num, int slot_num)
 {
        int status = 0;
-       int slot_num;
-       struct ocfs2_slot_info *si = osb->slot_info;
        struct ocfs2_dinode *la_copy = NULL;
        struct ocfs2_dinode *tl_copy = NULL;
 
-       mlog_entry("(node_num=%d, osb->node_num = %d)\n",
-                  node_num, osb->node_num);
-
-       mlog(0, "checking node %d\n", node_num);
+       mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
+                  node_num, slot_num, osb->node_num);
 
        /* Should not ever be called to recover ourselves -- in that
         * case we should've called ocfs2_journal_load instead. */
        BUG_ON(osb->node_num == node_num);
 
-       slot_num = ocfs2_node_num_to_slot(si, node_num);
-       if (slot_num == OCFS2_INVALID_SLOT) {
-               status = 0;
-               mlog(0, "no slot for this node, so no recovery required.\n");
-               goto done;
-       }
-
-       mlog(0, "node %d was using slot %d\n", node_num, slot_num);
-
        status = ocfs2_replay_journal(osb, node_num, slot_num);
        if (status < 0) {
+               if (status == -EBUSY) {
+                       mlog(0, "Skipping recovery for slot %u (node %u) "
+                            "as another node has recovered it\n", slot_num,
+                            node_num);
+                       status = 0;
+                       goto done;
+               }
                mlog_errno(status);
                goto done;
        }
@@ -1129,7 +1736,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 
        /* This will kfree the memory pointed to by la_copy and tl_copy */
        ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-                                       tl_copy);
+                                       tl_copy, NULL);
 
        status = 0;
 done:
@@ -1183,23 +1790,49 @@ bail:
  * slot info struct has been updated from disk. */
 int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
 {
-       int status, i, node_num;
-       struct ocfs2_slot_info *si = osb->slot_info;
+       unsigned int node_num;
+       int status, i;
+       u32 gen;
+       struct buffer_head *bh = NULL;
+       struct ocfs2_dinode *di;
 
        /* This is called with the super block cluster lock, so we
         * know that the slot map can't change underneath us. */
 
-       spin_lock(&si->si_lock);
-       for(i = 0; i < si->si_num_slots; i++) {
-               if (i == osb->slot_num)
+       for (i = 0; i < osb->max_slots; i++) {
+               /* Read journal inode to get the recovery generation */
+               status = ocfs2_read_journal_inode(osb, i, &bh, NULL);
+               if (status) {
+                       mlog_errno(status);
+                       goto bail;
+               }
+               di = (struct ocfs2_dinode *)bh->b_data;
+               gen = ocfs2_get_recovery_generation(di);
+               brelse(bh);
+               bh = NULL;
+
+               spin_lock(&osb->osb_lock);
+               osb->slot_recovery_generations[i] = gen;
+
+               mlog(0, "Slot %u recovery generation is %u\n", i,
+                    osb->slot_recovery_generations[i]);
+
+               if (i == osb->slot_num) {
+                       spin_unlock(&osb->osb_lock);
                        continue;
-               if (ocfs2_is_empty_slot(si, i))
+               }
+
+               status = ocfs2_slot_to_node_num_locked(osb, i, &node_num);
+               if (status == -ENOENT) {
+                       spin_unlock(&osb->osb_lock);
                        continue;
+               }
 
-               node_num = si->si_global_node_nums[i];
-               if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
+               if (__ocfs2_recovery_map_test(osb, node_num)) {
+                       spin_unlock(&osb->osb_lock);
                        continue;
-               spin_unlock(&si->si_lock);
+               }
+               spin_unlock(&osb->osb_lock);
 
                /* Ok, we have a slot occupied by another node which
                 * is not in the recovery map. We trylock his journal
@@ -1214,10 +1847,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
                        mlog_errno(status);
                        goto bail;
                }
-
-               spin_lock(&si->si_lock);
        }
-       spin_unlock(&si->si_lock);
 
        status = 0;
 bail:
@@ -1225,6 +1855,134 @@ bail:
        return status;
 }
 
+/*
+ * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
+ * randomness to the timeout to minimize multple nodes firing the timer at the
+ * same time.
+ */
+static inline unsigned long ocfs2_orphan_scan_timeout(void)
+{
+       unsigned long time;
+
+       get_random_bytes(&time, sizeof(time));
+       time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
+       return msecs_to_jiffies(time);
+}
+
+/*
+ * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
+ * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
+ * is done to catch any orphans that are left over in orphan directories.
+ *
+ * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
+ * seconds.  It gets an EX lock on os_lockres and checks sequence number
+ * stored in LVB. If the sequence number has changed, it means some other
+ * node has done the scan.  This node skips the scan and tracks the
+ * sequence number.  If the sequence number didn't change, it means a scan
+ * hasn't happened.  The node queues a scan and increments the
+ * sequence number in the LVB.
+ */
+void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+       int status, i;
+       u32 seqno = 0;
+
+       os = &osb->osb_orphan_scan;
+
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto out;
+
+       status = ocfs2_orphan_scan_lock(osb, &seqno);
+       if (status < 0) {
+               if (status != -EAGAIN)
+                       mlog_errno(status);
+               goto out;
+       }
+
+       /* Do no queue the tasks if the volume is being umounted */
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto unlock;
+
+       if (os->os_seqno != seqno) {
+               os->os_seqno = seqno;
+               goto unlock;
+       }
+
+       for (i = 0; i < osb->max_slots; i++)
+               ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
+                                               NULL);
+       /*
+        * We queued a recovery on orphan slots, increment the sequence
+        * number and update LVB so other node will skip the scan for a while
+        */
+       seqno++;
+       os->os_count++;
+       os->os_scantime = CURRENT_TIME;
+unlock:
+       ocfs2_orphan_scan_unlock(osb, seqno);
+out:
+       return;
+}
+
+/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
+void ocfs2_orphan_scan_work(struct work_struct *work)
+{
+       struct ocfs2_orphan_scan *os;
+       struct ocfs2_super *osb;
+
+       os = container_of(work, struct ocfs2_orphan_scan,
+                         os_orphan_scan_work.work);
+       osb = os->os_osb;
+
+       mutex_lock(&os->os_lock);
+       ocfs2_queue_orphan_scan(osb);
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       mutex_unlock(&os->os_lock);
+}
+
+void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) {
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+               mutex_lock(&os->os_lock);
+               cancel_delayed_work(&os->os_orphan_scan_work);
+               mutex_unlock(&os->os_lock);
+       }
+}
+
+void ocfs2_orphan_scan_init(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       os->os_osb = osb;
+       os->os_count = 0;
+       os->os_seqno = 0;
+       mutex_init(&os->os_lock);
+       INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work);
+}
+
+void ocfs2_orphan_scan_start(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       os->os_scantime = CURRENT_TIME;
+       if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+       else {
+               atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       }
+}
+
 struct ocfs2_orphan_filldir_priv {
        struct inode            *head;
        struct ocfs2_super      *osb;
@@ -1401,13 +2159,14 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
        return ret;
 }
 
-static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
+static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota)
 {
        /* This check is good because ocfs2 will wait on our recovery
         * thread before changing it to something other than MOUNTED
         * or DISABLED. */
        wait_event(osb->osb_mount_event,
-                  atomic_read(&osb->vol_state) == VOLUME_MOUNTED ||
+                 (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) ||
+                  atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS ||
                   atomic_read(&osb->vol_state) == VOLUME_DISABLED);
 
        /* If there's an error on mount, then we may never get to the
@@ -1453,49 +2212,41 @@ static int ocfs2_commit_thread(void *arg)
        return 0;
 }
 
-/* Look for a dirty journal without taking any cluster locks. Used for
- * hard readonly access to determine whether the file system journals
- * require recovery. */
+/* Reads all the journal inodes without taking any cluster locks. Used
+ * for hard readonly access to determine whether any journal requires
+ * recovery. Also used to refresh the recovery generation numbers after
+ * a journal has been recovered by another node.
+ */
 int ocfs2_check_journals_nolocks(struct ocfs2_super *osb)
 {
        int ret = 0;
        unsigned int slot;
-       struct buffer_head *di_bh;
+       struct buffer_head *di_bh = NULL;
        struct ocfs2_dinode *di;
-       struct inode *journal = NULL;
+       int journal_dirty = 0;
 
        for(slot = 0; slot < osb->max_slots; slot++) {
-               journal = ocfs2_get_system_file_inode(osb,
-                                                     JOURNAL_SYSTEM_INODE,
-                                                     slot);
-               if (!journal || is_bad_inode(journal)) {
-                       ret = -EACCES;
-                       mlog_errno(ret);
-                       goto out;
-               }
-
-               di_bh = NULL;
-               ret = ocfs2_read_block(osb, OCFS2_I(journal)->ip_blkno, &di_bh,
-                                      0, journal);
-               if (ret < 0) {
+               ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL);
+               if (ret) {
                        mlog_errno(ret);
                        goto out;
                }
 
                di = (struct ocfs2_dinode *) di_bh->b_data;
 
+               osb->slot_recovery_generations[slot] =
+                                       ocfs2_get_recovery_generation(di);
+
                if (le32_to_cpu(di->id1.journal1.ij_flags) &
                    OCFS2_JOURNAL_DIRTY_FL)
-                       ret = -EROFS;
+                       journal_dirty = 1;
 
                brelse(di_bh);
-               if (ret)
-                       break;
+               di_bh = NULL;
        }
 
 out:
-       if (journal)
-               iput(journal);
-
+       if (journal_dirty)
+               ret = -EROFS;
        return ret;
 }