nfsd: move most of nfsfh.h to fs/nfsd
[safe/jmp/linux-2.6] / fs / gfs2 / lops.c
index dd41863..9969ff0 100644 (file)
@@ -1,10 +1,10 @@
 /*
  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
- * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+ * Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
  *
  * This copyrighted material is made available to anyone wishing to use,
  * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU General Public License v.2.
+ * of the GNU General Public License version 2.
  */
 
 #include <linux/sched.h>
 #include <linux/spinlock.h>
 #include <linux/completion.h>
 #include <linux/buffer_head.h>
-#include <asm/semaphore.h>
+#include <linux/gfs2_ondisk.h>
+#include <linux/bio.h>
+#include <linux/fs.h>
 
 #include "gfs2.h"
+#include "incore.h"
+#include "inode.h"
 #include "glock.h"
 #include "log.h"
 #include "lops.h"
 #include "recovery.h"
 #include "rgrp.h"
 #include "trans.h"
+#include "util.h"
+#include "trace_gfs2.h"
 
-static void glock_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
+/**
+ * gfs2_pin - Pin a buffer in memory
+ * @sdp: The superblock
+ * @bh: The buffer to be pinned
+ *
+ * The log lock must be held when calling this function
+ */
+static void gfs2_pin(struct gfs2_sbd *sdp, struct buffer_head *bh)
 {
-       struct gfs2_glock *gl;
+       struct gfs2_bufdata *bd;
 
-       get_transaction->tr_touched = 1;
+       gfs2_assert_withdraw(sdp, test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags));
 
-       if (!list_empty(&le->le_list))
-               return;
+       clear_buffer_dirty(bh);
+       if (test_set_buffer_pinned(bh))
+               gfs2_assert_withdraw(sdp, 0);
+       if (!buffer_uptodate(bh))
+               gfs2_io_error_bh(sdp, bh);
+       bd = bh->b_private;
+       /* If this buffer is in the AIL and it has already been written
+        * to in-place disk block, remove it from the AIL.
+        */
+       if (bd->bd_ail)
+               list_move(&bd->bd_ail_st_list, &bd->bd_ail->ai_ail2_list);
+       get_bh(bh);
+       trace_gfs2_pin(bd, 1);
+}
 
-       gl = container_of(le, struct gfs2_glock, gl_le);
-       if (gfs2_assert_withdraw(sdp, gfs2_glock_is_held_excl(gl)))
-               return;
-       gfs2_glock_hold(gl);
-       set_bit(GLF_DIRTY, &gl->gl_flags);
+/**
+ * gfs2_unpin - Unpin a buffer
+ * @sdp: the filesystem the buffer belongs to
+ * @bh: The buffer to unpin
+ * @ai:
+ *
+ */
+
+static void gfs2_unpin(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                      struct gfs2_ail *ai)
+{
+       struct gfs2_bufdata *bd = bh->b_private;
+
+       gfs2_assert_withdraw(sdp, buffer_uptodate(bh));
+
+       if (!buffer_pinned(bh))
+               gfs2_assert_withdraw(sdp, 0);
+
+       lock_buffer(bh);
+       mark_buffer_dirty(bh);
+       clear_buffer_pinned(bh);
 
        gfs2_log_lock(sdp);
-       sdp->sd_log_num_gl++;
-       list_add(&le->le_list, &sdp->sd_log_le_gl);
+       if (bd->bd_ail) {
+               list_del(&bd->bd_ail_st_list);
+               brelse(bh);
+       } else {
+               struct gfs2_glock *gl = bd->bd_gl;
+               list_add(&bd->bd_ail_gl_list, &gl->gl_ail_list);
+               atomic_inc(&gl->gl_ail_count);
+       }
+       bd->bd_ail = ai;
+       list_add(&bd->bd_ail_st_list, &ai->ai_ail1_list);
+       clear_bit(GLF_LFLUSH, &bd->bd_gl->gl_flags);
+       trace_gfs2_pin(bd, 0);
        gfs2_log_unlock(sdp);
+       unlock_buffer(bh);
 }
 
-static void glock_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
+
+static inline struct gfs2_log_descriptor *bh_log_desc(struct buffer_head *bh)
 {
-       struct list_head *head = &sdp->sd_log_le_gl;
-       struct gfs2_glock *gl;
+       return (struct gfs2_log_descriptor *)bh->b_data;
+}
 
-       while (!list_empty(head)) {
-               gl = list_entry(head->next, struct gfs2_glock, gl_le.le_list);
-               list_del_init(&gl->gl_le.le_list);
-               sdp->sd_log_num_gl--;
+static inline __be64 *bh_log_ptr(struct buffer_head *bh)
+{
+       struct gfs2_log_descriptor *ld = bh_log_desc(bh);
+       return (__force __be64 *)(ld + 1);
+}
 
-               gfs2_assert_withdraw(sdp, gfs2_glock_is_held_excl(gl));
-               gfs2_glock_put(gl);
-       }
-       gfs2_assert_warn(sdp, !sdp->sd_log_num_gl);
+static inline __be64 *bh_ptr_end(struct buffer_head *bh)
+{
+       return (__force __be64 *)(bh->b_data + bh->b_size);
+}
+
+
+static struct buffer_head *gfs2_get_log_desc(struct gfs2_sbd *sdp, u32 ld_type)
+{
+       struct buffer_head *bh = gfs2_log_get_buf(sdp);
+       struct gfs2_log_descriptor *ld = bh_log_desc(bh);
+       ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
+       ld->ld_header.mh_type = cpu_to_be32(GFS2_METATYPE_LD);
+       ld->ld_header.mh_format = cpu_to_be32(GFS2_FORMAT_LD);
+       ld->ld_type = cpu_to_be32(ld_type);
+       ld->ld_length = 0;
+       ld->ld_data1 = 0;
+       ld->ld_data2 = 0;
+       memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
+       return bh;
 }
 
 static void buf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
@@ -65,41 +134,26 @@ static void buf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
        struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
        struct gfs2_trans *tr;
 
+       lock_buffer(bd->bd_bh);
+       gfs2_log_lock(sdp);
        if (!list_empty(&bd->bd_list_tr))
-               return;
-
-       tr = get_transaction;
+               goto out;
+       tr = current->journal_info;
        tr->tr_touched = 1;
        tr->tr_num_buf++;
        list_add(&bd->bd_list_tr, &tr->tr_list_buf);
-
        if (!list_empty(&le->le_list))
-               return;
-
-       gfs2_trans_add_gl(bd->bd_gl);
-
+               goto out;
+       set_bit(GLF_LFLUSH, &bd->bd_gl->gl_flags);
+       set_bit(GLF_DIRTY, &bd->bd_gl->gl_flags);
        gfs2_meta_check(sdp, bd->bd_bh);
        gfs2_pin(sdp, bd->bd_bh);
-
-       gfs2_log_lock(sdp);
        sdp->sd_log_num_buf++;
        list_add(&le->le_list, &sdp->sd_log_le_buf);
-       gfs2_log_unlock(sdp);
-
        tr->tr_num_buf_new++;
-}
-
-static void buf_lo_incore_commit(struct gfs2_sbd *sdp, struct gfs2_trans *tr)
-{
-       struct list_head *head = &tr->tr_list_buf;
-       struct gfs2_bufdata *bd;
-
-       while (!list_empty(head)) {
-               bd = list_entry(head->next, struct gfs2_bufdata, bd_list_tr);
-               list_del_init(&bd->bd_list_tr);
-               tr->tr_num_buf--;
-       }
-       gfs2_assert_warn(sdp, !tr->tr_num_buf);
+out:
+       gfs2_log_unlock(sdp);
+       unlock_buffer(bd->bd_bh);
 }
 
 static void buf_lo_before_commit(struct gfs2_sbd *sdp)
@@ -107,56 +161,59 @@ static void buf_lo_before_commit(struct gfs2_sbd *sdp)
        struct buffer_head *bh;
        struct gfs2_log_descriptor *ld;
        struct gfs2_bufdata *bd1 = NULL, *bd2;
-       unsigned int total = sdp->sd_log_num_buf;
-       unsigned int offset = sizeof(struct gfs2_log_descriptor);
+       unsigned int total;
        unsigned int limit;
        unsigned int num;
        unsigned n;
        __be64 *ptr;
 
-       offset += (sizeof(__be64) - 1);
-       offset &= ~(sizeof(__be64) - 1);
-       limit = (sdp->sd_sb.sb_bsize - offset)/sizeof(__be64);
+       limit = buf_limit(sdp);
        /* for 4k blocks, limit = 503 */
 
+       gfs2_log_lock(sdp);
+       total = sdp->sd_log_num_buf;
        bd1 = bd2 = list_prepare_entry(bd1, &sdp->sd_log_le_buf, bd_le.le_list);
        while(total) {
                num = total;
                if (total > limit)
                        num = limit;
-               bh = gfs2_log_get_buf(sdp);
-               ld = (struct gfs2_log_descriptor *)bh->b_data;
-               ptr = (__be64 *)(bh->b_data + offset);
-               ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
-               ld->ld_header.mh_type = cpu_to_be16(GFS2_METATYPE_LD);
-               ld->ld_header.mh_format = cpu_to_be16(GFS2_FORMAT_LD);
-               ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_METADATA);
+               gfs2_log_unlock(sdp);
+               bh = gfs2_get_log_desc(sdp, GFS2_LOG_DESC_METADATA);
+               gfs2_log_lock(sdp);
+               ld = bh_log_desc(bh);
+               ptr = bh_log_ptr(bh);
                ld->ld_length = cpu_to_be32(num + 1);
                ld->ld_data1 = cpu_to_be32(num);
-               ld->ld_data2 = cpu_to_be32(0);
-               memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
 
                n = 0;
-               list_for_each_entry_continue(bd1, &sdp->sd_log_le_buf, bd_le.le_list) {
+               list_for_each_entry_continue(bd1, &sdp->sd_log_le_buf,
+                                            bd_le.le_list) {
                        *ptr++ = cpu_to_be64(bd1->bd_bh->b_blocknr);
                        if (++n >= num)
                                break;
                }
 
-               set_buffer_dirty(bh);
-               ll_rw_block(WRITE, 1, &bh);
+               gfs2_log_unlock(sdp);
+               submit_bh(WRITE_SYNC_PLUG, bh);
+               gfs2_log_lock(sdp);
 
                n = 0;
-               list_for_each_entry_continue(bd2, &sdp->sd_log_le_buf, bd_le.le_list) {
+               list_for_each_entry_continue(bd2, &sdp->sd_log_le_buf,
+                                            bd_le.le_list) {
+                       get_bh(bd2->bd_bh);
+                       gfs2_log_unlock(sdp);
+                       lock_buffer(bd2->bd_bh);
                        bh = gfs2_log_fake_buf(sdp, bd2->bd_bh);
-                       set_buffer_dirty(bh);
-                       ll_rw_block(WRITE, 1, &bh);
+                       submit_bh(WRITE_SYNC_PLUG, bh);
+                       gfs2_log_lock(sdp);
                        if (++n >= num)
                                break;
                }
 
+               BUG_ON(total < num);
                total -= num;
        }
+       gfs2_log_unlock(sdp);
 }
 
 static void buf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
@@ -175,9 +232,9 @@ static void buf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
 }
 
 static void buf_lo_before_scan(struct gfs2_jdesc *jd,
-                              struct gfs2_log_header *head, int pass)
+                              struct gfs2_log_header_host *head, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
 
        if (pass != 0)
                return;
@@ -190,11 +247,12 @@ static int buf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
                                struct gfs2_log_descriptor *ld, __be64 *ptr,
                                int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
-       struct gfs2_glock *gl = jd->jd_inode->i_gl;
+       struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
+       struct gfs2_glock *gl = ip->i_gl;
        unsigned int blks = be32_to_cpu(ld->ld_data1);
        struct buffer_head *bh_log, *bh_ip;
-       uint64_t blkno;
+       u64 blkno;
        int error = 0;
 
        if (pass != 1 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_METADATA)
@@ -211,8 +269,8 @@ static int buf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
                        continue;
 
                error = gfs2_replay_read_block(jd, start, &bh_log);
-                if (error)
-                        return error;
+               if (error)
+                       return error;
 
                bh_ip = gfs2_meta_new(gl, blkno);
                memcpy(bh_ip->b_data, bh_log->b_data, bh_log->b_size);
@@ -236,16 +294,17 @@ static int buf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
 
 static void buf_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
 
        if (error) {
-               gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+               gfs2_meta_sync(ip->i_gl);
                return;
        }
        if (pass != 1)
                return;
 
-       gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+       gfs2_meta_sync(ip->i_gl);
 
        fs_info(sdp, "jid=%u: Replayed %u of %u blocks\n",
                jd->jd_jid, sdp->sd_replayed_blocks, sdp->sd_found_blocks);
@@ -255,14 +314,11 @@ static void revoke_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
 {
        struct gfs2_trans *tr;
 
-       tr = get_transaction;
+       tr = current->journal_info;
        tr->tr_touched = 1;
        tr->tr_num_revoke++;
-
-       gfs2_log_lock(sdp);
        sdp->sd_log_num_revoke++;
        list_add(&le->le_list, &sdp->sd_log_le_revoke);
-       gfs2_log_unlock(sdp);
 }
 
 static void revoke_lo_before_commit(struct gfs2_sbd *sdp)
@@ -272,55 +328,48 @@ static void revoke_lo_before_commit(struct gfs2_sbd *sdp)
        struct buffer_head *bh;
        unsigned int offset;
        struct list_head *head = &sdp->sd_log_le_revoke;
-       struct gfs2_revoke *rv;
+       struct gfs2_bufdata *bd;
 
        if (!sdp->sd_log_num_revoke)
                return;
 
-       bh = gfs2_log_get_buf(sdp);
-       ld = (struct gfs2_log_descriptor *)bh->b_data;
-       ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
-       ld->ld_header.mh_type = cpu_to_be16(GFS2_METATYPE_LD);
-       ld->ld_header.mh_format = cpu_to_be16(GFS2_FORMAT_LD);
-       ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_REVOKE);
-       ld->ld_length = cpu_to_be32(gfs2_struct2blk(sdp, sdp->sd_log_num_revoke, sizeof(uint64_t)));
+       bh = gfs2_get_log_desc(sdp, GFS2_LOG_DESC_REVOKE);
+       ld = bh_log_desc(bh);
+       ld->ld_length = cpu_to_be32(gfs2_struct2blk(sdp, sdp->sd_log_num_revoke,
+                                                   sizeof(u64)));
        ld->ld_data1 = cpu_to_be32(sdp->sd_log_num_revoke);
-       ld->ld_data2 = cpu_to_be32(0);
-       memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
        offset = sizeof(struct gfs2_log_descriptor);
 
        while (!list_empty(head)) {
-               rv = list_entry(head->next, struct gfs2_revoke, rv_le.le_list);
-               list_del(&rv->rv_le.le_list);
+               bd = list_entry(head->next, struct gfs2_bufdata, bd_le.le_list);
+               list_del_init(&bd->bd_le.le_list);
                sdp->sd_log_num_revoke--;
 
-               if (offset + sizeof(uint64_t) > sdp->sd_sb.sb_bsize) {
-                       set_buffer_dirty(bh);
-                       ll_rw_block(WRITE, 1, &bh);
+               if (offset + sizeof(u64) > sdp->sd_sb.sb_bsize) {
+                       submit_bh(WRITE_SYNC_PLUG, bh);
 
                        bh = gfs2_log_get_buf(sdp);
                        mh = (struct gfs2_meta_header *)bh->b_data;
                        mh->mh_magic = cpu_to_be32(GFS2_MAGIC);
-                       mh->mh_type = cpu_to_be16(GFS2_METATYPE_LB);
-                       mh->mh_format = cpu_to_be16(GFS2_FORMAT_LB);
+                       mh->mh_type = cpu_to_be32(GFS2_METATYPE_LB);
+                       mh->mh_format = cpu_to_be32(GFS2_FORMAT_LB);
                        offset = sizeof(struct gfs2_meta_header);
                }
 
-               *(__be64 *)(bh->b_data + offset) = cpu_to_be64(rv->rv_blkno);
-               kfree(rv);
+               *(__be64 *)(bh->b_data + offset) = cpu_to_be64(bd->bd_blkno);
+               kmem_cache_free(gfs2_bufdata_cachep, bd);
 
-               offset += sizeof(uint64_t);
+               offset += sizeof(u64);
        }
        gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
 
-       set_buffer_dirty(bh);
-       ll_rw_block(WRITE, 1, &bh);
+       submit_bh(WRITE_SYNC_PLUG, bh);
 }
 
 static void revoke_lo_before_scan(struct gfs2_jdesc *jd,
-                                 struct gfs2_log_header *head, int pass)
+                                 struct gfs2_log_header_host *head, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
 
        if (pass != 0)
                return;
@@ -333,12 +382,12 @@ static int revoke_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
                                   struct gfs2_log_descriptor *ld, __be64 *ptr,
                                   int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
        unsigned int blks = be32_to_cpu(ld->ld_length);
        unsigned int revokes = be32_to_cpu(ld->ld_data1);
        struct buffer_head *bh;
        unsigned int offset;
-       uint64_t blkno;
+       u64 blkno;
        int first = 1;
        int error;
 
@@ -355,18 +404,20 @@ static int revoke_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
                if (!first)
                        gfs2_metatype_check(sdp, bh, GFS2_METATYPE_LB);
 
-               while (offset + sizeof(uint64_t) <= sdp->sd_sb.sb_bsize) {
+               while (offset + sizeof(u64) <= sdp->sd_sb.sb_bsize) {
                        blkno = be64_to_cpu(*(__be64 *)(bh->b_data + offset));
 
                        error = gfs2_revoke_add(sdp, blkno, start);
-                       if (error < 0)
+                       if (error < 0) {
+                               brelse(bh);
                                return error;
+                       }
                        else if (error)
                                sdp->sd_found_revokes++;
 
                        if (!--revokes)
                                break;
-                       offset += sizeof(uint64_t);
+                       offset += sizeof(u64);
                }
 
                brelse(bh);
@@ -379,7 +430,7 @@ static int revoke_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
 
 static void revoke_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
 
        if (error) {
                gfs2_revoke_clean(sdp);
@@ -397,19 +448,21 @@ static void revoke_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
 static void rg_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
 {
        struct gfs2_rgrpd *rgd;
+       struct gfs2_trans *tr = current->journal_info;
 
-       get_transaction->tr_touched = 1;
-
-       if (!list_empty(&le->le_list))
-               return;
+       tr->tr_touched = 1;
 
        rgd = container_of(le, struct gfs2_rgrpd, rd_le);
-       gfs2_rgrp_bh_hold(rgd);
 
        gfs2_log_lock(sdp);
+       if (!list_empty(&le->le_list)){
+               gfs2_log_unlock(sdp);
+               return;
+       }
+       gfs2_rgrp_bh_hold(rgd);
        sdp->sd_log_num_rg++;
        list_add(&le->le_list, &sdp->sd_log_le_rg);
-       gfs2_log_unlock(sdp);   
+       gfs2_log_unlock(sdp);
 }
 
 static void rg_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
@@ -447,195 +500,156 @@ static void rg_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
 static void databuf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
 {
        struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
-       struct gfs2_trans *tr = get_transaction;
+       struct gfs2_trans *tr = current->journal_info;
        struct address_space *mapping = bd->bd_bh->b_page->mapping;
-       struct gfs2_inode *ip = get_v2ip(mapping->host);
+       struct gfs2_inode *ip = GFS2_I(mapping->host);
 
-       tr->tr_touched = 1;
-       if (!list_empty(&bd->bd_list_tr) &&
-           (ip->i_di.di_flags & GFS2_DIF_JDATA)) {
-               tr->tr_num_buf++;
-               gfs2_trans_add_gl(bd->bd_gl);
-               list_add(&bd->bd_list_tr, &tr->tr_list_buf);
+       lock_buffer(bd->bd_bh);
+       gfs2_log_lock(sdp);
+       if (tr) {
+               if (!list_empty(&bd->bd_list_tr))
+                       goto out;
+               tr->tr_touched = 1;
+               if (gfs2_is_jdata(ip)) {
+                       tr->tr_num_buf++;
+                       list_add(&bd->bd_list_tr, &tr->tr_list_buf);
+               }
+       }
+       if (!list_empty(&le->le_list))
+               goto out;
+
+       set_bit(GLF_LFLUSH, &bd->bd_gl->gl_flags);
+       set_bit(GLF_DIRTY, &bd->bd_gl->gl_flags);
+       if (gfs2_is_jdata(ip)) {
                gfs2_pin(sdp, bd->bd_bh);
+               tr->tr_num_databuf_new++;
+               sdp->sd_log_num_databuf++;
+               list_add(&le->le_list, &sdp->sd_log_le_databuf);
        } else {
-               clear_buffer_pinned(bd->bd_bh);
+               list_add(&le->le_list, &sdp->sd_log_le_ordered);
        }
-       gfs2_log_lock(sdp);
-       if (ip->i_di.di_flags & GFS2_DIF_JDATA)
-               sdp->sd_log_num_jdata++;
-       sdp->sd_log_num_databuf++;
-       list_add(&le->le_list, &sdp->sd_log_le_databuf);
+out:
        gfs2_log_unlock(sdp);
+       unlock_buffer(bd->bd_bh);
 }
 
-static int gfs2_check_magic(struct buffer_head *bh)
+static void gfs2_check_magic(struct buffer_head *bh)
 {
-       struct page *page = bh->b_page;
        void *kaddr;
        __be32 *ptr;
-       int rv = 0;
 
-       kaddr = kmap_atomic(page, KM_USER0);
+       clear_buffer_escaped(bh);
+       kaddr = kmap_atomic(bh->b_page, KM_USER0);
        ptr = kaddr + bh_offset(bh);
        if (*ptr == cpu_to_be32(GFS2_MAGIC))
-               rv = 1;
-       kunmap_atomic(page, KM_USER0);
+               set_buffer_escaped(bh);
+       kunmap_atomic(kaddr, KM_USER0);
+}
+
+static void gfs2_write_blocks(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                             struct list_head *list, struct list_head *done,
+                             unsigned int n)
+{
+       struct buffer_head *bh1;
+       struct gfs2_log_descriptor *ld;
+       struct gfs2_bufdata *bd;
+       __be64 *ptr;
+
+       if (!bh)
+               return;
+
+       ld = bh_log_desc(bh);
+       ld->ld_length = cpu_to_be32(n + 1);
+       ld->ld_data1 = cpu_to_be32(n);
 
-       return rv;
+       ptr = bh_log_ptr(bh);
+       
+       get_bh(bh);
+       submit_bh(WRITE_SYNC_PLUG, bh);
+       gfs2_log_lock(sdp);
+       while(!list_empty(list)) {
+               bd = list_entry(list->next, struct gfs2_bufdata, bd_le.le_list);
+               list_move_tail(&bd->bd_le.le_list, done);
+               get_bh(bd->bd_bh);
+               while (be64_to_cpu(*ptr) != bd->bd_bh->b_blocknr) {
+                       gfs2_log_incr_head(sdp);
+                       ptr += 2;
+               }
+               gfs2_log_unlock(sdp);
+               lock_buffer(bd->bd_bh);
+               if (buffer_escaped(bd->bd_bh)) {
+                       void *kaddr;
+                       bh1 = gfs2_log_get_buf(sdp);
+                       kaddr = kmap_atomic(bd->bd_bh->b_page, KM_USER0);
+                       memcpy(bh1->b_data, kaddr + bh_offset(bd->bd_bh),
+                              bh1->b_size);
+                       kunmap_atomic(kaddr, KM_USER0);
+                       *(__be32 *)bh1->b_data = 0;
+                       clear_buffer_escaped(bd->bd_bh);
+                       unlock_buffer(bd->bd_bh);
+                       brelse(bd->bd_bh);
+               } else {
+                       bh1 = gfs2_log_fake_buf(sdp, bd->bd_bh);
+               }
+               submit_bh(WRITE_SYNC_PLUG, bh1);
+               gfs2_log_lock(sdp);
+               ptr += 2;
+       }
+       gfs2_log_unlock(sdp);
+       brelse(bh);
 }
 
 /**
  * databuf_lo_before_commit - Scan the data buffers, writing as we go
  *
- * Here we scan through the lists of buffers and make the assumption
- * that any buffer thats been pinned is being journaled, and that
- * any unpinned buffer is an ordered write data buffer and therefore
- * will be written back rather than journaled.
  */
+
 static void databuf_lo_before_commit(struct gfs2_sbd *sdp)
 {
-       LIST_HEAD(started);
-       struct gfs2_bufdata *bd1 = NULL, *bd2, *bdt;
+       struct gfs2_bufdata *bd = NULL;
        struct buffer_head *bh = NULL;
-       unsigned int offset = sizeof(struct gfs2_log_descriptor);
-       struct gfs2_log_descriptor *ld;
-       unsigned int limit;
-       unsigned int total_dbuf = sdp->sd_log_num_databuf;
-       unsigned int total_jdata = sdp->sd_log_num_jdata;
-       unsigned int num, n;
-       __be64 *ptr;
-
-       offset += (2*sizeof(__be64) - 1);
-       offset &= ~(2*sizeof(__be64) - 1);
-       limit = (sdp->sd_sb.sb_bsize - offset)/sizeof(__be64);
+       unsigned int n = 0;
+       __be64 *ptr = NULL, *end = NULL;
+       LIST_HEAD(processed);
+       LIST_HEAD(in_progress);
 
-       /* printk(KERN_INFO "totals: jdata=%u dbuf=%u\n", total_jdata, total_dbuf); */
-       /*
-        * Start writing ordered buffers, write journaled buffers
-        * into the log along with a header
-        */
-       bd2 = bd1 = list_prepare_entry(bd1, &sdp->sd_log_le_databuf, bd_le.le_list);
-       while(total_dbuf) {
-               num = total_jdata;
-               if (num > limit)
-                       num = limit;
-               n = 0;
-               list_for_each_entry_safe_continue(bd1, bdt, &sdp->sd_log_le_databuf, bd_le.le_list) {
+       gfs2_log_lock(sdp);
+       while (!list_empty(&sdp->sd_log_le_databuf)) {
+               if (ptr == end) {
+                       gfs2_log_unlock(sdp);
+                       gfs2_write_blocks(sdp, bh, &in_progress, &processed, n);
+                       n = 0;
+                       bh = gfs2_get_log_desc(sdp, GFS2_LOG_DESC_JDATA);
+                       ptr = bh_log_ptr(bh);
+                       end = bh_ptr_end(bh) - 1;
                        gfs2_log_lock(sdp);
-                       /* An ordered write buffer */
-                       if (bd1->bd_bh && !buffer_pinned(bd1->bd_bh)) {
-                               list_move(&bd1->bd_le.le_list, &started);
-                               if (bd1 == bd2) {
-                                       bd2 = NULL;
-                                       bd2 = list_prepare_entry(bd2, &sdp->sd_log_le_databuf, bd_le.le_list);
-                               }
-                               total_dbuf--;
-                               if (bd1->bd_bh) {
-                                       get_bh(bd1->bd_bh);
-                                       gfs2_log_unlock(sdp);
-                                       if (buffer_dirty(bd1->bd_bh)) {
-                                               wait_on_buffer(bd1->bd_bh);
-                                               ll_rw_block(WRITE, 1, &bd1->bd_bh);
-                                       }
-                                       brelse(bd1->bd_bh);
-                                       continue;
-                               }
-                               gfs2_log_unlock(sdp);
-                               continue;
-                       } else if (bd1->bd_bh) { /* A journaled buffer */
-                               int magic;
-                               gfs2_log_unlock(sdp);
-                               /* printk(KERN_INFO "journaled buffer\n"); */
-                               if (!bh) {
-                                       bh = gfs2_log_get_buf(sdp);
-                                       ld = (struct gfs2_log_descriptor *)bh->b_data;
-                                       ptr = (__be64 *)(bh->b_data + offset);
-                                       ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
-                                       ld->ld_header.mh_type = cpu_to_be16(GFS2_METATYPE_LD);
-                                       ld->ld_header.mh_format = cpu_to_be16(GFS2_FORMAT_LD);
-                                       ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_JDATA);
-                                       ld->ld_length = cpu_to_be32(num + 1);
-                                       ld->ld_data1 = cpu_to_be32(num);
-                                       ld->ld_data2 = cpu_to_be32(0);
-                                       memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
-                               }
-                               magic = gfs2_check_magic(bd1->bd_bh);
-                               *ptr++ = cpu_to_be64(bd1->bd_bh->b_blocknr);
-                               *ptr++ = cpu_to_be64((__u64)magic);
-                               clear_buffer_escaped(bd1->bd_bh);
-                               if (unlikely(magic != 0))
-                                       set_buffer_escaped(bd1->bd_bh);
-                               if (n++ > num)
-                                       break;
-                       }
-               }
-               if (bh) {
-                       set_buffer_dirty(bh);
-                       ll_rw_block(WRITE, 1, &bh);
-                       bh = NULL;
-               }
-               n = 0;
-               /* printk(KERN_INFO "totals2: jdata=%u dbuf=%u\n", total_jdata, total_dbuf); */
-               list_for_each_entry_continue(bd2, &sdp->sd_log_le_databuf, bd_le.le_list) {
-                       if (!bd2->bd_bh)
-                               continue;
-                       /* copy buffer if it needs escaping */
-                       if (unlikely(buffer_escaped(bd2->bd_bh))) {
-                               void *kaddr;
-                               struct page *page = bd2->bd_bh->b_page;
-                               bh = gfs2_log_get_buf(sdp);
-                               kaddr = kmap_atomic(page, KM_USER0);
-                               memcpy(bh->b_data, kaddr + bh_offset(bd2->bd_bh), sdp->sd_sb.sb_bsize);
-                               kunmap_atomic(page, KM_USER0);
-                               *(__be32 *)bh->b_data = 0;
-                       } else {
-                               bh = gfs2_log_fake_buf(sdp, bd2->bd_bh);
-                       }
-                       set_buffer_dirty(bh);
-                       ll_rw_block(WRITE, 1, &bh);
-                       if (++n >= num)
-                               break;
+                       continue;
                }
-               bh = NULL;
-               total_dbuf -= num;
-               total_jdata -= num;
+               bd = list_entry(sdp->sd_log_le_databuf.next, struct gfs2_bufdata, bd_le.le_list);
+               list_move_tail(&bd->bd_le.le_list, &in_progress);
+               gfs2_check_magic(bd->bd_bh);
+               *ptr++ = cpu_to_be64(bd->bd_bh->b_blocknr);
+               *ptr++ = cpu_to_be64(buffer_escaped(bh) ? 1 : 0);
+               n++;
        }
-       /* printk(KERN_INFO "wait on ordered data buffers\n"); */
-       /* Wait on all ordered buffers */
-       while (!list_empty(&started)) {
-               bd1 = list_entry(started.next, struct gfs2_bufdata, bd_le.le_list);
-               list_del(&bd1->bd_le.le_list);
-               sdp->sd_log_num_databuf--;
-
-               gfs2_log_lock(sdp);
-               bh = bd1->bd_bh;
-               if (bh) {
-                       set_v2bd(bh, NULL);
-                       gfs2_log_unlock(sdp);
-                       wait_on_buffer(bh);
-                       brelse(bh);
-               } else
-                       gfs2_log_unlock(sdp);
-
-               kfree(bd1);
-       }
-
-       /* printk(KERN_INFO "sd_log_num_databuf %u sd_log_num_jdata %u\n", sdp->sd_log_num_databuf, sdp->sd_log_num_jdata); */
-       /* We've removed all the ordered write bufs here, so only jdata left */
-       gfs2_assert_warn(sdp, sdp->sd_log_num_databuf == sdp->sd_log_num_jdata);
+       gfs2_log_unlock(sdp);
+       gfs2_write_blocks(sdp, bh, &in_progress, &processed, n);
+       gfs2_log_lock(sdp);
+       list_splice(&processed, &sdp->sd_log_le_databuf);
+       gfs2_log_unlock(sdp);
 }
 
 static int databuf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
                                    struct gfs2_log_descriptor *ld,
                                    __be64 *ptr, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
-       struct gfs2_glock *gl = jd->jd_inode->i_gl;
+       struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
+       struct gfs2_glock *gl = ip->i_gl;
        unsigned int blks = be32_to_cpu(ld->ld_data1);
        struct buffer_head *bh_log, *bh_ip;
-       uint64_t blkno;
-       uint64_t esc;
+       u64 blkno;
+       u64 esc;
        int error = 0;
 
        if (pass != 1 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_JDATA)
@@ -680,17 +694,18 @@ static int databuf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
 
 static void databuf_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
 {
-       struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+       struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
+       struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
 
        if (error) {
-               gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+               gfs2_meta_sync(ip->i_gl);
                return;
        }
        if (pass != 1)
                return;
 
        /* data sync? */
-       gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+       gfs2_meta_sync(ip->i_gl);
 
        fs_info(sdp, "jid=%u: Replayed %u of %u data blocks\n",
                jd->jd_jid, sdp->sd_replayed_blocks, sdp->sd_found_blocks);
@@ -705,64 +720,51 @@ static void databuf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
                bd = list_entry(head->next, struct gfs2_bufdata, bd_le.le_list);
                list_del_init(&bd->bd_le.le_list);
                sdp->sd_log_num_databuf--;
-               sdp->sd_log_num_jdata--;
                gfs2_unpin(sdp, bd->bd_bh, ai);
-               brelse(bd->bd_bh);
-               kfree(bd);
        }
        gfs2_assert_warn(sdp, !sdp->sd_log_num_databuf);
-       gfs2_assert_warn(sdp, !sdp->sd_log_num_jdata);
 }
 
 
-struct gfs2_log_operations gfs2_glock_lops = {
-       .lo_add = glock_lo_add,
-       .lo_after_commit = glock_lo_after_commit,
-       .lo_name = "glock"
-};
-
-struct gfs2_log_operations gfs2_buf_lops = {
+const struct gfs2_log_operations gfs2_buf_lops = {
        .lo_add = buf_lo_add,
-       .lo_incore_commit = buf_lo_incore_commit,
        .lo_before_commit = buf_lo_before_commit,
        .lo_after_commit = buf_lo_after_commit,
        .lo_before_scan = buf_lo_before_scan,
        .lo_scan_elements = buf_lo_scan_elements,
        .lo_after_scan = buf_lo_after_scan,
-       .lo_name = "buf"
+       .lo_name = "buf",
 };
 
-struct gfs2_log_operations gfs2_revoke_lops = {
+const struct gfs2_log_operations gfs2_revoke_lops = {
        .lo_add = revoke_lo_add,
        .lo_before_commit = revoke_lo_before_commit,
        .lo_before_scan = revoke_lo_before_scan,
        .lo_scan_elements = revoke_lo_scan_elements,
        .lo_after_scan = revoke_lo_after_scan,
-       .lo_name = "revoke"
+       .lo_name = "revoke",
 };
 
-struct gfs2_log_operations gfs2_rg_lops = {
+const struct gfs2_log_operations gfs2_rg_lops = {
        .lo_add = rg_lo_add,
        .lo_after_commit = rg_lo_after_commit,
-       .lo_name = "rg"
+       .lo_name = "rg",
 };
 
-struct gfs2_log_operations gfs2_databuf_lops = {
+const struct gfs2_log_operations gfs2_databuf_lops = {
        .lo_add = databuf_lo_add,
-       .lo_incore_commit = buf_lo_incore_commit,
        .lo_before_commit = databuf_lo_before_commit,
        .lo_after_commit = databuf_lo_after_commit,
        .lo_scan_elements = databuf_lo_scan_elements,
        .lo_after_scan = databuf_lo_after_scan,
-       .lo_name = "databuf"
+       .lo_name = "databuf",
 };
 
-struct gfs2_log_operations *gfs2_log_ops[] = {
-       &gfs2_glock_lops,
+const struct gfs2_log_operations *gfs2_log_ops[] = {
+       &gfs2_databuf_lops,
        &gfs2_buf_lops,
-       &gfs2_revoke_lops,
        &gfs2_rg_lops,
-       &gfs2_databuf_lops,
-       NULL
+       &gfs2_revoke_lops,
+       NULL,
 };