Revert "nfsd4: fix error return when pseudoroot missing"
[safe/jmp/linux-2.6] / fs / dlm / lock.c
index 2a28048..eb507c4 100644 (file)
@@ -165,7 +165,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 }
 
-void dlm_print_rsb(struct dlm_rsb *r)
+static void dlm_print_rsb(struct dlm_rsb *r)
 {
        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
               r->res_nodeid, r->res_flags, r->res_first_lkid,
@@ -307,7 +307,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
        lkb->lkb_lksb->sb_status = rv;
        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 
-       dlm_add_ast(lkb, AST_COMP);
+       dlm_add_ast(lkb, AST_COMP, 0);
 }
 
 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -318,12 +318,12 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 {
+       lkb->lkb_time_bast = ktime_get();
+
        if (is_master_copy(lkb))
                send_bast(r, lkb, rqmode);
-       else {
-               lkb->lkb_bastmode = rqmode;
-               dlm_add_ast(lkb, AST_BAST);
-       }
+       else
+               dlm_add_ast(lkb, AST_BAST, rqmode);
 }
 
 /*
@@ -363,6 +363,7 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
                if (len == r->res_length && !memcmp(name, r->res_name, len))
                        goto found;
        }
+       *r_ret = NULL;
        return -EBADR;
 
  found:
@@ -411,9 +412,9 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
                      unsigned int flags, struct dlm_rsb **r_ret)
 {
        int error;
-       write_lock(&ls->ls_rsbtbl[b].lock);
+       spin_lock(&ls->ls_rsbtbl[b].lock);
        error = _search_rsb(ls, name, len, b, flags, r_ret);
-       write_unlock(&ls->ls_rsbtbl[b].lock);
+       spin_unlock(&ls->ls_rsbtbl[b].lock);
        return error;
 }
 
@@ -434,13 +435,17 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                    unsigned int flags, struct dlm_rsb **r_ret)
 {
-       struct dlm_rsb *r, *tmp;
+       struct dlm_rsb *r = NULL, *tmp;
        uint32_t hash, bucket;
-       int error = 0;
+       int error = -EINVAL;
+
+       if (namelen > DLM_RESNAME_MAXLEN)
+               goto out;
 
        if (dlm_no_directory(ls))
                flags |= R_CREATE;
 
+       error = 0;
        hash = jhash(name, namelen, 0);
        bucket = hash & (ls->ls_rsbtbl_size - 1);
 
@@ -473,16 +478,16 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                r->res_nodeid = nodeid;
        }
 
-       write_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
        if (!error) {
-               write_unlock(&ls->ls_rsbtbl[bucket].lock);
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
                dlm_free_rsb(r);
                r = tmp;
                goto out;
        }
        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-       write_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
        error = 0;
  out:
        *r_ret = r;
@@ -525,9 +530,9 @@ static void put_rsb(struct dlm_rsb *r)
        struct dlm_ls *ls = r->res_ls;
        uint32_t bucket = r->res_bucket;
 
-       write_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        kref_put(&r->res_ref, toss_rsb);
-       write_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -739,6 +744,8 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 
        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 
+       lkb->lkb_timestamp = ktime_get();
+
        lkb->lkb_status = status;
 
        switch (status) {
@@ -828,7 +835,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
                lkb->lkb_wait_count++;
                hold_lkb(lkb);
 
-               log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
+               log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
                          lkb->lkb_wait_count, lkb->lkb_flags);
                goto out;
@@ -844,7 +851,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
        if (error)
-               log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
+               log_error(ls, "addwait error %x %d flags %x %d %d %s",
                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
        mutex_unlock(&ls->ls_waiters_mutex);
@@ -856,23 +863,55 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
    request reply on the requestqueue) between dlm_recover_waiters_pre() which
    set RESEND and dlm_recover_waiters_post() */
 
-static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
+                               struct dlm_message *ms)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int overlap_done = 0;
 
        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
+               log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
                overlap_done = 1;
                goto out_del;
        }
 
        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
+               log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
                overlap_done = 1;
                goto out_del;
        }
 
+       /* Cancel state was preemptively cleared by a successful convert,
+          see next comment, nothing to do. */
+
+       if ((mstype == DLM_MSG_CANCEL_REPLY) &&
+           (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
+               log_debug(ls, "remwait %x cancel_reply wait_type %d",
+                         lkb->lkb_id, lkb->lkb_wait_type);
+               return -1;
+       }
+
+       /* Remove for the convert reply, and premptively remove for the
+          cancel reply.  A convert has been granted while there's still
+          an outstanding cancel on it (the cancel is moot and the result
+          in the cancel reply should be 0).  We preempt the cancel reply
+          because the app gets the convert result and then can follow up
+          with another op, like convert.  This subsequent op would see the
+          lingering state of the cancel and fail with -EBUSY. */
+
+       if ((mstype == DLM_MSG_CONVERT_REPLY) &&
+           (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
+           is_overlap_cancel(lkb) && ms && !ms->m_result) {
+               log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
+                         lkb->lkb_id);
+               lkb->lkb_wait_type = 0;
+               lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+               lkb->lkb_wait_count--;
+               goto out_del;
+       }
+
        /* N.B. type of reply may not always correspond to type of original
           msg due to lookup->request optimization, verify others? */
 
@@ -881,8 +920,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
                goto out_del;
        }
 
-       log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
-                 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
+       log_error(ls, "remwait error %x reply %d flags %x no wait_type",
+                 lkb->lkb_id, mstype, lkb->lkb_flags);
        return -1;
 
  out_del:
@@ -892,7 +931,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
           this would happen */
 
        if (overlap_done && lkb->lkb_wait_type) {
-               log_error(ls, "remove_from_waiters %x reply %d give up on %d",
+               log_error(ls, "remwait error %x reply %d wait_type %d overlap",
                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
                lkb->lkb_wait_count--;
                lkb->lkb_wait_type = 0;
@@ -914,7 +953,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
        int error;
 
        mutex_lock(&ls->ls_waiters_mutex);
-       error = _remove_from_waiters(lkb, mstype);
+       error = _remove_from_waiters(lkb, mstype, NULL);
        mutex_unlock(&ls->ls_waiters_mutex);
        return error;
 }
@@ -929,7 +968,7 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 
        if (ms != &ls->ls_stub_ms)
                mutex_lock(&ls->ls_waiters_mutex);
-       error = _remove_from_waiters(lkb, ms->m_type);
+       error = _remove_from_waiters(lkb, ms->m_type, ms);
        if (ms != &ls->ls_stub_ms)
                mutex_unlock(&ls->ls_waiters_mutex);
        return error;
@@ -960,7 +999,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 
        for (;;) {
                found = 0;
-               write_lock(&ls->ls_rsbtbl[b].lock);
+               spin_lock(&ls->ls_rsbtbl[b].lock);
                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
                                            res_hashchain) {
                        if (!time_after_eq(jiffies, r->res_toss_time +
@@ -971,20 +1010,20 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
                }
 
                if (!found) {
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
                        break;
                }
 
                if (kref_put(&r->res_ref, kill_rsb)) {
                        list_del(&r->res_hashchain);
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
 
                        if (is_master(r))
                                dir_remove(r);
                        dlm_free_rsb(r);
                        count++;
                } else {
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
                        log_error(ls, "tossed rsb in use %s", r->res_name);
                }
        }
@@ -1008,10 +1047,8 @@ static void add_timeout(struct dlm_lkb *lkb)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 
-       if (is_master_copy(lkb)) {
-               lkb->lkb_timestamp = jiffies;
+       if (is_master_copy(lkb))
                return;
-       }
 
        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
@@ -1026,7 +1063,6 @@ static void add_timeout(struct dlm_lkb *lkb)
        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
        mutex_lock(&ls->ls_timeout_mutex);
        hold_lkb(lkb);
-       lkb->lkb_timestamp = jiffies;
        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
        mutex_unlock(&ls->ls_timeout_mutex);
 }
@@ -1054,6 +1090,7 @@ void dlm_scan_timeout(struct dlm_ls *ls)
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
        int do_cancel, do_warn;
+       s64 wait_us;
 
        for (;;) {
                if (dlm_locking_stopped(ls))
@@ -1064,14 +1101,15 @@ void dlm_scan_timeout(struct dlm_ls *ls)
                mutex_lock(&ls->ls_timeout_mutex);
                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
 
+                       wait_us = ktime_to_us(ktime_sub(ktime_get(),
+                                                       lkb->lkb_timestamp));
+
                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
-                           time_after_eq(jiffies, lkb->lkb_timestamp +
-                                         lkb->lkb_timeout_cs * HZ/100))
+                           wait_us >= (lkb->lkb_timeout_cs * 10000))
                                do_cancel = 1;
 
                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
-                           time_after_eq(jiffies, lkb->lkb_timestamp +
-                                          dlm_config.ci_timewarn_cs * HZ/100))
+                           wait_us >= dlm_config.ci_timewarn_cs * 10000)
                                do_warn = 1;
 
                        if (!do_cancel && !do_warn)
@@ -1117,12 +1155,12 @@ void dlm_scan_timeout(struct dlm_ls *ls)
 void dlm_adjust_timeouts(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb;
-       long adj = jiffies - ls->ls_recover_begin;
+       u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
 
        ls->ls_recover_begin = 0;
        mutex_lock(&ls->ls_timeout_mutex);
        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
-               lkb->lkb_timestamp += adj;
+               lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
        mutex_unlock(&ls->ls_timeout_mutex);
 }
 
@@ -1222,6 +1260,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
        if (b == 1) {
                int len = receive_extralen(ms);
+               if (len > DLM_RESNAME_MAXLEN)
+                       len = DLM_RESNAME_MAXLEN;
                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
                lkb->lkb_lvbseq = ms->m_lvbseq;
        }
@@ -1775,8 +1815,9 @@ static void grant_pending_locks(struct dlm_rsb *r)
         */
 
        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
-               if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) {
-                       if (cw && high == DLM_LOCK_PR)
+               if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
+                       if (cw && high == DLM_LOCK_PR &&
+                           lkb->lkb_grmode == DLM_LOCK_PR)
                                queue_bast(r, lkb, DLM_LOCK_CW);
                        else
                                queue_bast(r, lkb, high);
@@ -1805,7 +1846,7 @@ static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
        struct dlm_lkb *gr;
 
        list_for_each_entry(gr, head, lkb_statequeue) {
-               if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) {
+               if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
                        queue_bast(r, gr, lkb->lkb_rqmode);
                        gr->lkb_highbast = lkb->lkb_rqmode;
                }
@@ -1950,8 +1991,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
                        list_del_init(&lkb->lkb_rsb_lookup);
                        r->res_first_lkid = lkb->lkb_id;
                        _request_lock(r, lkb);
-               } else
-                       r->res_nodeid = -1;
+               }
                break;
 
        default:
@@ -1960,8 +2000,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
 }
 
 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
-                        int namelen, unsigned long timeout_cs, void *ast,
-                        void *astarg, void *bast, struct dlm_args *args)
+                        int namelen, unsigned long timeout_cs,
+                        void (*ast) (void *astparam),
+                        void *astparam,
+                        void (*bast) (void *astparam, int mode),
+                        struct dlm_args *args)
 {
        int rv = -EINVAL;
 
@@ -2011,9 +2054,9 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
           an active lkb cannot be modified before locking the rsb */
 
        args->flags = flags;
-       args->astaddr = ast;
-       args->astparam = (long) astarg;
-       args->bastaddr = bast;
+       args->astfn = ast;
+       args->astparam = astparam;
+       args->bastfn = bast;
        args->timeout = timeout_cs;
        args->mode = mode;
        args->lksb = lksb;
@@ -2032,7 +2075,7 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
                return -EINVAL;
 
        args->flags = flags;
-       args->astparam = (long) astarg;
+       args->astparam = astarg;
        return 0;
 }
 
@@ -2062,9 +2105,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        lkb->lkb_exflags = args->flags;
        lkb->lkb_sbflags = 0;
-       lkb->lkb_astaddr = args->astaddr;
+       lkb->lkb_astfn = args->astfn;
        lkb->lkb_astparam = args->astparam;
-       lkb->lkb_bastaddr = args->bastaddr;
+       lkb->lkb_bastfn = args->bastfn;
        lkb->lkb_rqmode = args->mode;
        lkb->lkb_lksb = args->lksb;
        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
@@ -2072,6 +2115,11 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_timeout_cs = args->timeout;
        rv = 0;
  out:
+       if (rv)
+               log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
+                         rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
+                         lkb->lkb_status, lkb->lkb_wait_type,
+                         lkb->lkb_resource->res_name);
        return rv;
 }
 
@@ -2138,6 +2186,13 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
                        goto out;
                }
 
+               /* there's nothing to cancel */
+               if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
+                   !lkb->lkb_wait_type) {
+                       rv = -EBUSY;
+                       goto out;
+               }
+
                switch (lkb->lkb_wait_type) {
                case DLM_MSG_LOOKUP:
                case DLM_MSG_REQUEST:
@@ -2711,9 +2766,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
        /* m_result and m_bastmode are set from function args,
           not from lkb fields */
 
-       if (lkb->lkb_bastaddr)
+       if (lkb->lkb_bastfn)
                ms->m_asts |= AST_BAST;
-       if (lkb->lkb_astaddr)
+       if (lkb->lkb_astfn)
                ms->m_asts |= AST_COMP;
 
        /* compare with switch in create_message; send_remove() doesn't
@@ -2989,11 +3044,23 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                len = receive_extralen(ms);
+               if (len > DLM_RESNAME_MAXLEN)
+                       len = DLM_RESNAME_MAXLEN;
                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
        }
        return 0;
 }
 
+static void fake_bastfn(void *astparam, int mode)
+{
+       log_print("fake_bastfn should not be called");
+}
+
+static void fake_astfn(void *astparam)
+{
+       log_print("fake_astfn should not be called");
+}
+
 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                struct dlm_message *ms)
 {
@@ -3002,8 +3069,9 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_remid = ms->m_lkid;
        lkb->lkb_grmode = DLM_LOCK_IV;
        lkb->lkb_rqmode = ms->m_rqmode;
-       lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
-       lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
+
+       lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
+       lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                /* lkb was just created so there won't be an lvb yet */
@@ -4200,7 +4268,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
        struct dlm_rsb *r, *r_ret = NULL;
 
-       read_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
                if (!rsb_flag(r, RSB_LOCKS_PURGED))
                        continue;
@@ -4209,7 +4277,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
                r_ret = r;
                break;
        }
-       read_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
        return r_ret;
 }
 
@@ -4266,32 +4334,34 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
        return NULL;
 }
 
+/* needs at least dlm_rcom + rcom_lock */
 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                  struct dlm_rsb *r, struct dlm_rcom *rc)
 {
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
-       int lvblen;
 
        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
-       lkb->lkb_ownpid = rl->rl_ownpid;
-       lkb->lkb_remid = rl->rl_lkid;
-       lkb->lkb_exflags = rl->rl_exflags;
-       lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
+       lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
+       lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
+       lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
+       lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
        lkb->lkb_flags |= DLM_IFL_MSTCPY;
-       lkb->lkb_lvbseq = rl->rl_lvbseq;
+       lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
        lkb->lkb_rqmode = rl->rl_rqmode;
        lkb->lkb_grmode = rl->rl_grmode;
        /* don't set lkb_status because add_lkb wants to itself */
 
-       lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
-       lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
+       lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
+       lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+               int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
+                        sizeof(struct rcom_lock);
+               if (lvblen > ls->ls_lvblen)
+                       return -EINVAL;
                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
-               lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
-                        sizeof(struct rcom_lock);
                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
        }
 
@@ -4299,7 +4369,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
           The real granted mode of these converting locks cannot be determined
           until all locks have been rebuilt on the rsb (recover_conversion) */
 
-       if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
+       if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
+           middle_conversion(lkb)) {
                rl->rl_status = DLM_LKSTS_CONVERT;
                lkb->lkb_grmode = DLM_LOCK_IV;
                rsb_set_flag(r, RSB_RECOVER_CONVERT);
@@ -4314,6 +4385,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
    the given values and send back our lkid.  We send back our lkid by sending
    back the rcom_lock struct we got but with the remid field filled in. */
 
+/* needs at least dlm_rcom + rcom_lock */
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -4326,13 +4398,14 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                goto out;
        }
 
-       error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
+       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
+                        R_MASTER, &r);
        if (error)
                goto out;
 
        lock_rsb(r);
 
-       lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
+       lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
        if (lkb) {
                error = -EEXIST;
                goto out_remid;
@@ -4355,18 +4428,20 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
  out_remid:
        /* this is the new value returned to the lock holder for
           saving in its process-copy lkb */
-       rl->rl_remid = lkb->lkb_id;
+       rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
  out_unlock:
        unlock_rsb(r);
        put_rsb(r);
  out:
        if (error)
-               log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
-       rl->rl_result = error;
+               log_debug(ls, "recover_master_copy %d %x", error,
+                         le32_to_cpu(rl->rl_lkid));
+       rl->rl_result = cpu_to_le32(error);
        return error;
 }
 
+/* needs at least dlm_rcom + rcom_lock */
 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -4374,15 +4449,16 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct dlm_lkb *lkb;
        int error;
 
-       error = find_lkb(ls, rl->rl_lkid, &lkb);
+       error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
        if (error) {
-               log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
+               log_error(ls, "recover_process_copy no lkid %x",
+                               le32_to_cpu(rl->rl_lkid));
                return error;
        }
 
        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-       error = rl->rl_result;
+       error = le32_to_cpu(rl->rl_result);
 
        r = lkb->lkb_resource;
        hold_rsb(r);
@@ -4401,7 +4477,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                log_debug(ls, "master copy exists %x", lkb->lkb_id);
                /* fall through */
        case 0:
-               lkb->lkb_remid = rl->rl_remid;
+               lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
                break;
        default:
                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
@@ -4450,7 +4526,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
           lock and that lkb_astparam is the dlm_user_args structure. */
 
        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
-                             DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+                             fake_astfn, ua, fake_bastfn, &args);
        lkb->lkb_flags |= DLM_IFL_USER;
        ua->old_mode = DLM_LOCK_IV;
 
@@ -4503,7 +4579,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        /* user can change the params on its lock when it converts it, or
           add an lvb that didn't exist before */
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
@@ -4524,7 +4600,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        ua->old_mode = lkb->lkb_grmode;
 
        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
-                             DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+                             fake_astfn, ua, fake_bastfn, &args);
        if (error)
                goto out_put;
 
@@ -4554,7 +4630,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        if (lvb_in && ua->lksb.sb_lvbptr)
                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
@@ -4603,7 +4679,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
        if (ua_tmp->castparam)
                ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
@@ -4641,7 +4717,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        error = set_unlock_args(flags, ua, &args);
        if (error)
@@ -4680,7 +4756,6 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
 
 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
        struct dlm_args args;
        int error;
 
@@ -4689,7 +4764,7 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
        mutex_unlock(&ls->ls_orphans_mutex);
 
-       set_unlock_args(0, ua, &args);
+       set_unlock_args(0, lkb->lkb_ua, &args);
 
        error = cancel_lock(ls, lkb, &args);
        if (error == -DLM_ECANCEL)
@@ -4702,11 +4777,10 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 
 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
        struct dlm_args args;
        int error;
 
-       set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+       set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
 
        error = unlock_lock(ls, lkb, &args);
        if (error == -DLM_EUNLOCK)