ocfs2-devel: remove redundant OCFS2_MOUNT_POSIX_ACL check in ocfs2_get_acl_nolock()
[safe/jmp/linux-2.6] / fs / ocfs2 / journal.c
index 4c8f355..54c16b6 100644 (file)
@@ -28,6 +28,8 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/kthread.h>
+#include <linux/time.h>
+#include <linux/random.h>
 
 #define MLOG_MASK_PREFIX ML_JOURNAL
 #include <cluster/masklog.h>
 #include "slot_map.h"
 #include "super.h"
 #include "sysfile.h"
+#include "uptodate.h"
 #include "quota.h"
 
 #include "buffer_head_io.h"
 
 DEFINE_SPINLOCK(trans_inc_lock);
 
+#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
+
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
                              int node_num, int slot_num);
@@ -65,6 +70,11 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb,
 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                                 int slot);
 static int ocfs2_commit_thread(void *arg);
+static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
+                                           int slot_num,
+                                           struct ocfs2_dinode *la_dinode,
+                                           struct ocfs2_dinode *tl_dinode,
+                                           struct ocfs2_quota_recovery *qrec);
 
 static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
 {
@@ -76,6 +86,97 @@ static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
        return __ocfs2_wait_on_mount(osb, 1);
 }
 
+/*
+ * This replay_map is to track online/offline slots, so we could recover
+ * offline slots during recovery and mount
+ */
+
+enum ocfs2_replay_state {
+       REPLAY_UNNEEDED = 0,    /* Replay is not needed, so ignore this map */
+       REPLAY_NEEDED,          /* Replay slots marked in rm_replay_slots */
+       REPLAY_DONE             /* Replay was already queued */
+};
+
+struct ocfs2_replay_map {
+       unsigned int rm_slots;
+       enum ocfs2_replay_state rm_state;
+       unsigned char rm_replay_slots[0];
+};
+
+void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
+{
+       if (!osb->replay_map)
+               return;
+
+       /* If we've already queued the replay, we don't have any more to do */
+       if (osb->replay_map->rm_state == REPLAY_DONE)
+               return;
+
+       osb->replay_map->rm_state = state;
+}
+
+int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map;
+       int i, node_num;
+
+       /* If replay map is already set, we don't do it again */
+       if (osb->replay_map)
+               return 0;
+
+       replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
+                            (osb->max_slots * sizeof(char)), GFP_KERNEL);
+
+       if (!replay_map) {
+               mlog_errno(-ENOMEM);
+               return -ENOMEM;
+       }
+
+       spin_lock(&osb->osb_lock);
+
+       replay_map->rm_slots = osb->max_slots;
+       replay_map->rm_state = REPLAY_UNNEEDED;
+
+       /* set rm_replay_slots for offline slot(s) */
+       for (i = 0; i < replay_map->rm_slots; i++) {
+               if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
+                       replay_map->rm_replay_slots[i] = 1;
+       }
+
+       osb->replay_map = replay_map;
+       spin_unlock(&osb->osb_lock);
+       return 0;
+}
+
+void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map = osb->replay_map;
+       int i;
+
+       if (!replay_map)
+               return;
+
+       if (replay_map->rm_state != REPLAY_NEEDED)
+               return;
+
+       for (i = 0; i < replay_map->rm_slots; i++)
+               if (replay_map->rm_replay_slots[i])
+                       ocfs2_queue_recovery_completion(osb->journal, i, NULL,
+                                                       NULL, NULL);
+       replay_map->rm_state = REPLAY_DONE;
+}
+
+void ocfs2_free_replay_slots(struct ocfs2_super *osb)
+{
+       struct ocfs2_replay_map *replay_map = osb->replay_map;
+
+       if (!osb->replay_map)
+               return;
+
+       kfree(replay_map);
+       osb->replay_map = NULL;
+}
+
 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
        struct ocfs2_recovery_map *rm;
@@ -454,6 +555,14 @@ static struct ocfs2_triggers eb_triggers = {
        .ot_offset      = offsetof(struct ocfs2_extent_block, h_check),
 };
 
+static struct ocfs2_triggers rb_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_refcount_block, rf_check),
+};
+
 static struct ocfs2_triggers gd_triggers = {
        .ot_triggers = {
                .t_commit = ocfs2_commit_trigger,
@@ -484,15 +593,33 @@ static struct ocfs2_triggers dq_triggers = {
        },
 };
 
+static struct ocfs2_triggers dr_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_dx_root_block, dr_check),
+};
+
+static struct ocfs2_triggers dl_triggers = {
+       .ot_triggers = {
+               .t_commit = ocfs2_commit_trigger,
+               .t_abort = ocfs2_abort_trigger,
+       },
+       .ot_offset      = offsetof(struct ocfs2_dx_leaf, dl_check),
+};
+
 static int __ocfs2_journal_access(handle_t *handle,
-                                 struct inode *inode,
+                                 struct ocfs2_caching_info *ci,
                                  struct buffer_head *bh,
                                  struct ocfs2_triggers *triggers,
                                  int type)
 {
        int status;
+       struct ocfs2_super *osb =
+               OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
 
-       BUG_ON(!inode);
+       BUG_ON(!ci || !ci->ci_ops);
        BUG_ON(!handle);
        BUG_ON(!bh);
 
@@ -511,15 +638,15 @@ static int __ocfs2_journal_access(handle_t *handle,
                BUG();
        }
 
-       /* Set the current transaction information on the inode so
+       /* Set the current transaction information on the ci so
         * that the locking code knows whether it can drop it's locks
-        * on this inode or not. We're protected from the commit
+        * on this ci or not. We're protected from the commit
         * thread updating the current transaction id until
         * ocfs2_commit_trans() because ocfs2_start_trans() took
         * j_trans_barrier for us. */
-       ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode);
+       ocfs2_set_ci_lock_trans(osb->journal, ci);
 
-       mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
+       ocfs2_metadata_cache_io_lock(ci);
        switch (type) {
        case OCFS2_JOURNAL_ACCESS_CREATE:
        case OCFS2_JOURNAL_ACCESS_WRITE:
@@ -534,9 +661,9 @@ static int __ocfs2_journal_access(handle_t *handle,
                status = -EINVAL;
                mlog(ML_ERROR, "Uknown access type!\n");
        }
-       if (!status && ocfs2_meta_ecc(OCFS2_SB(inode->i_sb)) && triggers)
+       if (!status && ocfs2_meta_ecc(osb) && triggers)
                jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
-       mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
+       ocfs2_metadata_cache_io_unlock(ci);
 
        if (status < 0)
                mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
@@ -546,52 +673,65 @@ static int __ocfs2_journal_access(handle_t *handle,
        return status;
 }
 
-int ocfs2_journal_access_di(handle_t *handle, struct inode *inode,
-                              struct buffer_head *bh, int type)
+int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &di_triggers,
-                                     type);
+       return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type);
 }
 
-int ocfs2_journal_access_eb(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci,
                            struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &eb_triggers,
-                                     type);
+       return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type);
 }
 
-int ocfs2_journal_access_gd(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci,
                            struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &gd_triggers,
+       return __ocfs2_journal_access(handle, ci, bh, &rb_triggers,
                                      type);
 }
 
-int ocfs2_journal_access_db(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci,
                            struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &db_triggers,
-                                     type);
+       return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type);
 }
 
-int ocfs2_journal_access_xb(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci,
                            struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &xb_triggers,
-                                     type);
+       return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type);
 }
 
-int ocfs2_journal_access_dq(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci,
                            struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, &dq_triggers,
-                                     type);
+       return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type);
+}
+
+int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type);
 }
 
-int ocfs2_journal_access(handle_t *handle, struct inode *inode,
+int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type);
+}
+
+int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci,
+                           struct buffer_head *bh, int type)
+{
+       return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type);
+}
+
+int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci,
                         struct buffer_head *bh, int type)
 {
-       return __ocfs2_journal_access(handle, inode, bh, NULL, type);
+       return __ocfs2_journal_access(handle, ci, bh, NULL, type);
 }
 
 int ocfs2_journal_dirty(handle_t *handle,
@@ -768,7 +908,7 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
                ocfs2_bump_recovery_generation(fe);
 
        ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
-       status = ocfs2_write_block(osb, bh, journal->j_inode);
+       status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode));
        if (status < 0)
                mlog_errno(status);
 
@@ -1164,24 +1304,24 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 }
 
 /* Called by the mount code to queue recovery the last part of
- * recovery for it's own slot. */
+ * recovery for it's own and offline slot(s). */
 void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
 {
        struct ocfs2_journal *journal = osb->journal;
 
-       if (osb->dirty) {
-               /* No need to queue up our truncate_log as regular
-                * cleanup will catch that. */
-               ocfs2_queue_recovery_completion(journal,
-                                               osb->slot_num,
-                                               osb->local_alloc_copy,
-                                               NULL,
-                                               NULL);
-               ocfs2_schedule_truncate_log_flush(osb, 0);
+       /* No need to queue up our truncate_log as regular cleanup will catch
+        * that */
+       ocfs2_queue_recovery_completion(journal, osb->slot_num,
+                                       osb->local_alloc_copy, NULL, NULL);
+       ocfs2_schedule_truncate_log_flush(osb, 0);
 
-               osb->local_alloc_copy = NULL;
-               osb->dirty = 0;
-       }
+       osb->local_alloc_copy = NULL;
+       osb->dirty = 0;
+
+       /* queue to recover orphan slots for all offline slots */
+       ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+       ocfs2_queue_replay_slots(osb);
+       ocfs2_free_replay_slots(osb);
 }
 
 void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
@@ -1224,6 +1364,14 @@ restart:
                goto bail;
        }
 
+       status = ocfs2_compute_replay_slots(osb);
+       if (status < 0)
+               mlog_errno(status);
+
+       /* queue recovery for our own slot */
+       ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
+                                       NULL, NULL);
+
        spin_lock(&osb->osb_lock);
        while (rm->rm_used) {
                /* It's always safe to remove entry zero, as we won't
@@ -1289,11 +1437,8 @@ skip_recovery:
 
        ocfs2_super_unlock(osb, 1);
 
-       /* We always run recovery on our own orphan dir - the dead
-        * node(s) may have disallowd a previos inode delete. Re-processing
-        * is therefore required. */
-       ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
-                                       NULL, NULL);
+       /* queue recovery for offline slots */
+       ocfs2_queue_replay_slots(osb);
 
 bail:
        mutex_lock(&osb->recovery_lock);
@@ -1302,6 +1447,7 @@ bail:
                goto restart;
        }
 
+       ocfs2_free_replay_slots(osb);
        osb->recovery_thread_task = NULL;
        mb(); /* sync with ocfs2_recovery_thread_running */
        wake_up(&osb->recovery_event);
@@ -1453,6 +1599,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
                goto done;
        }
 
+       /* we need to run complete recovery for offline orphan slots */
+       ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+
        mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
             node_num, slot_num,
             MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
@@ -1503,7 +1652,7 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
                                        ocfs2_get_recovery_generation(fe);
 
        ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
-       status = ocfs2_write_block(osb, bh, inode);
+       status = ocfs2_write_block(osb, bh, INODE_CACHE(inode));
        if (status < 0)
                mlog_errno(status);
 
@@ -1706,6 +1855,134 @@ bail:
        return status;
 }
 
+/*
+ * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
+ * randomness to the timeout to minimize multple nodes firing the timer at the
+ * same time.
+ */
+static inline unsigned long ocfs2_orphan_scan_timeout(void)
+{
+       unsigned long time;
+
+       get_random_bytes(&time, sizeof(time));
+       time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
+       return msecs_to_jiffies(time);
+}
+
+/*
+ * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
+ * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
+ * is done to catch any orphans that are left over in orphan directories.
+ *
+ * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
+ * seconds.  It gets an EX lock on os_lockres and checks sequence number
+ * stored in LVB. If the sequence number has changed, it means some other
+ * node has done the scan.  This node skips the scan and tracks the
+ * sequence number.  If the sequence number didn't change, it means a scan
+ * hasn't happened.  The node queues a scan and increments the
+ * sequence number in the LVB.
+ */
+void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+       int status, i;
+       u32 seqno = 0;
+
+       os = &osb->osb_orphan_scan;
+
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto out;
+
+       status = ocfs2_orphan_scan_lock(osb, &seqno);
+       if (status < 0) {
+               if (status != -EAGAIN)
+                       mlog_errno(status);
+               goto out;
+       }
+
+       /* Do no queue the tasks if the volume is being umounted */
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto unlock;
+
+       if (os->os_seqno != seqno) {
+               os->os_seqno = seqno;
+               goto unlock;
+       }
+
+       for (i = 0; i < osb->max_slots; i++)
+               ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
+                                               NULL);
+       /*
+        * We queued a recovery on orphan slots, increment the sequence
+        * number and update LVB so other node will skip the scan for a while
+        */
+       seqno++;
+       os->os_count++;
+       os->os_scantime = CURRENT_TIME;
+unlock:
+       ocfs2_orphan_scan_unlock(osb, seqno);
+out:
+       return;
+}
+
+/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
+void ocfs2_orphan_scan_work(struct work_struct *work)
+{
+       struct ocfs2_orphan_scan *os;
+       struct ocfs2_super *osb;
+
+       os = container_of(work, struct ocfs2_orphan_scan,
+                         os_orphan_scan_work.work);
+       osb = os->os_osb;
+
+       mutex_lock(&os->os_lock);
+       ocfs2_queue_orphan_scan(osb);
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       mutex_unlock(&os->os_lock);
+}
+
+void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) {
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+               mutex_lock(&os->os_lock);
+               cancel_delayed_work(&os->os_orphan_scan_work);
+               mutex_unlock(&os->os_lock);
+       }
+}
+
+void ocfs2_orphan_scan_init(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       os->os_osb = osb;
+       os->os_count = 0;
+       os->os_seqno = 0;
+       mutex_init(&os->os_lock);
+       INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work);
+}
+
+void ocfs2_orphan_scan_start(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       os->os_scantime = CURRENT_TIME;
+       if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+       else {
+               atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       }
+}
+
 struct ocfs2_orphan_filldir_priv {
        struct inode            *head;
        struct ocfs2_super      *osb;