ocfs2: move downconvert worker to lockres ops
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40
41 #include <dlm/dlmapi.h>
42
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45
46 #include "ocfs2.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 #include "vote.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 };
69
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ->downconvert_worker functions.
75  *
76  * These control the precise actions of ocfs2_generic_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94                               struct ocfs2_unblock_ctl *ctl);
95 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
96                                         int new_level);
97 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
98
99 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
100                               struct ocfs2_unblock_ctl *ctl);
101 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
102                                      int blocking);
103
104 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
105                                     struct ocfs2_unblock_ctl *ctl);
106
107 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
108                                      struct ocfs2_unblock_ctl *ctl);
109 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
110                                        int blocking);
111
112 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
113                                      struct ocfs2_lock_res *lockres);
114
115 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
116                                   struct ocfs2_unblock_ctl *ctl);
117
118 /*
119  * OCFS2 Lock Resource Operations
120  *
121  * These fine tune the behavior of the generic dlmglue locking infrastructure.
122  */
123 struct ocfs2_lock_res_ops {
124         /*
125          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
126          * this callback if ->l_priv is not an ocfs2_super pointer
127          */
128         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
129         int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
130         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
131
132         /*
133          * Allow a lock type to add checks to determine whether it is
134          * safe to downconvert a lock. Return 0 to re-queue the
135          * downconvert at a later time, nonzero to continue.
136          *
137          * For most locks, the default checks that there are no
138          * incompatible holders are sufficient.
139          *
140          * Called with the lockres spinlock held.
141          */
142         int (*check_downconvert)(struct ocfs2_lock_res *, int);
143
144         /*
145          * Allows a lock type to populate the lock value block. This
146          * is called on downconvert, and when we drop a lock.
147          *
148          * Locks that want to use this should set LOCK_TYPE_USES_LVB
149          * in the flags field.
150          *
151          * Called with the lockres spinlock held.
152          */
153         void (*set_lvb)(struct ocfs2_lock_res *);
154
155         /*
156          * Called from the downconvert thread when it is determined
157          * that a lock will be downconverted. This is called without
158          * any locks held so the function can do work that might
159          * schedule (syncing out data, etc).
160          *
161          * This should return any one of the ocfs2_unblock_action
162          * values, depending on what it wants the thread to do.
163          */
164         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
165
166         /*
167          * LOCK_TYPE_* flags which describe the specific requirements
168          * of a lock type. Descriptions of each individual flag follow.
169          */
170         int flags;
171 };
172
173 /*
174  * Some locks want to "refresh" potentially stale data when a
175  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
176  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
177  * individual lockres l_flags member from the ast function. It is
178  * expected that the locking wrapper will clear the
179  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
180  */
181 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
182
183 /*
184  * Indicate that a lock type makes use of the lock value block. The
185  * ->set_lvb lock type callback must be defined.
186  */
187 #define LOCK_TYPE_USES_LVB              0x2
188
189 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
190                                       struct ocfs2_lock_res *lockres,
191                                       struct ocfs2_unblock_ctl *ctl);
192
193 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
194         .get_osb        = ocfs2_get_inode_osb,
195         .unblock        = ocfs2_unblock_inode_lock,
196         .flags          = 0,
197 };
198
199 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
200         .get_osb        = ocfs2_get_inode_osb,
201         .unblock        = ocfs2_unblock_meta,
202         .check_downconvert = ocfs2_check_meta_downconvert,
203         .set_lvb        = ocfs2_set_meta_lvb,
204         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
205 };
206
207 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
208         .get_osb        = ocfs2_get_inode_osb,
209         .unblock        = ocfs2_unblock_data,
210         .downconvert_worker = ocfs2_data_convert_worker,
211         .flags          = 0,
212 };
213
214 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
215         .unblock        = ocfs2_unblock_osb_lock,
216         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
217 };
218
219 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
220         .unblock        = ocfs2_unblock_osb_lock,
221         .flags          = 0,
222 };
223
224 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
225         .get_osb        = ocfs2_get_dentry_osb,
226         .unblock        = ocfs2_unblock_dentry_lock,
227         .post_unlock    = ocfs2_dentry_post_unlock,
228         .downconvert_worker = ocfs2_dentry_convert_worker,
229         .flags          = 0,
230 };
231
232 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
233 {
234         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
235                 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
236                 lockres->l_type == OCFS2_LOCK_TYPE_RW;
237 }
238
239 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
240 {
241         BUG_ON(!ocfs2_is_inode_lock(lockres));
242
243         return (struct inode *) lockres->l_priv;
244 }
245
246 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
247 {
248         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
249
250         return (struct ocfs2_dentry_lock *)lockres->l_priv;
251 }
252
253 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
254 {
255         if (lockres->l_ops->get_osb)
256                 return lockres->l_ops->get_osb(lockres);
257
258         return (struct ocfs2_super *)lockres->l_priv;
259 }
260
261 static int ocfs2_lock_create(struct ocfs2_super *osb,
262                              struct ocfs2_lock_res *lockres,
263                              int level,
264                              int dlm_flags);
265 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
266                                                      int wanted);
267 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
268                                  struct ocfs2_lock_res *lockres,
269                                  int level);
270 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
271 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
272 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
273 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
274 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
275                                         struct ocfs2_lock_res *lockres);
276 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
277                                                 int convert);
278 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
279         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
280                 "resource %s: %s\n", dlm_errname(_stat), _func, \
281                 _lockres->l_name, dlm_errmsg(_stat));           \
282 } while (0)
283 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
284                                  struct ocfs2_lock_res *lockres);
285 static int ocfs2_meta_lock_update(struct inode *inode,
286                                   struct buffer_head **bh);
287 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
288 static inline int ocfs2_highest_compat_lock_level(int level);
289
290 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
291                                   u64 blkno,
292                                   u32 generation,
293                                   char *name)
294 {
295         int len;
296
297         mlog_entry_void();
298
299         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
300
301         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
302                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
303                        (long long)blkno, generation);
304
305         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
306
307         mlog(0, "built lock resource with name: %s\n", name);
308
309         mlog_exit_void();
310 }
311
312 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
313
314 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
315                                        struct ocfs2_dlm_debug *dlm_debug)
316 {
317         mlog(0, "Add tracking for lockres %s\n", res->l_name);
318
319         spin_lock(&ocfs2_dlm_tracking_lock);
320         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
321         spin_unlock(&ocfs2_dlm_tracking_lock);
322 }
323
324 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
325 {
326         spin_lock(&ocfs2_dlm_tracking_lock);
327         if (!list_empty(&res->l_debug_list))
328                 list_del_init(&res->l_debug_list);
329         spin_unlock(&ocfs2_dlm_tracking_lock);
330 }
331
332 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
333                                        struct ocfs2_lock_res *res,
334                                        enum ocfs2_lock_type type,
335                                        struct ocfs2_lock_res_ops *ops,
336                                        void *priv)
337 {
338         res->l_type          = type;
339         res->l_ops           = ops;
340         res->l_priv          = priv;
341
342         res->l_level         = LKM_IVMODE;
343         res->l_requested     = LKM_IVMODE;
344         res->l_blocking      = LKM_IVMODE;
345         res->l_action        = OCFS2_AST_INVALID;
346         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
347
348         res->l_flags         = OCFS2_LOCK_INITIALIZED;
349
350         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
351 }
352
353 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
354 {
355         /* This also clears out the lock status block */
356         memset(res, 0, sizeof(struct ocfs2_lock_res));
357         spin_lock_init(&res->l_lock);
358         init_waitqueue_head(&res->l_event);
359         INIT_LIST_HEAD(&res->l_blocked_list);
360         INIT_LIST_HEAD(&res->l_mask_waiters);
361 }
362
363 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
364                                enum ocfs2_lock_type type,
365                                unsigned int generation,
366                                struct inode *inode)
367 {
368         struct ocfs2_lock_res_ops *ops;
369
370         switch(type) {
371                 case OCFS2_LOCK_TYPE_RW:
372                         ops = &ocfs2_inode_rw_lops;
373                         break;
374                 case OCFS2_LOCK_TYPE_META:
375                         ops = &ocfs2_inode_meta_lops;
376                         break;
377                 case OCFS2_LOCK_TYPE_DATA:
378                         ops = &ocfs2_inode_data_lops;
379                         break;
380                 default:
381                         mlog_bug_on_msg(1, "type: %d\n", type);
382                         ops = NULL; /* thanks, gcc */
383                         break;
384         };
385
386         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
387                               generation, res->l_name);
388         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
389 }
390
391 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
392 {
393         struct inode *inode = ocfs2_lock_res_inode(lockres);
394
395         return OCFS2_SB(inode->i_sb);
396 }
397
398 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
399 {
400         __be64 inode_blkno_be;
401
402         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
403                sizeof(__be64));
404
405         return be64_to_cpu(inode_blkno_be);
406 }
407
408 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
409 {
410         struct ocfs2_dentry_lock *dl = lockres->l_priv;
411
412         return OCFS2_SB(dl->dl_inode->i_sb);
413 }
414
415 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
416                                 u64 parent, struct inode *inode)
417 {
418         int len;
419         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
420         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
421         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
422
423         ocfs2_lock_res_init_once(lockres);
424
425         /*
426          * Unfortunately, the standard lock naming scheme won't work
427          * here because we have two 16 byte values to use. Instead,
428          * we'll stuff the inode number as a binary value. We still
429          * want error prints to show something without garbling the
430          * display, so drop a null byte in there before the inode
431          * number. A future version of OCFS2 will likely use all
432          * binary lock names. The stringified names have been a
433          * tremendous aid in debugging, but now that the debugfs
434          * interface exists, we can mangle things there if need be.
435          *
436          * NOTE: We also drop the standard "pad" value (the total lock
437          * name size stays the same though - the last part is all
438          * zeros due to the memset in ocfs2_lock_res_init_once()
439          */
440         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
441                        "%c%016llx",
442                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
443                        (long long)parent);
444
445         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
446
447         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
448                sizeof(__be64));
449
450         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
451                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
452                                    dl);
453 }
454
455 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
456                                       struct ocfs2_super *osb)
457 {
458         /* Superblock lockres doesn't come from a slab so we call init
459          * once on it manually.  */
460         ocfs2_lock_res_init_once(res);
461         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
462                               0, res->l_name);
463         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
464                                    &ocfs2_super_lops, osb);
465 }
466
467 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
468                                        struct ocfs2_super *osb)
469 {
470         /* Rename lockres doesn't come from a slab so we call init
471          * once on it manually.  */
472         ocfs2_lock_res_init_once(res);
473         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
474         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
475                                    &ocfs2_rename_lops, osb);
476 }
477
478 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
479 {
480         mlog_entry_void();
481
482         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
483                 return;
484
485         ocfs2_remove_lockres_tracking(res);
486
487         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
488                         "Lockres %s is on the blocked list\n",
489                         res->l_name);
490         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
491                         "Lockres %s has mask waiters pending\n",
492                         res->l_name);
493         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
494                         "Lockres %s is locked\n",
495                         res->l_name);
496         mlog_bug_on_msg(res->l_ro_holders,
497                         "Lockres %s has %u ro holders\n",
498                         res->l_name, res->l_ro_holders);
499         mlog_bug_on_msg(res->l_ex_holders,
500                         "Lockres %s has %u ex holders\n",
501                         res->l_name, res->l_ex_holders);
502
503         /* Need to clear out the lock status block for the dlm */
504         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
505
506         res->l_flags = 0UL;
507         mlog_exit_void();
508 }
509
510 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
511                                      int level)
512 {
513         mlog_entry_void();
514
515         BUG_ON(!lockres);
516
517         switch(level) {
518         case LKM_EXMODE:
519                 lockres->l_ex_holders++;
520                 break;
521         case LKM_PRMODE:
522                 lockres->l_ro_holders++;
523                 break;
524         default:
525                 BUG();
526         }
527
528         mlog_exit_void();
529 }
530
531 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
532                                      int level)
533 {
534         mlog_entry_void();
535
536         BUG_ON(!lockres);
537
538         switch(level) {
539         case LKM_EXMODE:
540                 BUG_ON(!lockres->l_ex_holders);
541                 lockres->l_ex_holders--;
542                 break;
543         case LKM_PRMODE:
544                 BUG_ON(!lockres->l_ro_holders);
545                 lockres->l_ro_holders--;
546                 break;
547         default:
548                 BUG();
549         }
550         mlog_exit_void();
551 }
552
553 /* WARNING: This function lives in a world where the only three lock
554  * levels are EX, PR, and NL. It *will* have to be adjusted when more
555  * lock types are added. */
556 static inline int ocfs2_highest_compat_lock_level(int level)
557 {
558         int new_level = LKM_EXMODE;
559
560         if (level == LKM_EXMODE)
561                 new_level = LKM_NLMODE;
562         else if (level == LKM_PRMODE)
563                 new_level = LKM_PRMODE;
564         return new_level;
565 }
566
567 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
568                               unsigned long newflags)
569 {
570         struct list_head *pos, *tmp;
571         struct ocfs2_mask_waiter *mw;
572
573         assert_spin_locked(&lockres->l_lock);
574
575         lockres->l_flags = newflags;
576
577         list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
578                 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
579                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
580                         continue;
581
582                 list_del_init(&mw->mw_item);
583                 mw->mw_status = 0;
584                 complete(&mw->mw_complete);
585         }
586 }
587 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
588 {
589         lockres_set_flags(lockres, lockres->l_flags | or);
590 }
591 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
592                                 unsigned long clear)
593 {
594         lockres_set_flags(lockres, lockres->l_flags & ~clear);
595 }
596
597 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
598 {
599         mlog_entry_void();
600
601         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
602         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
603         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
604         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
605
606         lockres->l_level = lockres->l_requested;
607         if (lockres->l_level <=
608             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
609                 lockres->l_blocking = LKM_NLMODE;
610                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
611         }
612         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
613
614         mlog_exit_void();
615 }
616
617 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
618 {
619         mlog_entry_void();
620
621         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
622         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
623
624         /* Convert from RO to EX doesn't really need anything as our
625          * information is already up to data. Convert from NL to
626          * *anything* however should mark ourselves as needing an
627          * update */
628         if (lockres->l_level == LKM_NLMODE &&
629             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
630                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
631
632         lockres->l_level = lockres->l_requested;
633         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
634
635         mlog_exit_void();
636 }
637
638 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
639 {
640         mlog_entry_void();
641
642         BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
643         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
644
645         if (lockres->l_requested > LKM_NLMODE &&
646             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
647             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
648                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
649
650         lockres->l_level = lockres->l_requested;
651         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
652         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
653
654         mlog_exit_void();
655 }
656
657 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
658                                      int level)
659 {
660         int needs_downconvert = 0;
661         mlog_entry_void();
662
663         assert_spin_locked(&lockres->l_lock);
664
665         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
666
667         if (level > lockres->l_blocking) {
668                 /* only schedule a downconvert if we haven't already scheduled
669                  * one that goes low enough to satisfy the level we're
670                  * blocking.  this also catches the case where we get
671                  * duplicate BASTs */
672                 if (ocfs2_highest_compat_lock_level(level) <
673                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
674                         needs_downconvert = 1;
675
676                 lockres->l_blocking = level;
677         }
678
679         mlog_exit(needs_downconvert);
680         return needs_downconvert;
681 }
682
683 static void ocfs2_blocking_ast(void *opaque, int level)
684 {
685         struct ocfs2_lock_res *lockres = opaque;
686         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
687         int needs_downconvert;
688         unsigned long flags;
689
690         BUG_ON(level <= LKM_NLMODE);
691
692         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
693              lockres->l_name, level, lockres->l_level,
694              ocfs2_lock_type_string(lockres->l_type));
695
696         spin_lock_irqsave(&lockres->l_lock, flags);
697         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
698         if (needs_downconvert)
699                 ocfs2_schedule_blocked_lock(osb, lockres);
700         spin_unlock_irqrestore(&lockres->l_lock, flags);
701
702         wake_up(&lockres->l_event);
703
704         ocfs2_kick_vote_thread(osb);
705 }
706
707 static void ocfs2_locking_ast(void *opaque)
708 {
709         struct ocfs2_lock_res *lockres = opaque;
710         struct dlm_lockstatus *lksb = &lockres->l_lksb;
711         unsigned long flags;
712
713         spin_lock_irqsave(&lockres->l_lock, flags);
714
715         if (lksb->status != DLM_NORMAL) {
716                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
717                      lockres->l_name, lksb->status);
718                 spin_unlock_irqrestore(&lockres->l_lock, flags);
719                 return;
720         }
721
722         switch(lockres->l_action) {
723         case OCFS2_AST_ATTACH:
724                 ocfs2_generic_handle_attach_action(lockres);
725                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
726                 break;
727         case OCFS2_AST_CONVERT:
728                 ocfs2_generic_handle_convert_action(lockres);
729                 break;
730         case OCFS2_AST_DOWNCONVERT:
731                 ocfs2_generic_handle_downconvert_action(lockres);
732                 break;
733         default:
734                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
735                      "lockres flags = 0x%lx, unlock action: %u\n",
736                      lockres->l_name, lockres->l_action, lockres->l_flags,
737                      lockres->l_unlock_action);
738                 BUG();
739         }
740
741         /* set it to something invalid so if we get called again we
742          * can catch it. */
743         lockres->l_action = OCFS2_AST_INVALID;
744
745         wake_up(&lockres->l_event);
746         spin_unlock_irqrestore(&lockres->l_lock, flags);
747 }
748
749 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
750                                                 int convert)
751 {
752         unsigned long flags;
753
754         mlog_entry_void();
755         spin_lock_irqsave(&lockres->l_lock, flags);
756         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
757         if (convert)
758                 lockres->l_action = OCFS2_AST_INVALID;
759         else
760                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
761         spin_unlock_irqrestore(&lockres->l_lock, flags);
762
763         wake_up(&lockres->l_event);
764         mlog_exit_void();
765 }
766
767 /* Note: If we detect another process working on the lock (i.e.,
768  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
769  * to do the right thing in that case.
770  */
771 static int ocfs2_lock_create(struct ocfs2_super *osb,
772                              struct ocfs2_lock_res *lockres,
773                              int level,
774                              int dlm_flags)
775 {
776         int ret = 0;
777         enum dlm_status status;
778         unsigned long flags;
779
780         mlog_entry_void();
781
782         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
783              dlm_flags);
784
785         spin_lock_irqsave(&lockres->l_lock, flags);
786         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
787             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
788                 spin_unlock_irqrestore(&lockres->l_lock, flags);
789                 goto bail;
790         }
791
792         lockres->l_action = OCFS2_AST_ATTACH;
793         lockres->l_requested = level;
794         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
795         spin_unlock_irqrestore(&lockres->l_lock, flags);
796
797         status = dlmlock(osb->dlm,
798                          level,
799                          &lockres->l_lksb,
800                          dlm_flags,
801                          lockres->l_name,
802                          OCFS2_LOCK_ID_MAX_LEN - 1,
803                          ocfs2_locking_ast,
804                          lockres,
805                          ocfs2_blocking_ast);
806         if (status != DLM_NORMAL) {
807                 ocfs2_log_dlm_error("dlmlock", status, lockres);
808                 ret = -EINVAL;
809                 ocfs2_recover_from_dlm_error(lockres, 1);
810         }
811
812         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
813
814 bail:
815         mlog_exit(ret);
816         return ret;
817 }
818
819 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
820                                         int flag)
821 {
822         unsigned long flags;
823         int ret;
824
825         spin_lock_irqsave(&lockres->l_lock, flags);
826         ret = lockres->l_flags & flag;
827         spin_unlock_irqrestore(&lockres->l_lock, flags);
828
829         return ret;
830 }
831
832 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
833
834 {
835         wait_event(lockres->l_event,
836                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
837 }
838
839 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
840
841 {
842         wait_event(lockres->l_event,
843                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
844 }
845
846 /* predict what lock level we'll be dropping down to on behalf
847  * of another node, and return true if the currently wanted
848  * level will be compatible with it. */
849 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
850                                                      int wanted)
851 {
852         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
853
854         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
855 }
856
857 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
858 {
859         INIT_LIST_HEAD(&mw->mw_item);
860         init_completion(&mw->mw_complete);
861 }
862
863 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
864 {
865         wait_for_completion(&mw->mw_complete);
866         /* Re-arm the completion in case we want to wait on it again */
867         INIT_COMPLETION(mw->mw_complete);
868         return mw->mw_status;
869 }
870
871 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
872                                     struct ocfs2_mask_waiter *mw,
873                                     unsigned long mask,
874                                     unsigned long goal)
875 {
876         BUG_ON(!list_empty(&mw->mw_item));
877
878         assert_spin_locked(&lockres->l_lock);
879
880         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
881         mw->mw_mask = mask;
882         mw->mw_goal = goal;
883 }
884
885 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
886  * if the mask still hadn't reached its goal */
887 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
888                                       struct ocfs2_mask_waiter *mw)
889 {
890         unsigned long flags;
891         int ret = 0;
892
893         spin_lock_irqsave(&lockres->l_lock, flags);
894         if (!list_empty(&mw->mw_item)) {
895                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
896                         ret = -EBUSY;
897
898                 list_del_init(&mw->mw_item);
899                 init_completion(&mw->mw_complete);
900         }
901         spin_unlock_irqrestore(&lockres->l_lock, flags);
902
903         return ret;
904
905 }
906
907 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
908                               struct ocfs2_lock_res *lockres,
909                               int level,
910                               int lkm_flags,
911                               int arg_flags)
912 {
913         struct ocfs2_mask_waiter mw;
914         enum dlm_status status;
915         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
916         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
917         unsigned long flags;
918
919         mlog_entry_void();
920
921         ocfs2_init_mask_waiter(&mw);
922
923         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
924                 lkm_flags |= LKM_VALBLK;
925
926 again:
927         wait = 0;
928
929         if (catch_signals && signal_pending(current)) {
930                 ret = -ERESTARTSYS;
931                 goto out;
932         }
933
934         spin_lock_irqsave(&lockres->l_lock, flags);
935
936         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
937                         "Cluster lock called on freeing lockres %s! flags "
938                         "0x%lx\n", lockres->l_name, lockres->l_flags);
939
940         /* We only compare against the currently granted level
941          * here. If the lock is blocked waiting on a downconvert,
942          * we'll get caught below. */
943         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
944             level > lockres->l_level) {
945                 /* is someone sitting in dlm_lock? If so, wait on
946                  * them. */
947                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
948                 wait = 1;
949                 goto unlock;
950         }
951
952         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
953                 /* lock has not been created yet. */
954                 spin_unlock_irqrestore(&lockres->l_lock, flags);
955
956                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
957                 if (ret < 0) {
958                         mlog_errno(ret);
959                         goto out;
960                 }
961                 goto again;
962         }
963
964         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
965             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
966                 /* is the lock is currently blocked on behalf of
967                  * another node */
968                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
969                 wait = 1;
970                 goto unlock;
971         }
972
973         if (level > lockres->l_level) {
974                 if (lockres->l_action != OCFS2_AST_INVALID)
975                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
976                              lockres->l_name, lockres->l_action);
977
978                 lockres->l_action = OCFS2_AST_CONVERT;
979                 lockres->l_requested = level;
980                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
981                 spin_unlock_irqrestore(&lockres->l_lock, flags);
982
983                 BUG_ON(level == LKM_IVMODE);
984                 BUG_ON(level == LKM_NLMODE);
985
986                 mlog(0, "lock %s, convert from %d to level = %d\n",
987                      lockres->l_name, lockres->l_level, level);
988
989                 /* call dlm_lock to upgrade lock now */
990                 status = dlmlock(osb->dlm,
991                                  level,
992                                  &lockres->l_lksb,
993                                  lkm_flags|LKM_CONVERT,
994                                  lockres->l_name,
995                                  OCFS2_LOCK_ID_MAX_LEN - 1,
996                                  ocfs2_locking_ast,
997                                  lockres,
998                                  ocfs2_blocking_ast);
999                 if (status != DLM_NORMAL) {
1000                         if ((lkm_flags & LKM_NOQUEUE) &&
1001                             (status == DLM_NOTQUEUED))
1002                                 ret = -EAGAIN;
1003                         else {
1004                                 ocfs2_log_dlm_error("dlmlock", status,
1005                                                     lockres);
1006                                 ret = -EINVAL;
1007                         }
1008                         ocfs2_recover_from_dlm_error(lockres, 1);
1009                         goto out;
1010                 }
1011
1012                 mlog(0, "lock %s, successfull return from dlmlock\n",
1013                      lockres->l_name);
1014
1015                 /* At this point we've gone inside the dlm and need to
1016                  * complete our work regardless. */
1017                 catch_signals = 0;
1018
1019                 /* wait for busy to clear and carry on */
1020                 goto again;
1021         }
1022
1023         /* Ok, if we get here then we're good to go. */
1024         ocfs2_inc_holders(lockres, level);
1025
1026         ret = 0;
1027 unlock:
1028         spin_unlock_irqrestore(&lockres->l_lock, flags);
1029 out:
1030         /*
1031          * This is helping work around a lock inversion between the page lock
1032          * and dlm locks.  One path holds the page lock while calling aops
1033          * which block acquiring dlm locks.  The voting thread holds dlm
1034          * locks while acquiring page locks while down converting data locks.
1035          * This block is helping an aop path notice the inversion and back
1036          * off to unlock its page lock before trying the dlm lock again.
1037          */
1038         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1039             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1040                 wait = 0;
1041                 if (lockres_remove_mask_waiter(lockres, &mw))
1042                         ret = -EAGAIN;
1043                 else
1044                         goto again;
1045         }
1046         if (wait) {
1047                 ret = ocfs2_wait_for_mask(&mw);
1048                 if (ret == 0)
1049                         goto again;
1050                 mlog_errno(ret);
1051         }
1052
1053         mlog_exit(ret);
1054         return ret;
1055 }
1056
1057 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1058                                  struct ocfs2_lock_res *lockres,
1059                                  int level)
1060 {
1061         unsigned long flags;
1062
1063         mlog_entry_void();
1064         spin_lock_irqsave(&lockres->l_lock, flags);
1065         ocfs2_dec_holders(lockres, level);
1066         ocfs2_vote_on_unlock(osb, lockres);
1067         spin_unlock_irqrestore(&lockres->l_lock, flags);
1068         mlog_exit_void();
1069 }
1070
1071 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1072                           struct ocfs2_lock_res *lockres,
1073                           int ex,
1074                           int local)
1075 {
1076         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1077         unsigned long flags;
1078         int lkm_flags = local ? LKM_LOCAL : 0;
1079
1080         spin_lock_irqsave(&lockres->l_lock, flags);
1081         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1082         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1083         spin_unlock_irqrestore(&lockres->l_lock, flags);
1084
1085         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1086 }
1087
1088 /* Grants us an EX lock on the data and metadata resources, skipping
1089  * the normal cluster directory lookup. Use this ONLY on newly created
1090  * inodes which other nodes can't possibly see, and which haven't been
1091  * hashed in the inode hash yet. This can give us a good performance
1092  * increase as it'll skip the network broadcast normally associated
1093  * with creating a new lock resource. */
1094 int ocfs2_create_new_inode_locks(struct inode *inode)
1095 {
1096         int ret;
1097         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1098
1099         BUG_ON(!inode);
1100         BUG_ON(!ocfs2_inode_is_new(inode));
1101
1102         mlog_entry_void();
1103
1104         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1105
1106         /* NOTE: That we don't increment any of the holder counts, nor
1107          * do we add anything to a journal handle. Since this is
1108          * supposed to be a new inode which the cluster doesn't know
1109          * about yet, there is no need to.  As far as the LVB handling
1110          * is concerned, this is basically like acquiring an EX lock
1111          * on a resource which has an invalid one -- we'll set it
1112          * valid when we release the EX. */
1113
1114         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1115         if (ret) {
1116                 mlog_errno(ret);
1117                 goto bail;
1118         }
1119
1120         /*
1121          * We don't want to use LKM_LOCAL on a meta data lock as they
1122          * don't use a generation in their lock names.
1123          */
1124         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1125         if (ret) {
1126                 mlog_errno(ret);
1127                 goto bail;
1128         }
1129
1130         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1131         if (ret) {
1132                 mlog_errno(ret);
1133                 goto bail;
1134         }
1135
1136 bail:
1137         mlog_exit(ret);
1138         return ret;
1139 }
1140
1141 int ocfs2_rw_lock(struct inode *inode, int write)
1142 {
1143         int status, level;
1144         struct ocfs2_lock_res *lockres;
1145
1146         BUG_ON(!inode);
1147
1148         mlog_entry_void();
1149
1150         mlog(0, "inode %llu take %s RW lock\n",
1151              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1152              write ? "EXMODE" : "PRMODE");
1153
1154         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1155
1156         level = write ? LKM_EXMODE : LKM_PRMODE;
1157
1158         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1159                                     0);
1160         if (status < 0)
1161                 mlog_errno(status);
1162
1163         mlog_exit(status);
1164         return status;
1165 }
1166
1167 void ocfs2_rw_unlock(struct inode *inode, int write)
1168 {
1169         int level = write ? LKM_EXMODE : LKM_PRMODE;
1170         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1171
1172         mlog_entry_void();
1173
1174         mlog(0, "inode %llu drop %s RW lock\n",
1175              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1176              write ? "EXMODE" : "PRMODE");
1177
1178         ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1179
1180         mlog_exit_void();
1181 }
1182
1183 int ocfs2_data_lock_full(struct inode *inode,
1184                          int write,
1185                          int arg_flags)
1186 {
1187         int status = 0, level;
1188         struct ocfs2_lock_res *lockres;
1189
1190         BUG_ON(!inode);
1191
1192         mlog_entry_void();
1193
1194         mlog(0, "inode %llu take %s DATA lock\n",
1195              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1196              write ? "EXMODE" : "PRMODE");
1197
1198         /* We'll allow faking a readonly data lock for
1199          * rodevices. */
1200         if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1201                 if (write) {
1202                         status = -EROFS;
1203                         mlog_errno(status);
1204                 }
1205                 goto out;
1206         }
1207
1208         lockres = &OCFS2_I(inode)->ip_data_lockres;
1209
1210         level = write ? LKM_EXMODE : LKM_PRMODE;
1211
1212         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1213                                     0, arg_flags);
1214         if (status < 0 && status != -EAGAIN)
1215                 mlog_errno(status);
1216
1217 out:
1218         mlog_exit(status);
1219         return status;
1220 }
1221
1222 /* see ocfs2_meta_lock_with_page() */
1223 int ocfs2_data_lock_with_page(struct inode *inode,
1224                               int write,
1225                               struct page *page)
1226 {
1227         int ret;
1228
1229         ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1230         if (ret == -EAGAIN) {
1231                 unlock_page(page);
1232                 if (ocfs2_data_lock(inode, write) == 0)
1233                         ocfs2_data_unlock(inode, write);
1234                 ret = AOP_TRUNCATED_PAGE;
1235         }
1236
1237         return ret;
1238 }
1239
1240 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1241                                  struct ocfs2_lock_res *lockres)
1242 {
1243         int kick = 0;
1244
1245         mlog_entry_void();
1246
1247         /* If we know that another node is waiting on our lock, kick
1248          * the vote thread * pre-emptively when we reach a release
1249          * condition. */
1250         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1251                 switch(lockres->l_blocking) {
1252                 case LKM_EXMODE:
1253                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1254                                 kick = 1;
1255                         break;
1256                 case LKM_PRMODE:
1257                         if (!lockres->l_ex_holders)
1258                                 kick = 1;
1259                         break;
1260                 default:
1261                         BUG();
1262                 }
1263         }
1264
1265         if (kick)
1266                 ocfs2_kick_vote_thread(osb);
1267
1268         mlog_exit_void();
1269 }
1270
1271 void ocfs2_data_unlock(struct inode *inode,
1272                        int write)
1273 {
1274         int level = write ? LKM_EXMODE : LKM_PRMODE;
1275         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1276
1277         mlog_entry_void();
1278
1279         mlog(0, "inode %llu drop %s DATA lock\n",
1280              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1281              write ? "EXMODE" : "PRMODE");
1282
1283         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1284                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1285
1286         mlog_exit_void();
1287 }
1288
1289 #define OCFS2_SEC_BITS   34
1290 #define OCFS2_SEC_SHIFT  (64 - 34)
1291 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1292
1293 /* LVB only has room for 64 bits of time here so we pack it for
1294  * now. */
1295 static u64 ocfs2_pack_timespec(struct timespec *spec)
1296 {
1297         u64 res;
1298         u64 sec = spec->tv_sec;
1299         u32 nsec = spec->tv_nsec;
1300
1301         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1302
1303         return res;
1304 }
1305
1306 /* Call this with the lockres locked. I am reasonably sure we don't
1307  * need ip_lock in this function as anyone who would be changing those
1308  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1309 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1310 {
1311         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1312         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1313         struct ocfs2_meta_lvb *lvb;
1314
1315         mlog_entry_void();
1316
1317         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1318
1319         /*
1320          * Invalidate the LVB of a deleted inode - this way other
1321          * nodes are forced to go to disk and discover the new inode
1322          * status.
1323          */
1324         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1325                 lvb->lvb_version = 0;
1326                 goto out;
1327         }
1328
1329         lvb->lvb_version   = OCFS2_LVB_VERSION;
1330         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1331         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1332         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1333         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1334         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1335         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1336         lvb->lvb_iatime_packed  =
1337                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1338         lvb->lvb_ictime_packed =
1339                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1340         lvb->lvb_imtime_packed =
1341                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1342         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1343         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1344
1345 out:
1346         mlog_meta_lvb(0, lockres);
1347
1348         mlog_exit_void();
1349 }
1350
1351 static void ocfs2_unpack_timespec(struct timespec *spec,
1352                                   u64 packed_time)
1353 {
1354         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1355         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1356 }
1357
1358 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1359 {
1360         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1361         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1362         struct ocfs2_meta_lvb *lvb;
1363
1364         mlog_entry_void();
1365
1366         mlog_meta_lvb(0, lockres);
1367
1368         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1369
1370         /* We're safe here without the lockres lock... */
1371         spin_lock(&oi->ip_lock);
1372         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1373         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1374
1375         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1376         ocfs2_set_inode_flags(inode);
1377
1378         /* fast-symlinks are a special case */
1379         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1380                 inode->i_blocks = 0;
1381         else
1382                 inode->i_blocks =
1383                         ocfs2_align_bytes_to_sectors(i_size_read(inode));
1384
1385         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1386         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1387         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1388         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1389         ocfs2_unpack_timespec(&inode->i_atime,
1390                               be64_to_cpu(lvb->lvb_iatime_packed));
1391         ocfs2_unpack_timespec(&inode->i_mtime,
1392                               be64_to_cpu(lvb->lvb_imtime_packed));
1393         ocfs2_unpack_timespec(&inode->i_ctime,
1394                               be64_to_cpu(lvb->lvb_ictime_packed));
1395         spin_unlock(&oi->ip_lock);
1396
1397         mlog_exit_void();
1398 }
1399
1400 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1401                                               struct ocfs2_lock_res *lockres)
1402 {
1403         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1404
1405         if (lvb->lvb_version == OCFS2_LVB_VERSION
1406             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1407                 return 1;
1408         return 0;
1409 }
1410
1411 /* Determine whether a lock resource needs to be refreshed, and
1412  * arbitrate who gets to refresh it.
1413  *
1414  *   0 means no refresh needed.
1415  *
1416  *   > 0 means you need to refresh this and you MUST call
1417  *   ocfs2_complete_lock_res_refresh afterwards. */
1418 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1419 {
1420         unsigned long flags;
1421         int status = 0;
1422
1423         mlog_entry_void();
1424
1425 refresh_check:
1426         spin_lock_irqsave(&lockres->l_lock, flags);
1427         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1428                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1429                 goto bail;
1430         }
1431
1432         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1433                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1434
1435                 ocfs2_wait_on_refreshing_lock(lockres);
1436                 goto refresh_check;
1437         }
1438
1439         /* Ok, I'll be the one to refresh this lock. */
1440         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1441         spin_unlock_irqrestore(&lockres->l_lock, flags);
1442
1443         status = 1;
1444 bail:
1445         mlog_exit(status);
1446         return status;
1447 }
1448
1449 /* If status is non zero, I'll mark it as not being in refresh
1450  * anymroe, but i won't clear the needs refresh flag. */
1451 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1452                                                    int status)
1453 {
1454         unsigned long flags;
1455         mlog_entry_void();
1456
1457         spin_lock_irqsave(&lockres->l_lock, flags);
1458         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1459         if (!status)
1460                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1461         spin_unlock_irqrestore(&lockres->l_lock, flags);
1462
1463         wake_up(&lockres->l_event);
1464
1465         mlog_exit_void();
1466 }
1467
1468 /* may or may not return a bh if it went to disk. */
1469 static int ocfs2_meta_lock_update(struct inode *inode,
1470                                   struct buffer_head **bh)
1471 {
1472         int status = 0;
1473         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1474         struct ocfs2_lock_res *lockres;
1475         struct ocfs2_dinode *fe;
1476
1477         mlog_entry_void();
1478
1479         spin_lock(&oi->ip_lock);
1480         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1481                 mlog(0, "Orphaned inode %llu was deleted while we "
1482                      "were waiting on a lock. ip_flags = 0x%x\n",
1483                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1484                 spin_unlock(&oi->ip_lock);
1485                 status = -ENOENT;
1486                 goto bail;
1487         }
1488         spin_unlock(&oi->ip_lock);
1489
1490         lockres = &oi->ip_meta_lockres;
1491
1492         if (!ocfs2_should_refresh_lock_res(lockres))
1493                 goto bail;
1494
1495         /* This will discard any caching information we might have had
1496          * for the inode metadata. */
1497         ocfs2_metadata_cache_purge(inode);
1498
1499         /* will do nothing for inode types that don't use the extent
1500          * map (directories, bitmap files, etc) */
1501         ocfs2_extent_map_trunc(inode, 0);
1502
1503         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1504                 mlog(0, "Trusting LVB on inode %llu\n",
1505                      (unsigned long long)oi->ip_blkno);
1506                 ocfs2_refresh_inode_from_lvb(inode);
1507         } else {
1508                 /* Boo, we have to go to disk. */
1509                 /* read bh, cast, ocfs2_refresh_inode */
1510                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1511                                           bh, OCFS2_BH_CACHED, inode);
1512                 if (status < 0) {
1513                         mlog_errno(status);
1514                         goto bail_refresh;
1515                 }
1516                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1517
1518                 /* This is a good chance to make sure we're not
1519                  * locking an invalid object.
1520                  *
1521                  * We bug on a stale inode here because we checked
1522                  * above whether it was wiped from disk. The wiping
1523                  * node provides a guarantee that we receive that
1524                  * message and can mark the inode before dropping any
1525                  * locks associated with it. */
1526                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1527                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1528                         status = -EIO;
1529                         goto bail_refresh;
1530                 }
1531                 mlog_bug_on_msg(inode->i_generation !=
1532                                 le32_to_cpu(fe->i_generation),
1533                                 "Invalid dinode %llu disk generation: %u "
1534                                 "inode->i_generation: %u\n",
1535                                 (unsigned long long)oi->ip_blkno,
1536                                 le32_to_cpu(fe->i_generation),
1537                                 inode->i_generation);
1538                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1539                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1540                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1541                                 (unsigned long long)oi->ip_blkno,
1542                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1543                                 le32_to_cpu(fe->i_flags));
1544
1545                 ocfs2_refresh_inode(inode, fe);
1546         }
1547
1548         status = 0;
1549 bail_refresh:
1550         ocfs2_complete_lock_res_refresh(lockres, status);
1551 bail:
1552         mlog_exit(status);
1553         return status;
1554 }
1555
1556 static int ocfs2_assign_bh(struct inode *inode,
1557                            struct buffer_head **ret_bh,
1558                            struct buffer_head *passed_bh)
1559 {
1560         int status;
1561
1562         if (passed_bh) {
1563                 /* Ok, the update went to disk for us, use the
1564                  * returned bh. */
1565                 *ret_bh = passed_bh;
1566                 get_bh(*ret_bh);
1567
1568                 return 0;
1569         }
1570
1571         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1572                                   OCFS2_I(inode)->ip_blkno,
1573                                   ret_bh,
1574                                   OCFS2_BH_CACHED,
1575                                   inode);
1576         if (status < 0)
1577                 mlog_errno(status);
1578
1579         return status;
1580 }
1581
1582 /*
1583  * returns < 0 error if the callback will never be called, otherwise
1584  * the result of the lock will be communicated via the callback.
1585  */
1586 int ocfs2_meta_lock_full(struct inode *inode,
1587                          struct ocfs2_journal_handle *handle,
1588                          struct buffer_head **ret_bh,
1589                          int ex,
1590                          int arg_flags)
1591 {
1592         int status, level, dlm_flags, acquired;
1593         struct ocfs2_lock_res *lockres;
1594         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1595         struct buffer_head *local_bh = NULL;
1596
1597         BUG_ON(!inode);
1598
1599         mlog_entry_void();
1600
1601         mlog(0, "inode %llu, take %s META lock\n",
1602              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1603              ex ? "EXMODE" : "PRMODE");
1604
1605         status = 0;
1606         acquired = 0;
1607         /* We'll allow faking a readonly metadata lock for
1608          * rodevices. */
1609         if (ocfs2_is_hard_readonly(osb)) {
1610                 if (ex)
1611                         status = -EROFS;
1612                 goto bail;
1613         }
1614
1615         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1616                 wait_event(osb->recovery_event,
1617                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1618
1619         acquired = 0;
1620         lockres = &OCFS2_I(inode)->ip_meta_lockres;
1621         level = ex ? LKM_EXMODE : LKM_PRMODE;
1622         dlm_flags = 0;
1623         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1624                 dlm_flags |= LKM_NOQUEUE;
1625
1626         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1627         if (status < 0) {
1628                 if (status != -EAGAIN && status != -EIOCBRETRY)
1629                         mlog_errno(status);
1630                 goto bail;
1631         }
1632
1633         /* Notify the error cleanup path to drop the cluster lock. */
1634         acquired = 1;
1635
1636         /* We wait twice because a node may have died while we were in
1637          * the lower dlm layers. The second time though, we've
1638          * committed to owning this lock so we don't allow signals to
1639          * abort the operation. */
1640         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1641                 wait_event(osb->recovery_event,
1642                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1643
1644         /*
1645          * We only see this flag if we're being called from
1646          * ocfs2_read_locked_inode(). It means we're locking an inode
1647          * which hasn't been populated yet, so clear the refresh flag
1648          * and let the caller handle it.
1649          */
1650         if (inode->i_state & I_NEW) {
1651                 status = 0;
1652                 ocfs2_complete_lock_res_refresh(lockres, 0);
1653                 goto bail;
1654         }
1655
1656         /* This is fun. The caller may want a bh back, or it may
1657          * not. ocfs2_meta_lock_update definitely wants one in, but
1658          * may or may not read one, depending on what's in the
1659          * LVB. The result of all of this is that we've *only* gone to
1660          * disk if we have to, so the complexity is worthwhile. */
1661         status = ocfs2_meta_lock_update(inode, &local_bh);
1662         if (status < 0) {
1663                 if (status != -ENOENT)
1664                         mlog_errno(status);
1665                 goto bail;
1666         }
1667
1668         if (ret_bh) {
1669                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1670                 if (status < 0) {
1671                         mlog_errno(status);
1672                         goto bail;
1673                 }
1674         }
1675
1676         if (handle) {
1677                 status = ocfs2_handle_add_lock(handle, inode);
1678                 if (status < 0)
1679                         mlog_errno(status);
1680         }
1681
1682 bail:
1683         if (status < 0) {
1684                 if (ret_bh && (*ret_bh)) {
1685                         brelse(*ret_bh);
1686                         *ret_bh = NULL;
1687                 }
1688                 if (acquired)
1689                         ocfs2_meta_unlock(inode, ex);
1690         }
1691
1692         if (local_bh)
1693                 brelse(local_bh);
1694
1695         mlog_exit(status);
1696         return status;
1697 }
1698
1699 /*
1700  * This is working around a lock inversion between tasks acquiring DLM locks
1701  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1702  * while acquiring page locks.
1703  *
1704  * ** These _with_page variantes are only intended to be called from aop
1705  * methods that hold page locks and return a very specific *positive* error
1706  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1707  *
1708  * The DLM is called such that it returns -EAGAIN if it would have blocked
1709  * waiting for the vote thread.  In that case we unlock our page so the vote
1710  * thread can make progress.  Once we've done this we have to return
1711  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1712  * into the VFS who will then immediately retry the aop call.
1713  *
1714  * We do a blocking lock and immediate unlock before returning, though, so that
1715  * the lock has a great chance of being cached on this node by the time the VFS
1716  * calls back to retry the aop.    This has a potential to livelock as nodes
1717  * ping locks back and forth, but that's a risk we're willing to take to avoid
1718  * the lock inversion simply.
1719  */
1720 int ocfs2_meta_lock_with_page(struct inode *inode,
1721                               struct ocfs2_journal_handle *handle,
1722                               struct buffer_head **ret_bh,
1723                               int ex,
1724                               struct page *page)
1725 {
1726         int ret;
1727
1728         ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1729                                    OCFS2_LOCK_NONBLOCK);
1730         if (ret == -EAGAIN) {
1731                 unlock_page(page);
1732                 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1733                         ocfs2_meta_unlock(inode, ex);
1734                 ret = AOP_TRUNCATED_PAGE;
1735         }
1736
1737         return ret;
1738 }
1739
1740 void ocfs2_meta_unlock(struct inode *inode,
1741                        int ex)
1742 {
1743         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1744         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1745
1746         mlog_entry_void();
1747
1748         mlog(0, "inode %llu drop %s META lock\n",
1749              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1750              ex ? "EXMODE" : "PRMODE");
1751
1752         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1753                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1754
1755         mlog_exit_void();
1756 }
1757
1758 int ocfs2_super_lock(struct ocfs2_super *osb,
1759                      int ex)
1760 {
1761         int status;
1762         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1763         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1764         struct buffer_head *bh;
1765         struct ocfs2_slot_info *si = osb->slot_info;
1766
1767         mlog_entry_void();
1768
1769         if (ocfs2_is_hard_readonly(osb))
1770                 return -EROFS;
1771
1772         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1773         if (status < 0) {
1774                 mlog_errno(status);
1775                 goto bail;
1776         }
1777
1778         /* The super block lock path is really in the best position to
1779          * know when resources covered by the lock need to be
1780          * refreshed, so we do it here. Of course, making sense of
1781          * everything is up to the caller :) */
1782         status = ocfs2_should_refresh_lock_res(lockres);
1783         if (status < 0) {
1784                 mlog_errno(status);
1785                 goto bail;
1786         }
1787         if (status) {
1788                 bh = si->si_bh;
1789                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1790                                           si->si_inode);
1791                 if (status == 0)
1792                         ocfs2_update_slot_info(si);
1793
1794                 ocfs2_complete_lock_res_refresh(lockres, status);
1795
1796                 if (status < 0)
1797                         mlog_errno(status);
1798         }
1799 bail:
1800         mlog_exit(status);
1801         return status;
1802 }
1803
1804 void ocfs2_super_unlock(struct ocfs2_super *osb,
1805                         int ex)
1806 {
1807         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1808         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1809
1810         ocfs2_cluster_unlock(osb, lockres, level);
1811 }
1812
1813 int ocfs2_rename_lock(struct ocfs2_super *osb)
1814 {
1815         int status;
1816         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1817
1818         if (ocfs2_is_hard_readonly(osb))
1819                 return -EROFS;
1820
1821         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1822         if (status < 0)
1823                 mlog_errno(status);
1824
1825         return status;
1826 }
1827
1828 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1829 {
1830         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1831
1832         ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1833 }
1834
1835 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1836 {
1837         int ret;
1838         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1839         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1840         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1841
1842         BUG_ON(!dl);
1843
1844         if (ocfs2_is_hard_readonly(osb))
1845                 return -EROFS;
1846
1847         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1848         if (ret < 0)
1849                 mlog_errno(ret);
1850
1851         return ret;
1852 }
1853
1854 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1855 {
1856         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1857         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1858         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1859
1860         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1861 }
1862
1863 /* Reference counting of the dlm debug structure. We want this because
1864  * open references on the debug inodes can live on after a mount, so
1865  * we can't rely on the ocfs2_super to always exist. */
1866 static void ocfs2_dlm_debug_free(struct kref *kref)
1867 {
1868         struct ocfs2_dlm_debug *dlm_debug;
1869
1870         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1871
1872         kfree(dlm_debug);
1873 }
1874
1875 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1876 {
1877         if (dlm_debug)
1878                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1879 }
1880
1881 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1882 {
1883         kref_get(&debug->d_refcnt);
1884 }
1885
1886 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1887 {
1888         struct ocfs2_dlm_debug *dlm_debug;
1889
1890         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1891         if (!dlm_debug) {
1892                 mlog_errno(-ENOMEM);
1893                 goto out;
1894         }
1895
1896         kref_init(&dlm_debug->d_refcnt);
1897         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1898         dlm_debug->d_locking_state = NULL;
1899 out:
1900         return dlm_debug;
1901 }
1902
1903 /* Access to this is arbitrated for us via seq_file->sem. */
1904 struct ocfs2_dlm_seq_priv {
1905         struct ocfs2_dlm_debug *p_dlm_debug;
1906         struct ocfs2_lock_res p_iter_res;
1907         struct ocfs2_lock_res p_tmp_res;
1908 };
1909
1910 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1911                                                  struct ocfs2_dlm_seq_priv *priv)
1912 {
1913         struct ocfs2_lock_res *iter, *ret = NULL;
1914         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1915
1916         assert_spin_locked(&ocfs2_dlm_tracking_lock);
1917
1918         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1919                 /* discover the head of the list */
1920                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1921                         mlog(0, "End of list found, %p\n", ret);
1922                         break;
1923                 }
1924
1925                 /* We track our "dummy" iteration lockres' by a NULL
1926                  * l_ops field. */
1927                 if (iter->l_ops != NULL) {
1928                         ret = iter;
1929                         break;
1930                 }
1931         }
1932
1933         return ret;
1934 }
1935
1936 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1937 {
1938         struct ocfs2_dlm_seq_priv *priv = m->private;
1939         struct ocfs2_lock_res *iter;
1940
1941         spin_lock(&ocfs2_dlm_tracking_lock);
1942         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1943         if (iter) {
1944                 /* Since lockres' have the lifetime of their container
1945                  * (which can be inodes, ocfs2_supers, etc) we want to
1946                  * copy this out to a temporary lockres while still
1947                  * under the spinlock. Obviously after this we can't
1948                  * trust any pointers on the copy returned, but that's
1949                  * ok as the information we want isn't typically held
1950                  * in them. */
1951                 priv->p_tmp_res = *iter;
1952                 iter = &priv->p_tmp_res;
1953         }
1954         spin_unlock(&ocfs2_dlm_tracking_lock);
1955
1956         return iter;
1957 }
1958
1959 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1960 {
1961 }
1962
1963 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1964 {
1965         struct ocfs2_dlm_seq_priv *priv = m->private;
1966         struct ocfs2_lock_res *iter = v;
1967         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1968
1969         spin_lock(&ocfs2_dlm_tracking_lock);
1970         iter = ocfs2_dlm_next_res(iter, priv);
1971         list_del_init(&dummy->l_debug_list);
1972         if (iter) {
1973                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1974                 priv->p_tmp_res = *iter;
1975                 iter = &priv->p_tmp_res;
1976         }
1977         spin_unlock(&ocfs2_dlm_tracking_lock);
1978
1979         return iter;
1980 }
1981
1982 /* So that debugfs.ocfs2 can determine which format is being used */
1983 #define OCFS2_DLM_DEBUG_STR_VERSION 1
1984 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1985 {
1986         int i;
1987         char *lvb;
1988         struct ocfs2_lock_res *lockres = v;
1989
1990         if (!lockres)
1991                 return -EINVAL;
1992
1993         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1994
1995         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1996                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1997                            lockres->l_name,
1998                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1999         else
2000                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2001
2002         seq_printf(m, "%d\t"
2003                    "0x%lx\t"
2004                    "0x%x\t"
2005                    "0x%x\t"
2006                    "%u\t"
2007                    "%u\t"
2008                    "%d\t"
2009                    "%d\t",
2010                    lockres->l_level,
2011                    lockres->l_flags,
2012                    lockres->l_action,
2013                    lockres->l_unlock_action,
2014                    lockres->l_ro_holders,
2015                    lockres->l_ex_holders,
2016                    lockres->l_requested,
2017                    lockres->l_blocking);
2018
2019         /* Dump the raw LVB */
2020         lvb = lockres->l_lksb.lvb;
2021         for(i = 0; i < DLM_LVB_LEN; i++)
2022                 seq_printf(m, "0x%x\t", lvb[i]);
2023
2024         /* End the line */
2025         seq_printf(m, "\n");
2026         return 0;
2027 }
2028
2029 static struct seq_operations ocfs2_dlm_seq_ops = {
2030         .start =        ocfs2_dlm_seq_start,
2031         .stop =         ocfs2_dlm_seq_stop,
2032         .next =         ocfs2_dlm_seq_next,
2033         .show =         ocfs2_dlm_seq_show,
2034 };
2035
2036 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2037 {
2038         struct seq_file *seq = (struct seq_file *) file->private_data;
2039         struct ocfs2_dlm_seq_priv *priv = seq->private;
2040         struct ocfs2_lock_res *res = &priv->p_iter_res;
2041
2042         ocfs2_remove_lockres_tracking(res);
2043         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2044         return seq_release_private(inode, file);
2045 }
2046
2047 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2048 {
2049         int ret;
2050         struct ocfs2_dlm_seq_priv *priv;
2051         struct seq_file *seq;
2052         struct ocfs2_super *osb;
2053
2054         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2055         if (!priv) {
2056                 ret = -ENOMEM;
2057                 mlog_errno(ret);
2058                 goto out;
2059         }
2060         osb = (struct ocfs2_super *) inode->u.generic_ip;
2061         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2062         priv->p_dlm_debug = osb->osb_dlm_debug;
2063         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2064
2065         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2066         if (ret) {
2067                 kfree(priv);
2068                 mlog_errno(ret);
2069                 goto out;
2070         }
2071
2072         seq = (struct seq_file *) file->private_data;
2073         seq->private = priv;
2074
2075         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2076                                    priv->p_dlm_debug);
2077
2078 out:
2079         return ret;
2080 }
2081
2082 static const struct file_operations ocfs2_dlm_debug_fops = {
2083         .open =         ocfs2_dlm_debug_open,
2084         .release =      ocfs2_dlm_debug_release,
2085         .read =         seq_read,
2086         .llseek =       seq_lseek,
2087 };
2088
2089 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2090 {
2091         int ret = 0;
2092         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2093
2094         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2095                                                          S_IFREG|S_IRUSR,
2096                                                          osb->osb_debug_root,
2097                                                          osb,
2098                                                          &ocfs2_dlm_debug_fops);
2099         if (!dlm_debug->d_locking_state) {
2100                 ret = -EINVAL;
2101                 mlog(ML_ERROR,
2102                      "Unable to create locking state debugfs file.\n");
2103                 goto out;
2104         }
2105
2106         ocfs2_get_dlm_debug(dlm_debug);
2107 out:
2108         return ret;
2109 }
2110
2111 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2112 {
2113         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2114
2115         if (dlm_debug) {
2116                 debugfs_remove(dlm_debug->d_locking_state);
2117                 ocfs2_put_dlm_debug(dlm_debug);
2118         }
2119 }
2120
2121 int ocfs2_dlm_init(struct ocfs2_super *osb)
2122 {
2123         int status;
2124         u32 dlm_key;
2125         struct dlm_ctxt *dlm;
2126
2127         mlog_entry_void();
2128
2129         status = ocfs2_dlm_init_debug(osb);
2130         if (status < 0) {
2131                 mlog_errno(status);
2132                 goto bail;
2133         }
2134
2135         /* launch vote thread */
2136         osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2137         if (IS_ERR(osb->vote_task)) {
2138                 status = PTR_ERR(osb->vote_task);
2139                 osb->vote_task = NULL;
2140                 mlog_errno(status);
2141                 goto bail;
2142         }
2143
2144         /* used by the dlm code to make message headers unique, each
2145          * node in this domain must agree on this. */
2146         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2147
2148         /* for now, uuid == domain */
2149         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2150         if (IS_ERR(dlm)) {
2151                 status = PTR_ERR(dlm);
2152                 mlog_errno(status);
2153                 goto bail;
2154         }
2155
2156         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2157         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2158
2159         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2160
2161         osb->dlm = dlm;
2162
2163         status = 0;
2164 bail:
2165         if (status < 0) {
2166                 ocfs2_dlm_shutdown_debug(osb);
2167                 if (osb->vote_task)
2168                         kthread_stop(osb->vote_task);
2169         }
2170
2171         mlog_exit(status);
2172         return status;
2173 }
2174
2175 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2176 {
2177         mlog_entry_void();
2178
2179         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2180
2181         ocfs2_drop_osb_locks(osb);
2182
2183         if (osb->vote_task) {
2184                 kthread_stop(osb->vote_task);
2185                 osb->vote_task = NULL;
2186         }
2187
2188         ocfs2_lock_res_free(&osb->osb_super_lockres);
2189         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2190
2191         dlm_unregister_domain(osb->dlm);
2192         osb->dlm = NULL;
2193
2194         ocfs2_dlm_shutdown_debug(osb);
2195
2196         mlog_exit_void();
2197 }
2198
2199 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2200 {
2201         struct ocfs2_lock_res *lockres = opaque;
2202         unsigned long flags;
2203
2204         mlog_entry_void();
2205
2206         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2207              lockres->l_unlock_action);
2208
2209         spin_lock_irqsave(&lockres->l_lock, flags);
2210         /* We tried to cancel a convert request, but it was already
2211          * granted. All we want to do here is clear our unlock
2212          * state. The wake_up call done at the bottom is redundant
2213          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2214          * hurt anything anyway */
2215         if (status == DLM_CANCELGRANT &&
2216             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2217                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2218
2219                 /* We don't clear the busy flag in this case as it
2220                  * should have been cleared by the ast which the dlm
2221                  * has called. */
2222                 goto complete_unlock;
2223         }
2224
2225         if (status != DLM_NORMAL) {
2226                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2227                      "unlock_action %d\n", status, lockres->l_name,
2228                      lockres->l_unlock_action);
2229                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2230                 return;
2231         }
2232
2233         switch(lockres->l_unlock_action) {
2234         case OCFS2_UNLOCK_CANCEL_CONVERT:
2235                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2236                 lockres->l_action = OCFS2_AST_INVALID;
2237                 break;
2238         case OCFS2_UNLOCK_DROP_LOCK:
2239                 lockres->l_level = LKM_IVMODE;
2240                 break;
2241         default:
2242                 BUG();
2243         }
2244
2245         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2246 complete_unlock:
2247         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2248         spin_unlock_irqrestore(&lockres->l_lock, flags);
2249
2250         wake_up(&lockres->l_event);
2251
2252         mlog_exit_void();
2253 }
2254
2255 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2256
2257 struct drop_lock_cb {
2258         ocfs2_pre_drop_cb_t     *drop_func;
2259         void                    *drop_data;
2260 };
2261
2262 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2263                            struct ocfs2_lock_res *lockres,
2264                            struct drop_lock_cb *dcb)
2265 {
2266         enum dlm_status status;
2267         unsigned long flags;
2268         int lkm_flags = 0;
2269
2270         /* We didn't get anywhere near actually using this lockres. */
2271         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2272                 goto out;
2273
2274         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2275                 lkm_flags |= LKM_VALBLK;
2276
2277         spin_lock_irqsave(&lockres->l_lock, flags);
2278
2279         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2280                         "lockres %s, flags 0x%lx\n",
2281                         lockres->l_name, lockres->l_flags);
2282
2283         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2284                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2285                      "%u, unlock_action = %u\n",
2286                      lockres->l_name, lockres->l_flags, lockres->l_action,
2287                      lockres->l_unlock_action);
2288
2289                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2290
2291                 /* XXX: Today we just wait on any busy
2292                  * locks... Perhaps we need to cancel converts in the
2293                  * future? */
2294                 ocfs2_wait_on_busy_lock(lockres);
2295
2296                 spin_lock_irqsave(&lockres->l_lock, flags);
2297         }
2298
2299         if (dcb)
2300                 dcb->drop_func(lockres, dcb->drop_data);
2301
2302         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2303                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2304                      lockres->l_name);
2305         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2306                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2307
2308         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2309                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2310                 goto out;
2311         }
2312
2313         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2314
2315         /* make sure we never get here while waiting for an ast to
2316          * fire. */
2317         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2318
2319         /* is this necessary? */
2320         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2321         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2322         spin_unlock_irqrestore(&lockres->l_lock, flags);
2323
2324         mlog(0, "lock %s\n", lockres->l_name);
2325
2326         status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2327                            ocfs2_unlock_ast, lockres);
2328         if (status != DLM_NORMAL) {
2329                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2330                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2331                 dlm_print_one_lock(lockres->l_lksb.lockid);
2332                 BUG();
2333         }
2334         mlog(0, "lock %s, successfull return from dlmunlock\n",
2335              lockres->l_name);
2336
2337         ocfs2_wait_on_busy_lock(lockres);
2338 out:
2339         mlog_exit(0);
2340         return 0;
2341 }
2342
2343 /* Mark the lockres as being dropped. It will no longer be
2344  * queued if blocking, but we still may have to wait on it
2345  * being dequeued from the vote thread before we can consider
2346  * it safe to drop. 
2347  *
2348  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2349 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2350 {
2351         int status;
2352         struct ocfs2_mask_waiter mw;
2353         unsigned long flags;
2354
2355         ocfs2_init_mask_waiter(&mw);
2356
2357         spin_lock_irqsave(&lockres->l_lock, flags);
2358         lockres->l_flags |= OCFS2_LOCK_FREEING;
2359         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2360                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2361                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2362
2363                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2364
2365                 status = ocfs2_wait_for_mask(&mw);
2366                 if (status)
2367                         mlog_errno(status);
2368
2369                 spin_lock_irqsave(&lockres->l_lock, flags);
2370         }
2371         spin_unlock_irqrestore(&lockres->l_lock, flags);
2372 }
2373
2374 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2375                                struct ocfs2_lock_res *lockres)
2376 {
2377         int ret;
2378
2379         ocfs2_mark_lockres_freeing(lockres);
2380         ret = ocfs2_drop_lock(osb, lockres, NULL);
2381         if (ret)
2382                 mlog_errno(ret);
2383 }
2384
2385 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2386 {
2387         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2388         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2389 }
2390
2391 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2392 {
2393         struct inode *inode = data;
2394
2395         /* the metadata lock requires a bit more work as we have an
2396          * LVB to worry about. */
2397         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2398             lockres->l_level == LKM_EXMODE &&
2399             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2400                 __ocfs2_stuff_meta_lvb(inode);
2401 }
2402
2403 int ocfs2_drop_inode_locks(struct inode *inode)
2404 {
2405         int status, err;
2406         struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2407
2408         mlog_entry_void();
2409
2410         /* No need to call ocfs2_mark_lockres_freeing here -
2411          * ocfs2_clear_inode has done it for us. */
2412
2413         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2414                               &OCFS2_I(inode)->ip_data_lockres,
2415                               NULL);
2416         if (err < 0)
2417                 mlog_errno(err);
2418
2419         status = err;
2420
2421         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2422                               &OCFS2_I(inode)->ip_meta_lockres,
2423                               &meta_dcb);
2424         if (err < 0)
2425                 mlog_errno(err);
2426         if (err < 0 && !status)
2427                 status = err;
2428
2429         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2430                               &OCFS2_I(inode)->ip_rw_lockres,
2431                               NULL);
2432         if (err < 0)
2433                 mlog_errno(err);
2434         if (err < 0 && !status)
2435                 status = err;
2436
2437         mlog_exit(status);
2438         return status;
2439 }
2440
2441 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2442                                       int new_level)
2443 {
2444         assert_spin_locked(&lockres->l_lock);
2445
2446         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2447
2448         if (lockres->l_level <= new_level) {
2449                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2450                      lockres->l_level, new_level);
2451                 BUG();
2452         }
2453
2454         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2455              lockres->l_name, new_level, lockres->l_blocking);
2456
2457         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2458         lockres->l_requested = new_level;
2459         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2460 }
2461
2462 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2463                                   struct ocfs2_lock_res *lockres,
2464                                   int new_level,
2465                                   int lvb)
2466 {
2467         int ret, dlm_flags = LKM_CONVERT;
2468         enum dlm_status status;
2469
2470         mlog_entry_void();
2471
2472         if (lvb)
2473                 dlm_flags |= LKM_VALBLK;
2474
2475         status = dlmlock(osb->dlm,
2476                          new_level,
2477                          &lockres->l_lksb,
2478                          dlm_flags,
2479                          lockres->l_name,
2480                          OCFS2_LOCK_ID_MAX_LEN - 1,
2481                          ocfs2_locking_ast,
2482                          lockres,
2483                          ocfs2_blocking_ast);
2484         if (status != DLM_NORMAL) {
2485                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2486                 ret = -EINVAL;
2487                 ocfs2_recover_from_dlm_error(lockres, 1);
2488                 goto bail;
2489         }
2490
2491         ret = 0;
2492 bail:
2493         mlog_exit(ret);
2494         return ret;
2495 }
2496
2497 /* returns 1 when the caller should unlock and call dlmunlock */
2498 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2499                                         struct ocfs2_lock_res *lockres)
2500 {
2501         assert_spin_locked(&lockres->l_lock);
2502
2503         mlog_entry_void();
2504         mlog(0, "lock %s\n", lockres->l_name);
2505
2506         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2507                 /* If we're already trying to cancel a lock conversion
2508                  * then just drop the spinlock and allow the caller to
2509                  * requeue this lock. */
2510
2511                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2512                 return 0;
2513         }
2514
2515         /* were we in a convert when we got the bast fire? */
2516         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2517                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2518         /* set things up for the unlockast to know to just
2519          * clear out the ast_action and unset busy, etc. */
2520         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2521
2522         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2523                         "lock %s, invalid flags: 0x%lx\n",
2524                         lockres->l_name, lockres->l_flags);
2525
2526         return 1;
2527 }
2528
2529 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2530                                 struct ocfs2_lock_res *lockres)
2531 {
2532         int ret;
2533         enum dlm_status status;
2534
2535         mlog_entry_void();
2536         mlog(0, "lock %s\n", lockres->l_name);
2537
2538         ret = 0;
2539         status = dlmunlock(osb->dlm,
2540                            &lockres->l_lksb,
2541                            LKM_CANCEL,
2542                            ocfs2_unlock_ast,
2543                            lockres);
2544         if (status != DLM_NORMAL) {
2545                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2546                 ret = -EINVAL;
2547                 ocfs2_recover_from_dlm_error(lockres, 0);
2548         }
2549
2550         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2551
2552         mlog_exit(ret);
2553         return ret;
2554 }
2555
2556 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2557                                       struct ocfs2_lock_res *lockres,
2558                                       struct ocfs2_unblock_ctl *ctl)
2559 {
2560         unsigned long flags;
2561         int blocking;
2562         int new_level;
2563         int ret = 0;
2564         int set_lvb = 0;
2565
2566         mlog_entry_void();
2567
2568         spin_lock_irqsave(&lockres->l_lock, flags);
2569
2570         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2571
2572 recheck:
2573         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2574                 ctl->requeue = 1;
2575                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2576                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2577                 if (ret) {
2578                         ret = ocfs2_cancel_convert(osb, lockres);
2579                         if (ret < 0)
2580                                 mlog_errno(ret);
2581                 }
2582                 goto leave;
2583         }
2584
2585         /* if we're blocking an exclusive and we have *any* holders,
2586          * then requeue. */
2587         if ((lockres->l_blocking == LKM_EXMODE)
2588             && (lockres->l_ex_holders || lockres->l_ro_holders))
2589                 goto leave_requeue;
2590
2591         /* If it's a PR we're blocking, then only
2592          * requeue if we've got any EX holders */
2593         if (lockres->l_blocking == LKM_PRMODE &&
2594             lockres->l_ex_holders)
2595                 goto leave_requeue;
2596
2597         /*
2598          * Can we get a lock in this state if the holder counts are
2599          * zero? The meta data unblock code used to check this.
2600          */
2601         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2602             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2603                 goto leave_requeue;
2604
2605         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2606
2607         if (lockres->l_ops->check_downconvert
2608             && !lockres->l_ops->check_downconvert(lockres, new_level))
2609                 goto leave_requeue;
2610
2611         /* If we get here, then we know that there are no more
2612          * incompatible holders (and anyone asking for an incompatible
2613          * lock is blocked). We can now downconvert the lock */
2614         if (!lockres->l_ops->downconvert_worker)
2615                 goto downconvert;
2616
2617         /* Some lockres types want to do a bit of work before
2618          * downconverting a lock. Allow that here. The worker function
2619          * may sleep, so we save off a copy of what we're blocking as
2620          * it may change while we're not holding the spin lock. */
2621         blocking = lockres->l_blocking;
2622         spin_unlock_irqrestore(&lockres->l_lock, flags);
2623
2624         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2625
2626         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2627                 goto leave;
2628
2629         spin_lock_irqsave(&lockres->l_lock, flags);
2630         if (blocking != lockres->l_blocking) {
2631                 /* If this changed underneath us, then we can't drop
2632                  * it just yet. */
2633                 goto recheck;
2634         }
2635
2636 downconvert:
2637         ctl->requeue = 0;
2638
2639         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2640                 if (lockres->l_level == LKM_EXMODE)
2641                         set_lvb = 1;
2642
2643                 /*
2644                  * We only set the lvb if the lock has been fully
2645                  * refreshed - otherwise we risk setting stale
2646                  * data. Otherwise, there's no need to actually clear
2647                  * out the lvb here as it's value is still valid.
2648                  */
2649                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2650                         lockres->l_ops->set_lvb(lockres);
2651         }
2652
2653         ocfs2_prepare_downconvert(lockres, new_level);
2654         spin_unlock_irqrestore(&lockres->l_lock, flags);
2655         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2656 leave:
2657         mlog_exit(ret);
2658         return ret;
2659
2660 leave_requeue:
2661         spin_unlock_irqrestore(&lockres->l_lock, flags);
2662         ctl->requeue = 1;
2663
2664         mlog_exit(0);
2665         return 0;
2666 }
2667
2668 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2669                                      int blocking)
2670 {
2671         struct inode *inode;
2672         struct address_space *mapping;
2673
2674         inode = ocfs2_lock_res_inode(lockres);
2675         mapping = inode->i_mapping;
2676
2677         if (filemap_fdatawrite(mapping)) {
2678                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2679                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
2680         }
2681         sync_mapping_buffers(mapping);
2682         if (blocking == LKM_EXMODE) {
2683                 truncate_inode_pages(mapping, 0);
2684                 unmap_mapping_range(mapping, 0, 0, 0);
2685         } else {
2686                 /* We only need to wait on the I/O if we're not also
2687                  * truncating pages because truncate_inode_pages waits
2688                  * for us above. We don't truncate pages if we're
2689                  * blocking anything < EXMODE because we want to keep
2690                  * them around in that case. */
2691                 filemap_fdatawait(mapping);
2692         }
2693
2694         return UNBLOCK_CONTINUE;
2695 }
2696
2697 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2698                        struct ocfs2_unblock_ctl *ctl)
2699 {
2700         int status;
2701         struct inode *inode;
2702         struct ocfs2_super *osb;
2703
2704         mlog_entry_void();
2705
2706         inode = ocfs2_lock_res_inode(lockres);
2707         osb = OCFS2_SB(inode->i_sb);
2708
2709         mlog(0, "unblock inode %llu\n",
2710              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2711
2712         status = ocfs2_generic_unblock_lock(osb, lockres, ctl);
2713         if (status < 0)
2714                 mlog_errno(status);
2715
2716         mlog(0, "inode %llu, requeue = %d\n",
2717              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2718
2719         mlog_exit(status);
2720         return status;
2721 }
2722
2723 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2724                                     struct ocfs2_unblock_ctl *ctl)
2725 {
2726         int status;
2727         struct inode *inode;
2728
2729         mlog_entry_void();
2730
2731         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2732
2733         inode  = ocfs2_lock_res_inode(lockres);
2734
2735         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2736                                             lockres, ctl);
2737         if (status < 0)
2738                 mlog_errno(status);
2739
2740         mlog_exit(status);
2741         return status;
2742 }
2743
2744 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2745                                         int new_level)
2746 {
2747         struct inode *inode = ocfs2_lock_res_inode(lockres);
2748         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2749
2750         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2751         BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2752
2753         if (checkpointed)
2754                 return 1;
2755
2756         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2757         return 0;
2758 }
2759
2760 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2761 {
2762         struct inode *inode = ocfs2_lock_res_inode(lockres);
2763
2764         __ocfs2_stuff_meta_lvb(inode);
2765 }
2766
2767 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2768                               struct ocfs2_unblock_ctl *ctl)
2769 {
2770         int status;
2771         struct inode *inode;
2772
2773         mlog_entry_void();
2774
2775         inode = ocfs2_lock_res_inode(lockres);
2776
2777         mlog(0, "unblock inode %llu\n",
2778              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2779
2780         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2781                                             lockres, ctl);
2782         if (status < 0)
2783                 mlog_errno(status);
2784
2785         mlog(0, "inode %llu, requeue = %d\n",
2786              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2787
2788         mlog_exit(status);
2789         return status;
2790 }
2791
2792 /*
2793  * Does the final reference drop on our dentry lock. Right now this
2794  * happens in the vote thread, but we could choose to simplify the
2795  * dlmglue API and push these off to the ocfs2_wq in the future.
2796  */
2797 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2798                                      struct ocfs2_lock_res *lockres)
2799 {
2800         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2801         ocfs2_dentry_lock_put(osb, dl);
2802 }
2803
2804 /*
2805  * d_delete() matching dentries before the lock downconvert.
2806  *
2807  * At this point, any process waiting to destroy the
2808  * dentry_lock due to last ref count is stopped by the
2809  * OCFS2_LOCK_QUEUED flag.
2810  *
2811  * We have two potential problems
2812  *
2813  * 1) If we do the last reference drop on our dentry_lock (via dput)
2814  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2815  *    the downconvert to finish. Instead we take an elevated
2816  *    reference and push the drop until after we've completed our
2817  *    unblock processing.
2818  *
2819  * 2) There might be another process with a final reference,
2820  *    waiting on us to finish processing. If this is the case, we
2821  *    detect it and exit out - there's no more dentries anyway.
2822  */
2823 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2824                                        int blocking)
2825 {
2826         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2827         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2828         struct dentry *dentry;
2829         unsigned long flags;
2830         int extra_ref = 0;
2831
2832         /*
2833          * This node is blocking another node from getting a read
2834          * lock. This happens when we've renamed within a
2835          * directory. We've forced the other nodes to d_delete(), but
2836          * we never actually dropped our lock because it's still
2837          * valid. The downconvert code will retain a PR for this node,
2838          * so there's no further work to do.
2839          */
2840         if (blocking == LKM_PRMODE)
2841                 return UNBLOCK_CONTINUE;
2842
2843         /*
2844          * Mark this inode as potentially orphaned. The code in
2845          * ocfs2_delete_inode() will figure out whether it actually
2846          * needs to be freed or not.
2847          */
2848         spin_lock(&oi->ip_lock);
2849         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2850         spin_unlock(&oi->ip_lock);
2851
2852         /*
2853          * Yuck. We need to make sure however that the check of
2854          * OCFS2_LOCK_FREEING and the extra reference are atomic with
2855          * respect to a reference decrement or the setting of that
2856          * flag.
2857          */
2858         spin_lock_irqsave(&lockres->l_lock, flags);
2859         spin_lock(&dentry_attach_lock);
2860         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2861             && dl->dl_count) {
2862                 dl->dl_count++;
2863                 extra_ref = 1;
2864         }
2865         spin_unlock(&dentry_attach_lock);
2866         spin_unlock_irqrestore(&lockres->l_lock, flags);
2867
2868         mlog(0, "extra_ref = %d\n", extra_ref);
2869
2870         /*
2871          * We have a process waiting on us in ocfs2_dentry_iput(),
2872          * which means we can't have any more outstanding
2873          * aliases. There's no need to do any more work.
2874          */
2875         if (!extra_ref)
2876                 return UNBLOCK_CONTINUE;
2877
2878         spin_lock(&dentry_attach_lock);
2879         while (1) {
2880                 dentry = ocfs2_find_local_alias(dl->dl_inode,
2881                                                 dl->dl_parent_blkno, 1);
2882                 if (!dentry)
2883                         break;
2884                 spin_unlock(&dentry_attach_lock);
2885
2886                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2887                      dentry->d_name.name);
2888
2889                 /*
2890                  * The following dcache calls may do an
2891                  * iput(). Normally we don't want that from the
2892                  * downconverting thread, but in this case it's ok
2893                  * because the requesting node already has an
2894                  * exclusive lock on the inode, so it can't be queued
2895                  * for a downconvert.
2896                  */
2897                 d_delete(dentry);
2898                 dput(dentry);
2899
2900                 spin_lock(&dentry_attach_lock);
2901         }
2902         spin_unlock(&dentry_attach_lock);
2903
2904         /*
2905          * If we are the last holder of this dentry lock, there is no
2906          * reason to downconvert so skip straight to the unlock.
2907          */
2908         if (dl->dl_count == 1)
2909                 return UNBLOCK_STOP_POST;
2910
2911         return UNBLOCK_CONTINUE_POST;
2912 }
2913
2914 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2915                                      struct ocfs2_unblock_ctl *ctl)
2916 {
2917         int ret;
2918         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2919         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2920
2921         mlog(0, "unblock dentry lock: %llu\n",
2922              (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2923
2924         ret = ocfs2_generic_unblock_lock(osb,
2925                                          lockres,
2926                                          ctl);
2927         if (ret < 0)
2928                 mlog_errno(ret);
2929
2930         mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2931
2932         return ret;
2933 }
2934
2935 /* Generic unblock function for any lockres whose private data is an
2936  * ocfs2_super pointer. */
2937 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2938                                   struct ocfs2_unblock_ctl *ctl)
2939 {
2940         int status;
2941         struct ocfs2_super *osb;
2942
2943         mlog_entry_void();
2944
2945         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2946
2947         osb = ocfs2_get_lockres_osb(lockres);
2948
2949         status = ocfs2_generic_unblock_lock(osb,
2950                                             lockres,
2951                                             ctl);
2952         if (status < 0)
2953                 mlog_errno(status);
2954
2955         mlog_exit(status);
2956         return status;
2957 }
2958
2959 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2960                                 struct ocfs2_lock_res *lockres)
2961 {
2962         int status;
2963         struct ocfs2_unblock_ctl ctl = {0, 0,};
2964         unsigned long flags;
2965
2966         /* Our reference to the lockres in this function can be
2967          * considered valid until we remove the OCFS2_LOCK_QUEUED
2968          * flag. */
2969
2970         mlog_entry_void();
2971
2972         BUG_ON(!lockres);
2973         BUG_ON(!lockres->l_ops);
2974         BUG_ON(!lockres->l_ops->unblock);
2975
2976         mlog(0, "lockres %s blocked.\n", lockres->l_name);
2977
2978         /* Detect whether a lock has been marked as going away while
2979          * the vote thread was processing other things. A lock can
2980          * still be marked with OCFS2_LOCK_FREEING after this check,
2981          * but short circuiting here will still save us some
2982          * performance. */
2983         spin_lock_irqsave(&lockres->l_lock, flags);
2984         if (lockres->l_flags & OCFS2_LOCK_FREEING)
2985                 goto unqueue;
2986         spin_unlock_irqrestore(&lockres->l_lock, flags);
2987
2988         status = lockres->l_ops->unblock(lockres, &ctl);
2989         if (status < 0)
2990                 mlog_errno(status);
2991
2992         spin_lock_irqsave(&lockres->l_lock, flags);
2993 unqueue:
2994         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2995                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2996         } else
2997                 ocfs2_schedule_blocked_lock(osb, lockres);
2998
2999         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3000              ctl.requeue ? "yes" : "no");
3001         spin_unlock_irqrestore(&lockres->l_lock, flags);
3002
3003         if (ctl.unblock_action != UNBLOCK_CONTINUE
3004             && lockres->l_ops->post_unlock)
3005                 lockres->l_ops->post_unlock(osb, lockres);
3006
3007         mlog_exit_void();
3008 }
3009
3010 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3011                                         struct ocfs2_lock_res *lockres)
3012 {
3013         mlog_entry_void();
3014
3015         assert_spin_locked(&lockres->l_lock);
3016
3017         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3018                 /* Do not schedule a lock for downconvert when it's on
3019                  * the way to destruction - any nodes wanting access
3020                  * to the resource will get it soon. */
3021                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3022                      lockres->l_name, lockres->l_flags);
3023                 return;
3024         }
3025
3026         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3027
3028         spin_lock(&osb->vote_task_lock);
3029         if (list_empty(&lockres->l_blocked_list)) {
3030                 list_add_tail(&lockres->l_blocked_list,
3031                               &osb->blocked_lock_list);
3032                 osb->blocked_lock_count++;
3033         }
3034         spin_unlock(&osb->vote_task_lock);
3035
3036         mlog_exit_void();
3037 }
3038
3039 /* This aids in debugging situations where a bad LVB might be involved. */
3040 void ocfs2_dump_meta_lvb_info(u64 level,
3041                               const char *function,
3042                               unsigned int line,
3043                               struct ocfs2_lock_res *lockres)
3044 {
3045         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3046
3047         mlog(level, "LVB information for %s (called from %s:%u):\n",
3048              lockres->l_name, function, line);
3049         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3050              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3051              be32_to_cpu(lvb->lvb_igeneration));
3052         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3053              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3054              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3055              be16_to_cpu(lvb->lvb_imode));
3056         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3057              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3058              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3059              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3060              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3061              be32_to_cpu(lvb->lvb_iattr));
3062 }