dlm: verify that places expecting rcom_lock have packet long enough
[safe/jmp/linux-2.6] / fs / dlm / lock.c
index 43ca2a3..6c605fc 100644 (file)
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -489,12 +489,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
        return error;
 }
 
-int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
-                unsigned int flags, struct dlm_rsb **r_ret)
-{
-       return find_rsb(ls, name, namelen, flags, r_ret);
-}
-
 /* This is only called to add a reference when the code already holds
    a valid reference to the rsb, so there's no need for locking. */
 
@@ -1851,7 +1845,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
+       int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
 
        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
@@ -1885,7 +1879,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
                return 1;
        }
 
-       for (;;) {
+       for (i = 0; i < 2; i++) {
                /* It's possible for dlm_scand to remove an old rsb for
                   this same resource from the toss list, us to create
                   a new one, look up the master locally, and find it
@@ -1899,6 +1893,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
                schedule();
        }
+       if (error && error != -EEXIST)
+               return error;
 
        if (ret_nodeid == our_nodeid) {
                r->res_first_lkid = 0;
@@ -1940,8 +1936,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
                break;
 
        case -EAGAIN:
-               /* the remote master didn't queue our NOQUEUE request;
-                  make a waiting lkb the first_lkid */
+       case -EBADR:
+       case -ENOTBLK:
+               /* the remote request failed and won't be retried (it was
+                  a NOQUEUE, or has been canceled/unlocked); make a waiting
+                  lkb the first_lkid */
 
                r->res_first_lkid = 0;
 
@@ -2107,17 +2106,18 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
        /* an lkb may be waiting for an rsb lookup to complete where the
           lookup was initiated by another lock */
 
-       if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
-               if (!list_empty(&lkb->lkb_rsb_lookup)) {
+       if (!list_empty(&lkb->lkb_rsb_lookup)) {
+               if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
                        list_del_init(&lkb->lkb_rsb_lookup);
                        queue_cast(lkb->lkb_resource, lkb,
                                   args->flags & DLM_LKF_CANCEL ?
                                   -DLM_ECANCEL : -DLM_EUNLOCK);
                        unhold_lkb(lkb); /* undoes create_lkb() */
-                       rv = -EBUSY;
-                       goto out;
                }
+               /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
+               rv = -EBUSY;
+               goto out;
        }
 
        /* cancel not allowed with another cancel/unlock in progress */
@@ -3005,8 +3005,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
        lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
 
-       DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
-
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                /* lkb was just created so there won't be an lvb yet */
                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
@@ -3020,16 +3018,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                struct dlm_message *ms)
 {
-       if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
-               log_error(ls, "convert_args nodeid %d %d lkid %x %x",
-                         lkb->lkb_nodeid, ms->m_header.h_nodeid,
-                         lkb->lkb_id, lkb->lkb_remid);
-               return -EINVAL;
-       }
-
-       if (!is_master_copy(lkb))
-               return -EINVAL;
-
        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
                return -EBUSY;
 
@@ -3045,8 +3033,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                               struct dlm_message *ms)
 {
-       if (!is_master_copy(lkb))
-               return -EINVAL;
        if (receive_lvb(ls, lkb, ms))
                return -ENOMEM;
        return 0;
@@ -3062,6 +3048,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
        lkb->lkb_remid = ms->m_lkid;
 }
 
+/* This is called after the rsb is locked so that we can safely inspect
+   fields in the lkb. */
+
+static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+       int from = ms->m_header.h_nodeid;
+       int error = 0;
+
+       switch (ms->m_type) {
+       case DLM_MSG_CONVERT:
+       case DLM_MSG_UNLOCK:
+       case DLM_MSG_CANCEL:
+               if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+                       error = -EINVAL;
+               break;
+
+       case DLM_MSG_CONVERT_REPLY:
+       case DLM_MSG_UNLOCK_REPLY:
+       case DLM_MSG_CANCEL_REPLY:
+       case DLM_MSG_GRANT:
+       case DLM_MSG_BAST:
+               if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+                       error = -EINVAL;
+               break;
+
+       case DLM_MSG_REQUEST_REPLY:
+               if (!is_process_copy(lkb))
+                       error = -EINVAL;
+               else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+                       error = -EINVAL;
+               break;
+
+       default:
+               error = -EINVAL;
+       }
+
+       if (error)
+               log_error(lkb->lkb_resource->res_ls,
+                         "ignore invalid message %d from %d %x %x %x %d",
+                         ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
+                         lkb->lkb_flags, lkb->lkb_nodeid);
+       return error;
+}
+
 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
@@ -3123,17 +3153,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        receive_flags(lkb, ms);
        error = receive_convert_args(ls, lkb, ms);
        if (error)
-               goto out;
+               goto out_reply;
        reply = !down_conversion(lkb);
 
        error = do_convert(r, lkb);
- out:
+ out_reply:
        if (reply)
                send_convert_reply(r, lkb, error);
-
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3159,15 +3193,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        receive_flags(lkb, ms);
        error = receive_unlock_args(ls, lkb, ms);
        if (error)
-               goto out;
+               goto out_reply;
 
        error = do_unlock(r, lkb);
- out:
+ out_reply:
        send_unlock_reply(r, lkb, error);
-
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3195,9 +3233,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        error = do_cancel(r, lkb);
        send_cancel_reply(r, lkb, error);
-
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3216,22 +3258,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_grant no lkb");
+               log_debug(ls, "receive_grant from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        r = lkb->lkb_resource;
 
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        receive_flags_reply(lkb, ms);
        if (is_altmode(lkb))
                munge_altmode(lkb, ms);
        grant_lock_pc(r, lkb, ms);
        queue_cast(r, lkb, 0);
-
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3245,18 +3291,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_bast no lkb");
+               log_debug(ls, "receive_bast from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        r = lkb->lkb_resource;
 
        hold_rsb(r);
        lock_rsb(r);
 
-       queue_bast(r, lkb, ms->m_bastmode);
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
 
+       queue_bast(r, lkb, ms->m_bastmode);
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3322,15 +3372,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_request_reply no lkb");
+               log_debug(ls, "receive_request_reply from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        r = lkb->lkb_resource;
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        mstype = lkb->lkb_wait_type;
        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
        if (error)
@@ -3382,6 +3436,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
                if (is_overlap(lkb)) {
                        /* we'll ignore error in cancel/unlock reply */
                        queue_cast_overlap(r, lkb);
+                       confirm_master(r, result);
                        unhold_lkb(lkb); /* undoes create_lkb() */
                } else
                        _request_lock(r, lkb);
@@ -3462,6 +3517,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        /* stub reply can happen with waiters_mutex held */
        error = remove_from_waiters_ms(lkb, ms);
        if (error)
@@ -3480,10 +3539,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_convert_reply no lkb");
+               log_debug(ls, "receive_convert_reply from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        _receive_convert_reply(lkb, ms);
        dlm_put_lkb(lkb);
@@ -3497,6 +3556,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        /* stub reply can happen with waiters_mutex held */
        error = remove_from_waiters_ms(lkb, ms);
        if (error)
@@ -3528,10 +3591,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_unlock_reply no lkb");
+               log_debug(ls, "receive_unlock_reply from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        _receive_unlock_reply(lkb, ms);
        dlm_put_lkb(lkb);
@@ -3545,6 +3608,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        hold_rsb(r);
        lock_rsb(r);
 
+       error = validate_message(lkb, ms);
+       if (error)
+               goto out;
+
        /* stub reply can happen with waiters_mutex held */
        error = remove_from_waiters_ms(lkb, ms);
        if (error)
@@ -3576,10 +3643,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_remid, &lkb);
        if (error) {
-               log_error(ls, "receive_cancel_reply no lkb");
+               log_debug(ls, "receive_cancel_reply from %d no lkb %x",
+                         ms->m_header.h_nodeid, ms->m_remid);
                return;
        }
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
        _receive_cancel_reply(lkb, ms);
        dlm_put_lkb(lkb);
@@ -3639,6 +3706,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
+       if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
+               log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+                         ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_remid, ms->m_result);
+               return;
+       }
+
        switch (ms->m_type) {
 
        /* messages sent to a master node */
@@ -3728,7 +3802,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
                                int nodeid)
 {
        if (dlm_locking_stopped(ls)) {
-               dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+               dlm_add_requestqueue(ls, nodeid, ms);
        } else {
                dlm_wait_requestqueue(ls);
                _receive_message(ls, ms);
@@ -3748,21 +3822,20 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
    standard locking activity) or an RCOM (recovery message sent as part of
    lockspace recovery). */
 
-void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+void dlm_receive_buffer(union dlm_packet *p, int nodeid)
 {
-       struct dlm_message *ms = (struct dlm_message *) hd;
-       struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+       struct dlm_header *hd = &p->header;
        struct dlm_ls *ls;
        int type = 0;
 
        switch (hd->h_cmd) {
        case DLM_MSG:
-               dlm_message_in(ms);
-               type = ms->m_type;
+               dlm_message_in(&p->message);
+               type = p->message.m_type;
                break;
        case DLM_RCOM:
-               dlm_rcom_in(rc);
-               type = rc->rc_type;
+               dlm_rcom_in(&p->rcom);
+               type = p->rcom.rc_type;
                break;
        default:
                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
@@ -3777,11 +3850,12 @@ void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
 
        ls = dlm_find_lockspace_global(hd->h_lockspace);
        if (!ls) {
-               log_print("invalid h_lockspace %x from %d cmd %d type %d",
-                         hd->h_lockspace, nodeid, hd->h_cmd, type);
+               if (dlm_config.ci_log_debug)
+                       log_print("invalid lockspace %x from %d cmd %d type %d",
+                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
 
                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
-                       dlm_send_ls_not_ready(nodeid, rc);
+                       dlm_send_ls_not_ready(nodeid, &p->rcom);
                return;
        }
 
@@ -3790,9 +3864,9 @@ void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
 
        down_read(&ls->ls_recv_active);
        if (hd->h_cmd == DLM_MSG)
-               dlm_receive_message(ls, ms, nodeid);
+               dlm_receive_message(ls, &p->message, nodeid);
        else
-               dlm_receive_rcom(ls, rc, nodeid);
+               dlm_receive_rcom(ls, &p->rcom, nodeid);
        up_read(&ls->ls_recv_active);
 
        dlm_put_lockspace(ls);
@@ -3805,6 +3879,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
                ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
                ls->ls_stub_ms.m_result = -EINPROGRESS;
                ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+               ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
                _receive_convert_reply(lkb, &ls->ls_stub_ms);
 
                /* Same special case as in receive_rcom_lock_args() */
@@ -3846,6 +3921,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
 void dlm_recover_waiters_pre(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb, *safe;
+       int wait_type, stub_unlock_result, stub_cancel_result;
 
        mutex_lock(&ls->ls_waiters_mutex);
 
@@ -3864,7 +3940,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                if (!waiter_needs_recovery(ls, lkb))
                        continue;
 
-               switch (lkb->lkb_wait_type) {
+               wait_type = lkb->lkb_wait_type;
+               stub_unlock_result = -DLM_EUNLOCK;
+               stub_cancel_result = -DLM_ECANCEL;
+
+               /* Main reply may have been received leaving a zero wait_type,
+                  but a reply for the overlapping op may not have been
+                  received.  In that case we need to fake the appropriate
+                  reply for the overlap op. */
+
+               if (!wait_type) {
+                       if (is_overlap_cancel(lkb)) {
+                               wait_type = DLM_MSG_CANCEL;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_cancel_result = 0;
+                       }
+                       if (is_overlap_unlock(lkb)) {
+                               wait_type = DLM_MSG_UNLOCK;
+                               if (lkb->lkb_grmode == DLM_LOCK_IV)
+                                       stub_unlock_result = -ENOENT;
+                       }
+
+                       log_debug(ls, "rwpre overlap %x %x %d %d %d",
+                                 lkb->lkb_id, lkb->lkb_flags, wait_type,
+                                 stub_cancel_result, stub_unlock_result);
+               }
+
+               switch (wait_type) {
 
                case DLM_MSG_REQUEST:
                        lkb->lkb_flags |= DLM_IFL_RESEND;
@@ -3877,8 +3979,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_UNLOCK:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+                       ls->ls_stub_ms.m_result = stub_unlock_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+                       ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
                        break;
@@ -3886,15 +3989,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_CANCEL:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
-                       ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+                       ls->ls_stub_ms.m_result = stub_cancel_result;
                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+                       ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
                        break;
 
                default:
-                       log_error(ls, "invalid lkb wait_type %d",
-                                 lkb->lkb_wait_type);
+                       log_error(ls, "invalid lkb wait_type %d %d",
+                                 lkb->lkb_wait_type, wait_type);
                }
                schedule();
        }
@@ -4162,6 +4266,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
        return NULL;
 }
 
+/* needs at least dlm_rcom + rcom_lock */
 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                  struct dlm_rsb *r, struct dlm_rcom *rc)
 {
@@ -4169,12 +4274,12 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        int lvblen;
 
        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
-       lkb->lkb_ownpid = rl->rl_ownpid;
-       lkb->lkb_remid = rl->rl_lkid;
-       lkb->lkb_exflags = rl->rl_exflags;
-       lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
+       lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
+       lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
+       lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
+       lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
        lkb->lkb_flags |= DLM_IFL_MSTCPY;
-       lkb->lkb_lvbseq = rl->rl_lvbseq;
+       lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
        lkb->lkb_rqmode = rl->rl_rqmode;
        lkb->lkb_grmode = rl->rl_grmode;
        /* don't set lkb_status because add_lkb wants to itself */
@@ -4195,7 +4300,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
           The real granted mode of these converting locks cannot be determined
           until all locks have been rebuilt on the rsb (recover_conversion) */
 
-       if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
+       if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
+           middle_conversion(lkb)) {
                rl->rl_status = DLM_LKSTS_CONVERT;
                lkb->lkb_grmode = DLM_LOCK_IV;
                rsb_set_flag(r, RSB_RECOVER_CONVERT);
@@ -4210,6 +4316,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
    the given values and send back our lkid.  We send back our lkid by sending
    back the rcom_lock struct we got but with the remid field filled in. */
 
+/* needs at least dlm_rcom + rcom_lock */
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -4222,13 +4329,14 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                goto out;
        }
 
-       error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
+       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
+                        R_MASTER, &r);
        if (error)
                goto out;
 
        lock_rsb(r);
 
-       lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
+       lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
        if (lkb) {
                error = -EEXIST;
                goto out_remid;
@@ -4251,18 +4359,20 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
  out_remid:
        /* this is the new value returned to the lock holder for
           saving in its process-copy lkb */
-       rl->rl_remid = lkb->lkb_id;
+       rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
  out_unlock:
        unlock_rsb(r);
        put_rsb(r);
  out:
        if (error)
-               log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
-       rl->rl_result = error;
+               log_debug(ls, "recover_master_copy %d %x", error,
+                         le32_to_cpu(rl->rl_lkid));
+       rl->rl_result = cpu_to_le32(error);
        return error;
 }
 
+/* needs at least dlm_rcom + rcom_lock */
 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -4270,15 +4380,16 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct dlm_lkb *lkb;
        int error;
 
-       error = find_lkb(ls, rl->rl_lkid, &lkb);
+       error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
        if (error) {
-               log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
+               log_error(ls, "recover_process_copy no lkid %x",
+                               le32_to_cpu(rl->rl_lkid));
                return error;
        }
 
        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-       error = rl->rl_result;
+       error = le32_to_cpu(rl->rl_result);
 
        r = lkb->lkb_resource;
        hold_rsb(r);
@@ -4297,7 +4408,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                log_debug(ls, "master copy exists %x", lkb->lkb_id);
                /* fall through */
        case 0:
-               lkb->lkb_remid = rl->rl_remid;
+               lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
                break;
        default:
                log_error(ls, "dlm_recover_process_copy unknown error %d %x",