WorkStruct: make allyesconfig
[safe/jmp/linux-2.6] / fs / ocfs2 / journal.c
index 303c8d9..d95ee27 100644 (file)
@@ -49,7 +49,7 @@
 
 #include "buffer_head_io.h"
 
-spinlock_t trans_inc_lock = SPIN_LOCK_UNLOCKED;
+DEFINE_SPINLOCK(trans_inc_lock);
 
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
@@ -117,7 +117,7 @@ struct ocfs2_journal_handle *ocfs2_alloc_handle(struct ocfs2_super *osb)
 {
        struct ocfs2_journal_handle *retval = NULL;
 
-       retval = kcalloc(1, sizeof(*retval), GFP_KERNEL);
+       retval = kcalloc(1, sizeof(*retval), GFP_NOFS);
        if (!retval) {
                mlog(ML_ERROR, "Failed to allocate memory for journal "
                     "handle!\n");
@@ -147,8 +147,7 @@ struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
 
        mlog_entry("(max_buffs = %d)\n", max_buffs);
 
-       if (!osb || !osb->journal->j_journal)
-               BUG();
+       BUG_ON(!osb || !osb->journal->j_journal);
 
        if (ocfs2_is_hard_readonly(osb)) {
                ret = -EROFS;
@@ -223,8 +222,7 @@ void ocfs2_handle_add_inode(struct ocfs2_journal_handle *handle,
        BUG_ON(!list_empty(&OCFS2_I(inode)->ip_handle_list));
 
        OCFS2_I(inode)->ip_handle = handle;
-       list_del(&(OCFS2_I(inode)->ip_handle_list));
-       list_add_tail(&(OCFS2_I(inode)->ip_handle_list), &(handle->inode_list));
+       list_move_tail(&(OCFS2_I(inode)->ip_handle_list), &(handle->inode_list));
 }
 
 static void ocfs2_handle_unlock_inodes(struct ocfs2_journal_handle *handle)
@@ -378,7 +376,7 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
        BUG_ON(!bh);
        BUG_ON(!(handle->flags & OCFS2_HANDLE_STARTED));
 
-       mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %hu\n",
+       mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %zu\n",
                   (unsigned long long)bh->b_blocknr, type,
                   (type == OCFS2_JOURNAL_ACCESS_CREATE) ?
                   "OCFS2_JOURNAL_ACCESS_CREATE" :
@@ -401,7 +399,7 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
         * j_trans_barrier for us. */
        ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode);
 
-       down(&OCFS2_I(inode)->ip_io_sem);
+       mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
        switch (type) {
        case OCFS2_JOURNAL_ACCESS_CREATE:
        case OCFS2_JOURNAL_ACCESS_WRITE:
@@ -416,7 +414,7 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
                status = -EINVAL;
                mlog(ML_ERROR, "Uknown access type!\n");
        }
-       up(&OCFS2_I(inode)->ip_io_sem);
+       mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
 
        if (status < 0)
                mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
@@ -504,8 +502,8 @@ static void ocfs2_handle_cleanup_locks(struct ocfs2_journal *journal,
                ocfs2_meta_unlock(inode, 1);
                if (atomic_read(&inode->i_count) == 1)
                        mlog(ML_ERROR,
-                            "Inode %"MLFu64", I'm doing a last iput for!",
-                            OCFS2_I(inode)->ip_blkno);
+                            "Inode %llu, I'm doing a last iput for!",
+                            (unsigned long long)OCFS2_I(inode)->ip_blkno);
                iput(inode);
                kmem_cache_free(ocfs2_lock_cache, lock);
        }
@@ -561,7 +559,11 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
        SET_INODE_JOURNAL(inode);
        OCFS2_I(inode)->ip_open_count++;
 
-       status = ocfs2_meta_lock(inode, NULL, &bh, 1);
+       /* Skip recovery waits here - journal inode metadata never
+        * changes in a live cluster so it can be considered an
+        * exception to the rule. */
+       status = ocfs2_meta_lock_full(inode, NULL, &bh, 1,
+                                     OCFS2_META_LOCK_RECOVERY);
        if (status < 0) {
                if (status != -ERESTARTSYS)
                        mlog(ML_ERROR, "Could not get lock on journal!\n");
@@ -579,7 +581,8 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
        }
 
        mlog(0, "inode->i_size = %lld\n", inode->i_size);
-       mlog(0, "inode->i_blocks = %lu\n", inode->i_blocks);
+       mlog(0, "inode->i_blocks = %llu\n",
+                       (unsigned long long)inode->i_blocks);
        mlog(0, "inode->ip_clusters = %u\n", OCFS2_I(inode)->ip_clusters);
 
        /* call the kernels journal init function now */
@@ -637,8 +640,9 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
                /* This is called from startup/shutdown which will
                 * handle the errors in a specific manner, so no need
                 * to call ocfs2_error() here. */
-               mlog(ML_ERROR, "Journal dinode %"MLFu64"  has invalid "
-                    "signature: %.*s", fe->i_blkno, 7, fe->i_signature);
+               mlog(ML_ERROR, "Journal dinode %llu  has invalid "
+                    "signature: %.*s", (unsigned long long)fe->i_blkno, 7,
+                    fe->i_signature);
                status = -EIO;
                goto out;
        }
@@ -672,8 +676,7 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
 
        mlog_entry_void();
 
-       if (!osb)
-               BUG();
+       BUG_ON(!osb);
 
        journal = osb->journal;
        if (!journal)
@@ -781,8 +784,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal)
        }
 
        /* Launch the commit thread */
-       osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt-%d",
-                                      osb->osb_id);
+       osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt");
        if (IS_ERR(osb->commit_task)) {
                status = PTR_ERR(osb->commit_task);
                osb->commit_task = NULL;
@@ -805,8 +807,7 @@ int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
 
        mlog_entry_void();
 
-       if (!journal)
-               BUG();
+       BUG_ON(!journal);
 
        status = journal_wipe(journal->j_journal, full);
        if (status < 0) {
@@ -848,8 +849,9 @@ static int ocfs2_force_read_journal(struct inode *inode)
 
        memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL);
 
-       mlog(0, "Force reading %lu blocks\n",
-            (inode->i_blocks >> (inode->i_sb->s_blocksize_bits - 9)));
+       mlog(0, "Force reading %llu blocks\n",
+               (unsigned long long)(inode->i_blocks >>
+                       (inode->i_sb->s_blocksize_bits - 9)));
 
        v_blkno = 0;
        while (v_blkno <
@@ -866,9 +868,11 @@ static int ocfs2_force_read_journal(struct inode *inode)
                if (p_blocks > CONCURRENT_JOURNAL_FILL)
                        p_blocks = CONCURRENT_JOURNAL_FILL;
 
+               /* We are reading journal data which should not
+                * be put in the uptodate cache */
                status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
                                           p_blkno, p_blocks, bhs, 0,
-                                          inode);
+                                          NULL);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail;
@@ -907,11 +911,12 @@ struct ocfs2_la_recovery_item {
  * NOTE: This function can and will sleep on recovery of other nodes
  * during cluster locking, just like any other ocfs2 process.
  */
-void ocfs2_complete_recovery(void *data)
+void ocfs2_complete_recovery(struct work_struct *work)
 {
        int ret;
-       struct ocfs2_super *osb = data;
-       struct ocfs2_journal *journal = osb->journal;
+       struct ocfs2_journal *journal =
+               container_of(work, struct ocfs2_journal, j_recovery_work);
+       struct ocfs2_super *osb = journal->j_osb;
        struct ocfs2_dinode *la_dinode, *tl_dinode;
        struct ocfs2_la_recovery_item *item;
        struct list_head *p, *n;
@@ -933,8 +938,8 @@ void ocfs2_complete_recovery(void *data)
 
                la_dinode = item->lri_la_dinode;
                if (la_dinode) {
-                       mlog(0, "Clean up local alloc %"MLFu64"\n",
-                            la_dinode->i_blkno);
+                       mlog(0, "Clean up local alloc %llu\n",
+                            (unsigned long long)la_dinode->i_blkno);
 
                        ret = ocfs2_complete_local_alloc_recovery(osb,
                                                                  la_dinode);
@@ -946,8 +951,8 @@ void ocfs2_complete_recovery(void *data)
 
                tl_dinode = item->lri_tl_dinode;
                if (tl_dinode) {
-                       mlog(0, "Clean up truncate log %"MLFu64"\n",
-                            tl_dinode->i_blkno);
+                       mlog(0, "Clean up truncate log %llu\n",
+                            (unsigned long long)tl_dinode->i_blkno);
 
                        ret = ocfs2_complete_truncate_log_recovery(osb,
                                                                   tl_dinode);
@@ -978,7 +983,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 {
        struct ocfs2_la_recovery_item *item;
 
-       item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_KERNEL);
+       item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS);
        if (!item) {
                /* Though we wish to avoid it, we are in fact safe in
                 * skipping local alloc cleanup as fsck.ocfs2 is more
@@ -1072,10 +1077,10 @@ restart:
                                        NULL);
 
 bail:
-       down(&osb->recovery_lock);
+       mutex_lock(&osb->recovery_lock);
        if (!status &&
            !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
-               up(&osb->recovery_lock);
+               mutex_unlock(&osb->recovery_lock);
                goto restart;
        }
 
@@ -1083,7 +1088,7 @@ bail:
        mb(); /* sync with ocfs2_recovery_thread_running */
        wake_up(&osb->recovery_event);
 
-       up(&osb->recovery_lock);
+       mutex_unlock(&osb->recovery_lock);
 
        mlog_exit(status);
        /* no one is callint kthread_stop() for us so the kthread() api
@@ -1098,7 +1103,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
        mlog_entry("(node_num=%d, osb->node_num = %d)\n",
                   node_num, osb->node_num);
 
-       down(&osb->recovery_lock);
+       mutex_lock(&osb->recovery_lock);
        if (osb->disable_recovery)
                goto out;
 
@@ -1113,14 +1118,14 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
                goto out;
 
        osb->recovery_thread_task =  kthread_run(__ocfs2_recovery_thread, osb,
-                                                "ocfs2rec-%d", osb->osb_id);
+                                                "ocfs2rec");
        if (IS_ERR(osb->recovery_thread_task)) {
                mlog_errno((int)PTR_ERR(osb->recovery_thread_task));
                osb->recovery_thread_task = NULL;
        }
 
 out:
-       up(&osb->recovery_lock);
+       mutex_unlock(&osb->recovery_lock);
        wake_up(&osb->recovery_event);
 
        mlog_exit_void();
@@ -1271,8 +1276,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 
        /* Should not ever be called to recover ourselves -- in that
         * case we should've called ocfs2_journal_load instead. */
-       if (osb->node_num == node_num)
-               BUG();
+       BUG_ON(osb->node_num == node_num);
 
        slot_num = ocfs2_node_num_to_slot(si, node_num);
        if (slot_num == OCFS2_INVALID_SLOT) {
@@ -1408,21 +1412,17 @@ bail:
        return status;
 }
 
-static int ocfs2_recover_orphans(struct ocfs2_super *osb,
-                                int slot)
+static int ocfs2_queue_orphans(struct ocfs2_super *osb,
+                              int slot,
+                              struct inode **head)
 {
-       int status = 0;
-       int have_disk_lock = 0;
-       struct inode *inode = NULL;
-       struct inode *iter;
+       int status;
        struct inode *orphan_dir_inode = NULL;
+       struct inode *iter;
        unsigned long offset, blk, local;
        struct buffer_head *bh = NULL;
        struct ocfs2_dir_entry *de;
        struct super_block *sb = osb->sb;
-       struct ocfs2_inode_info *oi;
-
-       mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
 
        orphan_dir_inode = ocfs2_get_system_file_inode(osb,
                                                       ORPHAN_DIR_SYSTEM_INODE,
@@ -1430,17 +1430,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
        if  (!orphan_dir_inode) {
                status = -ENOENT;
                mlog_errno(status);
-               goto out;
-       }
+               return status;
+       }       
 
        mutex_lock(&orphan_dir_inode->i_mutex);
        status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
        if (status < 0) {
-               mutex_unlock(&orphan_dir_inode->i_mutex);
                mlog_errno(status);
                goto out;
        }
-       have_disk_lock = 1;
 
        offset = 0;
        iter = NULL;
@@ -1451,11 +1449,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                if (!bh)
                        status = -EINVAL;
                if (status < 0) {
-                       mutex_unlock(&orphan_dir_inode->i_mutex);
                        if (bh)
                                brelse(bh);
                        mlog_errno(status);
-                       goto out;
+                       goto out_unlock;
                }
 
                local = 0;
@@ -1465,11 +1462,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
 
                        if (!ocfs2_check_dir_entry(orphan_dir_inode,
                                                  de, bh, local)) {
-                               mutex_unlock(&orphan_dir_inode->i_mutex);
                                status = -EINVAL;
                                mlog_errno(status);
                                brelse(bh);
-                               goto out;
+                               goto out_unlock;
                        }
 
                        local += le16_to_cpu(de->rec_len);
@@ -1481,11 +1477,11 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                        if (de->file_type > OCFS2_FT_MAX) {
                                mlog(ML_ERROR,
                                     "block %llu contains invalid de: "
-                                    "inode = %"MLFu64", rec_len = %u, "
+                                    "inode = %llu, rec_len = %u, "
                                     "name_len = %u, file_type = %u, "
                                     "name='%.*s'\n",
                                     (unsigned long long)bh->b_blocknr,
-                                    le64_to_cpu(de->inode),
+                                    (unsigned long long)le64_to_cpu(de->inode),
                                     le16_to_cpu(de->rec_len),
                                     de->name_len,
                                     de->file_type,
@@ -1498,28 +1494,106 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                        if (de->name_len == 2 && !strncmp("..", de->name, 2))
                                continue;
 
-                       iter = ocfs2_iget(osb, le64_to_cpu(de->inode));
+                       iter = ocfs2_iget(osb, le64_to_cpu(de->inode),
+                                         OCFS2_FI_FLAG_NOLOCK);
                        if (IS_ERR(iter))
                                continue;
 
-                       mlog(0, "queue orphan %"MLFu64"\n",
-                            OCFS2_I(iter)->ip_blkno);
-                       OCFS2_I(iter)->ip_next_orphan = inode;
-                       inode = iter;
+                       mlog(0, "queue orphan %llu\n",
+                            (unsigned long long)OCFS2_I(iter)->ip_blkno);
+                       /* No locking is required for the next_orphan
+                        * queue as there is only ever a single
+                        * process doing orphan recovery. */
+                       OCFS2_I(iter)->ip_next_orphan = *head;
+                       *head = iter;
                }
                brelse(bh);
        }
-       mutex_unlock(&orphan_dir_inode->i_mutex);
 
+out_unlock:
        ocfs2_meta_unlock(orphan_dir_inode, 0);
-       have_disk_lock = 0;
-
+out:
+       mutex_unlock(&orphan_dir_inode->i_mutex);
        iput(orphan_dir_inode);
-       orphan_dir_inode = NULL;
+       return status;
+}
+
+static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
+                                             int slot)
+{
+       int ret;
+
+       spin_lock(&osb->osb_lock);
+       ret = !osb->osb_orphan_wipes[slot];
+       spin_unlock(&osb->osb_lock);
+       return ret;
+}
+
+static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
+                                            int slot)
+{
+       spin_lock(&osb->osb_lock);
+       /* Mark ourselves such that new processes in delete_inode()
+        * know to quit early. */
+       ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+       while (osb->osb_orphan_wipes[slot]) {
+               /* If any processes are already in the middle of an
+                * orphan wipe on this dir, then we need to wait for
+                * them. */
+               spin_unlock(&osb->osb_lock);
+               wait_event_interruptible(osb->osb_wipe_event,
+                                        ocfs2_orphan_recovery_can_continue(osb, slot));
+               spin_lock(&osb->osb_lock);
+       }
+       spin_unlock(&osb->osb_lock);
+}
+
+static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
+                                             int slot)
+{
+       ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+}
+
+/*
+ * Orphan recovery. Each mounted node has it's own orphan dir which we
+ * must run during recovery. Our strategy here is to build a list of
+ * the inodes in the orphan dir and iget/iput them. The VFS does
+ * (most) of the rest of the work.
+ *
+ * Orphan recovery can happen at any time, not just mount so we have a
+ * couple of extra considerations.
+ *
+ * - We grab as many inodes as we can under the orphan dir lock -
+ *   doing iget() outside the orphan dir risks getting a reference on
+ *   an invalid inode.
+ * - We must be sure not to deadlock with other processes on the
+ *   system wanting to run delete_inode(). This can happen when they go
+ *   to lock the orphan dir and the orphan recovery process attempts to
+ *   iget() inside the orphan dir lock. This can be avoided by
+ *   advertising our state to ocfs2_delete_inode().
+ */
+static int ocfs2_recover_orphans(struct ocfs2_super *osb,
+                                int slot)
+{
+       int ret = 0;
+       struct inode *inode = NULL;
+       struct inode *iter;
+       struct ocfs2_inode_info *oi;
+
+       mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
+
+       ocfs2_mark_recovering_orphan_dir(osb, slot);
+       ret = ocfs2_queue_orphans(osb, slot, &inode);
+       ocfs2_clear_recovering_orphan_dir(osb, slot);
+
+       /* Error here should be noted, but we want to continue with as
+        * many queued inodes as we've got. */
+       if (ret)
+               mlog_errno(ret);
 
        while (inode) {
                oi = OCFS2_I(inode);
-               mlog(0, "iput orphan %"MLFu64"\n", oi->ip_blkno);
+               mlog(0, "iput orphan %llu\n", (unsigned long long)oi->ip_blkno);
 
                iter = oi->ip_next_orphan;
 
@@ -1541,14 +1615,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                inode = iter;
        }
 
-out:
-       if (have_disk_lock)
-               ocfs2_meta_unlock(orphan_dir_inode, 0);
-
-       if (orphan_dir_inode)
-               iput(orphan_dir_inode);
-
-       return status;
+       return ret;
 }
 
 static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
@@ -1584,10 +1651,9 @@ static int ocfs2_commit_thread(void *arg)
        while (!(kthread_should_stop() &&
                 atomic_read(&journal->j_num_trans) == 0)) {
 
-               wait_event_interruptible_timeout(osb->checkpoint_event,
-                                                atomic_read(&journal->j_num_trans)
-                                                || kthread_should_stop(),
-                                                OCFS2_CHECKPOINT_INTERVAL);
+               wait_event_interruptible(osb->checkpoint_event,
+                                        atomic_read(&journal->j_num_trans)
+                                        || kthread_should_stop());
 
                status = ocfs2_commit_cache(osb);
                if (status < 0)