ocfs2: combine inode and generic blocking AST functions
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40
41 #include <dlm/dlmapi.h>
42
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45
46 #include "ocfs2.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 #include "vote.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 };
69
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ocfs2_convert_worker_t functions.
75  *
76  * These control the precise actions of ocfs2_generic_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94                               struct ocfs2_unblock_ctl *ctl);
95 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
96                               struct ocfs2_unblock_ctl *ctl);
97 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
98                                     struct ocfs2_unblock_ctl *ctl);
99 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
100                                      struct ocfs2_unblock_ctl *ctl);
101 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
102                                   struct ocfs2_unblock_ctl *ctl);
103
104 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105                                      struct ocfs2_lock_res *lockres);
106
107 /*
108  * OCFS2 Lock Resource Operations
109  *
110  * These fine tune the behavior of the generic dlmglue locking infrastructure.
111  */
112 struct ocfs2_lock_res_ops {
113         /*
114          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
115          * this callback if ->l_priv is not an ocfs2_super pointer
116          */
117         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
118         int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
119         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
120
121         /*
122          * LOCK_TYPE_* flags which describe the specific requirements
123          * of a lock type. Descriptions of each individual flag follow.
124          */
125         int flags;
126 };
127
128 /*
129  * Some locks want to "refresh" potentially stale data when a
130  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
131  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
132  * individual lockres l_flags member from the ast function. It is
133  * expected that the locking wrapper will clear the
134  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
135  */
136 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
137
138 typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
139 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
140                                       struct ocfs2_lock_res *lockres,
141                                       struct ocfs2_unblock_ctl *ctl,
142                                       ocfs2_convert_worker_t *worker);
143
144 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
145         .get_osb        = ocfs2_get_inode_osb,
146         .unblock        = ocfs2_unblock_inode_lock,
147         .flags          = 0,
148 };
149
150 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
151         .get_osb        = ocfs2_get_inode_osb,
152         .unblock        = ocfs2_unblock_meta,
153         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
154 };
155
156 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
157         .get_osb        = ocfs2_get_inode_osb,
158         .unblock        = ocfs2_unblock_data,
159         .flags          = 0,
160 };
161
162 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
163         .unblock        = ocfs2_unblock_osb_lock,
164         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
165 };
166
167 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
168         .unblock        = ocfs2_unblock_osb_lock,
169         .flags          = 0,
170 };
171
172 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
173         .get_osb        = ocfs2_get_dentry_osb,
174         .unblock        = ocfs2_unblock_dentry_lock,
175         .post_unlock    = ocfs2_dentry_post_unlock,
176         .flags          = 0,
177 };
178
179 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
180 {
181         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
182                 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
183                 lockres->l_type == OCFS2_LOCK_TYPE_RW;
184 }
185
186 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
187 {
188         BUG_ON(!ocfs2_is_inode_lock(lockres));
189
190         return (struct inode *) lockres->l_priv;
191 }
192
193 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
194 {
195         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
196
197         return (struct ocfs2_dentry_lock *)lockres->l_priv;
198 }
199
200 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
201 {
202         if (lockres->l_ops->get_osb)
203                 return lockres->l_ops->get_osb(lockres);
204
205         return (struct ocfs2_super *)lockres->l_priv;
206 }
207
208 static int ocfs2_lock_create(struct ocfs2_super *osb,
209                              struct ocfs2_lock_res *lockres,
210                              int level,
211                              int dlm_flags);
212 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
213                                                      int wanted);
214 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
215                                  struct ocfs2_lock_res *lockres,
216                                  int level);
217 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
218 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
219 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
220 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
221 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
222                                         struct ocfs2_lock_res *lockres);
223 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
224                                                 int convert);
225 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
226         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
227                 "resource %s: %s\n", dlm_errname(_stat), _func, \
228                 _lockres->l_name, dlm_errmsg(_stat));           \
229 } while (0)
230 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
231                                  struct ocfs2_lock_res *lockres);
232 static int ocfs2_meta_lock_update(struct inode *inode,
233                                   struct buffer_head **bh);
234 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
235 static inline int ocfs2_highest_compat_lock_level(int level);
236 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
237                                                   struct ocfs2_lock_res *lockres,
238                                                   int new_level);
239
240 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
241                                   u64 blkno,
242                                   u32 generation,
243                                   char *name)
244 {
245         int len;
246
247         mlog_entry_void();
248
249         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
250
251         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
252                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
253                        (long long)blkno, generation);
254
255         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
256
257         mlog(0, "built lock resource with name: %s\n", name);
258
259         mlog_exit_void();
260 }
261
262 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
263
264 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
265                                        struct ocfs2_dlm_debug *dlm_debug)
266 {
267         mlog(0, "Add tracking for lockres %s\n", res->l_name);
268
269         spin_lock(&ocfs2_dlm_tracking_lock);
270         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
271         spin_unlock(&ocfs2_dlm_tracking_lock);
272 }
273
274 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
275 {
276         spin_lock(&ocfs2_dlm_tracking_lock);
277         if (!list_empty(&res->l_debug_list))
278                 list_del_init(&res->l_debug_list);
279         spin_unlock(&ocfs2_dlm_tracking_lock);
280 }
281
282 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
283                                        struct ocfs2_lock_res *res,
284                                        enum ocfs2_lock_type type,
285                                        struct ocfs2_lock_res_ops *ops,
286                                        void *priv)
287 {
288         res->l_type          = type;
289         res->l_ops           = ops;
290         res->l_priv          = priv;
291
292         res->l_level         = LKM_IVMODE;
293         res->l_requested     = LKM_IVMODE;
294         res->l_blocking      = LKM_IVMODE;
295         res->l_action        = OCFS2_AST_INVALID;
296         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
297
298         res->l_flags         = OCFS2_LOCK_INITIALIZED;
299
300         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
301 }
302
303 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
304 {
305         /* This also clears out the lock status block */
306         memset(res, 0, sizeof(struct ocfs2_lock_res));
307         spin_lock_init(&res->l_lock);
308         init_waitqueue_head(&res->l_event);
309         INIT_LIST_HEAD(&res->l_blocked_list);
310         INIT_LIST_HEAD(&res->l_mask_waiters);
311 }
312
313 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
314                                enum ocfs2_lock_type type,
315                                unsigned int generation,
316                                struct inode *inode)
317 {
318         struct ocfs2_lock_res_ops *ops;
319
320         switch(type) {
321                 case OCFS2_LOCK_TYPE_RW:
322                         ops = &ocfs2_inode_rw_lops;
323                         break;
324                 case OCFS2_LOCK_TYPE_META:
325                         ops = &ocfs2_inode_meta_lops;
326                         break;
327                 case OCFS2_LOCK_TYPE_DATA:
328                         ops = &ocfs2_inode_data_lops;
329                         break;
330                 default:
331                         mlog_bug_on_msg(1, "type: %d\n", type);
332                         ops = NULL; /* thanks, gcc */
333                         break;
334         };
335
336         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
337                               generation, res->l_name);
338         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
339 }
340
341 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
342 {
343         struct inode *inode = ocfs2_lock_res_inode(lockres);
344
345         return OCFS2_SB(inode->i_sb);
346 }
347
348 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
349 {
350         __be64 inode_blkno_be;
351
352         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
353                sizeof(__be64));
354
355         return be64_to_cpu(inode_blkno_be);
356 }
357
358 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
359 {
360         struct ocfs2_dentry_lock *dl = lockres->l_priv;
361
362         return OCFS2_SB(dl->dl_inode->i_sb);
363 }
364
365 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
366                                 u64 parent, struct inode *inode)
367 {
368         int len;
369         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
370         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
371         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
372
373         ocfs2_lock_res_init_once(lockres);
374
375         /*
376          * Unfortunately, the standard lock naming scheme won't work
377          * here because we have two 16 byte values to use. Instead,
378          * we'll stuff the inode number as a binary value. We still
379          * want error prints to show something without garbling the
380          * display, so drop a null byte in there before the inode
381          * number. A future version of OCFS2 will likely use all
382          * binary lock names. The stringified names have been a
383          * tremendous aid in debugging, but now that the debugfs
384          * interface exists, we can mangle things there if need be.
385          *
386          * NOTE: We also drop the standard "pad" value (the total lock
387          * name size stays the same though - the last part is all
388          * zeros due to the memset in ocfs2_lock_res_init_once()
389          */
390         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
391                        "%c%016llx",
392                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
393                        (long long)parent);
394
395         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
396
397         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
398                sizeof(__be64));
399
400         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
401                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
402                                    dl);
403 }
404
405 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
406                                       struct ocfs2_super *osb)
407 {
408         /* Superblock lockres doesn't come from a slab so we call init
409          * once on it manually.  */
410         ocfs2_lock_res_init_once(res);
411         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
412                               0, res->l_name);
413         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
414                                    &ocfs2_super_lops, osb);
415 }
416
417 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
418                                        struct ocfs2_super *osb)
419 {
420         /* Rename lockres doesn't come from a slab so we call init
421          * once on it manually.  */
422         ocfs2_lock_res_init_once(res);
423         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
424         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
425                                    &ocfs2_rename_lops, osb);
426 }
427
428 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
429 {
430         mlog_entry_void();
431
432         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
433                 return;
434
435         ocfs2_remove_lockres_tracking(res);
436
437         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
438                         "Lockres %s is on the blocked list\n",
439                         res->l_name);
440         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
441                         "Lockres %s has mask waiters pending\n",
442                         res->l_name);
443         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
444                         "Lockres %s is locked\n",
445                         res->l_name);
446         mlog_bug_on_msg(res->l_ro_holders,
447                         "Lockres %s has %u ro holders\n",
448                         res->l_name, res->l_ro_holders);
449         mlog_bug_on_msg(res->l_ex_holders,
450                         "Lockres %s has %u ex holders\n",
451                         res->l_name, res->l_ex_holders);
452
453         /* Need to clear out the lock status block for the dlm */
454         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
455
456         res->l_flags = 0UL;
457         mlog_exit_void();
458 }
459
460 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
461                                      int level)
462 {
463         mlog_entry_void();
464
465         BUG_ON(!lockres);
466
467         switch(level) {
468         case LKM_EXMODE:
469                 lockres->l_ex_holders++;
470                 break;
471         case LKM_PRMODE:
472                 lockres->l_ro_holders++;
473                 break;
474         default:
475                 BUG();
476         }
477
478         mlog_exit_void();
479 }
480
481 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
482                                      int level)
483 {
484         mlog_entry_void();
485
486         BUG_ON(!lockres);
487
488         switch(level) {
489         case LKM_EXMODE:
490                 BUG_ON(!lockres->l_ex_holders);
491                 lockres->l_ex_holders--;
492                 break;
493         case LKM_PRMODE:
494                 BUG_ON(!lockres->l_ro_holders);
495                 lockres->l_ro_holders--;
496                 break;
497         default:
498                 BUG();
499         }
500         mlog_exit_void();
501 }
502
503 /* WARNING: This function lives in a world where the only three lock
504  * levels are EX, PR, and NL. It *will* have to be adjusted when more
505  * lock types are added. */
506 static inline int ocfs2_highest_compat_lock_level(int level)
507 {
508         int new_level = LKM_EXMODE;
509
510         if (level == LKM_EXMODE)
511                 new_level = LKM_NLMODE;
512         else if (level == LKM_PRMODE)
513                 new_level = LKM_PRMODE;
514         return new_level;
515 }
516
517 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
518                               unsigned long newflags)
519 {
520         struct list_head *pos, *tmp;
521         struct ocfs2_mask_waiter *mw;
522
523         assert_spin_locked(&lockres->l_lock);
524
525         lockres->l_flags = newflags;
526
527         list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
528                 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
529                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
530                         continue;
531
532                 list_del_init(&mw->mw_item);
533                 mw->mw_status = 0;
534                 complete(&mw->mw_complete);
535         }
536 }
537 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
538 {
539         lockres_set_flags(lockres, lockres->l_flags | or);
540 }
541 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
542                                 unsigned long clear)
543 {
544         lockres_set_flags(lockres, lockres->l_flags & ~clear);
545 }
546
547 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
548 {
549         mlog_entry_void();
550
551         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
552         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
553         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
554         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
555
556         lockres->l_level = lockres->l_requested;
557         if (lockres->l_level <=
558             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
559                 lockres->l_blocking = LKM_NLMODE;
560                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
561         }
562         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
563
564         mlog_exit_void();
565 }
566
567 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
568 {
569         mlog_entry_void();
570
571         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
572         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
573
574         /* Convert from RO to EX doesn't really need anything as our
575          * information is already up to data. Convert from NL to
576          * *anything* however should mark ourselves as needing an
577          * update */
578         if (lockres->l_level == LKM_NLMODE &&
579             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
580                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
581
582         lockres->l_level = lockres->l_requested;
583         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
584
585         mlog_exit_void();
586 }
587
588 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
589 {
590         mlog_entry_void();
591
592         BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
593         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
594
595         if (lockres->l_requested > LKM_NLMODE &&
596             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
597             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
598                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
599
600         lockres->l_level = lockres->l_requested;
601         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
602         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
603
604         mlog_exit_void();
605 }
606
607 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
608                                      int level)
609 {
610         int needs_downconvert = 0;
611         mlog_entry_void();
612
613         assert_spin_locked(&lockres->l_lock);
614
615         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
616
617         if (level > lockres->l_blocking) {
618                 /* only schedule a downconvert if we haven't already scheduled
619                  * one that goes low enough to satisfy the level we're
620                  * blocking.  this also catches the case where we get
621                  * duplicate BASTs */
622                 if (ocfs2_highest_compat_lock_level(level) <
623                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
624                         needs_downconvert = 1;
625
626                 lockres->l_blocking = level;
627         }
628
629         mlog_exit(needs_downconvert);
630         return needs_downconvert;
631 }
632
633 static void ocfs2_blocking_ast(void *opaque, int level)
634 {
635         struct ocfs2_lock_res *lockres = opaque;
636         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
637         int needs_downconvert;
638         unsigned long flags;
639
640         BUG_ON(level <= LKM_NLMODE);
641
642         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
643              lockres->l_name, level, lockres->l_level,
644              ocfs2_lock_type_string(lockres->l_type));
645
646         spin_lock_irqsave(&lockres->l_lock, flags);
647         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
648         if (needs_downconvert)
649                 ocfs2_schedule_blocked_lock(osb, lockres);
650         spin_unlock_irqrestore(&lockres->l_lock, flags);
651
652         wake_up(&lockres->l_event);
653
654         ocfs2_kick_vote_thread(osb);
655 }
656
657 static void ocfs2_locking_ast(void *opaque)
658 {
659         struct ocfs2_lock_res *lockres = opaque;
660         struct dlm_lockstatus *lksb = &lockres->l_lksb;
661         unsigned long flags;
662
663         spin_lock_irqsave(&lockres->l_lock, flags);
664
665         if (lksb->status != DLM_NORMAL) {
666                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
667                      lockres->l_name, lksb->status);
668                 spin_unlock_irqrestore(&lockres->l_lock, flags);
669                 return;
670         }
671
672         switch(lockres->l_action) {
673         case OCFS2_AST_ATTACH:
674                 ocfs2_generic_handle_attach_action(lockres);
675                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
676                 break;
677         case OCFS2_AST_CONVERT:
678                 ocfs2_generic_handle_convert_action(lockres);
679                 break;
680         case OCFS2_AST_DOWNCONVERT:
681                 ocfs2_generic_handle_downconvert_action(lockres);
682                 break;
683         default:
684                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
685                      "lockres flags = 0x%lx, unlock action: %u\n",
686                      lockres->l_name, lockres->l_action, lockres->l_flags,
687                      lockres->l_unlock_action);
688                 BUG();
689         }
690
691         /* set it to something invalid so if we get called again we
692          * can catch it. */
693         lockres->l_action = OCFS2_AST_INVALID;
694
695         wake_up(&lockres->l_event);
696         spin_unlock_irqrestore(&lockres->l_lock, flags);
697 }
698
699 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
700                                                 int convert)
701 {
702         unsigned long flags;
703
704         mlog_entry_void();
705         spin_lock_irqsave(&lockres->l_lock, flags);
706         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
707         if (convert)
708                 lockres->l_action = OCFS2_AST_INVALID;
709         else
710                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
711         spin_unlock_irqrestore(&lockres->l_lock, flags);
712
713         wake_up(&lockres->l_event);
714         mlog_exit_void();
715 }
716
717 /* Note: If we detect another process working on the lock (i.e.,
718  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
719  * to do the right thing in that case.
720  */
721 static int ocfs2_lock_create(struct ocfs2_super *osb,
722                              struct ocfs2_lock_res *lockres,
723                              int level,
724                              int dlm_flags)
725 {
726         int ret = 0;
727         enum dlm_status status;
728         unsigned long flags;
729
730         mlog_entry_void();
731
732         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
733              dlm_flags);
734
735         spin_lock_irqsave(&lockres->l_lock, flags);
736         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
737             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
738                 spin_unlock_irqrestore(&lockres->l_lock, flags);
739                 goto bail;
740         }
741
742         lockres->l_action = OCFS2_AST_ATTACH;
743         lockres->l_requested = level;
744         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
745         spin_unlock_irqrestore(&lockres->l_lock, flags);
746
747         status = dlmlock(osb->dlm,
748                          level,
749                          &lockres->l_lksb,
750                          dlm_flags,
751                          lockres->l_name,
752                          OCFS2_LOCK_ID_MAX_LEN - 1,
753                          ocfs2_locking_ast,
754                          lockres,
755                          ocfs2_blocking_ast);
756         if (status != DLM_NORMAL) {
757                 ocfs2_log_dlm_error("dlmlock", status, lockres);
758                 ret = -EINVAL;
759                 ocfs2_recover_from_dlm_error(lockres, 1);
760         }
761
762         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
763
764 bail:
765         mlog_exit(ret);
766         return ret;
767 }
768
769 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
770                                         int flag)
771 {
772         unsigned long flags;
773         int ret;
774
775         spin_lock_irqsave(&lockres->l_lock, flags);
776         ret = lockres->l_flags & flag;
777         spin_unlock_irqrestore(&lockres->l_lock, flags);
778
779         return ret;
780 }
781
782 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
783
784 {
785         wait_event(lockres->l_event,
786                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
787 }
788
789 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
790
791 {
792         wait_event(lockres->l_event,
793                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
794 }
795
796 /* predict what lock level we'll be dropping down to on behalf
797  * of another node, and return true if the currently wanted
798  * level will be compatible with it. */
799 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
800                                                      int wanted)
801 {
802         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
803
804         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
805 }
806
807 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
808 {
809         INIT_LIST_HEAD(&mw->mw_item);
810         init_completion(&mw->mw_complete);
811 }
812
813 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
814 {
815         wait_for_completion(&mw->mw_complete);
816         /* Re-arm the completion in case we want to wait on it again */
817         INIT_COMPLETION(mw->mw_complete);
818         return mw->mw_status;
819 }
820
821 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
822                                     struct ocfs2_mask_waiter *mw,
823                                     unsigned long mask,
824                                     unsigned long goal)
825 {
826         BUG_ON(!list_empty(&mw->mw_item));
827
828         assert_spin_locked(&lockres->l_lock);
829
830         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
831         mw->mw_mask = mask;
832         mw->mw_goal = goal;
833 }
834
835 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
836  * if the mask still hadn't reached its goal */
837 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
838                                       struct ocfs2_mask_waiter *mw)
839 {
840         unsigned long flags;
841         int ret = 0;
842
843         spin_lock_irqsave(&lockres->l_lock, flags);
844         if (!list_empty(&mw->mw_item)) {
845                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
846                         ret = -EBUSY;
847
848                 list_del_init(&mw->mw_item);
849                 init_completion(&mw->mw_complete);
850         }
851         spin_unlock_irqrestore(&lockres->l_lock, flags);
852
853         return ret;
854
855 }
856
857 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
858                               struct ocfs2_lock_res *lockres,
859                               int level,
860                               int lkm_flags,
861                               int arg_flags)
862 {
863         struct ocfs2_mask_waiter mw;
864         enum dlm_status status;
865         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
866         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
867         unsigned long flags;
868
869         mlog_entry_void();
870
871         ocfs2_init_mask_waiter(&mw);
872
873 again:
874         wait = 0;
875
876         if (catch_signals && signal_pending(current)) {
877                 ret = -ERESTARTSYS;
878                 goto out;
879         }
880
881         spin_lock_irqsave(&lockres->l_lock, flags);
882
883         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
884                         "Cluster lock called on freeing lockres %s! flags "
885                         "0x%lx\n", lockres->l_name, lockres->l_flags);
886
887         /* We only compare against the currently granted level
888          * here. If the lock is blocked waiting on a downconvert,
889          * we'll get caught below. */
890         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
891             level > lockres->l_level) {
892                 /* is someone sitting in dlm_lock? If so, wait on
893                  * them. */
894                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
895                 wait = 1;
896                 goto unlock;
897         }
898
899         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
900                 /* lock has not been created yet. */
901                 spin_unlock_irqrestore(&lockres->l_lock, flags);
902
903                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
904                 if (ret < 0) {
905                         mlog_errno(ret);
906                         goto out;
907                 }
908                 goto again;
909         }
910
911         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
912             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
913                 /* is the lock is currently blocked on behalf of
914                  * another node */
915                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
916                 wait = 1;
917                 goto unlock;
918         }
919
920         if (level > lockres->l_level) {
921                 if (lockres->l_action != OCFS2_AST_INVALID)
922                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
923                              lockres->l_name, lockres->l_action);
924
925                 lockres->l_action = OCFS2_AST_CONVERT;
926                 lockres->l_requested = level;
927                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
928                 spin_unlock_irqrestore(&lockres->l_lock, flags);
929
930                 BUG_ON(level == LKM_IVMODE);
931                 BUG_ON(level == LKM_NLMODE);
932
933                 mlog(0, "lock %s, convert from %d to level = %d\n",
934                      lockres->l_name, lockres->l_level, level);
935
936                 /* call dlm_lock to upgrade lock now */
937                 status = dlmlock(osb->dlm,
938                                  level,
939                                  &lockres->l_lksb,
940                                  lkm_flags|LKM_CONVERT|LKM_VALBLK,
941                                  lockres->l_name,
942                                  OCFS2_LOCK_ID_MAX_LEN - 1,
943                                  ocfs2_locking_ast,
944                                  lockres,
945                                  ocfs2_blocking_ast);
946                 if (status != DLM_NORMAL) {
947                         if ((lkm_flags & LKM_NOQUEUE) &&
948                             (status == DLM_NOTQUEUED))
949                                 ret = -EAGAIN;
950                         else {
951                                 ocfs2_log_dlm_error("dlmlock", status,
952                                                     lockres);
953                                 ret = -EINVAL;
954                         }
955                         ocfs2_recover_from_dlm_error(lockres, 1);
956                         goto out;
957                 }
958
959                 mlog(0, "lock %s, successfull return from dlmlock\n",
960                      lockres->l_name);
961
962                 /* At this point we've gone inside the dlm and need to
963                  * complete our work regardless. */
964                 catch_signals = 0;
965
966                 /* wait for busy to clear and carry on */
967                 goto again;
968         }
969
970         /* Ok, if we get here then we're good to go. */
971         ocfs2_inc_holders(lockres, level);
972
973         ret = 0;
974 unlock:
975         spin_unlock_irqrestore(&lockres->l_lock, flags);
976 out:
977         /*
978          * This is helping work around a lock inversion between the page lock
979          * and dlm locks.  One path holds the page lock while calling aops
980          * which block acquiring dlm locks.  The voting thread holds dlm
981          * locks while acquiring page locks while down converting data locks.
982          * This block is helping an aop path notice the inversion and back
983          * off to unlock its page lock before trying the dlm lock again.
984          */
985         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
986             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
987                 wait = 0;
988                 if (lockres_remove_mask_waiter(lockres, &mw))
989                         ret = -EAGAIN;
990                 else
991                         goto again;
992         }
993         if (wait) {
994                 ret = ocfs2_wait_for_mask(&mw);
995                 if (ret == 0)
996                         goto again;
997                 mlog_errno(ret);
998         }
999
1000         mlog_exit(ret);
1001         return ret;
1002 }
1003
1004 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1005                                  struct ocfs2_lock_res *lockres,
1006                                  int level)
1007 {
1008         unsigned long flags;
1009
1010         mlog_entry_void();
1011         spin_lock_irqsave(&lockres->l_lock, flags);
1012         ocfs2_dec_holders(lockres, level);
1013         ocfs2_vote_on_unlock(osb, lockres);
1014         spin_unlock_irqrestore(&lockres->l_lock, flags);
1015         mlog_exit_void();
1016 }
1017
1018 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1019                           struct ocfs2_lock_res *lockres,
1020                           int ex,
1021                           int local)
1022 {
1023         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1024         unsigned long flags;
1025         int lkm_flags = local ? LKM_LOCAL : 0;
1026
1027         spin_lock_irqsave(&lockres->l_lock, flags);
1028         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1029         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1030         spin_unlock_irqrestore(&lockres->l_lock, flags);
1031
1032         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1033 }
1034
1035 /* Grants us an EX lock on the data and metadata resources, skipping
1036  * the normal cluster directory lookup. Use this ONLY on newly created
1037  * inodes which other nodes can't possibly see, and which haven't been
1038  * hashed in the inode hash yet. This can give us a good performance
1039  * increase as it'll skip the network broadcast normally associated
1040  * with creating a new lock resource. */
1041 int ocfs2_create_new_inode_locks(struct inode *inode)
1042 {
1043         int ret;
1044         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1045
1046         BUG_ON(!inode);
1047         BUG_ON(!ocfs2_inode_is_new(inode));
1048
1049         mlog_entry_void();
1050
1051         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1052
1053         /* NOTE: That we don't increment any of the holder counts, nor
1054          * do we add anything to a journal handle. Since this is
1055          * supposed to be a new inode which the cluster doesn't know
1056          * about yet, there is no need to.  As far as the LVB handling
1057          * is concerned, this is basically like acquiring an EX lock
1058          * on a resource which has an invalid one -- we'll set it
1059          * valid when we release the EX. */
1060
1061         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1062         if (ret) {
1063                 mlog_errno(ret);
1064                 goto bail;
1065         }
1066
1067         /*
1068          * We don't want to use LKM_LOCAL on a meta data lock as they
1069          * don't use a generation in their lock names.
1070          */
1071         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1072         if (ret) {
1073                 mlog_errno(ret);
1074                 goto bail;
1075         }
1076
1077         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1078         if (ret) {
1079                 mlog_errno(ret);
1080                 goto bail;
1081         }
1082
1083 bail:
1084         mlog_exit(ret);
1085         return ret;
1086 }
1087
1088 int ocfs2_rw_lock(struct inode *inode, int write)
1089 {
1090         int status, level;
1091         struct ocfs2_lock_res *lockres;
1092
1093         BUG_ON(!inode);
1094
1095         mlog_entry_void();
1096
1097         mlog(0, "inode %llu take %s RW lock\n",
1098              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1099              write ? "EXMODE" : "PRMODE");
1100
1101         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1102
1103         level = write ? LKM_EXMODE : LKM_PRMODE;
1104
1105         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1106                                     0);
1107         if (status < 0)
1108                 mlog_errno(status);
1109
1110         mlog_exit(status);
1111         return status;
1112 }
1113
1114 void ocfs2_rw_unlock(struct inode *inode, int write)
1115 {
1116         int level = write ? LKM_EXMODE : LKM_PRMODE;
1117         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1118
1119         mlog_entry_void();
1120
1121         mlog(0, "inode %llu drop %s RW lock\n",
1122              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1123              write ? "EXMODE" : "PRMODE");
1124
1125         ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1126
1127         mlog_exit_void();
1128 }
1129
1130 int ocfs2_data_lock_full(struct inode *inode,
1131                          int write,
1132                          int arg_flags)
1133 {
1134         int status = 0, level;
1135         struct ocfs2_lock_res *lockres;
1136
1137         BUG_ON(!inode);
1138
1139         mlog_entry_void();
1140
1141         mlog(0, "inode %llu take %s DATA lock\n",
1142              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1143              write ? "EXMODE" : "PRMODE");
1144
1145         /* We'll allow faking a readonly data lock for
1146          * rodevices. */
1147         if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1148                 if (write) {
1149                         status = -EROFS;
1150                         mlog_errno(status);
1151                 }
1152                 goto out;
1153         }
1154
1155         lockres = &OCFS2_I(inode)->ip_data_lockres;
1156
1157         level = write ? LKM_EXMODE : LKM_PRMODE;
1158
1159         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1160                                     0, arg_flags);
1161         if (status < 0 && status != -EAGAIN)
1162                 mlog_errno(status);
1163
1164 out:
1165         mlog_exit(status);
1166         return status;
1167 }
1168
1169 /* see ocfs2_meta_lock_with_page() */
1170 int ocfs2_data_lock_with_page(struct inode *inode,
1171                               int write,
1172                               struct page *page)
1173 {
1174         int ret;
1175
1176         ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1177         if (ret == -EAGAIN) {
1178                 unlock_page(page);
1179                 if (ocfs2_data_lock(inode, write) == 0)
1180                         ocfs2_data_unlock(inode, write);
1181                 ret = AOP_TRUNCATED_PAGE;
1182         }
1183
1184         return ret;
1185 }
1186
1187 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1188                                  struct ocfs2_lock_res *lockres)
1189 {
1190         int kick = 0;
1191
1192         mlog_entry_void();
1193
1194         /* If we know that another node is waiting on our lock, kick
1195          * the vote thread * pre-emptively when we reach a release
1196          * condition. */
1197         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1198                 switch(lockres->l_blocking) {
1199                 case LKM_EXMODE:
1200                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1201                                 kick = 1;
1202                         break;
1203                 case LKM_PRMODE:
1204                         if (!lockres->l_ex_holders)
1205                                 kick = 1;
1206                         break;
1207                 default:
1208                         BUG();
1209                 }
1210         }
1211
1212         if (kick)
1213                 ocfs2_kick_vote_thread(osb);
1214
1215         mlog_exit_void();
1216 }
1217
1218 void ocfs2_data_unlock(struct inode *inode,
1219                        int write)
1220 {
1221         int level = write ? LKM_EXMODE : LKM_PRMODE;
1222         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1223
1224         mlog_entry_void();
1225
1226         mlog(0, "inode %llu drop %s DATA lock\n",
1227              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1228              write ? "EXMODE" : "PRMODE");
1229
1230         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1231                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1232
1233         mlog_exit_void();
1234 }
1235
1236 #define OCFS2_SEC_BITS   34
1237 #define OCFS2_SEC_SHIFT  (64 - 34)
1238 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1239
1240 /* LVB only has room for 64 bits of time here so we pack it for
1241  * now. */
1242 static u64 ocfs2_pack_timespec(struct timespec *spec)
1243 {
1244         u64 res;
1245         u64 sec = spec->tv_sec;
1246         u32 nsec = spec->tv_nsec;
1247
1248         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1249
1250         return res;
1251 }
1252
1253 /* Call this with the lockres locked. I am reasonably sure we don't
1254  * need ip_lock in this function as anyone who would be changing those
1255  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1256 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1257 {
1258         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1259         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1260         struct ocfs2_meta_lvb *lvb;
1261
1262         mlog_entry_void();
1263
1264         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1265
1266         /*
1267          * Invalidate the LVB of a deleted inode - this way other
1268          * nodes are forced to go to disk and discover the new inode
1269          * status.
1270          */
1271         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1272                 lvb->lvb_version = 0;
1273                 goto out;
1274         }
1275
1276         lvb->lvb_version   = OCFS2_LVB_VERSION;
1277         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1278         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1279         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1280         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1281         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1282         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1283         lvb->lvb_iatime_packed  =
1284                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1285         lvb->lvb_ictime_packed =
1286                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1287         lvb->lvb_imtime_packed =
1288                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1289         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1290         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1291
1292 out:
1293         mlog_meta_lvb(0, lockres);
1294
1295         mlog_exit_void();
1296 }
1297
1298 static void ocfs2_unpack_timespec(struct timespec *spec,
1299                                   u64 packed_time)
1300 {
1301         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1302         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1303 }
1304
1305 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1306 {
1307         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1308         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1309         struct ocfs2_meta_lvb *lvb;
1310
1311         mlog_entry_void();
1312
1313         mlog_meta_lvb(0, lockres);
1314
1315         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1316
1317         /* We're safe here without the lockres lock... */
1318         spin_lock(&oi->ip_lock);
1319         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1320         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1321
1322         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1323         ocfs2_set_inode_flags(inode);
1324
1325         /* fast-symlinks are a special case */
1326         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1327                 inode->i_blocks = 0;
1328         else
1329                 inode->i_blocks =
1330                         ocfs2_align_bytes_to_sectors(i_size_read(inode));
1331
1332         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1333         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1334         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1335         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1336         ocfs2_unpack_timespec(&inode->i_atime,
1337                               be64_to_cpu(lvb->lvb_iatime_packed));
1338         ocfs2_unpack_timespec(&inode->i_mtime,
1339                               be64_to_cpu(lvb->lvb_imtime_packed));
1340         ocfs2_unpack_timespec(&inode->i_ctime,
1341                               be64_to_cpu(lvb->lvb_ictime_packed));
1342         spin_unlock(&oi->ip_lock);
1343
1344         mlog_exit_void();
1345 }
1346
1347 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1348                                               struct ocfs2_lock_res *lockres)
1349 {
1350         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1351
1352         if (lvb->lvb_version == OCFS2_LVB_VERSION
1353             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1354                 return 1;
1355         return 0;
1356 }
1357
1358 /* Determine whether a lock resource needs to be refreshed, and
1359  * arbitrate who gets to refresh it.
1360  *
1361  *   0 means no refresh needed.
1362  *
1363  *   > 0 means you need to refresh this and you MUST call
1364  *   ocfs2_complete_lock_res_refresh afterwards. */
1365 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1366 {
1367         unsigned long flags;
1368         int status = 0;
1369
1370         mlog_entry_void();
1371
1372 refresh_check:
1373         spin_lock_irqsave(&lockres->l_lock, flags);
1374         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1375                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1376                 goto bail;
1377         }
1378
1379         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1380                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1381
1382                 ocfs2_wait_on_refreshing_lock(lockres);
1383                 goto refresh_check;
1384         }
1385
1386         /* Ok, I'll be the one to refresh this lock. */
1387         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1388         spin_unlock_irqrestore(&lockres->l_lock, flags);
1389
1390         status = 1;
1391 bail:
1392         mlog_exit(status);
1393         return status;
1394 }
1395
1396 /* If status is non zero, I'll mark it as not being in refresh
1397  * anymroe, but i won't clear the needs refresh flag. */
1398 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1399                                                    int status)
1400 {
1401         unsigned long flags;
1402         mlog_entry_void();
1403
1404         spin_lock_irqsave(&lockres->l_lock, flags);
1405         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1406         if (!status)
1407                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1408         spin_unlock_irqrestore(&lockres->l_lock, flags);
1409
1410         wake_up(&lockres->l_event);
1411
1412         mlog_exit_void();
1413 }
1414
1415 /* may or may not return a bh if it went to disk. */
1416 static int ocfs2_meta_lock_update(struct inode *inode,
1417                                   struct buffer_head **bh)
1418 {
1419         int status = 0;
1420         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1421         struct ocfs2_lock_res *lockres;
1422         struct ocfs2_dinode *fe;
1423
1424         mlog_entry_void();
1425
1426         spin_lock(&oi->ip_lock);
1427         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1428                 mlog(0, "Orphaned inode %llu was deleted while we "
1429                      "were waiting on a lock. ip_flags = 0x%x\n",
1430                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1431                 spin_unlock(&oi->ip_lock);
1432                 status = -ENOENT;
1433                 goto bail;
1434         }
1435         spin_unlock(&oi->ip_lock);
1436
1437         lockres = &oi->ip_meta_lockres;
1438
1439         if (!ocfs2_should_refresh_lock_res(lockres))
1440                 goto bail;
1441
1442         /* This will discard any caching information we might have had
1443          * for the inode metadata. */
1444         ocfs2_metadata_cache_purge(inode);
1445
1446         /* will do nothing for inode types that don't use the extent
1447          * map (directories, bitmap files, etc) */
1448         ocfs2_extent_map_trunc(inode, 0);
1449
1450         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1451                 mlog(0, "Trusting LVB on inode %llu\n",
1452                      (unsigned long long)oi->ip_blkno);
1453                 ocfs2_refresh_inode_from_lvb(inode);
1454         } else {
1455                 /* Boo, we have to go to disk. */
1456                 /* read bh, cast, ocfs2_refresh_inode */
1457                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1458                                           bh, OCFS2_BH_CACHED, inode);
1459                 if (status < 0) {
1460                         mlog_errno(status);
1461                         goto bail_refresh;
1462                 }
1463                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1464
1465                 /* This is a good chance to make sure we're not
1466                  * locking an invalid object.
1467                  *
1468                  * We bug on a stale inode here because we checked
1469                  * above whether it was wiped from disk. The wiping
1470                  * node provides a guarantee that we receive that
1471                  * message and can mark the inode before dropping any
1472                  * locks associated with it. */
1473                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1474                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1475                         status = -EIO;
1476                         goto bail_refresh;
1477                 }
1478                 mlog_bug_on_msg(inode->i_generation !=
1479                                 le32_to_cpu(fe->i_generation),
1480                                 "Invalid dinode %llu disk generation: %u "
1481                                 "inode->i_generation: %u\n",
1482                                 (unsigned long long)oi->ip_blkno,
1483                                 le32_to_cpu(fe->i_generation),
1484                                 inode->i_generation);
1485                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1486                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1487                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1488                                 (unsigned long long)oi->ip_blkno,
1489                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1490                                 le32_to_cpu(fe->i_flags));
1491
1492                 ocfs2_refresh_inode(inode, fe);
1493         }
1494
1495         status = 0;
1496 bail_refresh:
1497         ocfs2_complete_lock_res_refresh(lockres, status);
1498 bail:
1499         mlog_exit(status);
1500         return status;
1501 }
1502
1503 static int ocfs2_assign_bh(struct inode *inode,
1504                            struct buffer_head **ret_bh,
1505                            struct buffer_head *passed_bh)
1506 {
1507         int status;
1508
1509         if (passed_bh) {
1510                 /* Ok, the update went to disk for us, use the
1511                  * returned bh. */
1512                 *ret_bh = passed_bh;
1513                 get_bh(*ret_bh);
1514
1515                 return 0;
1516         }
1517
1518         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1519                                   OCFS2_I(inode)->ip_blkno,
1520                                   ret_bh,
1521                                   OCFS2_BH_CACHED,
1522                                   inode);
1523         if (status < 0)
1524                 mlog_errno(status);
1525
1526         return status;
1527 }
1528
1529 /*
1530  * returns < 0 error if the callback will never be called, otherwise
1531  * the result of the lock will be communicated via the callback.
1532  */
1533 int ocfs2_meta_lock_full(struct inode *inode,
1534                          struct ocfs2_journal_handle *handle,
1535                          struct buffer_head **ret_bh,
1536                          int ex,
1537                          int arg_flags)
1538 {
1539         int status, level, dlm_flags, acquired;
1540         struct ocfs2_lock_res *lockres;
1541         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1542         struct buffer_head *local_bh = NULL;
1543
1544         BUG_ON(!inode);
1545
1546         mlog_entry_void();
1547
1548         mlog(0, "inode %llu, take %s META lock\n",
1549              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1550              ex ? "EXMODE" : "PRMODE");
1551
1552         status = 0;
1553         acquired = 0;
1554         /* We'll allow faking a readonly metadata lock for
1555          * rodevices. */
1556         if (ocfs2_is_hard_readonly(osb)) {
1557                 if (ex)
1558                         status = -EROFS;
1559                 goto bail;
1560         }
1561
1562         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1563                 wait_event(osb->recovery_event,
1564                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1565
1566         acquired = 0;
1567         lockres = &OCFS2_I(inode)->ip_meta_lockres;
1568         level = ex ? LKM_EXMODE : LKM_PRMODE;
1569         dlm_flags = 0;
1570         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1571                 dlm_flags |= LKM_NOQUEUE;
1572
1573         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1574         if (status < 0) {
1575                 if (status != -EAGAIN && status != -EIOCBRETRY)
1576                         mlog_errno(status);
1577                 goto bail;
1578         }
1579
1580         /* Notify the error cleanup path to drop the cluster lock. */
1581         acquired = 1;
1582
1583         /* We wait twice because a node may have died while we were in
1584          * the lower dlm layers. The second time though, we've
1585          * committed to owning this lock so we don't allow signals to
1586          * abort the operation. */
1587         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1588                 wait_event(osb->recovery_event,
1589                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1590
1591         /*
1592          * We only see this flag if we're being called from
1593          * ocfs2_read_locked_inode(). It means we're locking an inode
1594          * which hasn't been populated yet, so clear the refresh flag
1595          * and let the caller handle it.
1596          */
1597         if (inode->i_state & I_NEW) {
1598                 status = 0;
1599                 ocfs2_complete_lock_res_refresh(lockres, 0);
1600                 goto bail;
1601         }
1602
1603         /* This is fun. The caller may want a bh back, or it may
1604          * not. ocfs2_meta_lock_update definitely wants one in, but
1605          * may or may not read one, depending on what's in the
1606          * LVB. The result of all of this is that we've *only* gone to
1607          * disk if we have to, so the complexity is worthwhile. */
1608         status = ocfs2_meta_lock_update(inode, &local_bh);
1609         if (status < 0) {
1610                 if (status != -ENOENT)
1611                         mlog_errno(status);
1612                 goto bail;
1613         }
1614
1615         if (ret_bh) {
1616                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1617                 if (status < 0) {
1618                         mlog_errno(status);
1619                         goto bail;
1620                 }
1621         }
1622
1623         if (handle) {
1624                 status = ocfs2_handle_add_lock(handle, inode);
1625                 if (status < 0)
1626                         mlog_errno(status);
1627         }
1628
1629 bail:
1630         if (status < 0) {
1631                 if (ret_bh && (*ret_bh)) {
1632                         brelse(*ret_bh);
1633                         *ret_bh = NULL;
1634                 }
1635                 if (acquired)
1636                         ocfs2_meta_unlock(inode, ex);
1637         }
1638
1639         if (local_bh)
1640                 brelse(local_bh);
1641
1642         mlog_exit(status);
1643         return status;
1644 }
1645
1646 /*
1647  * This is working around a lock inversion between tasks acquiring DLM locks
1648  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1649  * while acquiring page locks.
1650  *
1651  * ** These _with_page variantes are only intended to be called from aop
1652  * methods that hold page locks and return a very specific *positive* error
1653  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1654  *
1655  * The DLM is called such that it returns -EAGAIN if it would have blocked
1656  * waiting for the vote thread.  In that case we unlock our page so the vote
1657  * thread can make progress.  Once we've done this we have to return
1658  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1659  * into the VFS who will then immediately retry the aop call.
1660  *
1661  * We do a blocking lock and immediate unlock before returning, though, so that
1662  * the lock has a great chance of being cached on this node by the time the VFS
1663  * calls back to retry the aop.    This has a potential to livelock as nodes
1664  * ping locks back and forth, but that's a risk we're willing to take to avoid
1665  * the lock inversion simply.
1666  */
1667 int ocfs2_meta_lock_with_page(struct inode *inode,
1668                               struct ocfs2_journal_handle *handle,
1669                               struct buffer_head **ret_bh,
1670                               int ex,
1671                               struct page *page)
1672 {
1673         int ret;
1674
1675         ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1676                                    OCFS2_LOCK_NONBLOCK);
1677         if (ret == -EAGAIN) {
1678                 unlock_page(page);
1679                 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1680                         ocfs2_meta_unlock(inode, ex);
1681                 ret = AOP_TRUNCATED_PAGE;
1682         }
1683
1684         return ret;
1685 }
1686
1687 void ocfs2_meta_unlock(struct inode *inode,
1688                        int ex)
1689 {
1690         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1691         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1692
1693         mlog_entry_void();
1694
1695         mlog(0, "inode %llu drop %s META lock\n",
1696              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1697              ex ? "EXMODE" : "PRMODE");
1698
1699         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1700                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1701
1702         mlog_exit_void();
1703 }
1704
1705 int ocfs2_super_lock(struct ocfs2_super *osb,
1706                      int ex)
1707 {
1708         int status;
1709         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1710         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1711         struct buffer_head *bh;
1712         struct ocfs2_slot_info *si = osb->slot_info;
1713
1714         mlog_entry_void();
1715
1716         if (ocfs2_is_hard_readonly(osb))
1717                 return -EROFS;
1718
1719         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1720         if (status < 0) {
1721                 mlog_errno(status);
1722                 goto bail;
1723         }
1724
1725         /* The super block lock path is really in the best position to
1726          * know when resources covered by the lock need to be
1727          * refreshed, so we do it here. Of course, making sense of
1728          * everything is up to the caller :) */
1729         status = ocfs2_should_refresh_lock_res(lockres);
1730         if (status < 0) {
1731                 mlog_errno(status);
1732                 goto bail;
1733         }
1734         if (status) {
1735                 bh = si->si_bh;
1736                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1737                                           si->si_inode);
1738                 if (status == 0)
1739                         ocfs2_update_slot_info(si);
1740
1741                 ocfs2_complete_lock_res_refresh(lockres, status);
1742
1743                 if (status < 0)
1744                         mlog_errno(status);
1745         }
1746 bail:
1747         mlog_exit(status);
1748         return status;
1749 }
1750
1751 void ocfs2_super_unlock(struct ocfs2_super *osb,
1752                         int ex)
1753 {
1754         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1755         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1756
1757         ocfs2_cluster_unlock(osb, lockres, level);
1758 }
1759
1760 int ocfs2_rename_lock(struct ocfs2_super *osb)
1761 {
1762         int status;
1763         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1764
1765         if (ocfs2_is_hard_readonly(osb))
1766                 return -EROFS;
1767
1768         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1769         if (status < 0)
1770                 mlog_errno(status);
1771
1772         return status;
1773 }
1774
1775 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1776 {
1777         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1778
1779         ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1780 }
1781
1782 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1783 {
1784         int ret;
1785         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1786         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1787         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1788
1789         BUG_ON(!dl);
1790
1791         if (ocfs2_is_hard_readonly(osb))
1792                 return -EROFS;
1793
1794         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1795         if (ret < 0)
1796                 mlog_errno(ret);
1797
1798         return ret;
1799 }
1800
1801 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1802 {
1803         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1804         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1805         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1806
1807         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1808 }
1809
1810 /* Reference counting of the dlm debug structure. We want this because
1811  * open references on the debug inodes can live on after a mount, so
1812  * we can't rely on the ocfs2_super to always exist. */
1813 static void ocfs2_dlm_debug_free(struct kref *kref)
1814 {
1815         struct ocfs2_dlm_debug *dlm_debug;
1816
1817         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1818
1819         kfree(dlm_debug);
1820 }
1821
1822 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1823 {
1824         if (dlm_debug)
1825                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1826 }
1827
1828 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1829 {
1830         kref_get(&debug->d_refcnt);
1831 }
1832
1833 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1834 {
1835         struct ocfs2_dlm_debug *dlm_debug;
1836
1837         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1838         if (!dlm_debug) {
1839                 mlog_errno(-ENOMEM);
1840                 goto out;
1841         }
1842
1843         kref_init(&dlm_debug->d_refcnt);
1844         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1845         dlm_debug->d_locking_state = NULL;
1846 out:
1847         return dlm_debug;
1848 }
1849
1850 /* Access to this is arbitrated for us via seq_file->sem. */
1851 struct ocfs2_dlm_seq_priv {
1852         struct ocfs2_dlm_debug *p_dlm_debug;
1853         struct ocfs2_lock_res p_iter_res;
1854         struct ocfs2_lock_res p_tmp_res;
1855 };
1856
1857 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1858                                                  struct ocfs2_dlm_seq_priv *priv)
1859 {
1860         struct ocfs2_lock_res *iter, *ret = NULL;
1861         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1862
1863         assert_spin_locked(&ocfs2_dlm_tracking_lock);
1864
1865         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1866                 /* discover the head of the list */
1867                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1868                         mlog(0, "End of list found, %p\n", ret);
1869                         break;
1870                 }
1871
1872                 /* We track our "dummy" iteration lockres' by a NULL
1873                  * l_ops field. */
1874                 if (iter->l_ops != NULL) {
1875                         ret = iter;
1876                         break;
1877                 }
1878         }
1879
1880         return ret;
1881 }
1882
1883 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1884 {
1885         struct ocfs2_dlm_seq_priv *priv = m->private;
1886         struct ocfs2_lock_res *iter;
1887
1888         spin_lock(&ocfs2_dlm_tracking_lock);
1889         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1890         if (iter) {
1891                 /* Since lockres' have the lifetime of their container
1892                  * (which can be inodes, ocfs2_supers, etc) we want to
1893                  * copy this out to a temporary lockres while still
1894                  * under the spinlock. Obviously after this we can't
1895                  * trust any pointers on the copy returned, but that's
1896                  * ok as the information we want isn't typically held
1897                  * in them. */
1898                 priv->p_tmp_res = *iter;
1899                 iter = &priv->p_tmp_res;
1900         }
1901         spin_unlock(&ocfs2_dlm_tracking_lock);
1902
1903         return iter;
1904 }
1905
1906 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1907 {
1908 }
1909
1910 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1911 {
1912         struct ocfs2_dlm_seq_priv *priv = m->private;
1913         struct ocfs2_lock_res *iter = v;
1914         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1915
1916         spin_lock(&ocfs2_dlm_tracking_lock);
1917         iter = ocfs2_dlm_next_res(iter, priv);
1918         list_del_init(&dummy->l_debug_list);
1919         if (iter) {
1920                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1921                 priv->p_tmp_res = *iter;
1922                 iter = &priv->p_tmp_res;
1923         }
1924         spin_unlock(&ocfs2_dlm_tracking_lock);
1925
1926         return iter;
1927 }
1928
1929 /* So that debugfs.ocfs2 can determine which format is being used */
1930 #define OCFS2_DLM_DEBUG_STR_VERSION 1
1931 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1932 {
1933         int i;
1934         char *lvb;
1935         struct ocfs2_lock_res *lockres = v;
1936
1937         if (!lockres)
1938                 return -EINVAL;
1939
1940         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1941
1942         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1943                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1944                            lockres->l_name,
1945                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1946         else
1947                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1948
1949         seq_printf(m, "%d\t"
1950                    "0x%lx\t"
1951                    "0x%x\t"
1952                    "0x%x\t"
1953                    "%u\t"
1954                    "%u\t"
1955                    "%d\t"
1956                    "%d\t",
1957                    lockres->l_level,
1958                    lockres->l_flags,
1959                    lockres->l_action,
1960                    lockres->l_unlock_action,
1961                    lockres->l_ro_holders,
1962                    lockres->l_ex_holders,
1963                    lockres->l_requested,
1964                    lockres->l_blocking);
1965
1966         /* Dump the raw LVB */
1967         lvb = lockres->l_lksb.lvb;
1968         for(i = 0; i < DLM_LVB_LEN; i++)
1969                 seq_printf(m, "0x%x\t", lvb[i]);
1970
1971         /* End the line */
1972         seq_printf(m, "\n");
1973         return 0;
1974 }
1975
1976 static struct seq_operations ocfs2_dlm_seq_ops = {
1977         .start =        ocfs2_dlm_seq_start,
1978         .stop =         ocfs2_dlm_seq_stop,
1979         .next =         ocfs2_dlm_seq_next,
1980         .show =         ocfs2_dlm_seq_show,
1981 };
1982
1983 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
1984 {
1985         struct seq_file *seq = (struct seq_file *) file->private_data;
1986         struct ocfs2_dlm_seq_priv *priv = seq->private;
1987         struct ocfs2_lock_res *res = &priv->p_iter_res;
1988
1989         ocfs2_remove_lockres_tracking(res);
1990         ocfs2_put_dlm_debug(priv->p_dlm_debug);
1991         return seq_release_private(inode, file);
1992 }
1993
1994 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
1995 {
1996         int ret;
1997         struct ocfs2_dlm_seq_priv *priv;
1998         struct seq_file *seq;
1999         struct ocfs2_super *osb;
2000
2001         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2002         if (!priv) {
2003                 ret = -ENOMEM;
2004                 mlog_errno(ret);
2005                 goto out;
2006         }
2007         osb = (struct ocfs2_super *) inode->u.generic_ip;
2008         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2009         priv->p_dlm_debug = osb->osb_dlm_debug;
2010         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2011
2012         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2013         if (ret) {
2014                 kfree(priv);
2015                 mlog_errno(ret);
2016                 goto out;
2017         }
2018
2019         seq = (struct seq_file *) file->private_data;
2020         seq->private = priv;
2021
2022         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2023                                    priv->p_dlm_debug);
2024
2025 out:
2026         return ret;
2027 }
2028
2029 static const struct file_operations ocfs2_dlm_debug_fops = {
2030         .open =         ocfs2_dlm_debug_open,
2031         .release =      ocfs2_dlm_debug_release,
2032         .read =         seq_read,
2033         .llseek =       seq_lseek,
2034 };
2035
2036 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2037 {
2038         int ret = 0;
2039         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2040
2041         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2042                                                          S_IFREG|S_IRUSR,
2043                                                          osb->osb_debug_root,
2044                                                          osb,
2045                                                          &ocfs2_dlm_debug_fops);
2046         if (!dlm_debug->d_locking_state) {
2047                 ret = -EINVAL;
2048                 mlog(ML_ERROR,
2049                      "Unable to create locking state debugfs file.\n");
2050                 goto out;
2051         }
2052
2053         ocfs2_get_dlm_debug(dlm_debug);
2054 out:
2055         return ret;
2056 }
2057
2058 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2059 {
2060         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2061
2062         if (dlm_debug) {
2063                 debugfs_remove(dlm_debug->d_locking_state);
2064                 ocfs2_put_dlm_debug(dlm_debug);
2065         }
2066 }
2067
2068 int ocfs2_dlm_init(struct ocfs2_super *osb)
2069 {
2070         int status;
2071         u32 dlm_key;
2072         struct dlm_ctxt *dlm;
2073
2074         mlog_entry_void();
2075
2076         status = ocfs2_dlm_init_debug(osb);
2077         if (status < 0) {
2078                 mlog_errno(status);
2079                 goto bail;
2080         }
2081
2082         /* launch vote thread */
2083         osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2084         if (IS_ERR(osb->vote_task)) {
2085                 status = PTR_ERR(osb->vote_task);
2086                 osb->vote_task = NULL;
2087                 mlog_errno(status);
2088                 goto bail;
2089         }
2090
2091         /* used by the dlm code to make message headers unique, each
2092          * node in this domain must agree on this. */
2093         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2094
2095         /* for now, uuid == domain */
2096         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2097         if (IS_ERR(dlm)) {
2098                 status = PTR_ERR(dlm);
2099                 mlog_errno(status);
2100                 goto bail;
2101         }
2102
2103         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2104         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2105
2106         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2107
2108         osb->dlm = dlm;
2109
2110         status = 0;
2111 bail:
2112         if (status < 0) {
2113                 ocfs2_dlm_shutdown_debug(osb);
2114                 if (osb->vote_task)
2115                         kthread_stop(osb->vote_task);
2116         }
2117
2118         mlog_exit(status);
2119         return status;
2120 }
2121
2122 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2123 {
2124         mlog_entry_void();
2125
2126         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2127
2128         ocfs2_drop_osb_locks(osb);
2129
2130         if (osb->vote_task) {
2131                 kthread_stop(osb->vote_task);
2132                 osb->vote_task = NULL;
2133         }
2134
2135         ocfs2_lock_res_free(&osb->osb_super_lockres);
2136         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2137
2138         dlm_unregister_domain(osb->dlm);
2139         osb->dlm = NULL;
2140
2141         ocfs2_dlm_shutdown_debug(osb);
2142
2143         mlog_exit_void();
2144 }
2145
2146 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2147 {
2148         struct ocfs2_lock_res *lockres = opaque;
2149         unsigned long flags;
2150
2151         mlog_entry_void();
2152
2153         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2154              lockres->l_unlock_action);
2155
2156         spin_lock_irqsave(&lockres->l_lock, flags);
2157         /* We tried to cancel a convert request, but it was already
2158          * granted. All we want to do here is clear our unlock
2159          * state. The wake_up call done at the bottom is redundant
2160          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2161          * hurt anything anyway */
2162         if (status == DLM_CANCELGRANT &&
2163             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2164                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2165
2166                 /* We don't clear the busy flag in this case as it
2167                  * should have been cleared by the ast which the dlm
2168                  * has called. */
2169                 goto complete_unlock;
2170         }
2171
2172         if (status != DLM_NORMAL) {
2173                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2174                      "unlock_action %d\n", status, lockres->l_name,
2175                      lockres->l_unlock_action);
2176                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2177                 return;
2178         }
2179
2180         switch(lockres->l_unlock_action) {
2181         case OCFS2_UNLOCK_CANCEL_CONVERT:
2182                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2183                 lockres->l_action = OCFS2_AST_INVALID;
2184                 break;
2185         case OCFS2_UNLOCK_DROP_LOCK:
2186                 lockres->l_level = LKM_IVMODE;
2187                 break;
2188         default:
2189                 BUG();
2190         }
2191
2192         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2193 complete_unlock:
2194         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2195         spin_unlock_irqrestore(&lockres->l_lock, flags);
2196
2197         wake_up(&lockres->l_event);
2198
2199         mlog_exit_void();
2200 }
2201
2202 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2203
2204 struct drop_lock_cb {
2205         ocfs2_pre_drop_cb_t     *drop_func;
2206         void                    *drop_data;
2207 };
2208
2209 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2210                            struct ocfs2_lock_res *lockres,
2211                            struct drop_lock_cb *dcb)
2212 {
2213         enum dlm_status status;
2214         unsigned long flags;
2215
2216         /* We didn't get anywhere near actually using this lockres. */
2217         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2218                 goto out;
2219
2220         spin_lock_irqsave(&lockres->l_lock, flags);
2221
2222         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2223                         "lockres %s, flags 0x%lx\n",
2224                         lockres->l_name, lockres->l_flags);
2225
2226         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2227                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2228                      "%u, unlock_action = %u\n",
2229                      lockres->l_name, lockres->l_flags, lockres->l_action,
2230                      lockres->l_unlock_action);
2231
2232                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2233
2234                 /* XXX: Today we just wait on any busy
2235                  * locks... Perhaps we need to cancel converts in the
2236                  * future? */
2237                 ocfs2_wait_on_busy_lock(lockres);
2238
2239                 spin_lock_irqsave(&lockres->l_lock, flags);
2240         }
2241
2242         if (dcb)
2243                 dcb->drop_func(lockres, dcb->drop_data);
2244
2245         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2246                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2247                      lockres->l_name);
2248         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2249                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2250
2251         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2252                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2253                 goto out;
2254         }
2255
2256         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2257
2258         /* make sure we never get here while waiting for an ast to
2259          * fire. */
2260         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2261
2262         /* is this necessary? */
2263         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2264         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2265         spin_unlock_irqrestore(&lockres->l_lock, flags);
2266
2267         mlog(0, "lock %s\n", lockres->l_name);
2268
2269         status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
2270                            ocfs2_unlock_ast, lockres);
2271         if (status != DLM_NORMAL) {
2272                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2273                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2274                 dlm_print_one_lock(lockres->l_lksb.lockid);
2275                 BUG();
2276         }
2277         mlog(0, "lock %s, successfull return from dlmunlock\n",
2278              lockres->l_name);
2279
2280         ocfs2_wait_on_busy_lock(lockres);
2281 out:
2282         mlog_exit(0);
2283         return 0;
2284 }
2285
2286 /* Mark the lockres as being dropped. It will no longer be
2287  * queued if blocking, but we still may have to wait on it
2288  * being dequeued from the vote thread before we can consider
2289  * it safe to drop. 
2290  *
2291  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2292 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2293 {
2294         int status;
2295         struct ocfs2_mask_waiter mw;
2296         unsigned long flags;
2297
2298         ocfs2_init_mask_waiter(&mw);
2299
2300         spin_lock_irqsave(&lockres->l_lock, flags);
2301         lockres->l_flags |= OCFS2_LOCK_FREEING;
2302         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2303                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2304                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2305
2306                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2307
2308                 status = ocfs2_wait_for_mask(&mw);
2309                 if (status)
2310                         mlog_errno(status);
2311
2312                 spin_lock_irqsave(&lockres->l_lock, flags);
2313         }
2314         spin_unlock_irqrestore(&lockres->l_lock, flags);
2315 }
2316
2317 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2318                                struct ocfs2_lock_res *lockres)
2319 {
2320         int ret;
2321
2322         ocfs2_mark_lockres_freeing(lockres);
2323         ret = ocfs2_drop_lock(osb, lockres, NULL);
2324         if (ret)
2325                 mlog_errno(ret);
2326 }
2327
2328 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2329 {
2330         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2331         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2332 }
2333
2334 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2335 {
2336         struct inode *inode = data;
2337
2338         /* the metadata lock requires a bit more work as we have an
2339          * LVB to worry about. */
2340         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2341             lockres->l_level == LKM_EXMODE &&
2342             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2343                 __ocfs2_stuff_meta_lvb(inode);
2344 }
2345
2346 int ocfs2_drop_inode_locks(struct inode *inode)
2347 {
2348         int status, err;
2349         struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2350
2351         mlog_entry_void();
2352
2353         /* No need to call ocfs2_mark_lockres_freeing here -
2354          * ocfs2_clear_inode has done it for us. */
2355
2356         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2357                               &OCFS2_I(inode)->ip_data_lockres,
2358                               NULL);
2359         if (err < 0)
2360                 mlog_errno(err);
2361
2362         status = err;
2363
2364         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2365                               &OCFS2_I(inode)->ip_meta_lockres,
2366                               &meta_dcb);
2367         if (err < 0)
2368                 mlog_errno(err);
2369         if (err < 0 && !status)
2370                 status = err;
2371
2372         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2373                               &OCFS2_I(inode)->ip_rw_lockres,
2374                               NULL);
2375         if (err < 0)
2376                 mlog_errno(err);
2377         if (err < 0 && !status)
2378                 status = err;
2379
2380         mlog_exit(status);
2381         return status;
2382 }
2383
2384 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2385                                       int new_level)
2386 {
2387         assert_spin_locked(&lockres->l_lock);
2388
2389         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2390
2391         if (lockres->l_level <= new_level) {
2392                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2393                      lockres->l_level, new_level);
2394                 BUG();
2395         }
2396
2397         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2398              lockres->l_name, new_level, lockres->l_blocking);
2399
2400         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2401         lockres->l_requested = new_level;
2402         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2403 }
2404
2405 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2406                                   struct ocfs2_lock_res *lockres,
2407                                   int new_level,
2408                                   int lvb)
2409 {
2410         int ret, dlm_flags = LKM_CONVERT;
2411         enum dlm_status status;
2412
2413         mlog_entry_void();
2414
2415         if (lvb)
2416                 dlm_flags |= LKM_VALBLK;
2417
2418         status = dlmlock(osb->dlm,
2419                          new_level,
2420                          &lockres->l_lksb,
2421                          dlm_flags,
2422                          lockres->l_name,
2423                          OCFS2_LOCK_ID_MAX_LEN - 1,
2424                          ocfs2_locking_ast,
2425                          lockres,
2426                          ocfs2_blocking_ast);
2427         if (status != DLM_NORMAL) {
2428                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2429                 ret = -EINVAL;
2430                 ocfs2_recover_from_dlm_error(lockres, 1);
2431                 goto bail;
2432         }
2433
2434         ret = 0;
2435 bail:
2436         mlog_exit(ret);
2437         return ret;
2438 }
2439
2440 /* returns 1 when the caller should unlock and call dlmunlock */
2441 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2442                                         struct ocfs2_lock_res *lockres)
2443 {
2444         assert_spin_locked(&lockres->l_lock);
2445
2446         mlog_entry_void();
2447         mlog(0, "lock %s\n", lockres->l_name);
2448
2449         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2450                 /* If we're already trying to cancel a lock conversion
2451                  * then just drop the spinlock and allow the caller to
2452                  * requeue this lock. */
2453
2454                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2455                 return 0;
2456         }
2457
2458         /* were we in a convert when we got the bast fire? */
2459         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2460                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2461         /* set things up for the unlockast to know to just
2462          * clear out the ast_action and unset busy, etc. */
2463         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2464
2465         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2466                         "lock %s, invalid flags: 0x%lx\n",
2467                         lockres->l_name, lockres->l_flags);
2468
2469         return 1;
2470 }
2471
2472 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2473                                 struct ocfs2_lock_res *lockres)
2474 {
2475         int ret;
2476         enum dlm_status status;
2477
2478         mlog_entry_void();
2479         mlog(0, "lock %s\n", lockres->l_name);
2480
2481         ret = 0;
2482         status = dlmunlock(osb->dlm,
2483                            &lockres->l_lksb,
2484                            LKM_CANCEL,
2485                            ocfs2_unlock_ast,
2486                            lockres);
2487         if (status != DLM_NORMAL) {
2488                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2489                 ret = -EINVAL;
2490                 ocfs2_recover_from_dlm_error(lockres, 0);
2491         }
2492
2493         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2494
2495         mlog_exit(ret);
2496         return ret;
2497 }
2498
2499 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2500                                                   struct ocfs2_lock_res *lockres,
2501                                                   int new_level)
2502 {
2503         int ret;
2504
2505         mlog_entry_void();
2506
2507         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2508
2509         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2510                 ret = 0;
2511                 mlog(0, "lockres %s currently being refreshed -- backing "
2512                      "off!\n", lockres->l_name);
2513         } else if (new_level == LKM_PRMODE)
2514                 ret = !lockres->l_ex_holders &&
2515                         ocfs2_inode_fully_checkpointed(inode);
2516         else /* Must be NLMODE we're converting to. */
2517                 ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2518                         ocfs2_inode_fully_checkpointed(inode);
2519
2520         mlog_exit(ret);
2521         return ret;
2522 }
2523
2524 static int ocfs2_do_unblock_meta(struct inode *inode,
2525                                  int *requeue)
2526 {
2527         int new_level;
2528         int set_lvb = 0;
2529         int ret = 0;
2530         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2531         unsigned long flags;
2532
2533         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2534
2535         mlog_entry_void();
2536
2537         spin_lock_irqsave(&lockres->l_lock, flags);
2538
2539         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2540
2541         mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2542              lockres->l_blocking);
2543
2544         BUG_ON(lockres->l_level != LKM_EXMODE &&
2545                lockres->l_level != LKM_PRMODE);
2546
2547         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2548                 *requeue = 1;
2549                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2550                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2551                 if (ret) {
2552                         ret = ocfs2_cancel_convert(osb, lockres);
2553                         if (ret < 0)
2554                                 mlog_errno(ret);
2555                 }
2556                 goto leave;
2557         }
2558
2559         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2560
2561         mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2562              lockres->l_level, lockres->l_blocking, new_level);
2563
2564         if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2565                 if (lockres->l_level == LKM_EXMODE)
2566                         set_lvb = 1;
2567
2568                 /* If the lock hasn't been refreshed yet (rare), then
2569                  * our memory inode values are old and we skip
2570                  * stuffing the lvb. There's no need to actually clear
2571                  * out the lvb here as it's value is still valid. */
2572                 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2573                         if (set_lvb)
2574                                 __ocfs2_stuff_meta_lvb(inode);
2575                 } else
2576                         mlog(0, "lockres %s: downconverting stale lock!\n",
2577                              lockres->l_name);
2578
2579                 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2580                      "l_blocking=%d, new_level=%d\n",
2581                      lockres->l_level, lockres->l_blocking, new_level);
2582
2583                 ocfs2_prepare_downconvert(lockres, new_level);
2584                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2585                 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2586                 goto leave;
2587         }
2588         if (!ocfs2_inode_fully_checkpointed(inode))
2589                 ocfs2_start_checkpoint(osb);
2590
2591         *requeue = 1;
2592         spin_unlock_irqrestore(&lockres->l_lock, flags);
2593         ret = 0;
2594 leave:
2595         mlog_exit(ret);
2596         return ret;
2597 }
2598
2599 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2600                                       struct ocfs2_lock_res *lockres,
2601                                       struct ocfs2_unblock_ctl *ctl,
2602                                       ocfs2_convert_worker_t *worker)
2603 {
2604         unsigned long flags;
2605         int blocking;
2606         int new_level;
2607         int ret = 0;
2608
2609         mlog_entry_void();
2610
2611         spin_lock_irqsave(&lockres->l_lock, flags);
2612
2613         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2614
2615 recheck:
2616         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2617                 ctl->requeue = 1;
2618                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2619                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2620                 if (ret) {
2621                         ret = ocfs2_cancel_convert(osb, lockres);
2622                         if (ret < 0)
2623                                 mlog_errno(ret);
2624                 }
2625                 goto leave;
2626         }
2627
2628         /* if we're blocking an exclusive and we have *any* holders,
2629          * then requeue. */
2630         if ((lockres->l_blocking == LKM_EXMODE)
2631             && (lockres->l_ex_holders || lockres->l_ro_holders)) {
2632                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2633                 ctl->requeue = 1;
2634                 ret = 0;
2635                 goto leave;
2636         }
2637
2638         /* If it's a PR we're blocking, then only
2639          * requeue if we've got any EX holders */
2640         if (lockres->l_blocking == LKM_PRMODE &&
2641             lockres->l_ex_holders) {
2642                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2643                 ctl->requeue = 1;
2644                 ret = 0;
2645                 goto leave;
2646         }
2647
2648         /* If we get here, then we know that there are no more
2649          * incompatible holders (and anyone asking for an incompatible
2650          * lock is blocked). We can now downconvert the lock */
2651         if (!worker)
2652                 goto downconvert;
2653
2654         /* Some lockres types want to do a bit of work before
2655          * downconverting a lock. Allow that here. The worker function
2656          * may sleep, so we save off a copy of what we're blocking as
2657          * it may change while we're not holding the spin lock. */
2658         blocking = lockres->l_blocking;
2659         spin_unlock_irqrestore(&lockres->l_lock, flags);
2660
2661         ctl->unblock_action = worker(lockres, blocking);
2662
2663         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2664                 goto leave;
2665
2666         spin_lock_irqsave(&lockres->l_lock, flags);
2667         if (blocking != lockres->l_blocking) {
2668                 /* If this changed underneath us, then we can't drop
2669                  * it just yet. */
2670                 goto recheck;
2671         }
2672
2673 downconvert:
2674         ctl->requeue = 0;
2675         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2676
2677         ocfs2_prepare_downconvert(lockres, new_level);
2678         spin_unlock_irqrestore(&lockres->l_lock, flags);
2679         ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
2680 leave:
2681         mlog_exit(ret);
2682         return ret;
2683 }
2684
2685 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2686                                      int blocking)
2687 {
2688         struct inode *inode;
2689         struct address_space *mapping;
2690
2691         inode = ocfs2_lock_res_inode(lockres);
2692         mapping = inode->i_mapping;
2693
2694         if (filemap_fdatawrite(mapping)) {
2695                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2696                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
2697         }
2698         sync_mapping_buffers(mapping);
2699         if (blocking == LKM_EXMODE) {
2700                 truncate_inode_pages(mapping, 0);
2701                 unmap_mapping_range(mapping, 0, 0, 0);
2702         } else {
2703                 /* We only need to wait on the I/O if we're not also
2704                  * truncating pages because truncate_inode_pages waits
2705                  * for us above. We don't truncate pages if we're
2706                  * blocking anything < EXMODE because we want to keep
2707                  * them around in that case. */
2708                 filemap_fdatawait(mapping);
2709         }
2710
2711         return UNBLOCK_CONTINUE;
2712 }
2713
2714 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2715                        struct ocfs2_unblock_ctl *ctl)
2716 {
2717         int status;
2718         struct inode *inode;
2719         struct ocfs2_super *osb;
2720
2721         mlog_entry_void();
2722
2723         inode = ocfs2_lock_res_inode(lockres);
2724         osb = OCFS2_SB(inode->i_sb);
2725
2726         mlog(0, "unblock inode %llu\n",
2727              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2728
2729         status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2730                                             ocfs2_data_convert_worker);
2731         if (status < 0)
2732                 mlog_errno(status);
2733
2734         mlog(0, "inode %llu, requeue = %d\n",
2735              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2736
2737         mlog_exit(status);
2738         return status;
2739 }
2740
2741 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2742                                     struct ocfs2_unblock_ctl *ctl)
2743 {
2744         int status;
2745         struct inode *inode;
2746
2747         mlog_entry_void();
2748
2749         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2750
2751         inode  = ocfs2_lock_res_inode(lockres);
2752
2753         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2754                                             lockres, ctl, NULL);
2755         if (status < 0)
2756                 mlog_errno(status);
2757
2758         mlog_exit(status);
2759         return status;
2760 }
2761
2762 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2763                               struct ocfs2_unblock_ctl *ctl)
2764 {
2765         int status;
2766         struct inode *inode;
2767
2768         mlog_entry_void();
2769
2770         inode = ocfs2_lock_res_inode(lockres);
2771
2772         mlog(0, "unblock inode %llu\n",
2773              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2774
2775         status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2776         if (status < 0)
2777                 mlog_errno(status);
2778
2779         mlog(0, "inode %llu, requeue = %d\n",
2780              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2781
2782         mlog_exit(status);
2783         return status;
2784 }
2785
2786 /*
2787  * Does the final reference drop on our dentry lock. Right now this
2788  * happens in the vote thread, but we could choose to simplify the
2789  * dlmglue API and push these off to the ocfs2_wq in the future.
2790  */
2791 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2792                                      struct ocfs2_lock_res *lockres)
2793 {
2794         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2795         ocfs2_dentry_lock_put(osb, dl);
2796 }
2797
2798 /*
2799  * d_delete() matching dentries before the lock downconvert.
2800  *
2801  * At this point, any process waiting to destroy the
2802  * dentry_lock due to last ref count is stopped by the
2803  * OCFS2_LOCK_QUEUED flag.
2804  *
2805  * We have two potential problems
2806  *
2807  * 1) If we do the last reference drop on our dentry_lock (via dput)
2808  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2809  *    the downconvert to finish. Instead we take an elevated
2810  *    reference and push the drop until after we've completed our
2811  *    unblock processing.
2812  *
2813  * 2) There might be another process with a final reference,
2814  *    waiting on us to finish processing. If this is the case, we
2815  *    detect it and exit out - there's no more dentries anyway.
2816  */
2817 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2818                                        int blocking)
2819 {
2820         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2821         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2822         struct dentry *dentry;
2823         unsigned long flags;
2824         int extra_ref = 0;
2825
2826         /*
2827          * This node is blocking another node from getting a read
2828          * lock. This happens when we've renamed within a
2829          * directory. We've forced the other nodes to d_delete(), but
2830          * we never actually dropped our lock because it's still
2831          * valid. The downconvert code will retain a PR for this node,
2832          * so there's no further work to do.
2833          */
2834         if (blocking == LKM_PRMODE)
2835                 return UNBLOCK_CONTINUE;
2836
2837         /*
2838          * Mark this inode as potentially orphaned. The code in
2839          * ocfs2_delete_inode() will figure out whether it actually
2840          * needs to be freed or not.
2841          */
2842         spin_lock(&oi->ip_lock);
2843         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2844         spin_unlock(&oi->ip_lock);
2845
2846         /*
2847          * Yuck. We need to make sure however that the check of
2848          * OCFS2_LOCK_FREEING and the extra reference are atomic with
2849          * respect to a reference decrement or the setting of that
2850          * flag.
2851          */
2852         spin_lock_irqsave(&lockres->l_lock, flags);
2853         spin_lock(&dentry_attach_lock);
2854         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2855             && dl->dl_count) {
2856                 dl->dl_count++;
2857                 extra_ref = 1;
2858         }
2859         spin_unlock(&dentry_attach_lock);
2860         spin_unlock_irqrestore(&lockres->l_lock, flags);
2861
2862         mlog(0, "extra_ref = %d\n", extra_ref);
2863
2864         /*
2865          * We have a process waiting on us in ocfs2_dentry_iput(),
2866          * which means we can't have any more outstanding
2867          * aliases. There's no need to do any more work.
2868          */
2869         if (!extra_ref)
2870                 return UNBLOCK_CONTINUE;
2871
2872         spin_lock(&dentry_attach_lock);
2873         while (1) {
2874                 dentry = ocfs2_find_local_alias(dl->dl_inode,
2875                                                 dl->dl_parent_blkno, 1);
2876                 if (!dentry)
2877                         break;
2878                 spin_unlock(&dentry_attach_lock);
2879
2880                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2881                      dentry->d_name.name);
2882
2883                 /*
2884                  * The following dcache calls may do an
2885                  * iput(). Normally we don't want that from the
2886                  * downconverting thread, but in this case it's ok
2887                  * because the requesting node already has an
2888                  * exclusive lock on the inode, so it can't be queued
2889                  * for a downconvert.
2890                  */
2891                 d_delete(dentry);
2892                 dput(dentry);
2893
2894                 spin_lock(&dentry_attach_lock);
2895         }
2896         spin_unlock(&dentry_attach_lock);
2897
2898         /*
2899          * If we are the last holder of this dentry lock, there is no
2900          * reason to downconvert so skip straight to the unlock.
2901          */
2902         if (dl->dl_count == 1)
2903                 return UNBLOCK_STOP_POST;
2904
2905         return UNBLOCK_CONTINUE_POST;
2906 }
2907
2908 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2909                                      struct ocfs2_unblock_ctl *ctl)
2910 {
2911         int ret;
2912         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2913         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2914
2915         mlog(0, "unblock dentry lock: %llu\n",
2916              (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2917
2918         ret = ocfs2_generic_unblock_lock(osb,
2919                                          lockres,
2920                                          ctl,
2921                                          ocfs2_dentry_convert_worker);
2922         if (ret < 0)
2923                 mlog_errno(ret);
2924
2925         mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2926
2927         return ret;
2928 }
2929
2930 /* Generic unblock function for any lockres whose private data is an
2931  * ocfs2_super pointer. */
2932 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2933                                   struct ocfs2_unblock_ctl *ctl)
2934 {
2935         int status;
2936         struct ocfs2_super *osb;
2937
2938         mlog_entry_void();
2939
2940         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2941
2942         osb = ocfs2_get_lockres_osb(lockres);
2943
2944         status = ocfs2_generic_unblock_lock(osb,
2945                                             lockres,
2946                                             ctl,
2947                                             NULL);
2948         if (status < 0)
2949                 mlog_errno(status);
2950
2951         mlog_exit(status);
2952         return status;
2953 }
2954
2955 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2956                                 struct ocfs2_lock_res *lockres)
2957 {
2958         int status;
2959         struct ocfs2_unblock_ctl ctl = {0, 0,};
2960         unsigned long flags;
2961
2962         /* Our reference to the lockres in this function can be
2963          * considered valid until we remove the OCFS2_LOCK_QUEUED
2964          * flag. */
2965
2966         mlog_entry_void();
2967
2968         BUG_ON(!lockres);
2969         BUG_ON(!lockres->l_ops);
2970         BUG_ON(!lockres->l_ops->unblock);
2971
2972         mlog(0, "lockres %s blocked.\n", lockres->l_name);
2973
2974         /* Detect whether a lock has been marked as going away while
2975          * the vote thread was processing other things. A lock can
2976          * still be marked with OCFS2_LOCK_FREEING after this check,
2977          * but short circuiting here will still save us some
2978          * performance. */
2979         spin_lock_irqsave(&lockres->l_lock, flags);
2980         if (lockres->l_flags & OCFS2_LOCK_FREEING)
2981                 goto unqueue;
2982         spin_unlock_irqrestore(&lockres->l_lock, flags);
2983
2984         status = lockres->l_ops->unblock(lockres, &ctl);
2985         if (status < 0)
2986                 mlog_errno(status);
2987
2988         spin_lock_irqsave(&lockres->l_lock, flags);
2989 unqueue:
2990         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2991                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2992         } else
2993                 ocfs2_schedule_blocked_lock(osb, lockres);
2994
2995         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
2996              ctl.requeue ? "yes" : "no");
2997         spin_unlock_irqrestore(&lockres->l_lock, flags);
2998
2999         if (ctl.unblock_action != UNBLOCK_CONTINUE
3000             && lockres->l_ops->post_unlock)
3001                 lockres->l_ops->post_unlock(osb, lockres);
3002
3003         mlog_exit_void();
3004 }
3005
3006 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3007                                         struct ocfs2_lock_res *lockres)
3008 {
3009         mlog_entry_void();
3010
3011         assert_spin_locked(&lockres->l_lock);
3012
3013         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3014                 /* Do not schedule a lock for downconvert when it's on
3015                  * the way to destruction - any nodes wanting access
3016                  * to the resource will get it soon. */
3017                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3018                      lockres->l_name, lockres->l_flags);
3019                 return;
3020         }
3021
3022         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3023
3024         spin_lock(&osb->vote_task_lock);
3025         if (list_empty(&lockres->l_blocked_list)) {
3026                 list_add_tail(&lockres->l_blocked_list,
3027                               &osb->blocked_lock_list);
3028                 osb->blocked_lock_count++;
3029         }
3030         spin_unlock(&osb->vote_task_lock);
3031
3032         mlog_exit_void();
3033 }
3034
3035 /* This aids in debugging situations where a bad LVB might be involved. */
3036 void ocfs2_dump_meta_lvb_info(u64 level,
3037                               const char *function,
3038                               unsigned int line,
3039                               struct ocfs2_lock_res *lockres)
3040 {
3041         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3042
3043         mlog(level, "LVB information for %s (called from %s:%u):\n",
3044              lockres->l_name, function, line);
3045         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3046              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3047              be32_to_cpu(lvb->lvb_igeneration));
3048         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3049              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3050              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3051              be16_to_cpu(lvb->lvb_imode));
3052         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3053              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3054              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3055              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3056              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3057              be32_to_cpu(lvb->lvb_iattr));
3058 }