ocfs2: remove unused handle argument from ocfs2_meta_lock_full()
[safe/jmp/linux-2.6] / fs / ocfs2 / journal.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * journal.c
5  *
6  * Defines functions of journalling api
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/fs.h>
27 #include <linux/types.h>
28 #include <linux/slab.h>
29 #include <linux/highmem.h>
30 #include <linux/kthread.h>
31
32 #define MLOG_MASK_PREFIX ML_JOURNAL
33 #include <cluster/masklog.h>
34
35 #include "ocfs2.h"
36
37 #include "alloc.h"
38 #include "dlmglue.h"
39 #include "extent_map.h"
40 #include "heartbeat.h"
41 #include "inode.h"
42 #include "journal.h"
43 #include "localalloc.h"
44 #include "namei.h"
45 #include "slot_map.h"
46 #include "super.h"
47 #include "vote.h"
48 #include "sysfile.h"
49
50 #include "buffer_head_io.h"
51
52 DEFINE_SPINLOCK(trans_inc_lock);
53
54 static int ocfs2_force_read_journal(struct inode *inode);
55 static int ocfs2_recover_node(struct ocfs2_super *osb,
56                               int node_num);
57 static int __ocfs2_recovery_thread(void *arg);
58 static int ocfs2_commit_cache(struct ocfs2_super *osb);
59 static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
60 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
61                                       int dirty);
62 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
63                                  int slot_num);
64 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
65                                  int slot);
66 static int ocfs2_commit_thread(void *arg);
67
68 static int ocfs2_commit_cache(struct ocfs2_super *osb)
69 {
70         int status = 0;
71         unsigned int flushed;
72         unsigned long old_id;
73         struct ocfs2_journal *journal = NULL;
74
75         mlog_entry_void();
76
77         journal = osb->journal;
78
79         /* Flush all pending commits and checkpoint the journal. */
80         down_write(&journal->j_trans_barrier);
81
82         if (atomic_read(&journal->j_num_trans) == 0) {
83                 up_write(&journal->j_trans_barrier);
84                 mlog(0, "No transactions for me to flush!\n");
85                 goto finally;
86         }
87
88         journal_lock_updates(journal->j_journal);
89         status = journal_flush(journal->j_journal);
90         journal_unlock_updates(journal->j_journal);
91         if (status < 0) {
92                 up_write(&journal->j_trans_barrier);
93                 mlog_errno(status);
94                 goto finally;
95         }
96
97         old_id = ocfs2_inc_trans_id(journal);
98
99         flushed = atomic_read(&journal->j_num_trans);
100         atomic_set(&journal->j_num_trans, 0);
101         up_write(&journal->j_trans_barrier);
102
103         mlog(0, "commit_thread: flushed transaction %lu (%u handles)\n",
104              journal->j_trans_id, flushed);
105
106         ocfs2_kick_vote_thread(osb);
107         wake_up(&journal->j_checkpointed);
108 finally:
109         mlog_exit(status);
110         return status;
111 }
112
113 static struct ocfs2_journal_handle *ocfs2_alloc_handle(struct ocfs2_super *osb)
114 {
115         struct ocfs2_journal_handle *retval = NULL;
116
117         retval = kcalloc(1, sizeof(*retval), GFP_NOFS);
118         if (!retval) {
119                 mlog(ML_ERROR, "Failed to allocate memory for journal "
120                      "handle!\n");
121                 return NULL;
122         }
123         retval->k_handle = NULL;
124
125         retval->journal = osb->journal;
126
127         return retval;
128 }
129
130 /* pass it NULL and it will allocate a new handle object for you.  If
131  * you pass it a handle however, it may still return error, in which
132  * case it has free'd the passed handle for you. */
133 struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
134                                                struct ocfs2_journal_handle *handle,
135                                                int max_buffs)
136 {
137         int ret;
138         journal_t *journal = osb->journal->j_journal;
139
140         mlog_entry("(max_buffs = %d)\n", max_buffs);
141
142         BUG_ON(!osb || !osb->journal->j_journal);
143
144         if (ocfs2_is_hard_readonly(osb)) {
145                 ret = -EROFS;
146                 goto done_free;
147         }
148
149         BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
150         BUG_ON(max_buffs <= 0);
151
152         /* JBD might support this, but our journalling code doesn't yet. */
153         if (journal_current_handle()) {
154                 mlog(ML_ERROR, "Recursive transaction attempted!\n");
155                 BUG();
156         }
157
158         if (!handle)
159                 handle = ocfs2_alloc_handle(osb);
160         if (!handle) {
161                 ret = -ENOMEM;
162                 mlog(ML_ERROR, "Failed to allocate memory for journal "
163                      "handle!\n");
164                 goto done_free;
165         }
166
167         down_read(&osb->journal->j_trans_barrier);
168
169         /* actually start the transaction now */
170         handle->k_handle = journal_start(journal, max_buffs);
171         if (IS_ERR(handle->k_handle)) {
172                 up_read(&osb->journal->j_trans_barrier);
173
174                 ret = PTR_ERR(handle->k_handle);
175                 handle->k_handle = NULL;
176                 mlog_errno(ret);
177
178                 if (is_journal_aborted(journal)) {
179                         ocfs2_abort(osb->sb, "Detected aborted journal");
180                         ret = -EROFS;
181                 }
182                 goto done_free;
183         }
184
185         atomic_inc(&(osb->journal->j_num_trans));
186
187         mlog_exit_ptr(handle);
188         return handle;
189
190 done_free:
191         if (handle)
192                 kfree(handle);
193
194         mlog_exit(ret);
195         return ERR_PTR(ret);
196 }
197
198 void ocfs2_commit_trans(struct ocfs2_journal_handle *handle)
199 {
200         handle_t *jbd_handle;
201         int retval;
202         struct ocfs2_journal *journal = handle->journal;
203
204         mlog_entry_void();
205
206         BUG_ON(!handle);
207
208         if (!handle->k_handle) {
209                 kfree(handle);
210                 mlog_exit_void();
211                 return;
212         }
213
214         /* ocfs2_extend_trans may have had to call journal_restart
215          * which will always commit the transaction, but may return
216          * error for any number of reasons. If this is the case, we
217          * clear k_handle as it's not valid any more. */
218         if (handle->k_handle) {
219                 jbd_handle = handle->k_handle;
220
221                 /* actually stop the transaction. if we've set h_sync,
222                  * it'll have been committed when we return */
223                 retval = journal_stop(jbd_handle);
224                 if (retval < 0) {
225                         mlog_errno(retval);
226                         mlog(ML_ERROR, "Could not commit transaction\n");
227                         BUG();
228                 }
229
230                 handle->k_handle = NULL; /* it's been free'd in journal_stop */
231         }
232
233         up_read(&journal->j_trans_barrier);
234
235         kfree(handle);
236         mlog_exit_void();
237 }
238
239 /*
240  * 'nblocks' is what you want to add to the current
241  * transaction. extend_trans will either extend the current handle by
242  * nblocks, or commit it and start a new one with nblocks credits.
243  *
244  * WARNING: This will not release any semaphores or disk locks taken
245  * during the transaction, so make sure they were taken *before*
246  * start_trans or we'll have ordering deadlocks.
247  *
248  * WARNING2: Note that we do *not* drop j_trans_barrier here. This is
249  * good because transaction ids haven't yet been recorded on the
250  * cluster locks associated with this handle.
251  */
252 int ocfs2_extend_trans(handle_t *handle, int nblocks)
253 {
254         int status;
255
256         BUG_ON(!handle);
257         BUG_ON(!nblocks);
258
259         mlog_entry_void();
260
261         mlog(0, "Trying to extend transaction by %d blocks\n", nblocks);
262
263         status = journal_extend(handle, nblocks);
264         if (status < 0) {
265                 mlog_errno(status);
266                 goto bail;
267         }
268
269         if (status > 0) {
270                 mlog(0, "journal_extend failed, trying journal_restart\n");
271                 status = journal_restart(handle, nblocks);
272                 if (status < 0) {
273                         mlog_errno(status);
274                         goto bail;
275                 }
276         }
277
278         status = 0;
279 bail:
280
281         mlog_exit(status);
282         return status;
283 }
284
285 int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
286                          struct inode *inode,
287                          struct buffer_head *bh,
288                          int type)
289 {
290         int status;
291
292         BUG_ON(!inode);
293         BUG_ON(!handle);
294         BUG_ON(!bh);
295
296         mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %zu\n",
297                    (unsigned long long)bh->b_blocknr, type,
298                    (type == OCFS2_JOURNAL_ACCESS_CREATE) ?
299                    "OCFS2_JOURNAL_ACCESS_CREATE" :
300                    "OCFS2_JOURNAL_ACCESS_WRITE",
301                    bh->b_size);
302
303         /* we can safely remove this assertion after testing. */
304         if (!buffer_uptodate(bh)) {
305                 mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n");
306                 mlog(ML_ERROR, "b_blocknr=%llu\n",
307                      (unsigned long long)bh->b_blocknr);
308                 BUG();
309         }
310
311         /* Set the current transaction information on the inode so
312          * that the locking code knows whether it can drop it's locks
313          * on this inode or not. We're protected from the commit
314          * thread updating the current transaction id until
315          * ocfs2_commit_trans() because ocfs2_start_trans() took
316          * j_trans_barrier for us. */
317         ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode);
318
319         mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
320         switch (type) {
321         case OCFS2_JOURNAL_ACCESS_CREATE:
322         case OCFS2_JOURNAL_ACCESS_WRITE:
323                 status = journal_get_write_access(handle->k_handle, bh);
324                 break;
325
326         case OCFS2_JOURNAL_ACCESS_UNDO:
327                 status = journal_get_undo_access(handle->k_handle, bh);
328                 break;
329
330         default:
331                 status = -EINVAL;
332                 mlog(ML_ERROR, "Uknown access type!\n");
333         }
334         mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
335
336         if (status < 0)
337                 mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
338                      status, type);
339
340         mlog_exit(status);
341         return status;
342 }
343
344 int ocfs2_journal_dirty(struct ocfs2_journal_handle *handle,
345                         struct buffer_head *bh)
346 {
347         int status;
348
349         mlog_entry("(bh->b_blocknr=%llu)\n",
350                    (unsigned long long)bh->b_blocknr);
351
352         status = journal_dirty_metadata(handle->k_handle, bh);
353         if (status < 0)
354                 mlog(ML_ERROR, "Could not dirty metadata buffer. "
355                      "(bh->b_blocknr=%llu)\n",
356                      (unsigned long long)bh->b_blocknr);
357
358         mlog_exit(status);
359         return status;
360 }
361
362 int ocfs2_journal_dirty_data(handle_t *handle,
363                              struct buffer_head *bh)
364 {
365         int err = journal_dirty_data(handle, bh);
366         if (err)
367                 mlog_errno(err);
368         /* TODO: When we can handle it, abort the handle and go RO on
369          * error here. */
370
371         return err;
372 }
373
374 #define OCFS2_DEFAULT_COMMIT_INTERVAL   (HZ * 5)
375
376 void ocfs2_set_journal_params(struct ocfs2_super *osb)
377 {
378         journal_t *journal = osb->journal->j_journal;
379
380         spin_lock(&journal->j_state_lock);
381         journal->j_commit_interval = OCFS2_DEFAULT_COMMIT_INTERVAL;
382         if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER)
383                 journal->j_flags |= JFS_BARRIER;
384         else
385                 journal->j_flags &= ~JFS_BARRIER;
386         spin_unlock(&journal->j_state_lock);
387 }
388
389 int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
390 {
391         int status = -1;
392         struct inode *inode = NULL; /* the journal inode */
393         journal_t *j_journal = NULL;
394         struct ocfs2_dinode *di = NULL;
395         struct buffer_head *bh = NULL;
396         struct ocfs2_super *osb;
397         int meta_lock = 0;
398
399         mlog_entry_void();
400
401         BUG_ON(!journal);
402
403         osb = journal->j_osb;
404
405         /* already have the inode for our journal */
406         inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
407                                             osb->slot_num);
408         if (inode == NULL) {
409                 status = -EACCES;
410                 mlog_errno(status);
411                 goto done;
412         }
413         if (is_bad_inode(inode)) {
414                 mlog(ML_ERROR, "access error (bad inode)\n");
415                 iput(inode);
416                 inode = NULL;
417                 status = -EACCES;
418                 goto done;
419         }
420
421         SET_INODE_JOURNAL(inode);
422         OCFS2_I(inode)->ip_open_count++;
423
424         /* Skip recovery waits here - journal inode metadata never
425          * changes in a live cluster so it can be considered an
426          * exception to the rule. */
427         status = ocfs2_meta_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
428         if (status < 0) {
429                 if (status != -ERESTARTSYS)
430                         mlog(ML_ERROR, "Could not get lock on journal!\n");
431                 goto done;
432         }
433
434         meta_lock = 1;
435         di = (struct ocfs2_dinode *)bh->b_data;
436
437         if (inode->i_size <  OCFS2_MIN_JOURNAL_SIZE) {
438                 mlog(ML_ERROR, "Journal file size (%lld) is too small!\n",
439                      inode->i_size);
440                 status = -EINVAL;
441                 goto done;
442         }
443
444         mlog(0, "inode->i_size = %lld\n", inode->i_size);
445         mlog(0, "inode->i_blocks = %llu\n",
446                         (unsigned long long)inode->i_blocks);
447         mlog(0, "inode->ip_clusters = %u\n", OCFS2_I(inode)->ip_clusters);
448
449         /* call the kernels journal init function now */
450         j_journal = journal_init_inode(inode);
451         if (j_journal == NULL) {
452                 mlog(ML_ERROR, "Linux journal layer error\n");
453                 status = -EINVAL;
454                 goto done;
455         }
456
457         mlog(0, "Returned from journal_init_inode\n");
458         mlog(0, "j_journal->j_maxlen = %u\n", j_journal->j_maxlen);
459
460         *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) &
461                   OCFS2_JOURNAL_DIRTY_FL);
462
463         journal->j_journal = j_journal;
464         journal->j_inode = inode;
465         journal->j_bh = bh;
466
467         ocfs2_set_journal_params(osb);
468
469         journal->j_state = OCFS2_JOURNAL_LOADED;
470
471         status = 0;
472 done:
473         if (status < 0) {
474                 if (meta_lock)
475                         ocfs2_meta_unlock(inode, 1);
476                 if (bh != NULL)
477                         brelse(bh);
478                 if (inode) {
479                         OCFS2_I(inode)->ip_open_count--;
480                         iput(inode);
481                 }
482         }
483
484         mlog_exit(status);
485         return status;
486 }
487
488 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
489                                       int dirty)
490 {
491         int status;
492         unsigned int flags;
493         struct ocfs2_journal *journal = osb->journal;
494         struct buffer_head *bh = journal->j_bh;
495         struct ocfs2_dinode *fe;
496
497         mlog_entry_void();
498
499         fe = (struct ocfs2_dinode *)bh->b_data;
500         if (!OCFS2_IS_VALID_DINODE(fe)) {
501                 /* This is called from startup/shutdown which will
502                  * handle the errors in a specific manner, so no need
503                  * to call ocfs2_error() here. */
504                 mlog(ML_ERROR, "Journal dinode %llu  has invalid "
505                      "signature: %.*s", (unsigned long long)fe->i_blkno, 7,
506                      fe->i_signature);
507                 status = -EIO;
508                 goto out;
509         }
510
511         flags = le32_to_cpu(fe->id1.journal1.ij_flags);
512         if (dirty)
513                 flags |= OCFS2_JOURNAL_DIRTY_FL;
514         else
515                 flags &= ~OCFS2_JOURNAL_DIRTY_FL;
516         fe->id1.journal1.ij_flags = cpu_to_le32(flags);
517
518         status = ocfs2_write_block(osb, bh, journal->j_inode);
519         if (status < 0)
520                 mlog_errno(status);
521
522 out:
523         mlog_exit(status);
524         return status;
525 }
526
527 /*
528  * If the journal has been kmalloc'd it needs to be freed after this
529  * call.
530  */
531 void ocfs2_journal_shutdown(struct ocfs2_super *osb)
532 {
533         struct ocfs2_journal *journal = NULL;
534         int status = 0;
535         struct inode *inode = NULL;
536         int num_running_trans = 0;
537
538         mlog_entry_void();
539
540         BUG_ON(!osb);
541
542         journal = osb->journal;
543         if (!journal)
544                 goto done;
545
546         inode = journal->j_inode;
547
548         if (journal->j_state != OCFS2_JOURNAL_LOADED)
549                 goto done;
550
551         /* need to inc inode use count as journal_destroy will iput. */
552         if (!igrab(inode))
553                 BUG();
554
555         num_running_trans = atomic_read(&(osb->journal->j_num_trans));
556         if (num_running_trans > 0)
557                 mlog(0, "Shutting down journal: must wait on %d "
558                      "running transactions!\n",
559                      num_running_trans);
560
561         /* Do a commit_cache here. It will flush our journal, *and*
562          * release any locks that are still held.
563          * set the SHUTDOWN flag and release the trans lock.
564          * the commit thread will take the trans lock for us below. */
565         journal->j_state = OCFS2_JOURNAL_IN_SHUTDOWN;
566
567         /* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not
568          * drop the trans_lock (which we want to hold until we
569          * completely destroy the journal. */
570         if (osb->commit_task) {
571                 /* Wait for the commit thread */
572                 mlog(0, "Waiting for ocfs2commit to exit....\n");
573                 kthread_stop(osb->commit_task);
574                 osb->commit_task = NULL;
575         }
576
577         BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
578
579         status = ocfs2_journal_toggle_dirty(osb, 0);
580         if (status < 0)
581                 mlog_errno(status);
582
583         /* Shutdown the kernel journal system */
584         journal_destroy(journal->j_journal);
585
586         OCFS2_I(inode)->ip_open_count--;
587
588         /* unlock our journal */
589         ocfs2_meta_unlock(inode, 1);
590
591         brelse(journal->j_bh);
592         journal->j_bh = NULL;
593
594         journal->j_state = OCFS2_JOURNAL_FREE;
595
596 //      up_write(&journal->j_trans_barrier);
597 done:
598         if (inode)
599                 iput(inode);
600         mlog_exit_void();
601 }
602
603 static void ocfs2_clear_journal_error(struct super_block *sb,
604                                       journal_t *journal,
605                                       int slot)
606 {
607         int olderr;
608
609         olderr = journal_errno(journal);
610         if (olderr) {
611                 mlog(ML_ERROR, "File system error %d recorded in "
612                      "journal %u.\n", olderr, slot);
613                 mlog(ML_ERROR, "File system on device %s needs checking.\n",
614                      sb->s_id);
615
616                 journal_ack_err(journal);
617                 journal_clear_err(journal);
618         }
619 }
620
621 int ocfs2_journal_load(struct ocfs2_journal *journal)
622 {
623         int status = 0;
624         struct ocfs2_super *osb;
625
626         mlog_entry_void();
627
628         if (!journal)
629                 BUG();
630
631         osb = journal->j_osb;
632
633         status = journal_load(journal->j_journal);
634         if (status < 0) {
635                 mlog(ML_ERROR, "Failed to load journal!\n");
636                 goto done;
637         }
638
639         ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num);
640
641         status = ocfs2_journal_toggle_dirty(osb, 1);
642         if (status < 0) {
643                 mlog_errno(status);
644                 goto done;
645         }
646
647         /* Launch the commit thread */
648         osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt");
649         if (IS_ERR(osb->commit_task)) {
650                 status = PTR_ERR(osb->commit_task);
651                 osb->commit_task = NULL;
652                 mlog(ML_ERROR, "unable to launch ocfs2commit thread, error=%d",
653                      status);
654                 goto done;
655         }
656
657 done:
658         mlog_exit(status);
659         return status;
660 }
661
662
663 /* 'full' flag tells us whether we clear out all blocks or if we just
664  * mark the journal clean */
665 int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
666 {
667         int status;
668
669         mlog_entry_void();
670
671         BUG_ON(!journal);
672
673         status = journal_wipe(journal->j_journal, full);
674         if (status < 0) {
675                 mlog_errno(status);
676                 goto bail;
677         }
678
679         status = ocfs2_journal_toggle_dirty(journal->j_osb, 0);
680         if (status < 0)
681                 mlog_errno(status);
682
683 bail:
684         mlog_exit(status);
685         return status;
686 }
687
688 /*
689  * JBD Might read a cached version of another nodes journal file. We
690  * don't want this as this file changes often and we get no
691  * notification on those changes. The only way to be sure that we've
692  * got the most up to date version of those blocks then is to force
693  * read them off disk. Just searching through the buffer cache won't
694  * work as there may be pages backing this file which are still marked
695  * up to date. We know things can't change on this file underneath us
696  * as we have the lock by now :)
697  */
698 static int ocfs2_force_read_journal(struct inode *inode)
699 {
700         int status = 0;
701         int i, p_blocks;
702         u64 v_blkno, p_blkno;
703 #define CONCURRENT_JOURNAL_FILL 32
704         struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL];
705
706         mlog_entry_void();
707
708         BUG_ON(inode->i_blocks !=
709                      ocfs2_align_bytes_to_sectors(i_size_read(inode)));
710
711         memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL);
712
713         mlog(0, "Force reading %llu blocks\n",
714                 (unsigned long long)(inode->i_blocks >>
715                         (inode->i_sb->s_blocksize_bits - 9)));
716
717         v_blkno = 0;
718         while (v_blkno <
719                (inode->i_blocks >> (inode->i_sb->s_blocksize_bits - 9))) {
720
721                 status = ocfs2_extent_map_get_blocks(inode, v_blkno,
722                                                      1, &p_blkno,
723                                                      &p_blocks);
724                 if (status < 0) {
725                         mlog_errno(status);
726                         goto bail;
727                 }
728
729                 if (p_blocks > CONCURRENT_JOURNAL_FILL)
730                         p_blocks = CONCURRENT_JOURNAL_FILL;
731
732                 /* We are reading journal data which should not
733                  * be put in the uptodate cache */
734                 status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
735                                            p_blkno, p_blocks, bhs, 0,
736                                            NULL);
737                 if (status < 0) {
738                         mlog_errno(status);
739                         goto bail;
740                 }
741
742                 for(i = 0; i < p_blocks; i++) {
743                         brelse(bhs[i]);
744                         bhs[i] = NULL;
745                 }
746
747                 v_blkno += p_blocks;
748         }
749
750 bail:
751         for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++)
752                 if (bhs[i])
753                         brelse(bhs[i]);
754         mlog_exit(status);
755         return status;
756 }
757
758 struct ocfs2_la_recovery_item {
759         struct list_head        lri_list;
760         int                     lri_slot;
761         struct ocfs2_dinode     *lri_la_dinode;
762         struct ocfs2_dinode     *lri_tl_dinode;
763 };
764
765 /* Does the second half of the recovery process. By this point, the
766  * node is marked clean and can actually be considered recovered,
767  * hence it's no longer in the recovery map, but there's still some
768  * cleanup we can do which shouldn't happen within the recovery thread
769  * as locking in that context becomes very difficult if we are to take
770  * recovering nodes into account.
771  *
772  * NOTE: This function can and will sleep on recovery of other nodes
773  * during cluster locking, just like any other ocfs2 process.
774  */
775 void ocfs2_complete_recovery(void *data)
776 {
777         int ret;
778         struct ocfs2_super *osb = data;
779         struct ocfs2_journal *journal = osb->journal;
780         struct ocfs2_dinode *la_dinode, *tl_dinode;
781         struct ocfs2_la_recovery_item *item;
782         struct list_head *p, *n;
783         LIST_HEAD(tmp_la_list);
784
785         mlog_entry_void();
786
787         mlog(0, "completing recovery from keventd\n");
788
789         spin_lock(&journal->j_lock);
790         list_splice_init(&journal->j_la_cleanups, &tmp_la_list);
791         spin_unlock(&journal->j_lock);
792
793         list_for_each_safe(p, n, &tmp_la_list) {
794                 item = list_entry(p, struct ocfs2_la_recovery_item, lri_list);
795                 list_del_init(&item->lri_list);
796
797                 mlog(0, "Complete recovery for slot %d\n", item->lri_slot);
798
799                 la_dinode = item->lri_la_dinode;
800                 if (la_dinode) {
801                         mlog(0, "Clean up local alloc %llu\n",
802                              (unsigned long long)la_dinode->i_blkno);
803
804                         ret = ocfs2_complete_local_alloc_recovery(osb,
805                                                                   la_dinode);
806                         if (ret < 0)
807                                 mlog_errno(ret);
808
809                         kfree(la_dinode);
810                 }
811
812                 tl_dinode = item->lri_tl_dinode;
813                 if (tl_dinode) {
814                         mlog(0, "Clean up truncate log %llu\n",
815                              (unsigned long long)tl_dinode->i_blkno);
816
817                         ret = ocfs2_complete_truncate_log_recovery(osb,
818                                                                    tl_dinode);
819                         if (ret < 0)
820                                 mlog_errno(ret);
821
822                         kfree(tl_dinode);
823                 }
824
825                 ret = ocfs2_recover_orphans(osb, item->lri_slot);
826                 if (ret < 0)
827                         mlog_errno(ret);
828
829                 kfree(item);
830         }
831
832         mlog(0, "Recovery completion\n");
833         mlog_exit_void();
834 }
835
836 /* NOTE: This function always eats your references to la_dinode and
837  * tl_dinode, either manually on error, or by passing them to
838  * ocfs2_complete_recovery */
839 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
840                                             int slot_num,
841                                             struct ocfs2_dinode *la_dinode,
842                                             struct ocfs2_dinode *tl_dinode)
843 {
844         struct ocfs2_la_recovery_item *item;
845
846         item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS);
847         if (!item) {
848                 /* Though we wish to avoid it, we are in fact safe in
849                  * skipping local alloc cleanup as fsck.ocfs2 is more
850                  * than capable of reclaiming unused space. */
851                 if (la_dinode)
852                         kfree(la_dinode);
853
854                 if (tl_dinode)
855                         kfree(tl_dinode);
856
857                 mlog_errno(-ENOMEM);
858                 return;
859         }
860
861         INIT_LIST_HEAD(&item->lri_list);
862         item->lri_la_dinode = la_dinode;
863         item->lri_slot = slot_num;
864         item->lri_tl_dinode = tl_dinode;
865
866         spin_lock(&journal->j_lock);
867         list_add_tail(&item->lri_list, &journal->j_la_cleanups);
868         queue_work(ocfs2_wq, &journal->j_recovery_work);
869         spin_unlock(&journal->j_lock);
870 }
871
872 /* Called by the mount code to queue recovery the last part of
873  * recovery for it's own slot. */
874 void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
875 {
876         struct ocfs2_journal *journal = osb->journal;
877
878         if (osb->dirty) {
879                 /* No need to queue up our truncate_log as regular
880                  * cleanup will catch that. */
881                 ocfs2_queue_recovery_completion(journal,
882                                                 osb->slot_num,
883                                                 osb->local_alloc_copy,
884                                                 NULL);
885                 ocfs2_schedule_truncate_log_flush(osb, 0);
886
887                 osb->local_alloc_copy = NULL;
888                 osb->dirty = 0;
889         }
890 }
891
892 static int __ocfs2_recovery_thread(void *arg)
893 {
894         int status, node_num;
895         struct ocfs2_super *osb = arg;
896
897         mlog_entry_void();
898
899         status = ocfs2_wait_on_mount(osb);
900         if (status < 0) {
901                 goto bail;
902         }
903
904 restart:
905         status = ocfs2_super_lock(osb, 1);
906         if (status < 0) {
907                 mlog_errno(status);
908                 goto bail;
909         }
910
911         while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
912                 node_num = ocfs2_node_map_first_set_bit(osb,
913                                                         &osb->recovery_map);
914                 if (node_num == O2NM_INVALID_NODE_NUM) {
915                         mlog(0, "Out of nodes to recover.\n");
916                         break;
917                 }
918
919                 status = ocfs2_recover_node(osb, node_num);
920                 if (status < 0) {
921                         mlog(ML_ERROR,
922                              "Error %d recovering node %d on device (%u,%u)!\n",
923                              status, node_num,
924                              MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
925                         mlog(ML_ERROR, "Volume requires unmount.\n");
926                         continue;
927                 }
928
929                 ocfs2_recovery_map_clear(osb, node_num);
930         }
931         ocfs2_super_unlock(osb, 1);
932
933         /* We always run recovery on our own orphan dir - the dead
934          * node(s) may have voted "no" on an inode delete earlier. A
935          * revote is therefore required. */
936         ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
937                                         NULL);
938
939 bail:
940         mutex_lock(&osb->recovery_lock);
941         if (!status &&
942             !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
943                 mutex_unlock(&osb->recovery_lock);
944                 goto restart;
945         }
946
947         osb->recovery_thread_task = NULL;
948         mb(); /* sync with ocfs2_recovery_thread_running */
949         wake_up(&osb->recovery_event);
950
951         mutex_unlock(&osb->recovery_lock);
952
953         mlog_exit(status);
954         /* no one is callint kthread_stop() for us so the kthread() api
955          * requires that we call do_exit().  And it isn't exported, but
956          * complete_and_exit() seems to be a minimal wrapper around it. */
957         complete_and_exit(NULL, status);
958         return status;
959 }
960
961 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
962 {
963         mlog_entry("(node_num=%d, osb->node_num = %d)\n",
964                    node_num, osb->node_num);
965
966         mutex_lock(&osb->recovery_lock);
967         if (osb->disable_recovery)
968                 goto out;
969
970         /* People waiting on recovery will wait on
971          * the recovery map to empty. */
972         if (!ocfs2_recovery_map_set(osb, node_num))
973                 mlog(0, "node %d already be in recovery.\n", node_num);
974
975         mlog(0, "starting recovery thread...\n");
976
977         if (osb->recovery_thread_task)
978                 goto out;
979
980         osb->recovery_thread_task =  kthread_run(__ocfs2_recovery_thread, osb,
981                                                  "ocfs2rec");
982         if (IS_ERR(osb->recovery_thread_task)) {
983                 mlog_errno((int)PTR_ERR(osb->recovery_thread_task));
984                 osb->recovery_thread_task = NULL;
985         }
986
987 out:
988         mutex_unlock(&osb->recovery_lock);
989         wake_up(&osb->recovery_event);
990
991         mlog_exit_void();
992 }
993
994 /* Does the actual journal replay and marks the journal inode as
995  * clean. Will only replay if the journal inode is marked dirty. */
996 static int ocfs2_replay_journal(struct ocfs2_super *osb,
997                                 int node_num,
998                                 int slot_num)
999 {
1000         int status;
1001         int got_lock = 0;
1002         unsigned int flags;
1003         struct inode *inode = NULL;
1004         struct ocfs2_dinode *fe;
1005         journal_t *journal = NULL;
1006         struct buffer_head *bh = NULL;
1007
1008         inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
1009                                             slot_num);
1010         if (inode == NULL) {
1011                 status = -EACCES;
1012                 mlog_errno(status);
1013                 goto done;
1014         }
1015         if (is_bad_inode(inode)) {
1016                 status = -EACCES;
1017                 iput(inode);
1018                 inode = NULL;
1019                 mlog_errno(status);
1020                 goto done;
1021         }
1022         SET_INODE_JOURNAL(inode);
1023
1024         status = ocfs2_meta_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
1025         if (status < 0) {
1026                 mlog(0, "status returned from ocfs2_meta_lock=%d\n", status);
1027                 if (status != -ERESTARTSYS)
1028                         mlog(ML_ERROR, "Could not lock journal!\n");
1029                 goto done;
1030         }
1031         got_lock = 1;
1032
1033         fe = (struct ocfs2_dinode *) bh->b_data;
1034
1035         flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1036
1037         if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
1038                 mlog(0, "No recovery required for node %d\n", node_num);
1039                 goto done;
1040         }
1041
1042         mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
1043              node_num, slot_num,
1044              MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1045
1046         OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters);
1047
1048         status = ocfs2_force_read_journal(inode);
1049         if (status < 0) {
1050                 mlog_errno(status);
1051                 goto done;
1052         }
1053
1054         mlog(0, "calling journal_init_inode\n");
1055         journal = journal_init_inode(inode);
1056         if (journal == NULL) {
1057                 mlog(ML_ERROR, "Linux journal layer error\n");
1058                 status = -EIO;
1059                 goto done;
1060         }
1061
1062         status = journal_load(journal);
1063         if (status < 0) {
1064                 mlog_errno(status);
1065                 if (!igrab(inode))
1066                         BUG();
1067                 journal_destroy(journal);
1068                 goto done;
1069         }
1070
1071         ocfs2_clear_journal_error(osb->sb, journal, slot_num);
1072
1073         /* wipe the journal */
1074         mlog(0, "flushing the journal.\n");
1075         journal_lock_updates(journal);
1076         status = journal_flush(journal);
1077         journal_unlock_updates(journal);
1078         if (status < 0)
1079                 mlog_errno(status);
1080
1081         /* This will mark the node clean */
1082         flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1083         flags &= ~OCFS2_JOURNAL_DIRTY_FL;
1084         fe->id1.journal1.ij_flags = cpu_to_le32(flags);
1085
1086         status = ocfs2_write_block(osb, bh, inode);
1087         if (status < 0)
1088                 mlog_errno(status);
1089
1090         if (!igrab(inode))
1091                 BUG();
1092
1093         journal_destroy(journal);
1094
1095 done:
1096         /* drop the lock on this nodes journal */
1097         if (got_lock)
1098                 ocfs2_meta_unlock(inode, 1);
1099
1100         if (inode)
1101                 iput(inode);
1102
1103         if (bh)
1104                 brelse(bh);
1105
1106         mlog_exit(status);
1107         return status;
1108 }
1109
1110 /*
1111  * Do the most important parts of node recovery:
1112  *  - Replay it's journal
1113  *  - Stamp a clean local allocator file
1114  *  - Stamp a clean truncate log
1115  *  - Mark the node clean
1116  *
1117  * If this function completes without error, a node in OCFS2 can be
1118  * said to have been safely recovered. As a result, failure during the
1119  * second part of a nodes recovery process (local alloc recovery) is
1120  * far less concerning.
1121  */
1122 static int ocfs2_recover_node(struct ocfs2_super *osb,
1123                               int node_num)
1124 {
1125         int status = 0;
1126         int slot_num;
1127         struct ocfs2_slot_info *si = osb->slot_info;
1128         struct ocfs2_dinode *la_copy = NULL;
1129         struct ocfs2_dinode *tl_copy = NULL;
1130
1131         mlog_entry("(node_num=%d, osb->node_num = %d)\n",
1132                    node_num, osb->node_num);
1133
1134         mlog(0, "checking node %d\n", node_num);
1135
1136         /* Should not ever be called to recover ourselves -- in that
1137          * case we should've called ocfs2_journal_load instead. */
1138         BUG_ON(osb->node_num == node_num);
1139
1140         slot_num = ocfs2_node_num_to_slot(si, node_num);
1141         if (slot_num == OCFS2_INVALID_SLOT) {
1142                 status = 0;
1143                 mlog(0, "no slot for this node, so no recovery required.\n");
1144                 goto done;
1145         }
1146
1147         mlog(0, "node %d was using slot %d\n", node_num, slot_num);
1148
1149         status = ocfs2_replay_journal(osb, node_num, slot_num);
1150         if (status < 0) {
1151                 mlog_errno(status);
1152                 goto done;
1153         }
1154
1155         /* Stamp a clean local alloc file AFTER recovering the journal... */
1156         status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
1157         if (status < 0) {
1158                 mlog_errno(status);
1159                 goto done;
1160         }
1161
1162         /* An error from begin_truncate_log_recovery is not
1163          * serious enough to warrant halting the rest of
1164          * recovery. */
1165         status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
1166         if (status < 0)
1167                 mlog_errno(status);
1168
1169         /* Likewise, this would be a strange but ultimately not so
1170          * harmful place to get an error... */
1171         ocfs2_clear_slot(si, slot_num);
1172         status = ocfs2_update_disk_slots(osb, si);
1173         if (status < 0)
1174                 mlog_errno(status);
1175
1176         /* This will kfree the memory pointed to by la_copy and tl_copy */
1177         ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
1178                                         tl_copy);
1179
1180         status = 0;
1181 done:
1182
1183         mlog_exit(status);
1184         return status;
1185 }
1186
1187 /* Test node liveness by trylocking his journal. If we get the lock,
1188  * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
1189  * still alive (we couldn't get the lock) and < 0 on error. */
1190 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
1191                                  int slot_num)
1192 {
1193         int status, flags;
1194         struct inode *inode = NULL;
1195
1196         inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
1197                                             slot_num);
1198         if (inode == NULL) {
1199                 mlog(ML_ERROR, "access error\n");
1200                 status = -EACCES;
1201                 goto bail;
1202         }
1203         if (is_bad_inode(inode)) {
1204                 mlog(ML_ERROR, "access error (bad inode)\n");
1205                 iput(inode);
1206                 inode = NULL;
1207                 status = -EACCES;
1208                 goto bail;
1209         }
1210         SET_INODE_JOURNAL(inode);
1211
1212         flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE;
1213         status = ocfs2_meta_lock_full(inode, NULL, 1, flags);
1214         if (status < 0) {
1215                 if (status != -EAGAIN)
1216                         mlog_errno(status);
1217                 goto bail;
1218         }
1219
1220         ocfs2_meta_unlock(inode, 1);
1221 bail:
1222         if (inode)
1223                 iput(inode);
1224
1225         return status;
1226 }
1227
1228 /* Call this underneath ocfs2_super_lock. It also assumes that the
1229  * slot info struct has been updated from disk. */
1230 int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
1231 {
1232         int status, i, node_num;
1233         struct ocfs2_slot_info *si = osb->slot_info;
1234
1235         /* This is called with the super block cluster lock, so we
1236          * know that the slot map can't change underneath us. */
1237
1238         spin_lock(&si->si_lock);
1239         for(i = 0; i < si->si_num_slots; i++) {
1240                 if (i == osb->slot_num)
1241                         continue;
1242                 if (ocfs2_is_empty_slot(si, i))
1243                         continue;
1244
1245                 node_num = si->si_global_node_nums[i];
1246                 if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
1247                         continue;
1248                 spin_unlock(&si->si_lock);
1249
1250                 /* Ok, we have a slot occupied by another node which
1251                  * is not in the recovery map. We trylock his journal
1252                  * file here to test if he's alive. */
1253                 status = ocfs2_trylock_journal(osb, i);
1254                 if (!status) {
1255                         /* Since we're called from mount, we know that
1256                          * the recovery thread can't race us on
1257                          * setting / checking the recovery bits. */
1258                         ocfs2_recovery_thread(osb, node_num);
1259                 } else if ((status < 0) && (status != -EAGAIN)) {
1260                         mlog_errno(status);
1261                         goto bail;
1262                 }
1263
1264                 spin_lock(&si->si_lock);
1265         }
1266         spin_unlock(&si->si_lock);
1267
1268         status = 0;
1269 bail:
1270         mlog_exit(status);
1271         return status;
1272 }
1273
1274 static int ocfs2_queue_orphans(struct ocfs2_super *osb,
1275                                int slot,
1276                                struct inode **head)
1277 {
1278         int status;
1279         struct inode *orphan_dir_inode = NULL;
1280         struct inode *iter;
1281         unsigned long offset, blk, local;
1282         struct buffer_head *bh = NULL;
1283         struct ocfs2_dir_entry *de;
1284         struct super_block *sb = osb->sb;
1285
1286         orphan_dir_inode = ocfs2_get_system_file_inode(osb,
1287                                                        ORPHAN_DIR_SYSTEM_INODE,
1288                                                        slot);
1289         if  (!orphan_dir_inode) {
1290                 status = -ENOENT;
1291                 mlog_errno(status);
1292                 return status;
1293         }       
1294
1295         mutex_lock(&orphan_dir_inode->i_mutex);
1296         status = ocfs2_meta_lock(orphan_dir_inode, NULL, 0);
1297         if (status < 0) {
1298                 mlog_errno(status);
1299                 goto out;
1300         }
1301
1302         offset = 0;
1303         iter = NULL;
1304         while(offset < i_size_read(orphan_dir_inode)) {
1305                 blk = offset >> sb->s_blocksize_bits;
1306
1307                 bh = ocfs2_bread(orphan_dir_inode, blk, &status, 0);
1308                 if (!bh)
1309                         status = -EINVAL;
1310                 if (status < 0) {
1311                         if (bh)
1312                                 brelse(bh);
1313                         mlog_errno(status);
1314                         goto out_unlock;
1315                 }
1316
1317                 local = 0;
1318                 while(offset < i_size_read(orphan_dir_inode)
1319                       && local < sb->s_blocksize) {
1320                         de = (struct ocfs2_dir_entry *) (bh->b_data + local);
1321
1322                         if (!ocfs2_check_dir_entry(orphan_dir_inode,
1323                                                   de, bh, local)) {
1324                                 status = -EINVAL;
1325                                 mlog_errno(status);
1326                                 brelse(bh);
1327                                 goto out_unlock;
1328                         }
1329
1330                         local += le16_to_cpu(de->rec_len);
1331                         offset += le16_to_cpu(de->rec_len);
1332
1333                         /* I guess we silently fail on no inode? */
1334                         if (!le64_to_cpu(de->inode))
1335                                 continue;
1336                         if (de->file_type > OCFS2_FT_MAX) {
1337                                 mlog(ML_ERROR,
1338                                      "block %llu contains invalid de: "
1339                                      "inode = %llu, rec_len = %u, "
1340                                      "name_len = %u, file_type = %u, "
1341                                      "name='%.*s'\n",
1342                                      (unsigned long long)bh->b_blocknr,
1343                                      (unsigned long long)le64_to_cpu(de->inode),
1344                                      le16_to_cpu(de->rec_len),
1345                                      de->name_len,
1346                                      de->file_type,
1347                                      de->name_len,
1348                                      de->name);
1349                                 continue;
1350                         }
1351                         if (de->name_len == 1 && !strncmp(".", de->name, 1))
1352                                 continue;
1353                         if (de->name_len == 2 && !strncmp("..", de->name, 2))
1354                                 continue;
1355
1356                         iter = ocfs2_iget(osb, le64_to_cpu(de->inode),
1357                                           OCFS2_FI_FLAG_NOLOCK);
1358                         if (IS_ERR(iter))
1359                                 continue;
1360
1361                         mlog(0, "queue orphan %llu\n",
1362                              (unsigned long long)OCFS2_I(iter)->ip_blkno);
1363                         /* No locking is required for the next_orphan
1364                          * queue as there is only ever a single
1365                          * process doing orphan recovery. */
1366                         OCFS2_I(iter)->ip_next_orphan = *head;
1367                         *head = iter;
1368                 }
1369                 brelse(bh);
1370         }
1371
1372 out_unlock:
1373         ocfs2_meta_unlock(orphan_dir_inode, 0);
1374 out:
1375         mutex_unlock(&orphan_dir_inode->i_mutex);
1376         iput(orphan_dir_inode);
1377         return status;
1378 }
1379
1380 static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
1381                                               int slot)
1382 {
1383         int ret;
1384
1385         spin_lock(&osb->osb_lock);
1386         ret = !osb->osb_orphan_wipes[slot];
1387         spin_unlock(&osb->osb_lock);
1388         return ret;
1389 }
1390
1391 static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
1392                                              int slot)
1393 {
1394         spin_lock(&osb->osb_lock);
1395         /* Mark ourselves such that new processes in delete_inode()
1396          * know to quit early. */
1397         ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
1398         while (osb->osb_orphan_wipes[slot]) {
1399                 /* If any processes are already in the middle of an
1400                  * orphan wipe on this dir, then we need to wait for
1401                  * them. */
1402                 spin_unlock(&osb->osb_lock);
1403                 wait_event_interruptible(osb->osb_wipe_event,
1404                                          ocfs2_orphan_recovery_can_continue(osb, slot));
1405                 spin_lock(&osb->osb_lock);
1406         }
1407         spin_unlock(&osb->osb_lock);
1408 }
1409
1410 static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
1411                                               int slot)
1412 {
1413         ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
1414 }
1415
1416 /*
1417  * Orphan recovery. Each mounted node has it's own orphan dir which we
1418  * must run during recovery. Our strategy here is to build a list of
1419  * the inodes in the orphan dir and iget/iput them. The VFS does
1420  * (most) of the rest of the work.
1421  *
1422  * Orphan recovery can happen at any time, not just mount so we have a
1423  * couple of extra considerations.
1424  *
1425  * - We grab as many inodes as we can under the orphan dir lock -
1426  *   doing iget() outside the orphan dir risks getting a reference on
1427  *   an invalid inode.
1428  * - We must be sure not to deadlock with other processes on the
1429  *   system wanting to run delete_inode(). This can happen when they go
1430  *   to lock the orphan dir and the orphan recovery process attempts to
1431  *   iget() inside the orphan dir lock. This can be avoided by
1432  *   advertising our state to ocfs2_delete_inode().
1433  */
1434 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1435                                  int slot)
1436 {
1437         int ret = 0;
1438         struct inode *inode = NULL;
1439         struct inode *iter;
1440         struct ocfs2_inode_info *oi;
1441
1442         mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
1443
1444         ocfs2_mark_recovering_orphan_dir(osb, slot);
1445         ret = ocfs2_queue_orphans(osb, slot, &inode);
1446         ocfs2_clear_recovering_orphan_dir(osb, slot);
1447
1448         /* Error here should be noted, but we want to continue with as
1449          * many queued inodes as we've got. */
1450         if (ret)
1451                 mlog_errno(ret);
1452
1453         while (inode) {
1454                 oi = OCFS2_I(inode);
1455                 mlog(0, "iput orphan %llu\n", (unsigned long long)oi->ip_blkno);
1456
1457                 iter = oi->ip_next_orphan;
1458
1459                 spin_lock(&oi->ip_lock);
1460                 /* Delete voting may have set these on the assumption
1461                  * that the other node would wipe them successfully.
1462                  * If they are still in the node's orphan dir, we need
1463                  * to reset that state. */
1464                 oi->ip_flags &= ~(OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE);
1465
1466                 /* Set the proper information to get us going into
1467                  * ocfs2_delete_inode. */
1468                 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
1469                 oi->ip_orphaned_slot = slot;
1470                 spin_unlock(&oi->ip_lock);
1471
1472                 iput(inode);
1473
1474                 inode = iter;
1475         }
1476
1477         return ret;
1478 }
1479
1480 static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
1481 {
1482         /* This check is good because ocfs2 will wait on our recovery
1483          * thread before changing it to something other than MOUNTED
1484          * or DISABLED. */
1485         wait_event(osb->osb_mount_event,
1486                    atomic_read(&osb->vol_state) == VOLUME_MOUNTED ||
1487                    atomic_read(&osb->vol_state) == VOLUME_DISABLED);
1488
1489         /* If there's an error on mount, then we may never get to the
1490          * MOUNTED flag, but this is set right before
1491          * dismount_volume() so we can trust it. */
1492         if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) {
1493                 mlog(0, "mount error, exiting!\n");
1494                 return -EBUSY;
1495         }
1496
1497         return 0;
1498 }
1499
1500 static int ocfs2_commit_thread(void *arg)
1501 {
1502         int status;
1503         struct ocfs2_super *osb = arg;
1504         struct ocfs2_journal *journal = osb->journal;
1505
1506         /* we can trust j_num_trans here because _should_stop() is only set in
1507          * shutdown and nobody other than ourselves should be able to start
1508          * transactions.  committing on shutdown might take a few iterations
1509          * as final transactions put deleted inodes on the list */
1510         while (!(kthread_should_stop() &&
1511                  atomic_read(&journal->j_num_trans) == 0)) {
1512
1513                 wait_event_interruptible(osb->checkpoint_event,
1514                                          atomic_read(&journal->j_num_trans)
1515                                          || kthread_should_stop());
1516
1517                 status = ocfs2_commit_cache(osb);
1518                 if (status < 0)
1519                         mlog_errno(status);
1520
1521                 if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){
1522                         mlog(ML_KTHREAD,
1523                              "commit_thread: %u transactions pending on "
1524                              "shutdown\n",
1525                              atomic_read(&journal->j_num_trans));
1526                 }
1527         }
1528
1529         return 0;
1530 }
1531
1532 /* Look for a dirty journal without taking any cluster locks. Used for
1533  * hard readonly access to determine whether the file system journals
1534  * require recovery. */
1535 int ocfs2_check_journals_nolocks(struct ocfs2_super *osb)
1536 {
1537         int ret = 0;
1538         unsigned int slot;
1539         struct buffer_head *di_bh;
1540         struct ocfs2_dinode *di;
1541         struct inode *journal = NULL;
1542
1543         for(slot = 0; slot < osb->max_slots; slot++) {
1544                 journal = ocfs2_get_system_file_inode(osb,
1545                                                       JOURNAL_SYSTEM_INODE,
1546                                                       slot);
1547                 if (!journal || is_bad_inode(journal)) {
1548                         ret = -EACCES;
1549                         mlog_errno(ret);
1550                         goto out;
1551                 }
1552
1553                 di_bh = NULL;
1554                 ret = ocfs2_read_block(osb, OCFS2_I(journal)->ip_blkno, &di_bh,
1555                                        0, journal);
1556                 if (ret < 0) {
1557                         mlog_errno(ret);
1558                         goto out;
1559                 }
1560
1561                 di = (struct ocfs2_dinode *) di_bh->b_data;
1562
1563                 if (le32_to_cpu(di->id1.journal1.ij_flags) &
1564                     OCFS2_JOURNAL_DIRTY_FL)
1565                         ret = -EROFS;
1566
1567                 brelse(di_bh);
1568                 if (ret)
1569                         break;
1570         }
1571
1572 out:
1573         if (journal)
1574                 iput(journal);
1575
1576         return ret;
1577 }