[DLM] fix userland unlock
[safe/jmp/linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88
89 #define FAKE_USER_AST (void*)0xff00ff00
90
91 /*
92  * Lock compatibilty matrix - thanks Steve
93  * UN = Unlocked state. Not really a state, used as a flag
94  * PD = Padding. Used to make the matrix a nice power of two in size
95  * Other states are the same as the VMS DLM.
96  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
97  */
98
99 static const int __dlm_compat_matrix[8][8] = {
100       /* UN NL CR CW PR PW EX PD */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
103         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
104         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
105         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
106         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
107         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
108         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
109 };
110
111 /*
112  * This defines the direction of transfer of LVB data.
113  * Granted mode is the row; requested mode is the column.
114  * Usage: matrix[grmode+1][rqmode+1]
115  * 1 = LVB is returned to the caller
116  * 0 = LVB is written to the resource
117  * -1 = nothing happens to the LVB
118  */
119
120 const int dlm_lvb_operations[8][8] = {
121         /* UN   NL  CR  CW  PR  PW  EX  PD*/
122         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
123         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
124         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
125         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
126         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
127         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
130 };
131
132 #define modes_compat(gr, rq) \
133         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135 int dlm_modes_compat(int mode1, int mode2)
136 {
137         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138 }
139
140 /*
141  * Compatibility matrix for conversions with QUECVT set.
142  * Granted mode is the row; requested mode is the column.
143  * Usage: matrix[grmode+1][rqmode+1]
144  */
145
146 static const int __quecvt_compat_matrix[8][8] = {
147       /* UN NL CR CW PR PW EX PD */
148         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
149         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
150         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
151         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
152         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
153         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
154         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
155         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
156 };
157
158 void dlm_print_lkb(struct dlm_lkb *lkb)
159 {
160         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165 }
166
167 void dlm_print_rsb(struct dlm_rsb *r)
168 {
169         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170                r->res_nodeid, r->res_flags, r->res_first_lkid,
171                r->res_recover_locks_count, r->res_name);
172 }
173
174 /* Threads cannot use the lockspace while it's being recovered */
175
176 static inline void lock_recovery(struct dlm_ls *ls)
177 {
178         down_read(&ls->ls_in_recovery);
179 }
180
181 static inline void unlock_recovery(struct dlm_ls *ls)
182 {
183         up_read(&ls->ls_in_recovery);
184 }
185
186 static inline int lock_recovery_try(struct dlm_ls *ls)
187 {
188         return down_read_trylock(&ls->ls_in_recovery);
189 }
190
191 static inline int can_be_queued(struct dlm_lkb *lkb)
192 {
193         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
194 }
195
196 static inline int force_blocking_asts(struct dlm_lkb *lkb)
197 {
198         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
199 }
200
201 static inline int is_demoted(struct dlm_lkb *lkb)
202 {
203         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
204 }
205
206 static inline int is_remote(struct dlm_rsb *r)
207 {
208         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209         return !!r->res_nodeid;
210 }
211
212 static inline int is_process_copy(struct dlm_lkb *lkb)
213 {
214         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
215 }
216
217 static inline int is_master_copy(struct dlm_lkb *lkb)
218 {
219         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
221         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
222 }
223
224 static inline int middle_conversion(struct dlm_lkb *lkb)
225 {
226         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
228                 return 1;
229         return 0;
230 }
231
232 static inline int down_conversion(struct dlm_lkb *lkb)
233 {
234         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
235 }
236
237 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
238 {
239         if (is_master_copy(lkb))
240                 return;
241
242         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
243
244         lkb->lkb_lksb->sb_status = rv;
245         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
246
247         dlm_add_ast(lkb, AST_COMP);
248 }
249
250 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
251 {
252         if (is_master_copy(lkb))
253                 send_bast(r, lkb, rqmode);
254         else {
255                 lkb->lkb_bastmode = rqmode;
256                 dlm_add_ast(lkb, AST_BAST);
257         }
258 }
259
260 /*
261  * Basic operations on rsb's and lkb's
262  */
263
264 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265 {
266         struct dlm_rsb *r;
267
268         r = allocate_rsb(ls, len);
269         if (!r)
270                 return NULL;
271
272         r->res_ls = ls;
273         r->res_length = len;
274         memcpy(r->res_name, name, len);
275         mutex_init(&r->res_mutex);
276
277         INIT_LIST_HEAD(&r->res_lookup);
278         INIT_LIST_HEAD(&r->res_grantqueue);
279         INIT_LIST_HEAD(&r->res_convertqueue);
280         INIT_LIST_HEAD(&r->res_waitqueue);
281         INIT_LIST_HEAD(&r->res_root_list);
282         INIT_LIST_HEAD(&r->res_recover_list);
283
284         return r;
285 }
286
287 static int search_rsb_list(struct list_head *head, char *name, int len,
288                            unsigned int flags, struct dlm_rsb **r_ret)
289 {
290         struct dlm_rsb *r;
291         int error = 0;
292
293         list_for_each_entry(r, head, res_hashchain) {
294                 if (len == r->res_length && !memcmp(name, r->res_name, len))
295                         goto found;
296         }
297         return -EBADR;
298
299  found:
300         if (r->res_nodeid && (flags & R_MASTER))
301                 error = -ENOTBLK;
302         *r_ret = r;
303         return error;
304 }
305
306 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307                        unsigned int flags, struct dlm_rsb **r_ret)
308 {
309         struct dlm_rsb *r;
310         int error;
311
312         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
313         if (!error) {
314                 kref_get(&r->res_ref);
315                 goto out;
316         }
317         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318         if (error)
319                 goto out;
320
321         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
322
323         if (dlm_no_directory(ls))
324                 goto out;
325
326         if (r->res_nodeid == -1) {
327                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328                 r->res_first_lkid = 0;
329         } else if (r->res_nodeid > 0) {
330                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331                 r->res_first_lkid = 0;
332         } else {
333                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
335         }
336  out:
337         *r_ret = r;
338         return error;
339 }
340
341 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342                       unsigned int flags, struct dlm_rsb **r_ret)
343 {
344         int error;
345         write_lock(&ls->ls_rsbtbl[b].lock);
346         error = _search_rsb(ls, name, len, b, flags, r_ret);
347         write_unlock(&ls->ls_rsbtbl[b].lock);
348         return error;
349 }
350
351 /*
352  * Find rsb in rsbtbl and potentially create/add one
353  *
354  * Delaying the release of rsb's has a similar benefit to applications keeping
355  * NL locks on an rsb, but without the guarantee that the cached master value
356  * will still be valid when the rsb is reused.  Apps aren't always smart enough
357  * to keep NL locks on an rsb that they may lock again shortly; this can lead
358  * to excessive master lookups and removals if we don't delay the release.
359  *
360  * Searching for an rsb means looking through both the normal list and toss
361  * list.  When found on the toss list the rsb is moved to the normal list with
362  * ref count of 1; when found on normal list the ref count is incremented.
363  */
364
365 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366                     unsigned int flags, struct dlm_rsb **r_ret)
367 {
368         struct dlm_rsb *r, *tmp;
369         uint32_t hash, bucket;
370         int error = 0;
371
372         if (dlm_no_directory(ls))
373                 flags |= R_CREATE;
374
375         hash = jhash(name, namelen, 0);
376         bucket = hash & (ls->ls_rsbtbl_size - 1);
377
378         error = search_rsb(ls, name, namelen, bucket, flags, &r);
379         if (!error)
380                 goto out;
381
382         if (error == -EBADR && !(flags & R_CREATE))
383                 goto out;
384
385         /* the rsb was found but wasn't a master copy */
386         if (error == -ENOTBLK)
387                 goto out;
388
389         error = -ENOMEM;
390         r = create_rsb(ls, name, namelen);
391         if (!r)
392                 goto out;
393
394         r->res_hash = hash;
395         r->res_bucket = bucket;
396         r->res_nodeid = -1;
397         kref_init(&r->res_ref);
398
399         /* With no directory, the master can be set immediately */
400         if (dlm_no_directory(ls)) {
401                 int nodeid = dlm_dir_nodeid(r);
402                 if (nodeid == dlm_our_nodeid())
403                         nodeid = 0;
404                 r->res_nodeid = nodeid;
405         }
406
407         write_lock(&ls->ls_rsbtbl[bucket].lock);
408         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
409         if (!error) {
410                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
411                 free_rsb(r);
412                 r = tmp;
413                 goto out;
414         }
415         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416         write_unlock(&ls->ls_rsbtbl[bucket].lock);
417         error = 0;
418  out:
419         *r_ret = r;
420         return error;
421 }
422
423 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424                  unsigned int flags, struct dlm_rsb **r_ret)
425 {
426         return find_rsb(ls, name, namelen, flags, r_ret);
427 }
428
429 /* This is only called to add a reference when the code already holds
430    a valid reference to the rsb, so there's no need for locking. */
431
432 static inline void hold_rsb(struct dlm_rsb *r)
433 {
434         kref_get(&r->res_ref);
435 }
436
437 void dlm_hold_rsb(struct dlm_rsb *r)
438 {
439         hold_rsb(r);
440 }
441
442 static void toss_rsb(struct kref *kref)
443 {
444         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445         struct dlm_ls *ls = r->res_ls;
446
447         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448         kref_init(&r->res_ref);
449         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450         r->res_toss_time = jiffies;
451         if (r->res_lvbptr) {
452                 free_lvb(r->res_lvbptr);
453                 r->res_lvbptr = NULL;
454         }
455 }
456
457 /* When all references to the rsb are gone it's transfered to
458    the tossed list for later disposal. */
459
460 static void put_rsb(struct dlm_rsb *r)
461 {
462         struct dlm_ls *ls = r->res_ls;
463         uint32_t bucket = r->res_bucket;
464
465         write_lock(&ls->ls_rsbtbl[bucket].lock);
466         kref_put(&r->res_ref, toss_rsb);
467         write_unlock(&ls->ls_rsbtbl[bucket].lock);
468 }
469
470 void dlm_put_rsb(struct dlm_rsb *r)
471 {
472         put_rsb(r);
473 }
474
475 /* See comment for unhold_lkb */
476
477 static void unhold_rsb(struct dlm_rsb *r)
478 {
479         int rv;
480         rv = kref_put(&r->res_ref, toss_rsb);
481         DLM_ASSERT(!rv, dlm_print_rsb(r););
482 }
483
484 static void kill_rsb(struct kref *kref)
485 {
486         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487
488         /* All work is done after the return from kref_put() so we
489            can release the write_lock before the remove and free. */
490
491         DLM_ASSERT(list_empty(&r->res_lookup),);
492         DLM_ASSERT(list_empty(&r->res_grantqueue),);
493         DLM_ASSERT(list_empty(&r->res_convertqueue),);
494         DLM_ASSERT(list_empty(&r->res_waitqueue),);
495         DLM_ASSERT(list_empty(&r->res_root_list),);
496         DLM_ASSERT(list_empty(&r->res_recover_list),);
497 }
498
499 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500    The rsb must exist as long as any lkb's for it do. */
501
502 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
503 {
504         hold_rsb(r);
505         lkb->lkb_resource = r;
506 }
507
508 static void detach_lkb(struct dlm_lkb *lkb)
509 {
510         if (lkb->lkb_resource) {
511                 put_rsb(lkb->lkb_resource);
512                 lkb->lkb_resource = NULL;
513         }
514 }
515
516 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
517 {
518         struct dlm_lkb *lkb, *tmp;
519         uint32_t lkid = 0;
520         uint16_t bucket;
521
522         lkb = allocate_lkb(ls);
523         if (!lkb)
524                 return -ENOMEM;
525
526         lkb->lkb_nodeid = -1;
527         lkb->lkb_grmode = DLM_LOCK_IV;
528         kref_init(&lkb->lkb_ref);
529         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
530
531         get_random_bytes(&bucket, sizeof(bucket));
532         bucket &= (ls->ls_lkbtbl_size - 1);
533
534         write_lock(&ls->ls_lkbtbl[bucket].lock);
535
536         /* counter can roll over so we must verify lkid is not in use */
537
538         while (lkid == 0) {
539                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
540
541                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
542                                     lkb_idtbl_list) {
543                         if (tmp->lkb_id != lkid)
544                                 continue;
545                         lkid = 0;
546                         break;
547                 }
548         }
549
550         lkb->lkb_id = lkid;
551         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
552         write_unlock(&ls->ls_lkbtbl[bucket].lock);
553
554         *lkb_ret = lkb;
555         return 0;
556 }
557
558 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
559 {
560         uint16_t bucket = lkid & 0xFFFF;
561         struct dlm_lkb *lkb;
562
563         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
564                 if (lkb->lkb_id == lkid)
565                         return lkb;
566         }
567         return NULL;
568 }
569
570 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
571 {
572         struct dlm_lkb *lkb;
573         uint16_t bucket = lkid & 0xFFFF;
574
575         if (bucket >= ls->ls_lkbtbl_size)
576                 return -EBADSLT;
577
578         read_lock(&ls->ls_lkbtbl[bucket].lock);
579         lkb = __find_lkb(ls, lkid);
580         if (lkb)
581                 kref_get(&lkb->lkb_ref);
582         read_unlock(&ls->ls_lkbtbl[bucket].lock);
583
584         *lkb_ret = lkb;
585         return lkb ? 0 : -ENOENT;
586 }
587
588 static void kill_lkb(struct kref *kref)
589 {
590         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
591
592         /* All work is done after the return from kref_put() so we
593            can release the write_lock before the detach_lkb */
594
595         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
596 }
597
598 /* __put_lkb() is used when an lkb may not have an rsb attached to
599    it so we need to provide the lockspace explicitly */
600
601 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
602 {
603         uint16_t bucket = lkb->lkb_id & 0xFFFF;
604
605         write_lock(&ls->ls_lkbtbl[bucket].lock);
606         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
607                 list_del(&lkb->lkb_idtbl_list);
608                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
609
610                 detach_lkb(lkb);
611
612                 /* for local/process lkbs, lvbptr points to caller's lksb */
613                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
614                         free_lvb(lkb->lkb_lvbptr);
615                 free_lkb(lkb);
616                 return 1;
617         } else {
618                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
619                 return 0;
620         }
621 }
622
623 int dlm_put_lkb(struct dlm_lkb *lkb)
624 {
625         struct dlm_ls *ls;
626
627         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
628         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
629
630         ls = lkb->lkb_resource->res_ls;
631         return __put_lkb(ls, lkb);
632 }
633
634 /* This is only called to add a reference when the code already holds
635    a valid reference to the lkb, so there's no need for locking. */
636
637 static inline void hold_lkb(struct dlm_lkb *lkb)
638 {
639         kref_get(&lkb->lkb_ref);
640 }
641
642 /* This is called when we need to remove a reference and are certain
643    it's not the last ref.  e.g. del_lkb is always called between a
644    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
645    put_lkb would work fine, but would involve unnecessary locking */
646
647 static inline void unhold_lkb(struct dlm_lkb *lkb)
648 {
649         int rv;
650         rv = kref_put(&lkb->lkb_ref, kill_lkb);
651         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
652 }
653
654 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
655                             int mode)
656 {
657         struct dlm_lkb *lkb = NULL;
658
659         list_for_each_entry(lkb, head, lkb_statequeue)
660                 if (lkb->lkb_rqmode < mode)
661                         break;
662
663         if (!lkb)
664                 list_add_tail(new, head);
665         else
666                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
667 }
668
669 /* add/remove lkb to rsb's grant/convert/wait queue */
670
671 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
672 {
673         kref_get(&lkb->lkb_ref);
674
675         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
676
677         lkb->lkb_status = status;
678
679         switch (status) {
680         case DLM_LKSTS_WAITING:
681                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
682                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
683                 else
684                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
685                 break;
686         case DLM_LKSTS_GRANTED:
687                 /* convention says granted locks kept in order of grmode */
688                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
689                                 lkb->lkb_grmode);
690                 break;
691         case DLM_LKSTS_CONVERT:
692                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
693                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
694                 else
695                         list_add_tail(&lkb->lkb_statequeue,
696                                       &r->res_convertqueue);
697                 break;
698         default:
699                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
700         }
701 }
702
703 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
704 {
705         lkb->lkb_status = 0;
706         list_del(&lkb->lkb_statequeue);
707         unhold_lkb(lkb);
708 }
709
710 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
711 {
712         hold_lkb(lkb);
713         del_lkb(r, lkb);
714         add_lkb(r, lkb, sts);
715         unhold_lkb(lkb);
716 }
717
718 /* add/remove lkb from global waiters list of lkb's waiting for
719    a reply from a remote node */
720
721 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
722 {
723         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
724
725         mutex_lock(&ls->ls_waiters_mutex);
726         if (lkb->lkb_wait_type) {
727                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
728                 goto out;
729         }
730         lkb->lkb_wait_type = mstype;
731         kref_get(&lkb->lkb_ref);
732         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
733  out:
734         mutex_unlock(&ls->ls_waiters_mutex);
735 }
736
737 static int _remove_from_waiters(struct dlm_lkb *lkb)
738 {
739         int error = 0;
740
741         if (!lkb->lkb_wait_type) {
742                 log_print("remove_from_waiters error");
743                 error = -EINVAL;
744                 goto out;
745         }
746         lkb->lkb_wait_type = 0;
747         list_del(&lkb->lkb_wait_reply);
748         unhold_lkb(lkb);
749  out:
750         return error;
751 }
752
753 static int remove_from_waiters(struct dlm_lkb *lkb)
754 {
755         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
756         int error;
757
758         mutex_lock(&ls->ls_waiters_mutex);
759         error = _remove_from_waiters(lkb);
760         mutex_unlock(&ls->ls_waiters_mutex);
761         return error;
762 }
763
764 static void dir_remove(struct dlm_rsb *r)
765 {
766         int to_nodeid;
767
768         if (dlm_no_directory(r->res_ls))
769                 return;
770
771         to_nodeid = dlm_dir_nodeid(r);
772         if (to_nodeid != dlm_our_nodeid())
773                 send_remove(r);
774         else
775                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
776                                      r->res_name, r->res_length);
777 }
778
779 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
780    found since they are in order of newest to oldest? */
781
782 static int shrink_bucket(struct dlm_ls *ls, int b)
783 {
784         struct dlm_rsb *r;
785         int count = 0, found;
786
787         for (;;) {
788                 found = 0;
789                 write_lock(&ls->ls_rsbtbl[b].lock);
790                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
791                                             res_hashchain) {
792                         if (!time_after_eq(jiffies, r->res_toss_time +
793                                            dlm_config.toss_secs * HZ))
794                                 continue;
795                         found = 1;
796                         break;
797                 }
798
799                 if (!found) {
800                         write_unlock(&ls->ls_rsbtbl[b].lock);
801                         break;
802                 }
803
804                 if (kref_put(&r->res_ref, kill_rsb)) {
805                         list_del(&r->res_hashchain);
806                         write_unlock(&ls->ls_rsbtbl[b].lock);
807
808                         if (is_master(r))
809                                 dir_remove(r);
810                         free_rsb(r);
811                         count++;
812                 } else {
813                         write_unlock(&ls->ls_rsbtbl[b].lock);
814                         log_error(ls, "tossed rsb in use %s", r->res_name);
815                 }
816         }
817
818         return count;
819 }
820
821 void dlm_scan_rsbs(struct dlm_ls *ls)
822 {
823         int i;
824
825         if (dlm_locking_stopped(ls))
826                 return;
827
828         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
829                 shrink_bucket(ls, i);
830                 cond_resched();
831         }
832 }
833
834 /* lkb is master or local copy */
835
836 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
837 {
838         int b, len = r->res_ls->ls_lvblen;
839
840         /* b=1 lvb returned to caller
841            b=0 lvb written to rsb or invalidated
842            b=-1 do nothing */
843
844         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
845
846         if (b == 1) {
847                 if (!lkb->lkb_lvbptr)
848                         return;
849
850                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
851                         return;
852
853                 if (!r->res_lvbptr)
854                         return;
855
856                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
857                 lkb->lkb_lvbseq = r->res_lvbseq;
858
859         } else if (b == 0) {
860                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
861                         rsb_set_flag(r, RSB_VALNOTVALID);
862                         return;
863                 }
864
865                 if (!lkb->lkb_lvbptr)
866                         return;
867
868                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
869                         return;
870
871                 if (!r->res_lvbptr)
872                         r->res_lvbptr = allocate_lvb(r->res_ls);
873
874                 if (!r->res_lvbptr)
875                         return;
876
877                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
878                 r->res_lvbseq++;
879                 lkb->lkb_lvbseq = r->res_lvbseq;
880                 rsb_clear_flag(r, RSB_VALNOTVALID);
881         }
882
883         if (rsb_flag(r, RSB_VALNOTVALID))
884                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
885 }
886
887 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
888 {
889         if (lkb->lkb_grmode < DLM_LOCK_PW)
890                 return;
891
892         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
893                 rsb_set_flag(r, RSB_VALNOTVALID);
894                 return;
895         }
896
897         if (!lkb->lkb_lvbptr)
898                 return;
899
900         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
901                 return;
902
903         if (!r->res_lvbptr)
904                 r->res_lvbptr = allocate_lvb(r->res_ls);
905
906         if (!r->res_lvbptr)
907                 return;
908
909         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
910         r->res_lvbseq++;
911         rsb_clear_flag(r, RSB_VALNOTVALID);
912 }
913
914 /* lkb is process copy (pc) */
915
916 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
917                             struct dlm_message *ms)
918 {
919         int b;
920
921         if (!lkb->lkb_lvbptr)
922                 return;
923
924         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
925                 return;
926
927         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
928         if (b == 1) {
929                 int len = receive_extralen(ms);
930                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
931                 lkb->lkb_lvbseq = ms->m_lvbseq;
932         }
933 }
934
935 /* Manipulate lkb's on rsb's convert/granted/waiting queues
936    remove_lock -- used for unlock, removes lkb from granted
937    revert_lock -- used for cancel, moves lkb from convert to granted
938    grant_lock  -- used for request and convert, adds lkb to granted or
939                   moves lkb from convert or waiting to granted
940
941    Each of these is used for master or local copy lkb's.  There is
942    also a _pc() variation used to make the corresponding change on
943    a process copy (pc) lkb. */
944
945 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
946 {
947         del_lkb(r, lkb);
948         lkb->lkb_grmode = DLM_LOCK_IV;
949         /* this unhold undoes the original ref from create_lkb()
950            so this leads to the lkb being freed */
951         unhold_lkb(lkb);
952 }
953
954 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
955 {
956         set_lvb_unlock(r, lkb);
957         _remove_lock(r, lkb);
958 }
959
960 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
961 {
962         _remove_lock(r, lkb);
963 }
964
965 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966 {
967         lkb->lkb_rqmode = DLM_LOCK_IV;
968
969         switch (lkb->lkb_status) {
970         case DLM_LKSTS_GRANTED:
971                 break;
972         case DLM_LKSTS_CONVERT:
973                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
974                 break;
975         case DLM_LKSTS_WAITING:
976                 del_lkb(r, lkb);
977                 lkb->lkb_grmode = DLM_LOCK_IV;
978                 /* this unhold undoes the original ref from create_lkb()
979                    so this leads to the lkb being freed */
980                 unhold_lkb(lkb);
981                 break;
982         default:
983                 log_print("invalid status for revert %d", lkb->lkb_status);
984         }
985 }
986
987 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 {
989         revert_lock(r, lkb);
990 }
991
992 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993 {
994         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
995                 lkb->lkb_grmode = lkb->lkb_rqmode;
996                 if (lkb->lkb_status)
997                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
998                 else
999                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000         }
1001
1002         lkb->lkb_rqmode = DLM_LOCK_IV;
1003 }
1004
1005 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1006 {
1007         set_lvb_lock(r, lkb);
1008         _grant_lock(r, lkb);
1009         lkb->lkb_highbast = 0;
1010 }
1011
1012 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1013                           struct dlm_message *ms)
1014 {
1015         set_lvb_lock_pc(r, lkb, ms);
1016         _grant_lock(r, lkb);
1017 }
1018
1019 /* called by grant_pending_locks() which means an async grant message must
1020    be sent to the requesting node in addition to granting the lock if the
1021    lkb belongs to a remote node. */
1022
1023 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1024 {
1025         grant_lock(r, lkb);
1026         if (is_master_copy(lkb))
1027                 send_grant(r, lkb);
1028         else
1029                 queue_cast(r, lkb, 0);
1030 }
1031
1032 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1033 {
1034         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1035                                            lkb_statequeue);
1036         if (lkb->lkb_id == first->lkb_id)
1037                 return 1;
1038
1039         return 0;
1040 }
1041
1042 /* Check if the given lkb conflicts with another lkb on the queue. */
1043
1044 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1045 {
1046         struct dlm_lkb *this;
1047
1048         list_for_each_entry(this, head, lkb_statequeue) {
1049                 if (this == lkb)
1050                         continue;
1051                 if (!modes_compat(this, lkb))
1052                         return 1;
1053         }
1054         return 0;
1055 }
1056
1057 /*
1058  * "A conversion deadlock arises with a pair of lock requests in the converting
1059  * queue for one resource.  The granted mode of each lock blocks the requested
1060  * mode of the other lock."
1061  *
1062  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1063  * convert queue from being granted, then demote lkb (set grmode to NL).
1064  * This second form requires that we check for conv-deadlk even when
1065  * now == 0 in _can_be_granted().
1066  *
1067  * Example:
1068  * Granted Queue: empty
1069  * Convert Queue: NL->EX (first lock)
1070  *                PR->EX (second lock)
1071  *
1072  * The first lock can't be granted because of the granted mode of the second
1073  * lock and the second lock can't be granted because it's not first in the
1074  * list.  We demote the granted mode of the second lock (the lkb passed to this
1075  * function).
1076  *
1077  * After the resolution, the "grant pending" function needs to go back and try
1078  * to grant locks on the convert queue again since the first lock can now be
1079  * granted.
1080  */
1081
1082 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1083 {
1084         struct dlm_lkb *this, *first = NULL, *self = NULL;
1085
1086         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1087                 if (!first)
1088                         first = this;
1089                 if (this == lkb) {
1090                         self = lkb;
1091                         continue;
1092                 }
1093
1094                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1095                         return 1;
1096         }
1097
1098         /* if lkb is on the convert queue and is preventing the first
1099            from being granted, then there's deadlock and we demote lkb.
1100            multiple converting locks may need to do this before the first
1101            converting lock can be granted. */
1102
1103         if (self && self != first) {
1104                 if (!modes_compat(lkb, first) &&
1105                     !queue_conflict(&rsb->res_grantqueue, first))
1106                         return 1;
1107         }
1108
1109         return 0;
1110 }
1111
1112 /*
1113  * Return 1 if the lock can be granted, 0 otherwise.
1114  * Also detect and resolve conversion deadlocks.
1115  *
1116  * lkb is the lock to be granted
1117  *
1118  * now is 1 if the function is being called in the context of the
1119  * immediate request, it is 0 if called later, after the lock has been
1120  * queued.
1121  *
1122  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1123  */
1124
1125 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1126 {
1127         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1128
1129         /*
1130          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1131          * a new request for a NL mode lock being blocked.
1132          *
1133          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1134          * request, then it would be granted.  In essence, the use of this flag
1135          * tells the Lock Manager to expedite theis request by not considering
1136          * what may be in the CONVERTING or WAITING queues...  As of this
1137          * writing, the EXPEDITE flag can be used only with new requests for NL
1138          * mode locks.  This flag is not valid for conversion requests.
1139          *
1140          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1141          * conversion or used with a non-NL requested mode.  We also know an
1142          * EXPEDITE request is always granted immediately, so now must always
1143          * be 1.  The full condition to grant an expedite request: (now &&
1144          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1145          * therefore be shortened to just checking the flag.
1146          */
1147
1148         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1149                 return 1;
1150
1151         /*
1152          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1153          * added to the remaining conditions.
1154          */
1155
1156         if (queue_conflict(&r->res_grantqueue, lkb))
1157                 goto out;
1158
1159         /*
1160          * 6-3: By default, a conversion request is immediately granted if the
1161          * requested mode is compatible with the modes of all other granted
1162          * locks
1163          */
1164
1165         if (queue_conflict(&r->res_convertqueue, lkb))
1166                 goto out;
1167
1168         /*
1169          * 6-5: But the default algorithm for deciding whether to grant or
1170          * queue conversion requests does not by itself guarantee that such
1171          * requests are serviced on a "first come first serve" basis.  This, in
1172          * turn, can lead to a phenomenon known as "indefinate postponement".
1173          *
1174          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1175          * the system service employed to request a lock conversion.  This flag
1176          * forces certain conversion requests to be queued, even if they are
1177          * compatible with the granted modes of other locks on the same
1178          * resource.  Thus, the use of this flag results in conversion requests
1179          * being ordered on a "first come first servce" basis.
1180          *
1181          * DCT: This condition is all about new conversions being able to occur
1182          * "in place" while the lock remains on the granted queue (assuming
1183          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1184          * doesn't _have_ to go onto the convert queue where it's processed in
1185          * order.  The "now" variable is necessary to distinguish converts
1186          * being received and processed for the first time now, because once a
1187          * convert is moved to the conversion queue the condition below applies
1188          * requiring fifo granting.
1189          */
1190
1191         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1192                 return 1;
1193
1194         /*
1195          * The NOORDER flag is set to avoid the standard vms rules on grant
1196          * order.
1197          */
1198
1199         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1200                 return 1;
1201
1202         /*
1203          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1204          * granted until all other conversion requests ahead of it are granted
1205          * and/or canceled.
1206          */
1207
1208         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1209                 return 1;
1210
1211         /*
1212          * 6-4: By default, a new request is immediately granted only if all
1213          * three of the following conditions are satisfied when the request is
1214          * issued:
1215          * - The queue of ungranted conversion requests for the resource is
1216          *   empty.
1217          * - The queue of ungranted new requests for the resource is empty.
1218          * - The mode of the new request is compatible with the most
1219          *   restrictive mode of all granted locks on the resource.
1220          */
1221
1222         if (now && !conv && list_empty(&r->res_convertqueue) &&
1223             list_empty(&r->res_waitqueue))
1224                 return 1;
1225
1226         /*
1227          * 6-4: Once a lock request is in the queue of ungranted new requests,
1228          * it cannot be granted until the queue of ungranted conversion
1229          * requests is empty, all ungranted new requests ahead of it are
1230          * granted and/or canceled, and it is compatible with the granted mode
1231          * of the most restrictive lock granted on the resource.
1232          */
1233
1234         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1235             first_in_list(lkb, &r->res_waitqueue))
1236                 return 1;
1237
1238  out:
1239         /*
1240          * The following, enabled by CONVDEADLK, departs from VMS.
1241          */
1242
1243         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1244             conversion_deadlock_detect(r, lkb)) {
1245                 lkb->lkb_grmode = DLM_LOCK_NL;
1246                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1247         }
1248
1249         return 0;
1250 }
1251
1252 /*
1253  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1254  * simple way to provide a big optimization to applications that can use them.
1255  */
1256
1257 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1258 {
1259         uint32_t flags = lkb->lkb_exflags;
1260         int rv;
1261         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1262
1263         rv = _can_be_granted(r, lkb, now);
1264         if (rv)
1265                 goto out;
1266
1267         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1268                 goto out;
1269
1270         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1271                 alt = DLM_LOCK_PR;
1272         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1273                 alt = DLM_LOCK_CW;
1274
1275         if (alt) {
1276                 lkb->lkb_rqmode = alt;
1277                 rv = _can_be_granted(r, lkb, now);
1278                 if (rv)
1279                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1280                 else
1281                         lkb->lkb_rqmode = rqmode;
1282         }
1283  out:
1284         return rv;
1285 }
1286
1287 static int grant_pending_convert(struct dlm_rsb *r, int high)
1288 {
1289         struct dlm_lkb *lkb, *s;
1290         int hi, demoted, quit, grant_restart, demote_restart;
1291
1292         quit = 0;
1293  restart:
1294         grant_restart = 0;
1295         demote_restart = 0;
1296         hi = DLM_LOCK_IV;
1297
1298         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1299                 demoted = is_demoted(lkb);
1300                 if (can_be_granted(r, lkb, 0)) {
1301                         grant_lock_pending(r, lkb);
1302                         grant_restart = 1;
1303                 } else {
1304                         hi = max_t(int, lkb->lkb_rqmode, hi);
1305                         if (!demoted && is_demoted(lkb))
1306                                 demote_restart = 1;
1307                 }
1308         }
1309
1310         if (grant_restart)
1311                 goto restart;
1312         if (demote_restart && !quit) {
1313                 quit = 1;
1314                 goto restart;
1315         }
1316
1317         return max_t(int, high, hi);
1318 }
1319
1320 static int grant_pending_wait(struct dlm_rsb *r, int high)
1321 {
1322         struct dlm_lkb *lkb, *s;
1323
1324         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1325                 if (can_be_granted(r, lkb, 0))
1326                         grant_lock_pending(r, lkb);
1327                 else
1328                         high = max_t(int, lkb->lkb_rqmode, high);
1329         }
1330
1331         return high;
1332 }
1333
1334 static void grant_pending_locks(struct dlm_rsb *r)
1335 {
1336         struct dlm_lkb *lkb, *s;
1337         int high = DLM_LOCK_IV;
1338
1339         DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1340
1341         high = grant_pending_convert(r, high);
1342         high = grant_pending_wait(r, high);
1343
1344         if (high == DLM_LOCK_IV)
1345                 return;
1346
1347         /*
1348          * If there are locks left on the wait/convert queue then send blocking
1349          * ASTs to granted locks based on the largest requested mode (high)
1350          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1351          */
1352
1353         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1354                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1355                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1356                         queue_bast(r, lkb, high);
1357                         lkb->lkb_highbast = high;
1358                 }
1359         }
1360 }
1361
1362 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1363                             struct dlm_lkb *lkb)
1364 {
1365         struct dlm_lkb *gr;
1366
1367         list_for_each_entry(gr, head, lkb_statequeue) {
1368                 if (gr->lkb_bastaddr &&
1369                     gr->lkb_highbast < lkb->lkb_rqmode &&
1370                     !modes_compat(gr, lkb)) {
1371                         queue_bast(r, gr, lkb->lkb_rqmode);
1372                         gr->lkb_highbast = lkb->lkb_rqmode;
1373                 }
1374         }
1375 }
1376
1377 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1378 {
1379         send_bast_queue(r, &r->res_grantqueue, lkb);
1380 }
1381
1382 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383 {
1384         send_bast_queue(r, &r->res_grantqueue, lkb);
1385         send_bast_queue(r, &r->res_convertqueue, lkb);
1386 }
1387
1388 /* set_master(r, lkb) -- set the master nodeid of a resource
1389
1390    The purpose of this function is to set the nodeid field in the given
1391    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1392    known, it can just be copied to the lkb and the function will return
1393    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1394    before it can be copied to the lkb.
1395
1396    When the rsb nodeid is being looked up remotely, the initial lkb
1397    causing the lookup is kept on the ls_waiters list waiting for the
1398    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1399    on the rsb's res_lookup list until the master is verified.
1400
1401    Return values:
1402    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1403    1: the rsb master is not available and the lkb has been placed on
1404       a wait queue
1405 */
1406
1407 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408 {
1409         struct dlm_ls *ls = r->res_ls;
1410         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1411
1412         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1413                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1414                 r->res_first_lkid = lkb->lkb_id;
1415                 lkb->lkb_nodeid = r->res_nodeid;
1416                 return 0;
1417         }
1418
1419         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1420                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1421                 return 1;
1422         }
1423
1424         if (r->res_nodeid == 0) {
1425                 lkb->lkb_nodeid = 0;
1426                 return 0;
1427         }
1428
1429         if (r->res_nodeid > 0) {
1430                 lkb->lkb_nodeid = r->res_nodeid;
1431                 return 0;
1432         }
1433
1434         DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1435
1436         dir_nodeid = dlm_dir_nodeid(r);
1437
1438         if (dir_nodeid != our_nodeid) {
1439                 r->res_first_lkid = lkb->lkb_id;
1440                 send_lookup(r, lkb);
1441                 return 1;
1442         }
1443
1444         for (;;) {
1445                 /* It's possible for dlm_scand to remove an old rsb for
1446                    this same resource from the toss list, us to create
1447                    a new one, look up the master locally, and find it
1448                    already exists just before dlm_scand does the
1449                    dir_remove() on the previous rsb. */
1450
1451                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1452                                        r->res_length, &ret_nodeid);
1453                 if (!error)
1454                         break;
1455                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1456                 schedule();
1457         }
1458
1459         if (ret_nodeid == our_nodeid) {
1460                 r->res_first_lkid = 0;
1461                 r->res_nodeid = 0;
1462                 lkb->lkb_nodeid = 0;
1463         } else {
1464                 r->res_first_lkid = lkb->lkb_id;
1465                 r->res_nodeid = ret_nodeid;
1466                 lkb->lkb_nodeid = ret_nodeid;
1467         }
1468         return 0;
1469 }
1470
1471 static void process_lookup_list(struct dlm_rsb *r)
1472 {
1473         struct dlm_lkb *lkb, *safe;
1474
1475         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1476                 list_del(&lkb->lkb_rsb_lookup);
1477                 _request_lock(r, lkb);
1478                 schedule();
1479         }
1480 }
1481
1482 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1483
1484 static void confirm_master(struct dlm_rsb *r, int error)
1485 {
1486         struct dlm_lkb *lkb;
1487
1488         if (!r->res_first_lkid)
1489                 return;
1490
1491         switch (error) {
1492         case 0:
1493         case -EINPROGRESS:
1494                 r->res_first_lkid = 0;
1495                 process_lookup_list(r);
1496                 break;
1497
1498         case -EAGAIN:
1499                 /* the remote master didn't queue our NOQUEUE request;
1500                    make a waiting lkb the first_lkid */
1501
1502                 r->res_first_lkid = 0;
1503
1504                 if (!list_empty(&r->res_lookup)) {
1505                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1506                                          lkb_rsb_lookup);
1507                         list_del(&lkb->lkb_rsb_lookup);
1508                         r->res_first_lkid = lkb->lkb_id;
1509                         _request_lock(r, lkb);
1510                 } else
1511                         r->res_nodeid = -1;
1512                 break;
1513
1514         default:
1515                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1516         }
1517 }
1518
1519 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1520                          int namelen, uint32_t parent_lkid, void *ast,
1521                          void *astarg, void *bast, struct dlm_args *args)
1522 {
1523         int rv = -EINVAL;
1524
1525         /* check for invalid arg usage */
1526
1527         if (mode < 0 || mode > DLM_LOCK_EX)
1528                 goto out;
1529
1530         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1531                 goto out;
1532
1533         if (flags & DLM_LKF_CANCEL)
1534                 goto out;
1535
1536         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1537                 goto out;
1538
1539         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1540                 goto out;
1541
1542         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1543                 goto out;
1544
1545         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1546                 goto out;
1547
1548         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1549                 goto out;
1550
1551         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1552                 goto out;
1553
1554         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1555                 goto out;
1556
1557         if (!ast || !lksb)
1558                 goto out;
1559
1560         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1561                 goto out;
1562
1563         /* parent/child locks not yet supported */
1564         if (parent_lkid)
1565                 goto out;
1566
1567         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1568                 goto out;
1569
1570         /* these args will be copied to the lkb in validate_lock_args,
1571            it cannot be done now because when converting locks, fields in
1572            an active lkb cannot be modified before locking the rsb */
1573
1574         args->flags = flags;
1575         args->astaddr = ast;
1576         args->astparam = (long) astarg;
1577         args->bastaddr = bast;
1578         args->mode = mode;
1579         args->lksb = lksb;
1580         rv = 0;
1581  out:
1582         return rv;
1583 }
1584
1585 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1586 {
1587         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1588                       DLM_LKF_FORCEUNLOCK))
1589                 return -EINVAL;
1590
1591         args->flags = flags;
1592         args->astparam = (long) astarg;
1593         return 0;
1594 }
1595
1596 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1597                               struct dlm_args *args)
1598 {
1599         int rv = -EINVAL;
1600
1601         if (args->flags & DLM_LKF_CONVERT) {
1602                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1603                         goto out;
1604
1605                 if (args->flags & DLM_LKF_QUECVT &&
1606                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1607                         goto out;
1608
1609                 rv = -EBUSY;
1610                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1611                         goto out;
1612
1613                 if (lkb->lkb_wait_type)
1614                         goto out;
1615         }
1616
1617         lkb->lkb_exflags = args->flags;
1618         lkb->lkb_sbflags = 0;
1619         lkb->lkb_astaddr = args->astaddr;
1620         lkb->lkb_astparam = args->astparam;
1621         lkb->lkb_bastaddr = args->bastaddr;
1622         lkb->lkb_rqmode = args->mode;
1623         lkb->lkb_lksb = args->lksb;
1624         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1625         lkb->lkb_ownpid = (int) current->pid;
1626         rv = 0;
1627  out:
1628         return rv;
1629 }
1630
1631 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1632 {
1633         int rv = -EINVAL;
1634
1635         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1636                 goto out;
1637
1638         if (args->flags & DLM_LKF_FORCEUNLOCK)
1639                 goto out_ok;
1640
1641         if (args->flags & DLM_LKF_CANCEL &&
1642             lkb->lkb_status == DLM_LKSTS_GRANTED)
1643                 goto out;
1644
1645         if (!(args->flags & DLM_LKF_CANCEL) &&
1646             lkb->lkb_status != DLM_LKSTS_GRANTED)
1647                 goto out;
1648
1649         rv = -EBUSY;
1650         if (lkb->lkb_wait_type)
1651                 goto out;
1652
1653  out_ok:
1654         lkb->lkb_exflags = args->flags;
1655         lkb->lkb_sbflags = 0;
1656         lkb->lkb_astparam = args->astparam;
1657
1658         rv = 0;
1659  out:
1660         return rv;
1661 }
1662
1663 /*
1664  * Four stage 4 varieties:
1665  * do_request(), do_convert(), do_unlock(), do_cancel()
1666  * These are called on the master node for the given lock and
1667  * from the central locking logic.
1668  */
1669
1670 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1671 {
1672         int error = 0;
1673
1674         if (can_be_granted(r, lkb, 1)) {
1675                 grant_lock(r, lkb);
1676                 queue_cast(r, lkb, 0);
1677                 goto out;
1678         }
1679
1680         if (can_be_queued(lkb)) {
1681                 error = -EINPROGRESS;
1682                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1683                 send_blocking_asts(r, lkb);
1684                 goto out;
1685         }
1686
1687         error = -EAGAIN;
1688         if (force_blocking_asts(lkb))
1689                 send_blocking_asts_all(r, lkb);
1690         queue_cast(r, lkb, -EAGAIN);
1691
1692  out:
1693         return error;
1694 }
1695
1696 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697 {
1698         int error = 0;
1699
1700         /* changing an existing lock may allow others to be granted */
1701
1702         if (can_be_granted(r, lkb, 1)) {
1703                 grant_lock(r, lkb);
1704                 queue_cast(r, lkb, 0);
1705                 grant_pending_locks(r);
1706                 goto out;
1707         }
1708
1709         if (can_be_queued(lkb)) {
1710                 if (is_demoted(lkb))
1711                         grant_pending_locks(r);
1712                 error = -EINPROGRESS;
1713                 del_lkb(r, lkb);
1714                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1715                 send_blocking_asts(r, lkb);
1716                 goto out;
1717         }
1718
1719         error = -EAGAIN;
1720         if (force_blocking_asts(lkb))
1721                 send_blocking_asts_all(r, lkb);
1722         queue_cast(r, lkb, -EAGAIN);
1723
1724  out:
1725         return error;
1726 }
1727
1728 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1729 {
1730         remove_lock(r, lkb);
1731         queue_cast(r, lkb, -DLM_EUNLOCK);
1732         grant_pending_locks(r);
1733         return -DLM_EUNLOCK;
1734 }
1735
1736 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1737    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1738    completed (and queued a normal ast) just before the cancel; we don't
1739    want to clobber the sb_result for the normal ast with ECANCEL. */
1740    
1741 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1742 {
1743         revert_lock(r, lkb);
1744         queue_cast(r, lkb, -DLM_ECANCEL);
1745         grant_pending_locks(r);
1746         return -DLM_ECANCEL;
1747 }
1748
1749 /*
1750  * Four stage 3 varieties:
1751  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1752  */
1753
1754 /* add a new lkb to a possibly new rsb, called by requesting process */
1755
1756 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1757 {
1758         int error;
1759
1760         /* set_master: sets lkb nodeid from r */
1761
1762         error = set_master(r, lkb);
1763         if (error < 0)
1764                 goto out;
1765         if (error) {
1766                 error = 0;
1767                 goto out;
1768         }
1769
1770         if (is_remote(r))
1771                 /* receive_request() calls do_request() on remote node */
1772                 error = send_request(r, lkb);
1773         else
1774                 error = do_request(r, lkb);
1775  out:
1776         return error;
1777 }
1778
1779 /* change some property of an existing lkb, e.g. mode */
1780
1781 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782 {
1783         int error;
1784
1785         if (is_remote(r))
1786                 /* receive_convert() calls do_convert() on remote node */
1787                 error = send_convert(r, lkb);
1788         else
1789                 error = do_convert(r, lkb);
1790
1791         return error;
1792 }
1793
1794 /* remove an existing lkb from the granted queue */
1795
1796 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1797 {
1798         int error;
1799
1800         if (is_remote(r))
1801                 /* receive_unlock() calls do_unlock() on remote node */
1802                 error = send_unlock(r, lkb);
1803         else
1804                 error = do_unlock(r, lkb);
1805
1806         return error;
1807 }
1808
1809 /* remove an existing lkb from the convert or wait queue */
1810
1811 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812 {
1813         int error;
1814
1815         if (is_remote(r))
1816                 /* receive_cancel() calls do_cancel() on remote node */
1817                 error = send_cancel(r, lkb);
1818         else
1819                 error = do_cancel(r, lkb);
1820
1821         return error;
1822 }
1823
1824 /*
1825  * Four stage 2 varieties:
1826  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1827  */
1828
1829 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1830                         int len, struct dlm_args *args)
1831 {
1832         struct dlm_rsb *r;
1833         int error;
1834
1835         error = validate_lock_args(ls, lkb, args);
1836         if (error)
1837                 goto out;
1838
1839         error = find_rsb(ls, name, len, R_CREATE, &r);
1840         if (error)
1841                 goto out;
1842
1843         lock_rsb(r);
1844
1845         attach_lkb(r, lkb);
1846         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1847
1848         error = _request_lock(r, lkb);
1849
1850         unlock_rsb(r);
1851         put_rsb(r);
1852
1853  out:
1854         return error;
1855 }
1856
1857 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1858                         struct dlm_args *args)
1859 {
1860         struct dlm_rsb *r;
1861         int error;
1862
1863         r = lkb->lkb_resource;
1864
1865         hold_rsb(r);
1866         lock_rsb(r);
1867
1868         error = validate_lock_args(ls, lkb, args);
1869         if (error)
1870                 goto out;
1871
1872         error = _convert_lock(r, lkb);
1873  out:
1874         unlock_rsb(r);
1875         put_rsb(r);
1876         return error;
1877 }
1878
1879 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1880                        struct dlm_args *args)
1881 {
1882         struct dlm_rsb *r;
1883         int error;
1884
1885         r = lkb->lkb_resource;
1886
1887         hold_rsb(r);
1888         lock_rsb(r);
1889
1890         error = validate_unlock_args(lkb, args);
1891         if (error)
1892                 goto out;
1893
1894         error = _unlock_lock(r, lkb);
1895  out:
1896         unlock_rsb(r);
1897         put_rsb(r);
1898         return error;
1899 }
1900
1901 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1902                        struct dlm_args *args)
1903 {
1904         struct dlm_rsb *r;
1905         int error;
1906
1907         r = lkb->lkb_resource;
1908
1909         hold_rsb(r);
1910         lock_rsb(r);
1911
1912         error = validate_unlock_args(lkb, args);
1913         if (error)
1914                 goto out;
1915
1916         error = _cancel_lock(r, lkb);
1917  out:
1918         unlock_rsb(r);
1919         put_rsb(r);
1920         return error;
1921 }
1922
1923 /*
1924  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1925  */
1926
1927 int dlm_lock(dlm_lockspace_t *lockspace,
1928              int mode,
1929              struct dlm_lksb *lksb,
1930              uint32_t flags,
1931              void *name,
1932              unsigned int namelen,
1933              uint32_t parent_lkid,
1934              void (*ast) (void *astarg),
1935              void *astarg,
1936              void (*bast) (void *astarg, int mode))
1937 {
1938         struct dlm_ls *ls;
1939         struct dlm_lkb *lkb;
1940         struct dlm_args args;
1941         int error, convert = flags & DLM_LKF_CONVERT;
1942
1943         ls = dlm_find_lockspace_local(lockspace);
1944         if (!ls)
1945                 return -EINVAL;
1946
1947         lock_recovery(ls);
1948
1949         if (convert)
1950                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1951         else
1952                 error = create_lkb(ls, &lkb);
1953
1954         if (error)
1955                 goto out;
1956
1957         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1958                               astarg, bast, &args);
1959         if (error)
1960                 goto out_put;
1961
1962         if (convert)
1963                 error = convert_lock(ls, lkb, &args);
1964         else
1965                 error = request_lock(ls, lkb, name, namelen, &args);
1966
1967         if (error == -EINPROGRESS)
1968                 error = 0;
1969  out_put:
1970         if (convert || error)
1971                 __put_lkb(ls, lkb);
1972         if (error == -EAGAIN)
1973                 error = 0;
1974  out:
1975         unlock_recovery(ls);
1976         dlm_put_lockspace(ls);
1977         return error;
1978 }
1979
1980 int dlm_unlock(dlm_lockspace_t *lockspace,
1981                uint32_t lkid,
1982                uint32_t flags,
1983                struct dlm_lksb *lksb,
1984                void *astarg)
1985 {
1986         struct dlm_ls *ls;
1987         struct dlm_lkb *lkb;
1988         struct dlm_args args;
1989         int error;
1990
1991         ls = dlm_find_lockspace_local(lockspace);
1992         if (!ls)
1993                 return -EINVAL;
1994
1995         lock_recovery(ls);
1996
1997         error = find_lkb(ls, lkid, &lkb);
1998         if (error)
1999                 goto out;
2000
2001         error = set_unlock_args(flags, astarg, &args);
2002         if (error)
2003                 goto out_put;
2004
2005         if (flags & DLM_LKF_CANCEL)
2006                 error = cancel_lock(ls, lkb, &args);
2007         else
2008                 error = unlock_lock(ls, lkb, &args);
2009
2010         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2011                 error = 0;
2012  out_put:
2013         dlm_put_lkb(lkb);
2014  out:
2015         unlock_recovery(ls);
2016         dlm_put_lockspace(ls);
2017         return error;
2018 }
2019
2020 /*
2021  * send/receive routines for remote operations and replies
2022  *
2023  * send_args
2024  * send_common
2025  * send_request                 receive_request
2026  * send_convert                 receive_convert
2027  * send_unlock                  receive_unlock
2028  * send_cancel                  receive_cancel
2029  * send_grant                   receive_grant
2030  * send_bast                    receive_bast
2031  * send_lookup                  receive_lookup
2032  * send_remove                  receive_remove
2033  *
2034  *                              send_common_reply
2035  * receive_request_reply        send_request_reply
2036  * receive_convert_reply        send_convert_reply
2037  * receive_unlock_reply         send_unlock_reply
2038  * receive_cancel_reply         send_cancel_reply
2039  * receive_lookup_reply         send_lookup_reply
2040  */
2041
2042 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2043                           int to_nodeid, int mstype,
2044                           struct dlm_message **ms_ret,
2045                           struct dlm_mhandle **mh_ret)
2046 {
2047         struct dlm_message *ms;
2048         struct dlm_mhandle *mh;
2049         char *mb;
2050         int mb_len = sizeof(struct dlm_message);
2051
2052         switch (mstype) {
2053         case DLM_MSG_REQUEST:
2054         case DLM_MSG_LOOKUP:
2055         case DLM_MSG_REMOVE:
2056                 mb_len += r->res_length;
2057                 break;
2058         case DLM_MSG_CONVERT:
2059         case DLM_MSG_UNLOCK:
2060         case DLM_MSG_REQUEST_REPLY:
2061         case DLM_MSG_CONVERT_REPLY:
2062         case DLM_MSG_GRANT:
2063                 if (lkb && lkb->lkb_lvbptr)
2064                         mb_len += r->res_ls->ls_lvblen;
2065                 break;
2066         }
2067
2068         /* get_buffer gives us a message handle (mh) that we need to
2069            pass into lowcomms_commit and a message buffer (mb) that we
2070            write our data into */
2071
2072         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2073         if (!mh)
2074                 return -ENOBUFS;
2075
2076         memset(mb, 0, mb_len);
2077
2078         ms = (struct dlm_message *) mb;
2079
2080         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2081         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2082         ms->m_header.h_nodeid = dlm_our_nodeid();
2083         ms->m_header.h_length = mb_len;
2084         ms->m_header.h_cmd = DLM_MSG;
2085
2086         ms->m_type = mstype;
2087
2088         *mh_ret = mh;
2089         *ms_ret = ms;
2090         return 0;
2091 }
2092
2093 /* further lowcomms enhancements or alternate implementations may make
2094    the return value from this function useful at some point */
2095
2096 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2097 {
2098         dlm_message_out(ms);
2099         dlm_lowcomms_commit_buffer(mh);
2100         return 0;
2101 }
2102
2103 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2104                       struct dlm_message *ms)
2105 {
2106         ms->m_nodeid   = lkb->lkb_nodeid;
2107         ms->m_pid      = lkb->lkb_ownpid;
2108         ms->m_lkid     = lkb->lkb_id;
2109         ms->m_remid    = lkb->lkb_remid;
2110         ms->m_exflags  = lkb->lkb_exflags;
2111         ms->m_sbflags  = lkb->lkb_sbflags;
2112         ms->m_flags    = lkb->lkb_flags;
2113         ms->m_lvbseq   = lkb->lkb_lvbseq;
2114         ms->m_status   = lkb->lkb_status;
2115         ms->m_grmode   = lkb->lkb_grmode;
2116         ms->m_rqmode   = lkb->lkb_rqmode;
2117         ms->m_hash     = r->res_hash;
2118
2119         /* m_result and m_bastmode are set from function args,
2120            not from lkb fields */
2121
2122         if (lkb->lkb_bastaddr)
2123                 ms->m_asts |= AST_BAST;
2124         if (lkb->lkb_astaddr)
2125                 ms->m_asts |= AST_COMP;
2126
2127         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2128                 memcpy(ms->m_extra, r->res_name, r->res_length);
2129
2130         else if (lkb->lkb_lvbptr)
2131                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2132
2133 }
2134
2135 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2136 {
2137         struct dlm_message *ms;
2138         struct dlm_mhandle *mh;
2139         int to_nodeid, error;
2140
2141         add_to_waiters(lkb, mstype);
2142
2143         to_nodeid = r->res_nodeid;
2144
2145         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2146         if (error)
2147                 goto fail;
2148
2149         send_args(r, lkb, ms);
2150
2151         error = send_message(mh, ms);
2152         if (error)
2153                 goto fail;
2154         return 0;
2155
2156  fail:
2157         remove_from_waiters(lkb);
2158         return error;
2159 }
2160
2161 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2162 {
2163         return send_common(r, lkb, DLM_MSG_REQUEST);
2164 }
2165
2166 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2167 {
2168         int error;
2169
2170         error = send_common(r, lkb, DLM_MSG_CONVERT);
2171
2172         /* down conversions go without a reply from the master */
2173         if (!error && down_conversion(lkb)) {
2174                 remove_from_waiters(lkb);
2175                 r->res_ls->ls_stub_ms.m_result = 0;
2176                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2177         }
2178
2179         return error;
2180 }
2181
2182 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2183    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2184    that the master is still correct. */
2185
2186 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2187 {
2188         return send_common(r, lkb, DLM_MSG_UNLOCK);
2189 }
2190
2191 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2192 {
2193         return send_common(r, lkb, DLM_MSG_CANCEL);
2194 }
2195
2196 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197 {
2198         struct dlm_message *ms;
2199         struct dlm_mhandle *mh;
2200         int to_nodeid, error;
2201
2202         to_nodeid = lkb->lkb_nodeid;
2203
2204         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2205         if (error)
2206                 goto out;
2207
2208         send_args(r, lkb, ms);
2209
2210         ms->m_result = 0;
2211
2212         error = send_message(mh, ms);
2213  out:
2214         return error;
2215 }
2216
2217 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2218 {
2219         struct dlm_message *ms;
2220         struct dlm_mhandle *mh;
2221         int to_nodeid, error;
2222
2223         to_nodeid = lkb->lkb_nodeid;
2224
2225         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2226         if (error)
2227                 goto out;
2228
2229         send_args(r, lkb, ms);
2230
2231         ms->m_bastmode = mode;
2232
2233         error = send_message(mh, ms);
2234  out:
2235         return error;
2236 }
2237
2238 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2239 {
2240         struct dlm_message *ms;
2241         struct dlm_mhandle *mh;
2242         int to_nodeid, error;
2243
2244         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2245
2246         to_nodeid = dlm_dir_nodeid(r);
2247
2248         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2249         if (error)
2250                 goto fail;
2251
2252         send_args(r, lkb, ms);
2253
2254         error = send_message(mh, ms);
2255         if (error)
2256                 goto fail;
2257         return 0;
2258
2259  fail:
2260         remove_from_waiters(lkb);
2261         return error;
2262 }
2263
2264 static int send_remove(struct dlm_rsb *r)
2265 {
2266         struct dlm_message *ms;
2267         struct dlm_mhandle *mh;
2268         int to_nodeid, error;
2269
2270         to_nodeid = dlm_dir_nodeid(r);
2271
2272         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2273         if (error)
2274                 goto out;
2275
2276         memcpy(ms->m_extra, r->res_name, r->res_length);
2277         ms->m_hash = r->res_hash;
2278
2279         error = send_message(mh, ms);
2280  out:
2281         return error;
2282 }
2283
2284 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2285                              int mstype, int rv)
2286 {
2287         struct dlm_message *ms;
2288         struct dlm_mhandle *mh;
2289         int to_nodeid, error;
2290
2291         to_nodeid = lkb->lkb_nodeid;
2292
2293         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2294         if (error)
2295                 goto out;
2296
2297         send_args(r, lkb, ms);
2298
2299         ms->m_result = rv;
2300
2301         error = send_message(mh, ms);
2302  out:
2303         return error;
2304 }
2305
2306 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2307 {
2308         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2309 }
2310
2311 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2312 {
2313         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2314 }
2315
2316 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2317 {
2318         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2319 }
2320
2321 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2322 {
2323         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2324 }
2325
2326 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2327                              int ret_nodeid, int rv)
2328 {
2329         struct dlm_rsb *r = &ls->ls_stub_rsb;
2330         struct dlm_message *ms;
2331         struct dlm_mhandle *mh;
2332         int error, nodeid = ms_in->m_header.h_nodeid;
2333
2334         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2335         if (error)
2336                 goto out;
2337
2338         ms->m_lkid = ms_in->m_lkid;
2339         ms->m_result = rv;
2340         ms->m_nodeid = ret_nodeid;
2341
2342         error = send_message(mh, ms);
2343  out:
2344         return error;
2345 }
2346
2347 /* which args we save from a received message depends heavily on the type
2348    of message, unlike the send side where we can safely send everything about
2349    the lkb for any type of message */
2350
2351 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2352 {
2353         lkb->lkb_exflags = ms->m_exflags;
2354         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2355                          (ms->m_flags & 0x0000FFFF);
2356 }
2357
2358 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2359 {
2360         lkb->lkb_sbflags = ms->m_sbflags;
2361         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2362                          (ms->m_flags & 0x0000FFFF);
2363 }
2364
2365 static int receive_extralen(struct dlm_message *ms)
2366 {
2367         return (ms->m_header.h_length - sizeof(struct dlm_message));
2368 }
2369
2370 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2371                        struct dlm_message *ms)
2372 {
2373         int len;
2374
2375         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2376                 if (!lkb->lkb_lvbptr)
2377                         lkb->lkb_lvbptr = allocate_lvb(ls);
2378                 if (!lkb->lkb_lvbptr)
2379                         return -ENOMEM;
2380                 len = receive_extralen(ms);
2381                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2382         }
2383         return 0;
2384 }
2385
2386 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2387                                 struct dlm_message *ms)
2388 {
2389         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2390         lkb->lkb_ownpid = ms->m_pid;
2391         lkb->lkb_remid = ms->m_lkid;
2392         lkb->lkb_grmode = DLM_LOCK_IV;
2393         lkb->lkb_rqmode = ms->m_rqmode;
2394         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2395         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2396
2397         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2398
2399         if (receive_lvb(ls, lkb, ms))
2400                 return -ENOMEM;
2401
2402         return 0;
2403 }
2404
2405 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406                                 struct dlm_message *ms)
2407 {
2408         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2409                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2410                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2411                           lkb->lkb_id, lkb->lkb_remid);
2412                 return -EINVAL;
2413         }
2414
2415         if (!is_master_copy(lkb))
2416                 return -EINVAL;
2417
2418         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2419                 return -EBUSY;
2420
2421         if (receive_lvb(ls, lkb, ms))
2422                 return -ENOMEM;
2423
2424         lkb->lkb_rqmode = ms->m_rqmode;
2425         lkb->lkb_lvbseq = ms->m_lvbseq;
2426
2427         return 0;
2428 }
2429
2430 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2431                                struct dlm_message *ms)
2432 {
2433         if (!is_master_copy(lkb))
2434                 return -EINVAL;
2435         if (receive_lvb(ls, lkb, ms))
2436                 return -ENOMEM;
2437         return 0;
2438 }
2439
2440 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2441    uses to send a reply and that the remote end uses to process the reply. */
2442
2443 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2444 {
2445         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2446         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2447         lkb->lkb_remid = ms->m_lkid;
2448 }
2449
2450 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2451 {
2452         struct dlm_lkb *lkb;
2453         struct dlm_rsb *r;
2454         int error, namelen;
2455
2456         error = create_lkb(ls, &lkb);
2457         if (error)
2458                 goto fail;
2459
2460         receive_flags(lkb, ms);
2461         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2462         error = receive_request_args(ls, lkb, ms);
2463         if (error) {
2464                 __put_lkb(ls, lkb);
2465                 goto fail;
2466         }
2467
2468         namelen = receive_extralen(ms);
2469
2470         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2471         if (error) {
2472                 __put_lkb(ls, lkb);
2473                 goto fail;
2474         }
2475
2476         lock_rsb(r);
2477
2478         attach_lkb(r, lkb);
2479         error = do_request(r, lkb);
2480         send_request_reply(r, lkb, error);
2481
2482         unlock_rsb(r);
2483         put_rsb(r);
2484
2485         if (error == -EINPROGRESS)
2486                 error = 0;
2487         if (error)
2488                 dlm_put_lkb(lkb);
2489         return;
2490
2491  fail:
2492         setup_stub_lkb(ls, ms);
2493         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2494 }
2495
2496 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2497 {
2498         struct dlm_lkb *lkb;
2499         struct dlm_rsb *r;
2500         int error, reply = 1;
2501
2502         error = find_lkb(ls, ms->m_remid, &lkb);
2503         if (error)
2504                 goto fail;
2505
2506         r = lkb->lkb_resource;
2507
2508         hold_rsb(r);
2509         lock_rsb(r);
2510
2511         receive_flags(lkb, ms);
2512         error = receive_convert_args(ls, lkb, ms);
2513         if (error)
2514                 goto out;
2515         reply = !down_conversion(lkb);
2516
2517         error = do_convert(r, lkb);
2518  out:
2519         if (reply)
2520                 send_convert_reply(r, lkb, error);
2521
2522         unlock_rsb(r);
2523         put_rsb(r);
2524         dlm_put_lkb(lkb);
2525         return;
2526
2527  fail:
2528         setup_stub_lkb(ls, ms);
2529         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2530 }
2531
2532 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2533 {
2534         struct dlm_lkb *lkb;
2535         struct dlm_rsb *r;
2536         int error;
2537
2538         error = find_lkb(ls, ms->m_remid, &lkb);
2539         if (error)
2540                 goto fail;
2541
2542         r = lkb->lkb_resource;
2543
2544         hold_rsb(r);
2545         lock_rsb(r);
2546
2547         receive_flags(lkb, ms);
2548         error = receive_unlock_args(ls, lkb, ms);
2549         if (error)
2550                 goto out;
2551
2552         error = do_unlock(r, lkb);
2553  out:
2554         send_unlock_reply(r, lkb, error);
2555
2556         unlock_rsb(r);
2557         put_rsb(r);
2558         dlm_put_lkb(lkb);
2559         return;
2560
2561  fail:
2562         setup_stub_lkb(ls, ms);
2563         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2564 }
2565
2566 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2567 {
2568         struct dlm_lkb *lkb;
2569         struct dlm_rsb *r;
2570         int error;
2571
2572         error = find_lkb(ls, ms->m_remid, &lkb);
2573         if (error)
2574                 goto fail;
2575
2576         receive_flags(lkb, ms);
2577
2578         r = lkb->lkb_resource;
2579
2580         hold_rsb(r);
2581         lock_rsb(r);
2582
2583         error = do_cancel(r, lkb);
2584         send_cancel_reply(r, lkb, error);
2585
2586         unlock_rsb(r);
2587         put_rsb(r);
2588         dlm_put_lkb(lkb);
2589         return;
2590
2591  fail:
2592         setup_stub_lkb(ls, ms);
2593         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2594 }
2595
2596 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2597 {
2598         struct dlm_lkb *lkb;
2599         struct dlm_rsb *r;
2600         int error;
2601
2602         error = find_lkb(ls, ms->m_remid, &lkb);
2603         if (error) {
2604                 log_error(ls, "receive_grant no lkb");
2605                 return;
2606         }
2607         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2608
2609         r = lkb->lkb_resource;
2610
2611         hold_rsb(r);
2612         lock_rsb(r);
2613
2614         receive_flags_reply(lkb, ms);
2615         grant_lock_pc(r, lkb, ms);
2616         queue_cast(r, lkb, 0);
2617
2618         unlock_rsb(r);
2619         put_rsb(r);
2620         dlm_put_lkb(lkb);
2621 }
2622
2623 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2624 {
2625         struct dlm_lkb *lkb;
2626         struct dlm_rsb *r;
2627         int error;
2628
2629         error = find_lkb(ls, ms->m_remid, &lkb);
2630         if (error) {
2631                 log_error(ls, "receive_bast no lkb");
2632                 return;
2633         }
2634         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2635
2636         r = lkb->lkb_resource;
2637
2638         hold_rsb(r);
2639         lock_rsb(r);
2640
2641         queue_bast(r, lkb, ms->m_bastmode);
2642
2643         unlock_rsb(r);
2644         put_rsb(r);
2645         dlm_put_lkb(lkb);
2646 }
2647
2648 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2649 {
2650         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2651
2652         from_nodeid = ms->m_header.h_nodeid;
2653         our_nodeid = dlm_our_nodeid();
2654
2655         len = receive_extralen(ms);
2656
2657         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2658         if (dir_nodeid != our_nodeid) {
2659                 log_error(ls, "lookup dir_nodeid %d from %d",
2660                           dir_nodeid, from_nodeid);
2661                 error = -EINVAL;
2662                 ret_nodeid = -1;
2663                 goto out;
2664         }
2665
2666         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2667
2668         /* Optimization: we're master so treat lookup as a request */
2669         if (!error && ret_nodeid == our_nodeid) {
2670                 receive_request(ls, ms);
2671                 return;
2672         }
2673  out:
2674         send_lookup_reply(ls, ms, ret_nodeid, error);
2675 }
2676
2677 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2678 {
2679         int len, dir_nodeid, from_nodeid;
2680
2681         from_nodeid = ms->m_header.h_nodeid;
2682
2683         len = receive_extralen(ms);
2684
2685         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2686         if (dir_nodeid != dlm_our_nodeid()) {
2687                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2688                           dir_nodeid, from_nodeid);
2689                 return;
2690         }
2691
2692         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2693 }
2694
2695 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2696 {
2697         struct dlm_lkb *lkb;
2698         struct dlm_rsb *r;
2699         int error, mstype;
2700
2701         error = find_lkb(ls, ms->m_remid, &lkb);
2702         if (error) {
2703                 log_error(ls, "receive_request_reply no lkb");
2704                 return;
2705         }
2706         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2707
2708         mstype = lkb->lkb_wait_type;
2709         error = remove_from_waiters(lkb);
2710         if (error) {
2711                 log_error(ls, "receive_request_reply not on waiters");
2712                 goto out;
2713         }
2714
2715         /* this is the value returned from do_request() on the master */
2716         error = ms->m_result;
2717
2718         r = lkb->lkb_resource;
2719         hold_rsb(r);
2720         lock_rsb(r);
2721
2722         /* Optimization: the dir node was also the master, so it took our
2723            lookup as a request and sent request reply instead of lookup reply */
2724         if (mstype == DLM_MSG_LOOKUP) {
2725                 r->res_nodeid = ms->m_header.h_nodeid;
2726                 lkb->lkb_nodeid = r->res_nodeid;
2727         }
2728
2729         switch (error) {
2730         case -EAGAIN:
2731                 /* request would block (be queued) on remote master;
2732                    the unhold undoes the original ref from create_lkb()
2733                    so it leads to the lkb being freed */
2734                 queue_cast(r, lkb, -EAGAIN);
2735                 confirm_master(r, -EAGAIN);
2736                 unhold_lkb(lkb);
2737                 break;
2738
2739         case -EINPROGRESS:
2740         case 0:
2741                 /* request was queued or granted on remote master */
2742                 receive_flags_reply(lkb, ms);
2743                 lkb->lkb_remid = ms->m_lkid;
2744                 if (error)
2745                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2746                 else {
2747                         grant_lock_pc(r, lkb, ms);
2748                         queue_cast(r, lkb, 0);
2749                 }
2750                 confirm_master(r, error);
2751                 break;
2752
2753         case -EBADR:
2754         case -ENOTBLK:
2755                 /* find_rsb failed to find rsb or rsb wasn't master */
2756                 r->res_nodeid = -1;
2757                 lkb->lkb_nodeid = -1;
2758                 _request_lock(r, lkb);
2759                 break;
2760
2761         default:
2762                 log_error(ls, "receive_request_reply error %d", error);
2763         }
2764
2765         unlock_rsb(r);
2766         put_rsb(r);
2767  out:
2768         dlm_put_lkb(lkb);
2769 }
2770
2771 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2772                                     struct dlm_message *ms)
2773 {
2774         int error = ms->m_result;
2775
2776         /* this is the value returned from do_convert() on the master */
2777
2778         switch (error) {
2779         case -EAGAIN:
2780                 /* convert would block (be queued) on remote master */
2781                 queue_cast(r, lkb, -EAGAIN);
2782                 break;
2783
2784         case -EINPROGRESS:
2785                 /* convert was queued on remote master */
2786                 del_lkb(r, lkb);
2787                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2788                 break;
2789
2790         case 0:
2791                 /* convert was granted on remote master */
2792                 receive_flags_reply(lkb, ms);
2793                 grant_lock_pc(r, lkb, ms);
2794                 queue_cast(r, lkb, 0);
2795                 break;
2796
2797         default:
2798                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2799         }
2800 }
2801
2802 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2803 {
2804         struct dlm_rsb *r = lkb->lkb_resource;
2805
2806         hold_rsb(r);
2807         lock_rsb(r);
2808
2809         __receive_convert_reply(r, lkb, ms);
2810
2811         unlock_rsb(r);
2812         put_rsb(r);
2813 }
2814
2815 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2816 {
2817         struct dlm_lkb *lkb;
2818         int error;
2819
2820         error = find_lkb(ls, ms->m_remid, &lkb);
2821         if (error) {
2822                 log_error(ls, "receive_convert_reply no lkb");
2823                 return;
2824         }
2825         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2826
2827         error = remove_from_waiters(lkb);
2828         if (error) {
2829                 log_error(ls, "receive_convert_reply not on waiters");
2830                 goto out;
2831         }
2832
2833         _receive_convert_reply(lkb, ms);
2834  out:
2835         dlm_put_lkb(lkb);
2836 }
2837
2838 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2839 {
2840         struct dlm_rsb *r = lkb->lkb_resource;
2841         int error = ms->m_result;
2842
2843         hold_rsb(r);
2844         lock_rsb(r);
2845
2846         /* this is the value returned from do_unlock() on the master */
2847
2848         switch (error) {
2849         case -DLM_EUNLOCK:
2850                 receive_flags_reply(lkb, ms);
2851                 remove_lock_pc(r, lkb);
2852                 queue_cast(r, lkb, -DLM_EUNLOCK);
2853                 break;
2854         default:
2855                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2856         }
2857
2858         unlock_rsb(r);
2859         put_rsb(r);
2860 }
2861
2862 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2863 {
2864         struct dlm_lkb *lkb;
2865         int error;
2866
2867         error = find_lkb(ls, ms->m_remid, &lkb);
2868         if (error) {
2869                 log_error(ls, "receive_unlock_reply no lkb");
2870                 return;
2871         }
2872         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2873
2874         error = remove_from_waiters(lkb);
2875         if (error) {
2876                 log_error(ls, "receive_unlock_reply not on waiters");
2877                 goto out;
2878         }
2879
2880         _receive_unlock_reply(lkb, ms);
2881  out:
2882         dlm_put_lkb(lkb);
2883 }
2884
2885 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2886 {
2887         struct dlm_rsb *r = lkb->lkb_resource;
2888         int error = ms->m_result;
2889
2890         hold_rsb(r);
2891         lock_rsb(r);
2892
2893         /* this is the value returned from do_cancel() on the master */
2894
2895         switch (error) {
2896         case -DLM_ECANCEL:
2897                 receive_flags_reply(lkb, ms);
2898                 revert_lock_pc(r, lkb);
2899                 queue_cast(r, lkb, -DLM_ECANCEL);
2900                 break;
2901         default:
2902                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2903         }
2904
2905         unlock_rsb(r);
2906         put_rsb(r);
2907 }
2908
2909 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2910 {
2911         struct dlm_lkb *lkb;
2912         int error;
2913
2914         error = find_lkb(ls, ms->m_remid, &lkb);
2915         if (error) {
2916                 log_error(ls, "receive_cancel_reply no lkb");
2917                 return;
2918         }
2919         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2920
2921         error = remove_from_waiters(lkb);
2922         if (error) {
2923                 log_error(ls, "receive_cancel_reply not on waiters");
2924                 goto out;
2925         }
2926
2927         _receive_cancel_reply(lkb, ms);
2928  out:
2929         dlm_put_lkb(lkb);
2930 }
2931
2932 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2933 {
2934         struct dlm_lkb *lkb;
2935         struct dlm_rsb *r;
2936         int error, ret_nodeid;
2937
2938         error = find_lkb(ls, ms->m_lkid, &lkb);
2939         if (error) {
2940                 log_error(ls, "receive_lookup_reply no lkb");
2941                 return;
2942         }
2943
2944         error = remove_from_waiters(lkb);
2945         if (error) {
2946                 log_error(ls, "receive_lookup_reply not on waiters");
2947                 goto out;
2948         }
2949
2950         /* this is the value returned by dlm_dir_lookup on dir node
2951            FIXME: will a non-zero error ever be returned? */
2952         error = ms->m_result;
2953
2954         r = lkb->lkb_resource;
2955         hold_rsb(r);
2956         lock_rsb(r);
2957
2958         ret_nodeid = ms->m_nodeid;
2959         if (ret_nodeid == dlm_our_nodeid()) {
2960                 r->res_nodeid = 0;
2961                 ret_nodeid = 0;
2962                 r->res_first_lkid = 0;
2963         } else {
2964                 /* set_master() will copy res_nodeid to lkb_nodeid */
2965                 r->res_nodeid = ret_nodeid;
2966         }
2967
2968         _request_lock(r, lkb);
2969
2970         if (!ret_nodeid)
2971                 process_lookup_list(r);
2972
2973         unlock_rsb(r);
2974         put_rsb(r);
2975  out:
2976         dlm_put_lkb(lkb);
2977 }
2978
2979 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2980 {
2981         struct dlm_message *ms = (struct dlm_message *) hd;
2982         struct dlm_ls *ls;
2983         int error;
2984
2985         if (!recovery)
2986                 dlm_message_in(ms);
2987
2988         ls = dlm_find_lockspace_global(hd->h_lockspace);
2989         if (!ls) {
2990                 log_print("drop message %d from %d for unknown lockspace %d",
2991                           ms->m_type, nodeid, hd->h_lockspace);
2992                 return -EINVAL;
2993         }
2994
2995         /* recovery may have just ended leaving a bunch of backed-up requests
2996            in the requestqueue; wait while dlm_recoverd clears them */
2997
2998         if (!recovery)
2999                 dlm_wait_requestqueue(ls);
3000
3001         /* recovery may have just started while there were a bunch of
3002            in-flight requests -- save them in requestqueue to be processed
3003            after recovery.  we can't let dlm_recvd block on the recovery
3004            lock.  if dlm_recoverd is calling this function to clear the
3005            requestqueue, it needs to be interrupted (-EINTR) if another
3006            recovery operation is starting. */
3007
3008         while (1) {
3009                 if (dlm_locking_stopped(ls)) {
3010                         if (!recovery)
3011                                 dlm_add_requestqueue(ls, nodeid, hd);
3012                         error = -EINTR;
3013                         goto out;
3014                 }
3015
3016                 if (lock_recovery_try(ls))
3017                         break;
3018                 schedule();
3019         }
3020
3021         switch (ms->m_type) {
3022
3023         /* messages sent to a master node */
3024
3025         case DLM_MSG_REQUEST:
3026                 receive_request(ls, ms);
3027                 break;
3028
3029         case DLM_MSG_CONVERT:
3030                 receive_convert(ls, ms);
3031                 break;
3032
3033         case DLM_MSG_UNLOCK:
3034                 receive_unlock(ls, ms);
3035                 break;
3036
3037         case DLM_MSG_CANCEL:
3038                 receive_cancel(ls, ms);
3039                 break;
3040
3041         /* messages sent from a master node (replies to above) */
3042
3043         case DLM_MSG_REQUEST_REPLY:
3044                 receive_request_reply(ls, ms);
3045                 break;
3046
3047         case DLM_MSG_CONVERT_REPLY:
3048                 receive_convert_reply(ls, ms);
3049                 break;
3050
3051         case DLM_MSG_UNLOCK_REPLY:
3052                 receive_unlock_reply(ls, ms);
3053                 break;
3054
3055         case DLM_MSG_CANCEL_REPLY:
3056                 receive_cancel_reply(ls, ms);
3057                 break;
3058
3059         /* messages sent from a master node (only two types of async msg) */
3060
3061         case DLM_MSG_GRANT:
3062                 receive_grant(ls, ms);
3063                 break;
3064
3065         case DLM_MSG_BAST:
3066                 receive_bast(ls, ms);
3067                 break;
3068
3069         /* messages sent to a dir node */
3070
3071         case DLM_MSG_LOOKUP:
3072                 receive_lookup(ls, ms);
3073                 break;
3074
3075         case DLM_MSG_REMOVE:
3076                 receive_remove(ls, ms);
3077                 break;
3078
3079         /* messages sent from a dir node (remove has no reply) */
3080
3081         case DLM_MSG_LOOKUP_REPLY:
3082                 receive_lookup_reply(ls, ms);
3083                 break;
3084
3085         default:
3086                 log_error(ls, "unknown message type %d", ms->m_type);
3087         }
3088
3089         unlock_recovery(ls);
3090  out:
3091         dlm_put_lockspace(ls);
3092         dlm_astd_wake();
3093         return 0;
3094 }
3095
3096
3097 /*
3098  * Recovery related
3099  */
3100
3101 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3102 {
3103         if (middle_conversion(lkb)) {
3104                 hold_lkb(lkb);
3105                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3106                 _remove_from_waiters(lkb);
3107                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3108
3109                 /* Same special case as in receive_rcom_lock_args() */
3110                 lkb->lkb_grmode = DLM_LOCK_IV;
3111                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3112                 unhold_lkb(lkb);
3113
3114         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3115                 lkb->lkb_flags |= DLM_IFL_RESEND;
3116         }
3117
3118         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3119            conversions are async; there's no reply from the remote master */
3120 }
3121
3122 /* A waiting lkb needs recovery if the master node has failed, or
3123    the master node is changing (only when no directory is used) */
3124
3125 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3126 {
3127         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3128                 return 1;
3129
3130         if (!dlm_no_directory(ls))
3131                 return 0;
3132
3133         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3134                 return 1;
3135
3136         return 0;
3137 }
3138
3139 /* Recovery for locks that are waiting for replies from nodes that are now
3140    gone.  We can just complete unlocks and cancels by faking a reply from the
3141    dead node.  Requests and up-conversions we flag to be resent after
3142    recovery.  Down-conversions can just be completed with a fake reply like
3143    unlocks.  Conversions between PR and CW need special attention. */
3144
3145 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3146 {
3147         struct dlm_lkb *lkb, *safe;
3148
3149         mutex_lock(&ls->ls_waiters_mutex);
3150
3151         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3152                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3153                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3154
3155                 /* all outstanding lookups, regardless of destination  will be
3156                    resent after recovery is done */
3157
3158                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3159                         lkb->lkb_flags |= DLM_IFL_RESEND;
3160                         continue;
3161                 }
3162
3163                 if (!waiter_needs_recovery(ls, lkb))
3164                         continue;
3165
3166                 switch (lkb->lkb_wait_type) {
3167
3168                 case DLM_MSG_REQUEST:
3169                         lkb->lkb_flags |= DLM_IFL_RESEND;
3170                         break;
3171
3172                 case DLM_MSG_CONVERT:
3173                         recover_convert_waiter(ls, lkb);
3174                         break;
3175
3176                 case DLM_MSG_UNLOCK:
3177                         hold_lkb(lkb);
3178                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3179                         _remove_from_waiters(lkb);
3180                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3181                         dlm_put_lkb(lkb);
3182                         break;
3183
3184                 case DLM_MSG_CANCEL:
3185                         hold_lkb(lkb);
3186                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3187                         _remove_from_waiters(lkb);
3188                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3189                         dlm_put_lkb(lkb);
3190                         break;
3191
3192                 default:
3193                         log_error(ls, "invalid lkb wait_type %d",
3194                                   lkb->lkb_wait_type);
3195                 }
3196                 schedule();
3197         }
3198         mutex_unlock(&ls->ls_waiters_mutex);
3199 }
3200
3201 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3202 {
3203         struct dlm_lkb *lkb;
3204         int rv = 0;
3205
3206         mutex_lock(&ls->ls_waiters_mutex);
3207         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3208                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3209                         rv = lkb->lkb_wait_type;
3210                         _remove_from_waiters(lkb);
3211                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3212                         break;
3213                 }
3214         }
3215         mutex_unlock(&ls->ls_waiters_mutex);
3216
3217         if (!rv)
3218                 lkb = NULL;
3219         *lkb_ret = lkb;
3220         return rv;
3221 }
3222
3223 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3224    master or dir-node for r.  Processing the lkb may result in it being placed
3225    back on waiters. */
3226
3227 int dlm_recover_waiters_post(struct dlm_ls *ls)
3228 {
3229         struct dlm_lkb *lkb;
3230         struct dlm_rsb *r;
3231         int error = 0, mstype;
3232
3233         while (1) {
3234                 if (dlm_locking_stopped(ls)) {
3235                         log_debug(ls, "recover_waiters_post aborted");
3236                         error = -EINTR;
3237                         break;
3238                 }
3239
3240                 mstype = remove_resend_waiter(ls, &lkb);
3241                 if (!mstype)
3242                         break;
3243
3244                 r = lkb->lkb_resource;
3245
3246                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3247                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3248
3249                 switch (mstype) {
3250
3251                 case DLM_MSG_LOOKUP:
3252                         hold_rsb(r);
3253                         lock_rsb(r);
3254                         _request_lock(r, lkb);
3255                         if (is_master(r))
3256                                 confirm_master(r, 0);
3257                         unlock_rsb(r);
3258                         put_rsb(r);
3259                         break;
3260
3261                 case DLM_MSG_REQUEST:
3262                         hold_rsb(r);
3263                         lock_rsb(r);
3264                         _request_lock(r, lkb);
3265                         unlock_rsb(r);
3266                         put_rsb(r);
3267                         break;
3268
3269                 case DLM_MSG_CONVERT:
3270                         hold_rsb(r);
3271                         lock_rsb(r);
3272                         _convert_lock(r, lkb);
3273                         unlock_rsb(r);
3274                         put_rsb(r);
3275                         break;
3276
3277                 default:
3278                         log_error(ls, "recover_waiters_post type %d", mstype);
3279                 }
3280         }
3281
3282         return error;
3283 }
3284
3285 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3286                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3287 {
3288         struct dlm_ls *ls = r->res_ls;
3289         struct dlm_lkb *lkb, *safe;
3290
3291         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3292                 if (test(ls, lkb)) {
3293                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3294                         del_lkb(r, lkb);
3295                         /* this put should free the lkb */
3296                         if (!dlm_put_lkb(lkb))
3297                                 log_error(ls, "purged lkb not released");
3298                 }
3299         }
3300 }
3301
3302 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3303 {
3304         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3305 }
3306
3307 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3308 {
3309         return is_master_copy(lkb);
3310 }
3311
3312 static void purge_dead_locks(struct dlm_rsb *r)
3313 {
3314         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3315         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3316         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3317 }
3318
3319 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3320 {
3321         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3322         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3323         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3324 }
3325
3326 /* Get rid of locks held by nodes that are gone. */
3327
3328 int dlm_purge_locks(struct dlm_ls *ls)
3329 {
3330         struct dlm_rsb *r;
3331
3332         log_debug(ls, "dlm_purge_locks");
3333
3334         down_write(&ls->ls_root_sem);
3335         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3336                 hold_rsb(r);
3337                 lock_rsb(r);
3338                 if (is_master(r))
3339                         purge_dead_locks(r);
3340                 unlock_rsb(r);
3341                 unhold_rsb(r);
3342
3343                 schedule();
3344         }
3345         up_write(&ls->ls_root_sem);
3346
3347         return 0;
3348 }
3349
3350 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3351 {
3352         struct dlm_rsb *r, *r_ret = NULL;
3353
3354         read_lock(&ls->ls_rsbtbl[bucket].lock);
3355         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3356                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3357                         continue;
3358                 hold_rsb(r);
3359                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3360                 r_ret = r;
3361                 break;
3362         }
3363         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3364         return r_ret;
3365 }
3366
3367 void dlm_grant_after_purge(struct dlm_ls *ls)
3368 {
3369         struct dlm_rsb *r;
3370         int bucket = 0;
3371
3372         while (1) {
3373                 r = find_purged_rsb(ls, bucket);
3374                 if (!r) {
3375                         if (bucket == ls->ls_rsbtbl_size - 1)
3376                                 break;
3377                         bucket++;
3378                         continue;
3379                 }
3380                 lock_rsb(r);
3381                 if (is_master(r)) {
3382                         grant_pending_locks(r);
3383                         confirm_master(r, 0);
3384                 }
3385                 unlock_rsb(r);
3386                 put_rsb(r);
3387                 schedule();
3388         }
3389 }
3390
3391 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3392                                          uint32_t remid)
3393 {
3394         struct dlm_lkb *lkb;
3395
3396         list_for_each_entry(lkb, head, lkb_statequeue) {
3397                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3398                         return lkb;
3399         }
3400         return NULL;
3401 }
3402
3403 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3404                                     uint32_t remid)
3405 {
3406         struct dlm_lkb *lkb;
3407
3408         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3409         if (lkb)
3410                 return lkb;
3411         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3412         if (lkb)
3413                 return lkb;
3414         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3415         if (lkb)
3416                 return lkb;
3417         return NULL;
3418 }
3419
3420 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3421                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3422 {
3423         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3424         int lvblen;
3425
3426         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3427         lkb->lkb_ownpid = rl->rl_ownpid;
3428         lkb->lkb_remid = rl->rl_lkid;
3429         lkb->lkb_exflags = rl->rl_exflags;
3430         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3431         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3432         lkb->lkb_lvbseq = rl->rl_lvbseq;
3433         lkb->lkb_rqmode = rl->rl_rqmode;
3434         lkb->lkb_grmode = rl->rl_grmode;
3435         /* don't set lkb_status because add_lkb wants to itself */
3436
3437         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3438         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3439
3440         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3441                 lkb->lkb_lvbptr = allocate_lvb(ls);
3442                 if (!lkb->lkb_lvbptr)
3443                         return -ENOMEM;
3444                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3445                          sizeof(struct rcom_lock);
3446                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3447         }
3448
3449         /* Conversions between PR and CW (middle modes) need special handling.
3450            The real granted mode of these converting locks cannot be determined
3451            until all locks have been rebuilt on the rsb (recover_conversion) */
3452
3453         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3454                 rl->rl_status = DLM_LKSTS_CONVERT;
3455                 lkb->lkb_grmode = DLM_LOCK_IV;
3456                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3457         }
3458
3459         return 0;
3460 }
3461
3462 /* This lkb may have been recovered in a previous aborted recovery so we need
3463    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3464    If so we just send back a standard reply.  If not, we create a new lkb with
3465    the given values and send back our lkid.  We send back our lkid by sending
3466    back the rcom_lock struct we got but with the remid field filled in. */
3467
3468 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3469 {
3470         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3471         struct dlm_rsb *r;
3472         struct dlm_lkb *lkb;
3473         int error;
3474
3475         if (rl->rl_parent_lkid) {
3476                 error = -EOPNOTSUPP;
3477                 goto out;
3478         }
3479
3480         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3481         if (error)
3482                 goto out;
3483
3484         lock_rsb(r);
3485
3486         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3487         if (lkb) {
3488                 error = -EEXIST;
3489                 goto out_remid;
3490         }
3491
3492         error = create_lkb(ls, &lkb);
3493         if (error)
3494                 goto out_unlock;
3495
3496         error = receive_rcom_lock_args(ls, lkb, r, rc);
3497         if (error) {
3498                 __put_lkb(ls, lkb);
3499                 goto out_unlock;
3500         }
3501
3502         attach_lkb(r, lkb);
3503         add_lkb(r, lkb, rl->rl_status);
3504         error = 0;
3505
3506  out_remid:
3507         /* this is the new value returned to the lock holder for
3508            saving in its process-copy lkb */
3509         rl->rl_remid = lkb->lkb_id;
3510
3511  out_unlock:
3512         unlock_rsb(r);
3513         put_rsb(r);
3514  out:
3515         if (error)
3516                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3517         rl->rl_result = error;
3518         return error;
3519 }
3520
3521 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3522 {
3523         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3524         struct dlm_rsb *r;
3525         struct dlm_lkb *lkb;
3526         int error;
3527
3528         error = find_lkb(ls, rl->rl_lkid, &lkb);
3529         if (error) {
3530                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3531                 return error;
3532         }
3533
3534         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3535
3536         error = rl->rl_result;
3537
3538         r = lkb->lkb_resource;
3539         hold_rsb(r);
3540         lock_rsb(r);
3541
3542         switch (error) {
3543         case -EEXIST:
3544                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3545                 /* fall through */
3546         case 0:
3547                 lkb->lkb_remid = rl->rl_remid;
3548                 break;
3549         default:
3550                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3551                           error, lkb->lkb_id);
3552         }
3553
3554         /* an ack for dlm_recover_locks() which waits for replies from
3555            all the locks it sends to new masters */
3556         dlm_recovered_lock(r);
3557
3558         unlock_rsb(r);
3559         put_rsb(r);
3560         dlm_put_lkb(lkb);
3561
3562         return 0;
3563 }
3564
3565 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3566                      int mode, uint32_t flags, void *name, unsigned int namelen,
3567                      uint32_t parent_lkid)
3568 {
3569         struct dlm_lkb *lkb;
3570         struct dlm_args args;
3571         int error;
3572
3573         lock_recovery(ls);
3574
3575         error = create_lkb(ls, &lkb);
3576         if (error) {
3577                 kfree(ua);
3578                 goto out;
3579         }
3580
3581         if (flags & DLM_LKF_VALBLK) {
3582                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3583                 if (!ua->lksb.sb_lvbptr) {
3584                         kfree(ua);
3585                         __put_lkb(ls, lkb);
3586                         error = -ENOMEM;
3587                         goto out;
3588                 }
3589         }
3590
3591         /* After ua is attached to lkb it will be freed by free_lkb().
3592            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3593            lock and that lkb_astparam is the dlm_user_args structure. */
3594
3595         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3596                               FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3597         lkb->lkb_flags |= DLM_IFL_USER;
3598         ua->old_mode = DLM_LOCK_IV;
3599
3600         if (error) {
3601                 __put_lkb(ls, lkb);
3602                 goto out;
3603         }
3604
3605         error = request_lock(ls, lkb, name, namelen, &args);
3606
3607         switch (error) {
3608         case 0:
3609                 break;
3610         case -EINPROGRESS:
3611                 error = 0;
3612                 break;
3613         case -EAGAIN:
3614                 error = 0;
3615                 /* fall through */
3616         default:
3617                 __put_lkb(ls, lkb);
3618                 goto out;
3619         }
3620
3621         /* add this new lkb to the per-process list of locks */
3622         spin_lock(&ua->proc->locks_spin);
3623         kref_get(&lkb->lkb_ref);
3624         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3625         spin_unlock(&ua->proc->locks_spin);
3626  out:
3627         unlock_recovery(ls);
3628         return error;
3629 }
3630
3631 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3632                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3633 {
3634         struct dlm_lkb *lkb;
3635         struct dlm_args args;
3636         struct dlm_user_args *ua;
3637         int error;
3638
3639         lock_recovery(ls);
3640
3641         error = find_lkb(ls, lkid, &lkb);
3642         if (error)
3643                 goto out;
3644
3645         /* user can change the params on its lock when it converts it, or
3646            add an lvb that didn't exist before */
3647
3648         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3649
3650         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3651                 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3652                 if (!ua->lksb.sb_lvbptr) {
3653                         error = -ENOMEM;
3654                         goto out_put;
3655                 }
3656         }
3657         if (lvb_in && ua->lksb.sb_lvbptr)
3658                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3659
3660         ua->castparam = ua_tmp->castparam;
3661         ua->castaddr = ua_tmp->castaddr;
3662         ua->bastparam = ua_tmp->bastparam;
3663         ua->bastaddr = ua_tmp->bastaddr;
3664         ua->old_mode = lkb->lkb_grmode;
3665
3666         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3667                               FAKE_USER_AST, &args);
3668         if (error)
3669                 goto out_put;
3670
3671         error = convert_lock(ls, lkb, &args);
3672
3673         if (error == -EINPROGRESS || error == -EAGAIN)
3674                 error = 0;
3675  out_put:
3676         dlm_put_lkb(lkb);
3677  out:
3678         unlock_recovery(ls);
3679         kfree(ua_tmp);
3680         return error;
3681 }
3682
3683 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3684                     uint32_t flags, uint32_t lkid, char *lvb_in)
3685 {
3686         struct dlm_lkb *lkb;
3687         struct dlm_args args;
3688         struct dlm_user_args *ua;
3689         int error;
3690
3691         lock_recovery(ls);
3692
3693         error = find_lkb(ls, lkid, &lkb);
3694         if (error)
3695                 goto out;
3696
3697         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3698
3699         if (lvb_in && ua->lksb.sb_lvbptr)
3700                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3701         ua->castparam = ua_tmp->castparam;
3702         ua->user_lksb = ua_tmp->user_lksb;
3703
3704         error = set_unlock_args(flags, ua, &args);
3705         if (error)
3706                 goto out_put;
3707
3708         error = unlock_lock(ls, lkb, &args);
3709
3710         if (error == -DLM_EUNLOCK)
3711                 error = 0;
3712         if (error)
3713                 goto out_put;
3714
3715         spin_lock(&ua->proc->locks_spin);
3716         list_del_init(&lkb->lkb_ownqueue);
3717         spin_unlock(&ua->proc->locks_spin);
3718
3719         /* this removes the reference for the proc->locks list added by
3720            dlm_user_request */
3721         unhold_lkb(lkb);
3722  out_put:
3723         dlm_put_lkb(lkb);
3724  out:
3725         unlock_recovery(ls);
3726         return error;
3727 }
3728
3729 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3730                     uint32_t flags, uint32_t lkid)
3731 {
3732         struct dlm_lkb *lkb;
3733         struct dlm_args args;
3734         struct dlm_user_args *ua;
3735         int error;
3736
3737         lock_recovery(ls);
3738
3739         error = find_lkb(ls, lkid, &lkb);
3740         if (error)
3741                 goto out;
3742
3743         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3744         ua->castparam = ua_tmp->castparam;
3745
3746         error = set_unlock_args(flags, ua, &args);
3747         if (error)
3748                 goto out_put;
3749
3750         error = cancel_lock(ls, lkb, &args);
3751
3752         if (error == -DLM_ECANCEL)
3753                 error = 0;
3754         if (error)
3755                 goto out_put;
3756
3757         /* this lkb was removed from the WAITING queue */
3758         if (lkb->lkb_grmode == DLM_LOCK_IV) {
3759                 spin_lock(&ua->proc->locks_spin);
3760                 list_del_init(&lkb->lkb_ownqueue);
3761                 spin_unlock(&ua->proc->locks_spin);
3762                 unhold_lkb(lkb);
3763         }
3764  out_put:
3765         dlm_put_lkb(lkb);
3766  out:
3767         unlock_recovery(ls);
3768         return error;
3769 }
3770
3771 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3772 {
3773         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3774
3775         if (ua->lksb.sb_lvbptr)
3776                 kfree(ua->lksb.sb_lvbptr);
3777         kfree(ua);
3778         lkb->lkb_astparam = (long)NULL;
3779
3780         /* TODO: propogate to master if needed */
3781         return 0;
3782 }
3783
3784 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3785    Regardless of what rsb queue the lock is on, it's removed and freed. */
3786
3787 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3788 {
3789         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3790         struct dlm_args args;
3791         int error;
3792
3793         /* FIXME: we need to handle the case where the lkb is in limbo
3794            while the rsb is being looked up, currently we assert in
3795            _unlock_lock/is_remote because rsb nodeid is -1. */
3796
3797         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3798
3799         error = unlock_lock(ls, lkb, &args);
3800         if (error == -DLM_EUNLOCK)
3801                 error = 0;
3802         return error;
3803 }
3804
3805 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3806    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3807    which we clear here. */
3808
3809 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3810    list, and no more device_writes should add lkb's to proc->locks list; so we
3811    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3812    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3813    them ourself. */
3814
3815 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3816 {
3817         struct dlm_lkb *lkb, *safe;
3818
3819         lock_recovery(ls);
3820         mutex_lock(&ls->ls_clear_proc_locks);
3821
3822         list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3823                 if (lkb->lkb_ast_type) {
3824                         list_del(&lkb->lkb_astqueue);
3825                         unhold_lkb(lkb);
3826                 }
3827
3828                 list_del_init(&lkb->lkb_ownqueue);
3829
3830                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3831                         lkb->lkb_flags |= DLM_IFL_ORPHAN;
3832                         orphan_proc_lock(ls, lkb);
3833                 } else {
3834                         lkb->lkb_flags |= DLM_IFL_DEAD;
3835                         unlock_proc_lock(ls, lkb);
3836                 }
3837
3838                 /* this removes the reference for the proc->locks list
3839                    added by dlm_user_request, it may result in the lkb
3840                    being freed */
3841
3842                 dlm_put_lkb(lkb);
3843         }
3844         mutex_unlock(&ls->ls_clear_proc_locks);
3845         unlock_recovery(ls);
3846 }