hwmon: add TI ads7871 a/d converter driver
[safe/jmp/linux-2.6] / fs / dlm / lock.c
index 0593dd8..031dbe3 100644 (file)
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
 #include "memory.h"
@@ -165,7 +166,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 }
 
-void dlm_print_rsb(struct dlm_rsb *r)
+static void dlm_print_rsb(struct dlm_rsb *r)
 {
        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
               r->res_nodeid, r->res_flags, r->res_first_lkid,
@@ -307,7 +308,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
        lkb->lkb_lksb->sb_status = rv;
        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 
-       dlm_add_ast(lkb, AST_COMP);
+       dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
 }
 
 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -318,11 +319,13 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 {
-       if (is_master_copy(lkb))
+       lkb->lkb_time_bast = ktime_get();
+
+       if (is_master_copy(lkb)) {
+               lkb->lkb_bastmode = rqmode; /* printed by debugfs */
                send_bast(r, lkb, rqmode);
-       else {
-               lkb->lkb_bastmode = rqmode;
-               dlm_add_ast(lkb, AST_BAST);
+       } else {
+               dlm_add_ast(lkb, AST_BAST, rqmode);
        }
 }
 
@@ -363,6 +366,7 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
                if (len == r->res_length && !memcmp(name, r->res_name, len))
                        goto found;
        }
+       *r_ret = NULL;
        return -EBADR;
 
  found:
@@ -411,9 +415,9 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
                      unsigned int flags, struct dlm_rsb **r_ret)
 {
        int error;
-       write_lock(&ls->ls_rsbtbl[b].lock);
+       spin_lock(&ls->ls_rsbtbl[b].lock);
        error = _search_rsb(ls, name, len, b, flags, r_ret);
-       write_unlock(&ls->ls_rsbtbl[b].lock);
+       spin_unlock(&ls->ls_rsbtbl[b].lock);
        return error;
 }
 
@@ -434,13 +438,17 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                    unsigned int flags, struct dlm_rsb **r_ret)
 {
-       struct dlm_rsb *r, *tmp;
+       struct dlm_rsb *r = NULL, *tmp;
        uint32_t hash, bucket;
-       int error = 0;
+       int error = -EINVAL;
+
+       if (namelen > DLM_RESNAME_MAXLEN)
+               goto out;
 
        if (dlm_no_directory(ls))
                flags |= R_CREATE;
 
+       error = 0;
        hash = jhash(name, namelen, 0);
        bucket = hash & (ls->ls_rsbtbl_size - 1);
 
@@ -473,16 +481,16 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                r->res_nodeid = nodeid;
        }
 
-       write_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
        if (!error) {
-               write_unlock(&ls->ls_rsbtbl[bucket].lock);
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
                dlm_free_rsb(r);
                r = tmp;
                goto out;
        }
        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-       write_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
        error = 0;
  out:
        *r_ret = r;
@@ -525,9 +533,9 @@ static void put_rsb(struct dlm_rsb *r)
        struct dlm_ls *ls = r->res_ls;
        uint32_t bucket = r->res_bucket;
 
-       write_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        kref_put(&r->res_ref, toss_rsb);
-       write_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -725,10 +733,7 @@ static void lkb_add_ordered(struct list_head *new, struct list_head *head,
                if (lkb->lkb_rqmode < mode)
                        break;
 
-       if (!lkb)
-               list_add_tail(new, head);
-       else
-               __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
+       __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 }
 
 /* add/remove lkb to rsb's grant/convert/wait queue */
@@ -739,6 +744,8 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 
        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 
+       lkb->lkb_timestamp = ktime_get();
+
        lkb->lkb_status = status;
 
        switch (status) {
@@ -828,7 +835,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
                lkb->lkb_wait_count++;
                hold_lkb(lkb);
 
-               log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
+               log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
                          lkb->lkb_wait_count, lkb->lkb_flags);
                goto out;
@@ -844,7 +851,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
        if (error)
-               log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
+               log_error(ls, "addwait error %x %d flags %x %d %d %s",
                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
        mutex_unlock(&ls->ls_waiters_mutex);
@@ -856,23 +863,55 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
    request reply on the requestqueue) between dlm_recover_waiters_pre() which
    set RESEND and dlm_recover_waiters_post() */
 
-static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
+                               struct dlm_message *ms)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int overlap_done = 0;
 
        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
+               log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
                overlap_done = 1;
                goto out_del;
        }
 
        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
+               log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
                overlap_done = 1;
                goto out_del;
        }
 
+       /* Cancel state was preemptively cleared by a successful convert,
+          see next comment, nothing to do. */
+
+       if ((mstype == DLM_MSG_CANCEL_REPLY) &&
+           (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
+               log_debug(ls, "remwait %x cancel_reply wait_type %d",
+                         lkb->lkb_id, lkb->lkb_wait_type);
+               return -1;
+       }
+
+       /* Remove for the convert reply, and premptively remove for the
+          cancel reply.  A convert has been granted while there's still
+          an outstanding cancel on it (the cancel is moot and the result
+          in the cancel reply should be 0).  We preempt the cancel reply
+          because the app gets the convert result and then can follow up
+          with another op, like convert.  This subsequent op would see the
+          lingering state of the cancel and fail with -EBUSY. */
+
+       if ((mstype == DLM_MSG_CONVERT_REPLY) &&
+           (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
+           is_overlap_cancel(lkb) && ms && !ms->m_result) {
+               log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
+                         lkb->lkb_id);
+               lkb->lkb_wait_type = 0;
+               lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+               lkb->lkb_wait_count--;
+               goto out_del;
+       }
+
        /* N.B. type of reply may not always correspond to type of original
           msg due to lookup->request optimization, verify others? */
 
@@ -881,8 +920,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
                goto out_del;
        }
 
-       log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
-                 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
+       log_error(ls, "remwait error %x reply %d flags %x no wait_type",
+                 lkb->lkb_id, mstype, lkb->lkb_flags);
        return -1;
 
  out_del:
@@ -892,7 +931,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
           this would happen */
 
        if (overlap_done && lkb->lkb_wait_type) {
-               log_error(ls, "remove_from_waiters %x reply %d give up on %d",
+               log_error(ls, "remwait error %x reply %d wait_type %d overlap",
                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
                lkb->lkb_wait_count--;
                lkb->lkb_wait_type = 0;
@@ -914,7 +953,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
        int error;
 
        mutex_lock(&ls->ls_waiters_mutex);
-       error = _remove_from_waiters(lkb, mstype);
+       error = _remove_from_waiters(lkb, mstype, NULL);
        mutex_unlock(&ls->ls_waiters_mutex);
        return error;
 }
@@ -929,7 +968,7 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 
        if (ms != &ls->ls_stub_ms)
                mutex_lock(&ls->ls_waiters_mutex);
-       error = _remove_from_waiters(lkb, ms->m_type);
+       error = _remove_from_waiters(lkb, ms->m_type, ms);
        if (ms != &ls->ls_stub_ms)
                mutex_unlock(&ls->ls_waiters_mutex);
        return error;
@@ -960,7 +999,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 
        for (;;) {
                found = 0;
-               write_lock(&ls->ls_rsbtbl[b].lock);
+               spin_lock(&ls->ls_rsbtbl[b].lock);
                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
                                            res_hashchain) {
                        if (!time_after_eq(jiffies, r->res_toss_time +
@@ -971,20 +1010,20 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
                }
 
                if (!found) {
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
                        break;
                }
 
                if (kref_put(&r->res_ref, kill_rsb)) {
                        list_del(&r->res_hashchain);
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
 
                        if (is_master(r))
                                dir_remove(r);
                        dlm_free_rsb(r);
                        count++;
                } else {
-                       write_unlock(&ls->ls_rsbtbl[b].lock);
+                       spin_unlock(&ls->ls_rsbtbl[b].lock);
                        log_error(ls, "tossed rsb in use %s", r->res_name);
                }
        }
@@ -1008,10 +1047,8 @@ static void add_timeout(struct dlm_lkb *lkb)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 
-       if (is_master_copy(lkb)) {
-               lkb->lkb_timestamp = jiffies;
+       if (is_master_copy(lkb))
                return;
-       }
 
        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
@@ -1026,7 +1063,6 @@ static void add_timeout(struct dlm_lkb *lkb)
        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
        mutex_lock(&ls->ls_timeout_mutex);
        hold_lkb(lkb);
-       lkb->lkb_timestamp = jiffies;
        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
        mutex_unlock(&ls->ls_timeout_mutex);
 }
@@ -1054,6 +1090,7 @@ void dlm_scan_timeout(struct dlm_ls *ls)
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
        int do_cancel, do_warn;
+       s64 wait_us;
 
        for (;;) {
                if (dlm_locking_stopped(ls))
@@ -1064,14 +1101,15 @@ void dlm_scan_timeout(struct dlm_ls *ls)
                mutex_lock(&ls->ls_timeout_mutex);
                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
 
+                       wait_us = ktime_to_us(ktime_sub(ktime_get(),
+                                                       lkb->lkb_timestamp));
+
                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
-                           time_after_eq(jiffies, lkb->lkb_timestamp +
-                                         lkb->lkb_timeout_cs * HZ/100))
+                           wait_us >= (lkb->lkb_timeout_cs * 10000))
                                do_cancel = 1;
 
                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
-                           time_after_eq(jiffies, lkb->lkb_timestamp +
-                                          dlm_config.ci_timewarn_cs * HZ/100))
+                           wait_us >= dlm_config.ci_timewarn_cs * 10000)
                                do_warn = 1;
 
                        if (!do_cancel && !do_warn)
@@ -1117,12 +1155,12 @@ void dlm_scan_timeout(struct dlm_ls *ls)
 void dlm_adjust_timeouts(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb;
-       long adj = jiffies - ls->ls_recover_begin;
+       u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
 
        ls->ls_recover_begin = 0;
        mutex_lock(&ls->ls_timeout_mutex);
        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
-               lkb->lkb_timestamp += adj;
+               lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
        mutex_unlock(&ls->ls_timeout_mutex);
 }
 
@@ -1222,6 +1260,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
        if (b == 1) {
                int len = receive_extralen(ms);
+               if (len > DLM_RESNAME_MAXLEN)
+                       len = DLM_RESNAME_MAXLEN;
                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
                lkb->lkb_lvbseq = ms->m_lvbseq;
        }
@@ -1775,8 +1815,9 @@ static void grant_pending_locks(struct dlm_rsb *r)
         */
 
        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
-               if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) {
-                       if (cw && high == DLM_LOCK_PR)
+               if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
+                       if (cw && high == DLM_LOCK_PR &&
+                           lkb->lkb_grmode == DLM_LOCK_PR)
                                queue_bast(r, lkb, DLM_LOCK_CW);
                        else
                                queue_bast(r, lkb, high);
@@ -1805,7 +1846,7 @@ static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
        struct dlm_lkb *gr;
 
        list_for_each_entry(gr, head, lkb_statequeue) {
-               if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) {
+               if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
                        queue_bast(r, gr, lkb->lkb_rqmode);
                        gr->lkb_highbast = lkb->lkb_rqmode;
                }
@@ -1950,8 +1991,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
                        list_del_init(&lkb->lkb_rsb_lookup);
                        r->res_first_lkid = lkb->lkb_id;
                        _request_lock(r, lkb);
-               } else
-                       r->res_nodeid = -1;
+               }
                break;
 
        default:
@@ -1960,8 +2000,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
 }
 
 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
-                        int namelen, unsigned long timeout_cs, void *ast,
-                        void *astarg, void *bast, struct dlm_args *args)
+                        int namelen, unsigned long timeout_cs,
+                        void (*ast) (void *astparam),
+                        void *astparam,
+                        void (*bast) (void *astparam, int mode),
+                        struct dlm_args *args)
 {
        int rv = -EINVAL;
 
@@ -2011,9 +2054,9 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
           an active lkb cannot be modified before locking the rsb */
 
        args->flags = flags;
-       args->astaddr = ast;
-       args->astparam = (long) astarg;
-       args->bastaddr = bast;
+       args->astfn = ast;
+       args->astparam = astparam;
+       args->bastfn = bast;
        args->timeout = timeout_cs;
        args->mode = mode;
        args->lksb = lksb;
@@ -2032,7 +2075,7 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
                return -EINVAL;
 
        args->flags = flags;
-       args->astparam = (long) astarg;
+       args->astparam = astarg;
        return 0;
 }
 
@@ -2062,9 +2105,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        lkb->lkb_exflags = args->flags;
        lkb->lkb_sbflags = 0;
-       lkb->lkb_astaddr = args->astaddr;
+       lkb->lkb_astfn = args->astfn;
        lkb->lkb_astparam = args->astparam;
-       lkb->lkb_bastaddr = args->bastaddr;
+       lkb->lkb_bastfn = args->bastfn;
        lkb->lkb_rqmode = args->mode;
        lkb->lkb_lksb = args->lksb;
        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
@@ -2072,6 +2115,11 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_timeout_cs = args->timeout;
        rv = 0;
  out:
+       if (rv)
+               log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
+                         rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
+                         lkb->lkb_status, lkb->lkb_wait_type,
+                         lkb->lkb_resource->res_name);
        return rv;
 }
 
@@ -2138,6 +2186,13 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
                        goto out;
                }
 
+               /* there's nothing to cancel */
+               if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
+                   !lkb->lkb_wait_type) {
+                       rv = -EBUSY;
+                       goto out;
+               }
+
                switch (lkb->lkb_wait_type) {
                case DLM_MSG_LOOKUP:
                case DLM_MSG_REQUEST:
@@ -2225,20 +2280,30 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
        if (can_be_queued(lkb)) {
                error = -EINPROGRESS;
                add_lkb(r, lkb, DLM_LKSTS_WAITING);
-               send_blocking_asts(r, lkb);
                add_timeout(lkb);
                goto out;
        }
 
        error = -EAGAIN;
-       if (force_blocking_asts(lkb))
-               send_blocking_asts_all(r, lkb);
        queue_cast(r, lkb, -EAGAIN);
-
  out:
        return error;
 }
 
+static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+                              int error)
+{
+       switch (error) {
+       case -EAGAIN:
+               if (force_blocking_asts(lkb))
+                       send_blocking_asts_all(r, lkb);
+               break;
+       case -EINPROGRESS:
+               send_blocking_asts(r, lkb);
+               break;
+       }
+}
+
 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error = 0;
@@ -2249,7 +2314,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
        if (can_be_granted(r, lkb, 1, &deadlk)) {
                grant_lock(r, lkb);
                queue_cast(r, lkb, 0);
-               grant_pending_locks(r);
                goto out;
        }
 
@@ -2279,7 +2343,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
                if (_can_be_granted(r, lkb, 1)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
-                       grant_pending_locks(r);
                        goto out;
                }
                /* else fall through and move to convert queue */
@@ -2289,28 +2352,47 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
                error = -EINPROGRESS;
                del_lkb(r, lkb);
                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
-               send_blocking_asts(r, lkb);
                add_timeout(lkb);
                goto out;
        }
 
        error = -EAGAIN;
-       if (force_blocking_asts(lkb))
-               send_blocking_asts_all(r, lkb);
        queue_cast(r, lkb, -EAGAIN);
-
  out:
        return error;
 }
 
+static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+                              int error)
+{
+       switch (error) {
+       case 0:
+               grant_pending_locks(r);
+               /* grant_pending_locks also sends basts */
+               break;
+       case -EAGAIN:
+               if (force_blocking_asts(lkb))
+                       send_blocking_asts_all(r, lkb);
+               break;
+       case -EINPROGRESS:
+               send_blocking_asts(r, lkb);
+               break;
+       }
+}
+
 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        remove_lock(r, lkb);
        queue_cast(r, lkb, -DLM_EUNLOCK);
-       grant_pending_locks(r);
        return -DLM_EUNLOCK;
 }
 
+static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+                             int error)
+{
+       grant_pending_locks(r);
+}
+
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
  
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -2320,12 +2402,18 @@ static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
        error = revert_lock(r, lkb);
        if (error) {
                queue_cast(r, lkb, -DLM_ECANCEL);
-               grant_pending_locks(r);
                return -DLM_ECANCEL;
        }
        return 0;
 }
 
+static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+                             int error)
+{
+       if (error)
+               grant_pending_locks(r);
+}
+
 /*
  * Four stage 3 varieties:
  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
@@ -2347,11 +2435,15 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
                goto out;
        }
 
-       if (is_remote(r))
+       if (is_remote(r)) {
                /* receive_request() calls do_request() on remote node */
                error = send_request(r, lkb);
-       else
+       } else {
                error = do_request(r, lkb);
+               /* for remote locks the request_reply is sent
+                  between do_request and do_request_effects */
+               do_request_effects(r, lkb, error);
+       }
  out:
        return error;
 }
@@ -2362,11 +2454,15 @@ static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error;
 
-       if (is_remote(r))
+       if (is_remote(r)) {
                /* receive_convert() calls do_convert() on remote node */
                error = send_convert(r, lkb);
-       else
+       } else {
                error = do_convert(r, lkb);
+               /* for remote locks the convert_reply is sent
+                  between do_convert and do_convert_effects */
+               do_convert_effects(r, lkb, error);
+       }
 
        return error;
 }
@@ -2377,11 +2473,15 @@ static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error;
 
-       if (is_remote(r))
+       if (is_remote(r)) {
                /* receive_unlock() calls do_unlock() on remote node */
                error = send_unlock(r, lkb);
-       else
+       } else {
                error = do_unlock(r, lkb);
+               /* for remote locks the unlock_reply is sent
+                  between do_unlock and do_unlock_effects */
+               do_unlock_effects(r, lkb, error);
+       }
 
        return error;
 }
@@ -2392,11 +2492,15 @@ static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error;
 
-       if (is_remote(r))
+       if (is_remote(r)) {
                /* receive_cancel() calls do_cancel() on remote node */
                error = send_cancel(r, lkb);
-       else
+       } else {
                error = do_cancel(r, lkb);
+               /* for remote locks the cancel_reply is sent
+                  between do_cancel and do_cancel_effects */
+               do_cancel_effects(r, lkb, error);
+       }
 
        return error;
 }
@@ -2634,7 +2738,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
           pass into lowcomms_commit and a message buffer (mb) that we
           write our data into */
 
-       mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
+       mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
        if (!mh)
                return -ENOBUFS;
 
@@ -2711,9 +2815,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
        /* m_result and m_bastmode are set from function args,
           not from lkb fields */
 
-       if (lkb->lkb_bastaddr)
+       if (lkb->lkb_bastfn)
                ms->m_asts |= AST_BAST;
-       if (lkb->lkb_astaddr)
+       if (lkb->lkb_astfn)
                ms->m_asts |= AST_COMP;
 
        /* compare with switch in create_message; send_remove() doesn't
@@ -2989,11 +3093,23 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
                if (!lkb->lkb_lvbptr)
                        return -ENOMEM;
                len = receive_extralen(ms);
+               if (len > DLM_RESNAME_MAXLEN)
+                       len = DLM_RESNAME_MAXLEN;
                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
        }
        return 0;
 }
 
+static void fake_bastfn(void *astparam, int mode)
+{
+       log_print("fake_bastfn should not be called");
+}
+
+static void fake_astfn(void *astparam)
+{
+       log_print("fake_astfn should not be called");
+}
+
 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                struct dlm_message *ms)
 {
@@ -3002,8 +3118,9 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_remid = ms->m_lkid;
        lkb->lkb_grmode = DLM_LOCK_IV;
        lkb->lkb_rqmode = ms->m_rqmode;
-       lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
-       lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
+
+       lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
+       lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                /* lkb was just created so there won't be an lvb yet */
@@ -3123,6 +3240,7 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
        attach_lkb(r, lkb);
        error = do_request(r, lkb);
        send_request_reply(r, lkb, error);
+       do_request_effects(r, lkb, error);
 
        unlock_rsb(r);
        put_rsb(r);
@@ -3158,15 +3276,19 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
                goto out;
 
        receive_flags(lkb, ms);
+
        error = receive_convert_args(ls, lkb, ms);
-       if (error)
-               goto out_reply;
+       if (error) {
+               send_convert_reply(r, lkb, error);
+               goto out;
+       }
+
        reply = !down_conversion(lkb);
 
        error = do_convert(r, lkb);
- out_reply:
        if (reply)
                send_convert_reply(r, lkb, error);
+       do_convert_effects(r, lkb, error);
  out:
        unlock_rsb(r);
        put_rsb(r);
@@ -3198,13 +3320,16 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
                goto out;
 
        receive_flags(lkb, ms);
+
        error = receive_unlock_args(ls, lkb, ms);
-       if (error)
-               goto out_reply;
+       if (error) {
+               send_unlock_reply(r, lkb, error);
+               goto out;
+       }
 
        error = do_unlock(r, lkb);
- out_reply:
        send_unlock_reply(r, lkb, error);
+       do_unlock_effects(r, lkb, error);
  out:
        unlock_rsb(r);
        put_rsb(r);
@@ -3239,6 +3364,7 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = do_cancel(r, lkb);
        send_cancel_reply(r, lkb, error);
+       do_cancel_effects(r, lkb, error);
  out:
        unlock_rsb(r);
        put_rsb(r);
@@ -4200,7 +4326,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
        struct dlm_rsb *r, *r_ret = NULL;
 
-       read_lock(&ls->ls_rsbtbl[bucket].lock);
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
                if (!rsb_flag(r, RSB_LOCKS_PURGED))
                        continue;
@@ -4209,7 +4335,7 @@ static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
                r_ret = r;
                break;
        }
-       read_unlock(&ls->ls_rsbtbl[bucket].lock);
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
        return r_ret;
 }
 
@@ -4283,8 +4409,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
        lkb->lkb_grmode = rl->rl_grmode;
        /* don't set lkb_status because add_lkb wants to itself */
 
-       lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
-       lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
+       lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
+       lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
 
        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4444,7 +4570,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
        }
 
        if (flags & DLM_LKF_VALBLK) {
-               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
                if (!ua->lksb.sb_lvbptr) {
                        kfree(ua);
                        __put_lkb(ls, lkb);
@@ -4458,7 +4584,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
           lock and that lkb_astparam is the dlm_user_args structure. */
 
        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
-                             DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+                             fake_astfn, ua, fake_bastfn, &args);
        lkb->lkb_flags |= DLM_IFL_USER;
        ua->old_mode = DLM_LOCK_IV;
 
@@ -4511,10 +4637,10 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        /* user can change the params on its lock when it converts it, or
           add an lvb that didn't exist before */
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
-               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
                if (!ua->lksb.sb_lvbptr) {
                        error = -ENOMEM;
                        goto out_put;
@@ -4532,7 +4658,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        ua->old_mode = lkb->lkb_grmode;
 
        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
-                             DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
+                             fake_astfn, ua, fake_bastfn, &args);
        if (error)
                goto out_put;
 
@@ -4562,7 +4688,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        if (lvb_in && ua->lksb.sb_lvbptr)
                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
@@ -4611,7 +4737,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
        if (ua_tmp->castparam)
                ua->castparam = ua_tmp->castparam;
        ua->user_lksb = ua_tmp->user_lksb;
@@ -4649,7 +4775,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
        if (error)
                goto out;
 
-       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua = lkb->lkb_ua;
 
        error = set_unlock_args(flags, ua, &args);
        if (error)
@@ -4688,7 +4814,6 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
 
 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
        struct dlm_args args;
        int error;
 
@@ -4697,7 +4822,7 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
        mutex_unlock(&ls->ls_orphans_mutex);
 
-       set_unlock_args(0, ua, &args);
+       set_unlock_args(0, lkb->lkb_ua, &args);
 
        error = cancel_lock(ls, lkb, &args);
        if (error == -DLM_ECANCEL)
@@ -4710,11 +4835,10 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 
 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
        struct dlm_args args;
        int error;
 
-       set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+       set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
 
        error = unlock_lock(ls, lkb, &args);
        if (error == -DLM_EUNLOCK)