ocfs2: Separate out dlm lock functions.
[safe/jmp/linux-2.6] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/crc32.h>
31 #include <linux/kthread.h>
32 #include <linux/pagemap.h>
33 #include <linux/debugfs.h>
34 #include <linux/seq_file.h>
35
36 #include <cluster/heartbeat.h>
37 #include <cluster/nodemanager.h>
38 #include <cluster/tcp.h>
39
40 #include <dlm/dlmapi.h>
41
42 #define MLOG_MASK_PREFIX ML_DLM_GLUE
43 #include <cluster/masklog.h>
44
45 #include "ocfs2.h"
46 #include "ocfs2_lockingver.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "file.h"
53 #include "heartbeat.h"
54 #include "inode.h"
55 #include "journal.h"
56 #include "stackglue.h"
57 #include "slot_map.h"
58 #include "super.h"
59 #include "uptodate.h"
60
61 #include "buffer_head_io.h"
62
63 struct ocfs2_mask_waiter {
64         struct list_head        mw_item;
65         int                     mw_status;
66         struct completion       mw_complete;
67         unsigned long           mw_mask;
68         unsigned long           mw_goal;
69 };
70
71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74
75 /*
76  * Return value from ->downconvert_worker functions.
77  *
78  * These control the precise actions of ocfs2_unblock_lock()
79  * and ocfs2_process_blocked_lock()
80  *
81  */
82 enum ocfs2_unblock_action {
83         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
84         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
85                                       * ->post_unlock callback */
86         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
87                                       * ->post_unlock() callback. */
88 };
89
90 struct ocfs2_unblock_ctl {
91         int requeue;
92         enum ocfs2_unblock_action unblock_action;
93 };
94
95 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
96                                         int new_level);
97 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
98
99 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
100                                      int blocking);
101
102 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
103                                        int blocking);
104
105 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
106                                      struct ocfs2_lock_res *lockres);
107
108
109 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
110
111 /* This aids in debugging situations where a bad LVB might be involved. */
112 static void ocfs2_dump_meta_lvb_info(u64 level,
113                                      const char *function,
114                                      unsigned int line,
115                                      struct ocfs2_lock_res *lockres)
116 {
117         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
118
119         mlog(level, "LVB information for %s (called from %s:%u):\n",
120              lockres->l_name, function, line);
121         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
122              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
123              be32_to_cpu(lvb->lvb_igeneration));
124         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
125              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
126              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
127              be16_to_cpu(lvb->lvb_imode));
128         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
129              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
130              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
131              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
132              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
133              be32_to_cpu(lvb->lvb_iattr));
134 }
135
136
137 /*
138  * OCFS2 Lock Resource Operations
139  *
140  * These fine tune the behavior of the generic dlmglue locking infrastructure.
141  *
142  * The most basic of lock types can point ->l_priv to their respective
143  * struct ocfs2_super and allow the default actions to manage things.
144  *
145  * Right now, each lock type also needs to implement an init function,
146  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
147  * should be called when the lock is no longer needed (i.e., object
148  * destruction time).
149  */
150 struct ocfs2_lock_res_ops {
151         /*
152          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
153          * this callback if ->l_priv is not an ocfs2_super pointer
154          */
155         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
156
157         /*
158          * Optionally called in the downconvert thread after a
159          * successful downconvert. The lockres will not be referenced
160          * after this callback is called, so it is safe to free
161          * memory, etc.
162          *
163          * The exact semantics of when this is called are controlled
164          * by ->downconvert_worker()
165          */
166         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
167
168         /*
169          * Allow a lock type to add checks to determine whether it is
170          * safe to downconvert a lock. Return 0 to re-queue the
171          * downconvert at a later time, nonzero to continue.
172          *
173          * For most locks, the default checks that there are no
174          * incompatible holders are sufficient.
175          *
176          * Called with the lockres spinlock held.
177          */
178         int (*check_downconvert)(struct ocfs2_lock_res *, int);
179
180         /*
181          * Allows a lock type to populate the lock value block. This
182          * is called on downconvert, and when we drop a lock.
183          *
184          * Locks that want to use this should set LOCK_TYPE_USES_LVB
185          * in the flags field.
186          *
187          * Called with the lockres spinlock held.
188          */
189         void (*set_lvb)(struct ocfs2_lock_res *);
190
191         /*
192          * Called from the downconvert thread when it is determined
193          * that a lock will be downconverted. This is called without
194          * any locks held so the function can do work that might
195          * schedule (syncing out data, etc).
196          *
197          * This should return any one of the ocfs2_unblock_action
198          * values, depending on what it wants the thread to do.
199          */
200         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
201
202         /*
203          * LOCK_TYPE_* flags which describe the specific requirements
204          * of a lock type. Descriptions of each individual flag follow.
205          */
206         int flags;
207 };
208
209 /*
210  * Some locks want to "refresh" potentially stale data when a
211  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
212  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
213  * individual lockres l_flags member from the ast function. It is
214  * expected that the locking wrapper will clear the
215  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
216  */
217 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
218
219 /*
220  * Indicate that a lock type makes use of the lock value block. The
221  * ->set_lvb lock type callback must be defined.
222  */
223 #define LOCK_TYPE_USES_LVB              0x2
224
225 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
226         .get_osb        = ocfs2_get_inode_osb,
227         .flags          = 0,
228 };
229
230 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
231         .get_osb        = ocfs2_get_inode_osb,
232         .check_downconvert = ocfs2_check_meta_downconvert,
233         .set_lvb        = ocfs2_set_meta_lvb,
234         .downconvert_worker = ocfs2_data_convert_worker,
235         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
236 };
237
238 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
239         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
240 };
241
242 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
243         .flags          = 0,
244 };
245
246 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
247         .get_osb        = ocfs2_get_dentry_osb,
248         .post_unlock    = ocfs2_dentry_post_unlock,
249         .downconvert_worker = ocfs2_dentry_convert_worker,
250         .flags          = 0,
251 };
252
253 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
254         .get_osb        = ocfs2_get_inode_osb,
255         .flags          = 0,
256 };
257
258 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
259         .get_osb        = ocfs2_get_file_osb,
260         .flags          = 0,
261 };
262
263 /*
264  * This is the filesystem locking protocol version.
265  *
266  * Whenever the filesystem does new things with locks (adds or removes a
267  * lock, orders them differently, does different things underneath a lock),
268  * the version must be changed.  The protocol is negotiated when joining
269  * the dlm domain.  A node may join the domain if its major version is
270  * identical to all other nodes and its minor version is greater than
271  * or equal to all other nodes.  When its minor version is greater than
272  * the other nodes, it will run at the minor version specified by the
273  * other nodes.
274  *
275  * If a locking change is made that will not be compatible with older
276  * versions, the major number must be increased and the minor version set
277  * to zero.  If a change merely adds a behavior that can be disabled when
278  * speaking to older versions, the minor version must be increased.  If a
279  * change adds a fully backwards compatible change (eg, LVB changes that
280  * are just ignored by older versions), the version does not need to be
281  * updated.
282  */
283 const struct dlm_protocol_version ocfs2_locking_protocol = {
284         .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
285         .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
286 };
287
288 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
289 {
290         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
291                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
292                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
293 }
294
295 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
296 {
297         BUG_ON(!ocfs2_is_inode_lock(lockres));
298
299         return (struct inode *) lockres->l_priv;
300 }
301
302 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
303 {
304         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
305
306         return (struct ocfs2_dentry_lock *)lockres->l_priv;
307 }
308
309 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
310 {
311         if (lockres->l_ops->get_osb)
312                 return lockres->l_ops->get_osb(lockres);
313
314         return (struct ocfs2_super *)lockres->l_priv;
315 }
316
317 static int ocfs2_lock_create(struct ocfs2_super *osb,
318                              struct ocfs2_lock_res *lockres,
319                              int level,
320                              int dlm_flags);
321 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
322                                                      int wanted);
323 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
324                                  struct ocfs2_lock_res *lockres,
325                                  int level);
326 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
327 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
328 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
329 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
330 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
331                                         struct ocfs2_lock_res *lockres);
332 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
333                                                 int convert);
334 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
335         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
336                 "resource %s: %s\n", dlm_errname(_stat), _func, \
337                 _lockres->l_name, dlm_errmsg(_stat));           \
338 } while (0)
339 static int ocfs2_downconvert_thread(void *arg);
340 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
341                                         struct ocfs2_lock_res *lockres);
342 static int ocfs2_inode_lock_update(struct inode *inode,
343                                   struct buffer_head **bh);
344 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
345 static inline int ocfs2_highest_compat_lock_level(int level);
346 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
347                                       int new_level);
348 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
349                                   struct ocfs2_lock_res *lockres,
350                                   int new_level,
351                                   int lvb);
352 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
353                                         struct ocfs2_lock_res *lockres);
354 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
355                                 struct ocfs2_lock_res *lockres);
356
357
358 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
359                                   u64 blkno,
360                                   u32 generation,
361                                   char *name)
362 {
363         int len;
364
365         mlog_entry_void();
366
367         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
368
369         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
370                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
371                        (long long)blkno, generation);
372
373         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
374
375         mlog(0, "built lock resource with name: %s\n", name);
376
377         mlog_exit_void();
378 }
379
380 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
381
382 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
383                                        struct ocfs2_dlm_debug *dlm_debug)
384 {
385         mlog(0, "Add tracking for lockres %s\n", res->l_name);
386
387         spin_lock(&ocfs2_dlm_tracking_lock);
388         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
389         spin_unlock(&ocfs2_dlm_tracking_lock);
390 }
391
392 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
393 {
394         spin_lock(&ocfs2_dlm_tracking_lock);
395         if (!list_empty(&res->l_debug_list))
396                 list_del_init(&res->l_debug_list);
397         spin_unlock(&ocfs2_dlm_tracking_lock);
398 }
399
400 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
401                                        struct ocfs2_lock_res *res,
402                                        enum ocfs2_lock_type type,
403                                        struct ocfs2_lock_res_ops *ops,
404                                        void *priv)
405 {
406         res->l_type          = type;
407         res->l_ops           = ops;
408         res->l_priv          = priv;
409
410         res->l_level         = LKM_IVMODE;
411         res->l_requested     = LKM_IVMODE;
412         res->l_blocking      = LKM_IVMODE;
413         res->l_action        = OCFS2_AST_INVALID;
414         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
415
416         res->l_flags         = OCFS2_LOCK_INITIALIZED;
417
418         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
419 }
420
421 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
422 {
423         /* This also clears out the lock status block */
424         memset(res, 0, sizeof(struct ocfs2_lock_res));
425         spin_lock_init(&res->l_lock);
426         init_waitqueue_head(&res->l_event);
427         INIT_LIST_HEAD(&res->l_blocked_list);
428         INIT_LIST_HEAD(&res->l_mask_waiters);
429 }
430
431 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
432                                enum ocfs2_lock_type type,
433                                unsigned int generation,
434                                struct inode *inode)
435 {
436         struct ocfs2_lock_res_ops *ops;
437
438         switch(type) {
439                 case OCFS2_LOCK_TYPE_RW:
440                         ops = &ocfs2_inode_rw_lops;
441                         break;
442                 case OCFS2_LOCK_TYPE_META:
443                         ops = &ocfs2_inode_inode_lops;
444                         break;
445                 case OCFS2_LOCK_TYPE_OPEN:
446                         ops = &ocfs2_inode_open_lops;
447                         break;
448                 default:
449                         mlog_bug_on_msg(1, "type: %d\n", type);
450                         ops = NULL; /* thanks, gcc */
451                         break;
452         };
453
454         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
455                               generation, res->l_name);
456         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
457 }
458
459 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
460 {
461         struct inode *inode = ocfs2_lock_res_inode(lockres);
462
463         return OCFS2_SB(inode->i_sb);
464 }
465
466 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
467 {
468         struct ocfs2_file_private *fp = lockres->l_priv;
469
470         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
471 }
472
473 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
474 {
475         __be64 inode_blkno_be;
476
477         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
478                sizeof(__be64));
479
480         return be64_to_cpu(inode_blkno_be);
481 }
482
483 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
484 {
485         struct ocfs2_dentry_lock *dl = lockres->l_priv;
486
487         return OCFS2_SB(dl->dl_inode->i_sb);
488 }
489
490 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
491                                 u64 parent, struct inode *inode)
492 {
493         int len;
494         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
495         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
496         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
497
498         ocfs2_lock_res_init_once(lockres);
499
500         /*
501          * Unfortunately, the standard lock naming scheme won't work
502          * here because we have two 16 byte values to use. Instead,
503          * we'll stuff the inode number as a binary value. We still
504          * want error prints to show something without garbling the
505          * display, so drop a null byte in there before the inode
506          * number. A future version of OCFS2 will likely use all
507          * binary lock names. The stringified names have been a
508          * tremendous aid in debugging, but now that the debugfs
509          * interface exists, we can mangle things there if need be.
510          *
511          * NOTE: We also drop the standard "pad" value (the total lock
512          * name size stays the same though - the last part is all
513          * zeros due to the memset in ocfs2_lock_res_init_once()
514          */
515         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
516                        "%c%016llx",
517                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
518                        (long long)parent);
519
520         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
521
522         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
523                sizeof(__be64));
524
525         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
526                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
527                                    dl);
528 }
529
530 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
531                                       struct ocfs2_super *osb)
532 {
533         /* Superblock lockres doesn't come from a slab so we call init
534          * once on it manually.  */
535         ocfs2_lock_res_init_once(res);
536         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
537                               0, res->l_name);
538         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
539                                    &ocfs2_super_lops, osb);
540 }
541
542 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
543                                        struct ocfs2_super *osb)
544 {
545         /* Rename lockres doesn't come from a slab so we call init
546          * once on it manually.  */
547         ocfs2_lock_res_init_once(res);
548         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
549         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
550                                    &ocfs2_rename_lops, osb);
551 }
552
553 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
554                               struct ocfs2_file_private *fp)
555 {
556         struct inode *inode = fp->fp_file->f_mapping->host;
557         struct ocfs2_inode_info *oi = OCFS2_I(inode);
558
559         ocfs2_lock_res_init_once(lockres);
560         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
561                               inode->i_generation, lockres->l_name);
562         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
563                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
564                                    fp);
565         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
566 }
567
568 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
569 {
570         mlog_entry_void();
571
572         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
573                 return;
574
575         ocfs2_remove_lockres_tracking(res);
576
577         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
578                         "Lockres %s is on the blocked list\n",
579                         res->l_name);
580         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
581                         "Lockres %s has mask waiters pending\n",
582                         res->l_name);
583         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
584                         "Lockres %s is locked\n",
585                         res->l_name);
586         mlog_bug_on_msg(res->l_ro_holders,
587                         "Lockres %s has %u ro holders\n",
588                         res->l_name, res->l_ro_holders);
589         mlog_bug_on_msg(res->l_ex_holders,
590                         "Lockres %s has %u ex holders\n",
591                         res->l_name, res->l_ex_holders);
592
593         /* Need to clear out the lock status block for the dlm */
594         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
595
596         res->l_flags = 0UL;
597         mlog_exit_void();
598 }
599
600 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
601                                      int level)
602 {
603         mlog_entry_void();
604
605         BUG_ON(!lockres);
606
607         switch(level) {
608         case LKM_EXMODE:
609                 lockres->l_ex_holders++;
610                 break;
611         case LKM_PRMODE:
612                 lockres->l_ro_holders++;
613                 break;
614         default:
615                 BUG();
616         }
617
618         mlog_exit_void();
619 }
620
621 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
622                                      int level)
623 {
624         mlog_entry_void();
625
626         BUG_ON(!lockres);
627
628         switch(level) {
629         case LKM_EXMODE:
630                 BUG_ON(!lockres->l_ex_holders);
631                 lockres->l_ex_holders--;
632                 break;
633         case LKM_PRMODE:
634                 BUG_ON(!lockres->l_ro_holders);
635                 lockres->l_ro_holders--;
636                 break;
637         default:
638                 BUG();
639         }
640         mlog_exit_void();
641 }
642
643 /* WARNING: This function lives in a world where the only three lock
644  * levels are EX, PR, and NL. It *will* have to be adjusted when more
645  * lock types are added. */
646 static inline int ocfs2_highest_compat_lock_level(int level)
647 {
648         int new_level = LKM_EXMODE;
649
650         if (level == LKM_EXMODE)
651                 new_level = LKM_NLMODE;
652         else if (level == LKM_PRMODE)
653                 new_level = LKM_PRMODE;
654         return new_level;
655 }
656
657 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
658                               unsigned long newflags)
659 {
660         struct ocfs2_mask_waiter *mw, *tmp;
661
662         assert_spin_locked(&lockres->l_lock);
663
664         lockres->l_flags = newflags;
665
666         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
667                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
668                         continue;
669
670                 list_del_init(&mw->mw_item);
671                 mw->mw_status = 0;
672                 complete(&mw->mw_complete);
673         }
674 }
675 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
676 {
677         lockres_set_flags(lockres, lockres->l_flags | or);
678 }
679 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
680                                 unsigned long clear)
681 {
682         lockres_set_flags(lockres, lockres->l_flags & ~clear);
683 }
684
685 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
686 {
687         mlog_entry_void();
688
689         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
690         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
691         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
692         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
693
694         lockres->l_level = lockres->l_requested;
695         if (lockres->l_level <=
696             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
697                 lockres->l_blocking = LKM_NLMODE;
698                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
699         }
700         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
701
702         mlog_exit_void();
703 }
704
705 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
706 {
707         mlog_entry_void();
708
709         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
710         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
711
712         /* Convert from RO to EX doesn't really need anything as our
713          * information is already up to data. Convert from NL to
714          * *anything* however should mark ourselves as needing an
715          * update */
716         if (lockres->l_level == LKM_NLMODE &&
717             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
718                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
719
720         lockres->l_level = lockres->l_requested;
721         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
722
723         mlog_exit_void();
724 }
725
726 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
727 {
728         mlog_entry_void();
729
730         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
731         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
732
733         if (lockres->l_requested > LKM_NLMODE &&
734             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
735             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
736                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
737
738         lockres->l_level = lockres->l_requested;
739         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
740         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
741
742         mlog_exit_void();
743 }
744
745 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
746                                      int level)
747 {
748         int needs_downconvert = 0;
749         mlog_entry_void();
750
751         assert_spin_locked(&lockres->l_lock);
752
753         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
754
755         if (level > lockres->l_blocking) {
756                 /* only schedule a downconvert if we haven't already scheduled
757                  * one that goes low enough to satisfy the level we're
758                  * blocking.  this also catches the case where we get
759                  * duplicate BASTs */
760                 if (ocfs2_highest_compat_lock_level(level) <
761                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
762                         needs_downconvert = 1;
763
764                 lockres->l_blocking = level;
765         }
766
767         mlog_exit(needs_downconvert);
768         return needs_downconvert;
769 }
770
771 static void ocfs2_blocking_ast(void *opaque, int level)
772 {
773         struct ocfs2_lock_res *lockres = opaque;
774         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
775         int needs_downconvert;
776         unsigned long flags;
777
778         BUG_ON(level <= LKM_NLMODE);
779
780         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
781              lockres->l_name, level, lockres->l_level,
782              ocfs2_lock_type_string(lockres->l_type));
783
784         /*
785          * We can skip the bast for locks which don't enable caching -
786          * they'll be dropped at the earliest possible time anyway.
787          */
788         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
789                 return;
790
791         spin_lock_irqsave(&lockres->l_lock, flags);
792         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
793         if (needs_downconvert)
794                 ocfs2_schedule_blocked_lock(osb, lockres);
795         spin_unlock_irqrestore(&lockres->l_lock, flags);
796
797         wake_up(&lockres->l_event);
798
799         ocfs2_wake_downconvert_thread(osb);
800 }
801
802 static void ocfs2_locking_ast(void *opaque)
803 {
804         struct ocfs2_lock_res *lockres = opaque;
805         struct dlm_lockstatus *lksb = &lockres->l_lksb;
806         unsigned long flags;
807
808         spin_lock_irqsave(&lockres->l_lock, flags);
809
810         if (lksb->status != DLM_NORMAL) {
811                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
812                      lockres->l_name, lksb->status);
813                 spin_unlock_irqrestore(&lockres->l_lock, flags);
814                 return;
815         }
816
817         switch(lockres->l_action) {
818         case OCFS2_AST_ATTACH:
819                 ocfs2_generic_handle_attach_action(lockres);
820                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
821                 break;
822         case OCFS2_AST_CONVERT:
823                 ocfs2_generic_handle_convert_action(lockres);
824                 break;
825         case OCFS2_AST_DOWNCONVERT:
826                 ocfs2_generic_handle_downconvert_action(lockres);
827                 break;
828         default:
829                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
830                      "lockres flags = 0x%lx, unlock action: %u\n",
831                      lockres->l_name, lockres->l_action, lockres->l_flags,
832                      lockres->l_unlock_action);
833                 BUG();
834         }
835
836         /* set it to something invalid so if we get called again we
837          * can catch it. */
838         lockres->l_action = OCFS2_AST_INVALID;
839
840         wake_up(&lockres->l_event);
841         spin_unlock_irqrestore(&lockres->l_lock, flags);
842 }
843
844 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
845                                                 int convert)
846 {
847         unsigned long flags;
848
849         mlog_entry_void();
850         spin_lock_irqsave(&lockres->l_lock, flags);
851         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
852         if (convert)
853                 lockres->l_action = OCFS2_AST_INVALID;
854         else
855                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
856         spin_unlock_irqrestore(&lockres->l_lock, flags);
857
858         wake_up(&lockres->l_event);
859         mlog_exit_void();
860 }
861
862 /* Note: If we detect another process working on the lock (i.e.,
863  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
864  * to do the right thing in that case.
865  */
866 static int ocfs2_lock_create(struct ocfs2_super *osb,
867                              struct ocfs2_lock_res *lockres,
868                              int level,
869                              int dlm_flags)
870 {
871         int ret = 0;
872         enum dlm_status status = DLM_NORMAL;
873         unsigned long flags;
874
875         mlog_entry_void();
876
877         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
878              dlm_flags);
879
880         spin_lock_irqsave(&lockres->l_lock, flags);
881         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
882             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
883                 spin_unlock_irqrestore(&lockres->l_lock, flags);
884                 goto bail;
885         }
886
887         lockres->l_action = OCFS2_AST_ATTACH;
888         lockres->l_requested = level;
889         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
890         spin_unlock_irqrestore(&lockres->l_lock, flags);
891
892         status = ocfs2_dlm_lock(osb->dlm,
893                                 level,
894                                 &lockres->l_lksb,
895                                 dlm_flags,
896                                 lockres->l_name,
897                                 OCFS2_LOCK_ID_MAX_LEN - 1,
898                                 lockres);
899         if (status != DLM_NORMAL) {
900                 ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
901                 ret = -EINVAL;
902                 ocfs2_recover_from_dlm_error(lockres, 1);
903         }
904
905         mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
906              lockres->l_name);
907
908 bail:
909         mlog_exit(ret);
910         return ret;
911 }
912
913 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
914                                         int flag)
915 {
916         unsigned long flags;
917         int ret;
918
919         spin_lock_irqsave(&lockres->l_lock, flags);
920         ret = lockres->l_flags & flag;
921         spin_unlock_irqrestore(&lockres->l_lock, flags);
922
923         return ret;
924 }
925
926 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
927
928 {
929         wait_event(lockres->l_event,
930                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
931 }
932
933 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
934
935 {
936         wait_event(lockres->l_event,
937                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
938 }
939
940 /* predict what lock level we'll be dropping down to on behalf
941  * of another node, and return true if the currently wanted
942  * level will be compatible with it. */
943 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
944                                                      int wanted)
945 {
946         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
947
948         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
949 }
950
951 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
952 {
953         INIT_LIST_HEAD(&mw->mw_item);
954         init_completion(&mw->mw_complete);
955 }
956
957 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
958 {
959         wait_for_completion(&mw->mw_complete);
960         /* Re-arm the completion in case we want to wait on it again */
961         INIT_COMPLETION(mw->mw_complete);
962         return mw->mw_status;
963 }
964
965 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
966                                     struct ocfs2_mask_waiter *mw,
967                                     unsigned long mask,
968                                     unsigned long goal)
969 {
970         BUG_ON(!list_empty(&mw->mw_item));
971
972         assert_spin_locked(&lockres->l_lock);
973
974         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
975         mw->mw_mask = mask;
976         mw->mw_goal = goal;
977 }
978
979 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
980  * if the mask still hadn't reached its goal */
981 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
982                                       struct ocfs2_mask_waiter *mw)
983 {
984         unsigned long flags;
985         int ret = 0;
986
987         spin_lock_irqsave(&lockres->l_lock, flags);
988         if (!list_empty(&mw->mw_item)) {
989                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
990                         ret = -EBUSY;
991
992                 list_del_init(&mw->mw_item);
993                 init_completion(&mw->mw_complete);
994         }
995         spin_unlock_irqrestore(&lockres->l_lock, flags);
996
997         return ret;
998
999 }
1000
1001 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1002                                              struct ocfs2_lock_res *lockres)
1003 {
1004         int ret;
1005
1006         ret = wait_for_completion_interruptible(&mw->mw_complete);
1007         if (ret)
1008                 lockres_remove_mask_waiter(lockres, mw);
1009         else
1010                 ret = mw->mw_status;
1011         /* Re-arm the completion in case we want to wait on it again */
1012         INIT_COMPLETION(mw->mw_complete);
1013         return ret;
1014 }
1015
1016 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
1017                               struct ocfs2_lock_res *lockres,
1018                               int level,
1019                               int lkm_flags,
1020                               int arg_flags)
1021 {
1022         struct ocfs2_mask_waiter mw;
1023         enum dlm_status status;
1024         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1025         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1026         unsigned long flags;
1027
1028         mlog_entry_void();
1029
1030         ocfs2_init_mask_waiter(&mw);
1031
1032         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1033                 lkm_flags |= LKM_VALBLK;
1034
1035 again:
1036         wait = 0;
1037
1038         if (catch_signals && signal_pending(current)) {
1039                 ret = -ERESTARTSYS;
1040                 goto out;
1041         }
1042
1043         spin_lock_irqsave(&lockres->l_lock, flags);
1044
1045         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1046                         "Cluster lock called on freeing lockres %s! flags "
1047                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1048
1049         /* We only compare against the currently granted level
1050          * here. If the lock is blocked waiting on a downconvert,
1051          * we'll get caught below. */
1052         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1053             level > lockres->l_level) {
1054                 /* is someone sitting in dlm_lock? If so, wait on
1055                  * them. */
1056                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1057                 wait = 1;
1058                 goto unlock;
1059         }
1060
1061         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1062             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1063                 /* is the lock is currently blocked on behalf of
1064                  * another node */
1065                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1066                 wait = 1;
1067                 goto unlock;
1068         }
1069
1070         if (level > lockres->l_level) {
1071                 if (lockres->l_action != OCFS2_AST_INVALID)
1072                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1073                              lockres->l_name, lockres->l_action);
1074
1075                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1076                         lockres->l_action = OCFS2_AST_ATTACH;
1077                         lkm_flags &= ~LKM_CONVERT;
1078                 } else {
1079                         lockres->l_action = OCFS2_AST_CONVERT;
1080                         lkm_flags |= LKM_CONVERT;
1081                 }
1082
1083                 lockres->l_requested = level;
1084                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1085                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1086
1087                 BUG_ON(level == LKM_IVMODE);
1088                 BUG_ON(level == LKM_NLMODE);
1089
1090                 mlog(0, "lock %s, convert from %d to level = %d\n",
1091                      lockres->l_name, lockres->l_level, level);
1092
1093                 /* call dlm_lock to upgrade lock now */
1094                 status = ocfs2_dlm_lock(osb->dlm,
1095                                         level,
1096                                         &lockres->l_lksb,
1097                                         lkm_flags,
1098                                         lockres->l_name,
1099                                         OCFS2_LOCK_ID_MAX_LEN - 1,
1100                                         lockres);
1101                 if (status != DLM_NORMAL) {
1102                         if ((lkm_flags & LKM_NOQUEUE) &&
1103                             (status == DLM_NOTQUEUED))
1104                                 ret = -EAGAIN;
1105                         else {
1106                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1107                                                     status, lockres);
1108                                 ret = -EINVAL;
1109                         }
1110                         ocfs2_recover_from_dlm_error(lockres, 1);
1111                         goto out;
1112                 }
1113
1114                 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
1115                      lockres->l_name);
1116
1117                 /* At this point we've gone inside the dlm and need to
1118                  * complete our work regardless. */
1119                 catch_signals = 0;
1120
1121                 /* wait for busy to clear and carry on */
1122                 goto again;
1123         }
1124
1125         /* Ok, if we get here then we're good to go. */
1126         ocfs2_inc_holders(lockres, level);
1127
1128         ret = 0;
1129 unlock:
1130         spin_unlock_irqrestore(&lockres->l_lock, flags);
1131 out:
1132         /*
1133          * This is helping work around a lock inversion between the page lock
1134          * and dlm locks.  One path holds the page lock while calling aops
1135          * which block acquiring dlm locks.  The voting thread holds dlm
1136          * locks while acquiring page locks while down converting data locks.
1137          * This block is helping an aop path notice the inversion and back
1138          * off to unlock its page lock before trying the dlm lock again.
1139          */
1140         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1141             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1142                 wait = 0;
1143                 if (lockres_remove_mask_waiter(lockres, &mw))
1144                         ret = -EAGAIN;
1145                 else
1146                         goto again;
1147         }
1148         if (wait) {
1149                 ret = ocfs2_wait_for_mask(&mw);
1150                 if (ret == 0)
1151                         goto again;
1152                 mlog_errno(ret);
1153         }
1154
1155         mlog_exit(ret);
1156         return ret;
1157 }
1158
1159 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1160                                  struct ocfs2_lock_res *lockres,
1161                                  int level)
1162 {
1163         unsigned long flags;
1164
1165         mlog_entry_void();
1166         spin_lock_irqsave(&lockres->l_lock, flags);
1167         ocfs2_dec_holders(lockres, level);
1168         ocfs2_downconvert_on_unlock(osb, lockres);
1169         spin_unlock_irqrestore(&lockres->l_lock, flags);
1170         mlog_exit_void();
1171 }
1172
1173 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1174                                  struct ocfs2_lock_res *lockres,
1175                                  int ex,
1176                                  int local)
1177 {
1178         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1179         unsigned long flags;
1180         int lkm_flags = local ? LKM_LOCAL : 0;
1181
1182         spin_lock_irqsave(&lockres->l_lock, flags);
1183         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1184         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1185         spin_unlock_irqrestore(&lockres->l_lock, flags);
1186
1187         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1188 }
1189
1190 /* Grants us an EX lock on the data and metadata resources, skipping
1191  * the normal cluster directory lookup. Use this ONLY on newly created
1192  * inodes which other nodes can't possibly see, and which haven't been
1193  * hashed in the inode hash yet. This can give us a good performance
1194  * increase as it'll skip the network broadcast normally associated
1195  * with creating a new lock resource. */
1196 int ocfs2_create_new_inode_locks(struct inode *inode)
1197 {
1198         int ret;
1199         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1200
1201         BUG_ON(!inode);
1202         BUG_ON(!ocfs2_inode_is_new(inode));
1203
1204         mlog_entry_void();
1205
1206         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1207
1208         /* NOTE: That we don't increment any of the holder counts, nor
1209          * do we add anything to a journal handle. Since this is
1210          * supposed to be a new inode which the cluster doesn't know
1211          * about yet, there is no need to.  As far as the LVB handling
1212          * is concerned, this is basically like acquiring an EX lock
1213          * on a resource which has an invalid one -- we'll set it
1214          * valid when we release the EX. */
1215
1216         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1217         if (ret) {
1218                 mlog_errno(ret);
1219                 goto bail;
1220         }
1221
1222         /*
1223          * We don't want to use LKM_LOCAL on a meta data lock as they
1224          * don't use a generation in their lock names.
1225          */
1226         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1227         if (ret) {
1228                 mlog_errno(ret);
1229                 goto bail;
1230         }
1231
1232         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1233         if (ret) {
1234                 mlog_errno(ret);
1235                 goto bail;
1236         }
1237
1238 bail:
1239         mlog_exit(ret);
1240         return ret;
1241 }
1242
1243 int ocfs2_rw_lock(struct inode *inode, int write)
1244 {
1245         int status, level;
1246         struct ocfs2_lock_res *lockres;
1247         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1248
1249         BUG_ON(!inode);
1250
1251         mlog_entry_void();
1252
1253         mlog(0, "inode %llu take %s RW lock\n",
1254              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1255              write ? "EXMODE" : "PRMODE");
1256
1257         if (ocfs2_mount_local(osb))
1258                 return 0;
1259
1260         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1261
1262         level = write ? LKM_EXMODE : LKM_PRMODE;
1263
1264         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1265                                     0);
1266         if (status < 0)
1267                 mlog_errno(status);
1268
1269         mlog_exit(status);
1270         return status;
1271 }
1272
1273 void ocfs2_rw_unlock(struct inode *inode, int write)
1274 {
1275         int level = write ? LKM_EXMODE : LKM_PRMODE;
1276         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1277         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1278
1279         mlog_entry_void();
1280
1281         mlog(0, "inode %llu drop %s RW lock\n",
1282              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1283              write ? "EXMODE" : "PRMODE");
1284
1285         if (!ocfs2_mount_local(osb))
1286                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1287
1288         mlog_exit_void();
1289 }
1290
1291 /*
1292  * ocfs2_open_lock always get PR mode lock.
1293  */
1294 int ocfs2_open_lock(struct inode *inode)
1295 {
1296         int status = 0;
1297         struct ocfs2_lock_res *lockres;
1298         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1299
1300         BUG_ON(!inode);
1301
1302         mlog_entry_void();
1303
1304         mlog(0, "inode %llu take PRMODE open lock\n",
1305              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1306
1307         if (ocfs2_mount_local(osb))
1308                 goto out;
1309
1310         lockres = &OCFS2_I(inode)->ip_open_lockres;
1311
1312         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1313                                     LKM_PRMODE, 0, 0);
1314         if (status < 0)
1315                 mlog_errno(status);
1316
1317 out:
1318         mlog_exit(status);
1319         return status;
1320 }
1321
1322 int ocfs2_try_open_lock(struct inode *inode, int write)
1323 {
1324         int status = 0, level;
1325         struct ocfs2_lock_res *lockres;
1326         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1327
1328         BUG_ON(!inode);
1329
1330         mlog_entry_void();
1331
1332         mlog(0, "inode %llu try to take %s open lock\n",
1333              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1334              write ? "EXMODE" : "PRMODE");
1335
1336         if (ocfs2_mount_local(osb))
1337                 goto out;
1338
1339         lockres = &OCFS2_I(inode)->ip_open_lockres;
1340
1341         level = write ? LKM_EXMODE : LKM_PRMODE;
1342
1343         /*
1344          * The file system may already holding a PRMODE/EXMODE open lock.
1345          * Since we pass LKM_NOQUEUE, the request won't block waiting on
1346          * other nodes and the -EAGAIN will indicate to the caller that
1347          * this inode is still in use.
1348          */
1349         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1350                                     level, LKM_NOQUEUE, 0);
1351
1352 out:
1353         mlog_exit(status);
1354         return status;
1355 }
1356
1357 /*
1358  * ocfs2_open_unlock unlock PR and EX mode open locks.
1359  */
1360 void ocfs2_open_unlock(struct inode *inode)
1361 {
1362         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1363         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1364
1365         mlog_entry_void();
1366
1367         mlog(0, "inode %llu drop open lock\n",
1368              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1369
1370         if (ocfs2_mount_local(osb))
1371                 goto out;
1372
1373         if(lockres->l_ro_holders)
1374                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1375                                      LKM_PRMODE);
1376         if(lockres->l_ex_holders)
1377                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1378                                      LKM_EXMODE);
1379
1380 out:
1381         mlog_exit_void();
1382 }
1383
1384 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1385                                      int level)
1386 {
1387         int ret;
1388         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1389         unsigned long flags;
1390         struct ocfs2_mask_waiter mw;
1391
1392         ocfs2_init_mask_waiter(&mw);
1393
1394 retry_cancel:
1395         spin_lock_irqsave(&lockres->l_lock, flags);
1396         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1397                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1398                 if (ret) {
1399                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1400                         ret = ocfs2_cancel_convert(osb, lockres);
1401                         if (ret < 0) {
1402                                 mlog_errno(ret);
1403                                 goto out;
1404                         }
1405                         goto retry_cancel;
1406                 }
1407                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1408                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1409
1410                 ocfs2_wait_for_mask(&mw);
1411                 goto retry_cancel;
1412         }
1413
1414         ret = -ERESTARTSYS;
1415         /*
1416          * We may still have gotten the lock, in which case there's no
1417          * point to restarting the syscall.
1418          */
1419         if (lockres->l_level == level)
1420                 ret = 0;
1421
1422         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1423              lockres->l_flags, lockres->l_level, lockres->l_action);
1424
1425         spin_unlock_irqrestore(&lockres->l_lock, flags);
1426
1427 out:
1428         return ret;
1429 }
1430
1431 /*
1432  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1433  * flock() calls. The locking approach this requires is sufficiently
1434  * different from all other cluster lock types that we implement a
1435  * seperate path to the "low-level" dlm calls. In particular:
1436  *
1437  * - No optimization of lock levels is done - we take at exactly
1438  *   what's been requested.
1439  *
1440  * - No lock caching is employed. We immediately downconvert to
1441  *   no-lock at unlock time. This also means flock locks never go on
1442  *   the blocking list).
1443  *
1444  * - Since userspace can trivially deadlock itself with flock, we make
1445  *   sure to allow cancellation of a misbehaving applications flock()
1446  *   request.
1447  *
1448  * - Access to any flock lockres doesn't require concurrency, so we
1449  *   can simplify the code by requiring the caller to guarantee
1450  *   serialization of dlmglue flock calls.
1451  */
1452 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1453 {
1454         int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
1455         unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
1456         unsigned long flags;
1457         struct ocfs2_file_private *fp = file->private_data;
1458         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1459         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1460         struct ocfs2_mask_waiter mw;
1461
1462         ocfs2_init_mask_waiter(&mw);
1463
1464         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1465             (lockres->l_level > LKM_NLMODE)) {
1466                 mlog(ML_ERROR,
1467                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1468                      "level: %u\n", lockres->l_name, lockres->l_flags,
1469                      lockres->l_level);
1470                 return -EINVAL;
1471         }
1472
1473         spin_lock_irqsave(&lockres->l_lock, flags);
1474         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1475                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1476                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1477
1478                 /*
1479                  * Get the lock at NLMODE to start - that way we
1480                  * can cancel the upconvert request if need be.
1481                  */
1482                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
1483                 if (ret < 0) {
1484                         mlog_errno(ret);
1485                         goto out;
1486                 }
1487
1488                 ret = ocfs2_wait_for_mask(&mw);
1489                 if (ret) {
1490                         mlog_errno(ret);
1491                         goto out;
1492                 }
1493                 spin_lock_irqsave(&lockres->l_lock, flags);
1494         }
1495
1496         lockres->l_action = OCFS2_AST_CONVERT;
1497         lkm_flags |= LKM_CONVERT;
1498         lockres->l_requested = level;
1499         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1500
1501         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1502         spin_unlock_irqrestore(&lockres->l_lock, flags);
1503
1504         ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
1505                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1506                              lockres);
1507         if (ret != DLM_NORMAL) {
1508                 if (trylock && ret == DLM_NOTQUEUED)
1509                         ret = -EAGAIN;
1510                 else {
1511                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1512                         ret = -EINVAL;
1513                 }
1514
1515                 ocfs2_recover_from_dlm_error(lockres, 1);
1516                 lockres_remove_mask_waiter(lockres, &mw);
1517                 goto out;
1518         }
1519
1520         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1521         if (ret == -ERESTARTSYS) {
1522                 /*
1523                  * Userspace can cause deadlock itself with
1524                  * flock(). Current behavior locally is to allow the
1525                  * deadlock, but abort the system call if a signal is
1526                  * received. We follow this example, otherwise a
1527                  * poorly written program could sit in kernel until
1528                  * reboot.
1529                  *
1530                  * Handling this is a bit more complicated for Ocfs2
1531                  * though. We can't exit this function with an
1532                  * outstanding lock request, so a cancel convert is
1533                  * required. We intentionally overwrite 'ret' - if the
1534                  * cancel fails and the lock was granted, it's easier
1535                  * to just bubble sucess back up to the user.
1536                  */
1537                 ret = ocfs2_flock_handle_signal(lockres, level);
1538         }
1539
1540 out:
1541
1542         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1543              lockres->l_name, ex, trylock, ret);
1544         return ret;
1545 }
1546
1547 void ocfs2_file_unlock(struct file *file)
1548 {
1549         int ret;
1550         unsigned long flags;
1551         struct ocfs2_file_private *fp = file->private_data;
1552         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1553         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1554         struct ocfs2_mask_waiter mw;
1555
1556         ocfs2_init_mask_waiter(&mw);
1557
1558         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1559                 return;
1560
1561         if (lockres->l_level == LKM_NLMODE)
1562                 return;
1563
1564         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1565              lockres->l_name, lockres->l_flags, lockres->l_level,
1566              lockres->l_action);
1567
1568         spin_lock_irqsave(&lockres->l_lock, flags);
1569         /*
1570          * Fake a blocking ast for the downconvert code.
1571          */
1572         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1573         lockres->l_blocking = LKM_EXMODE;
1574
1575         ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1576         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1577         spin_unlock_irqrestore(&lockres->l_lock, flags);
1578
1579         ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
1580         if (ret) {
1581                 mlog_errno(ret);
1582                 return;
1583         }
1584
1585         ret = ocfs2_wait_for_mask(&mw);
1586         if (ret)
1587                 mlog_errno(ret);
1588 }
1589
1590 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1591                                         struct ocfs2_lock_res *lockres)
1592 {
1593         int kick = 0;
1594
1595         mlog_entry_void();
1596
1597         /* If we know that another node is waiting on our lock, kick
1598          * the downconvert thread * pre-emptively when we reach a release
1599          * condition. */
1600         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1601                 switch(lockres->l_blocking) {
1602                 case LKM_EXMODE:
1603                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1604                                 kick = 1;
1605                         break;
1606                 case LKM_PRMODE:
1607                         if (!lockres->l_ex_holders)
1608                                 kick = 1;
1609                         break;
1610                 default:
1611                         BUG();
1612                 }
1613         }
1614
1615         if (kick)
1616                 ocfs2_wake_downconvert_thread(osb);
1617
1618         mlog_exit_void();
1619 }
1620
1621 #define OCFS2_SEC_BITS   34
1622 #define OCFS2_SEC_SHIFT  (64 - 34)
1623 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1624
1625 /* LVB only has room for 64 bits of time here so we pack it for
1626  * now. */
1627 static u64 ocfs2_pack_timespec(struct timespec *spec)
1628 {
1629         u64 res;
1630         u64 sec = spec->tv_sec;
1631         u32 nsec = spec->tv_nsec;
1632
1633         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1634
1635         return res;
1636 }
1637
1638 /* Call this with the lockres locked. I am reasonably sure we don't
1639  * need ip_lock in this function as anyone who would be changing those
1640  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1641 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1642 {
1643         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1644         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1645         struct ocfs2_meta_lvb *lvb;
1646
1647         mlog_entry_void();
1648
1649         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1650
1651         /*
1652          * Invalidate the LVB of a deleted inode - this way other
1653          * nodes are forced to go to disk and discover the new inode
1654          * status.
1655          */
1656         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1657                 lvb->lvb_version = 0;
1658                 goto out;
1659         }
1660
1661         lvb->lvb_version   = OCFS2_LVB_VERSION;
1662         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1663         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1664         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1665         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1666         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1667         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1668         lvb->lvb_iatime_packed  =
1669                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1670         lvb->lvb_ictime_packed =
1671                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1672         lvb->lvb_imtime_packed =
1673                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1674         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1675         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1676         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1677
1678 out:
1679         mlog_meta_lvb(0, lockres);
1680
1681         mlog_exit_void();
1682 }
1683
1684 static void ocfs2_unpack_timespec(struct timespec *spec,
1685                                   u64 packed_time)
1686 {
1687         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1688         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1689 }
1690
1691 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1692 {
1693         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1694         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1695         struct ocfs2_meta_lvb *lvb;
1696
1697         mlog_entry_void();
1698
1699         mlog_meta_lvb(0, lockres);
1700
1701         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1702
1703         /* We're safe here without the lockres lock... */
1704         spin_lock(&oi->ip_lock);
1705         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1706         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1707
1708         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1709         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1710         ocfs2_set_inode_flags(inode);
1711
1712         /* fast-symlinks are a special case */
1713         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1714                 inode->i_blocks = 0;
1715         else
1716                 inode->i_blocks = ocfs2_inode_sector_count(inode);
1717
1718         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1719         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1720         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1721         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1722         ocfs2_unpack_timespec(&inode->i_atime,
1723                               be64_to_cpu(lvb->lvb_iatime_packed));
1724         ocfs2_unpack_timespec(&inode->i_mtime,
1725                               be64_to_cpu(lvb->lvb_imtime_packed));
1726         ocfs2_unpack_timespec(&inode->i_ctime,
1727                               be64_to_cpu(lvb->lvb_ictime_packed));
1728         spin_unlock(&oi->ip_lock);
1729
1730         mlog_exit_void();
1731 }
1732
1733 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1734                                               struct ocfs2_lock_res *lockres)
1735 {
1736         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1737
1738         if (lvb->lvb_version == OCFS2_LVB_VERSION
1739             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1740                 return 1;
1741         return 0;
1742 }
1743
1744 /* Determine whether a lock resource needs to be refreshed, and
1745  * arbitrate who gets to refresh it.
1746  *
1747  *   0 means no refresh needed.
1748  *
1749  *   > 0 means you need to refresh this and you MUST call
1750  *   ocfs2_complete_lock_res_refresh afterwards. */
1751 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1752 {
1753         unsigned long flags;
1754         int status = 0;
1755
1756         mlog_entry_void();
1757
1758 refresh_check:
1759         spin_lock_irqsave(&lockres->l_lock, flags);
1760         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1761                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1762                 goto bail;
1763         }
1764
1765         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1766                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1767
1768                 ocfs2_wait_on_refreshing_lock(lockres);
1769                 goto refresh_check;
1770         }
1771
1772         /* Ok, I'll be the one to refresh this lock. */
1773         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1774         spin_unlock_irqrestore(&lockres->l_lock, flags);
1775
1776         status = 1;
1777 bail:
1778         mlog_exit(status);
1779         return status;
1780 }
1781
1782 /* If status is non zero, I'll mark it as not being in refresh
1783  * anymroe, but i won't clear the needs refresh flag. */
1784 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1785                                                    int status)
1786 {
1787         unsigned long flags;
1788         mlog_entry_void();
1789
1790         spin_lock_irqsave(&lockres->l_lock, flags);
1791         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1792         if (!status)
1793                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1794         spin_unlock_irqrestore(&lockres->l_lock, flags);
1795
1796         wake_up(&lockres->l_event);
1797
1798         mlog_exit_void();
1799 }
1800
1801 /* may or may not return a bh if it went to disk. */
1802 static int ocfs2_inode_lock_update(struct inode *inode,
1803                                   struct buffer_head **bh)
1804 {
1805         int status = 0;
1806         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1807         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1808         struct ocfs2_dinode *fe;
1809         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1810
1811         mlog_entry_void();
1812
1813         if (ocfs2_mount_local(osb))
1814                 goto bail;
1815
1816         spin_lock(&oi->ip_lock);
1817         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1818                 mlog(0, "Orphaned inode %llu was deleted while we "
1819                      "were waiting on a lock. ip_flags = 0x%x\n",
1820                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1821                 spin_unlock(&oi->ip_lock);
1822                 status = -ENOENT;
1823                 goto bail;
1824         }
1825         spin_unlock(&oi->ip_lock);
1826
1827         if (!ocfs2_should_refresh_lock_res(lockres))
1828                 goto bail;
1829
1830         /* This will discard any caching information we might have had
1831          * for the inode metadata. */
1832         ocfs2_metadata_cache_purge(inode);
1833
1834         ocfs2_extent_map_trunc(inode, 0);
1835
1836         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1837                 mlog(0, "Trusting LVB on inode %llu\n",
1838                      (unsigned long long)oi->ip_blkno);
1839                 ocfs2_refresh_inode_from_lvb(inode);
1840         } else {
1841                 /* Boo, we have to go to disk. */
1842                 /* read bh, cast, ocfs2_refresh_inode */
1843                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1844                                           bh, OCFS2_BH_CACHED, inode);
1845                 if (status < 0) {
1846                         mlog_errno(status);
1847                         goto bail_refresh;
1848                 }
1849                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1850
1851                 /* This is a good chance to make sure we're not
1852                  * locking an invalid object.
1853                  *
1854                  * We bug on a stale inode here because we checked
1855                  * above whether it was wiped from disk. The wiping
1856                  * node provides a guarantee that we receive that
1857                  * message and can mark the inode before dropping any
1858                  * locks associated with it. */
1859                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1860                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1861                         status = -EIO;
1862                         goto bail_refresh;
1863                 }
1864                 mlog_bug_on_msg(inode->i_generation !=
1865                                 le32_to_cpu(fe->i_generation),
1866                                 "Invalid dinode %llu disk generation: %u "
1867                                 "inode->i_generation: %u\n",
1868                                 (unsigned long long)oi->ip_blkno,
1869                                 le32_to_cpu(fe->i_generation),
1870                                 inode->i_generation);
1871                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1872                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1873                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1874                                 (unsigned long long)oi->ip_blkno,
1875                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1876                                 le32_to_cpu(fe->i_flags));
1877
1878                 ocfs2_refresh_inode(inode, fe);
1879         }
1880
1881         status = 0;
1882 bail_refresh:
1883         ocfs2_complete_lock_res_refresh(lockres, status);
1884 bail:
1885         mlog_exit(status);
1886         return status;
1887 }
1888
1889 static int ocfs2_assign_bh(struct inode *inode,
1890                            struct buffer_head **ret_bh,
1891                            struct buffer_head *passed_bh)
1892 {
1893         int status;
1894
1895         if (passed_bh) {
1896                 /* Ok, the update went to disk for us, use the
1897                  * returned bh. */
1898                 *ret_bh = passed_bh;
1899                 get_bh(*ret_bh);
1900
1901                 return 0;
1902         }
1903
1904         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1905                                   OCFS2_I(inode)->ip_blkno,
1906                                   ret_bh,
1907                                   OCFS2_BH_CACHED,
1908                                   inode);
1909         if (status < 0)
1910                 mlog_errno(status);
1911
1912         return status;
1913 }
1914
1915 /*
1916  * returns < 0 error if the callback will never be called, otherwise
1917  * the result of the lock will be communicated via the callback.
1918  */
1919 int ocfs2_inode_lock_full(struct inode *inode,
1920                          struct buffer_head **ret_bh,
1921                          int ex,
1922                          int arg_flags)
1923 {
1924         int status, level, dlm_flags, acquired;
1925         struct ocfs2_lock_res *lockres = NULL;
1926         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1927         struct buffer_head *local_bh = NULL;
1928
1929         BUG_ON(!inode);
1930
1931         mlog_entry_void();
1932
1933         mlog(0, "inode %llu, take %s META lock\n",
1934              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1935              ex ? "EXMODE" : "PRMODE");
1936
1937         status = 0;
1938         acquired = 0;
1939         /* We'll allow faking a readonly metadata lock for
1940          * rodevices. */
1941         if (ocfs2_is_hard_readonly(osb)) {
1942                 if (ex)
1943                         status = -EROFS;
1944                 goto bail;
1945         }
1946
1947         if (ocfs2_mount_local(osb))
1948                 goto local;
1949
1950         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1951                 ocfs2_wait_for_recovery(osb);
1952
1953         lockres = &OCFS2_I(inode)->ip_inode_lockres;
1954         level = ex ? LKM_EXMODE : LKM_PRMODE;
1955         dlm_flags = 0;
1956         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1957                 dlm_flags |= LKM_NOQUEUE;
1958
1959         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1960         if (status < 0) {
1961                 if (status != -EAGAIN && status != -EIOCBRETRY)
1962                         mlog_errno(status);
1963                 goto bail;
1964         }
1965
1966         /* Notify the error cleanup path to drop the cluster lock. */
1967         acquired = 1;
1968
1969         /* We wait twice because a node may have died while we were in
1970          * the lower dlm layers. The second time though, we've
1971          * committed to owning this lock so we don't allow signals to
1972          * abort the operation. */
1973         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1974                 ocfs2_wait_for_recovery(osb);
1975
1976 local:
1977         /*
1978          * We only see this flag if we're being called from
1979          * ocfs2_read_locked_inode(). It means we're locking an inode
1980          * which hasn't been populated yet, so clear the refresh flag
1981          * and let the caller handle it.
1982          */
1983         if (inode->i_state & I_NEW) {
1984                 status = 0;
1985                 if (lockres)
1986                         ocfs2_complete_lock_res_refresh(lockres, 0);
1987                 goto bail;
1988         }
1989
1990         /* This is fun. The caller may want a bh back, or it may
1991          * not. ocfs2_inode_lock_update definitely wants one in, but
1992          * may or may not read one, depending on what's in the
1993          * LVB. The result of all of this is that we've *only* gone to
1994          * disk if we have to, so the complexity is worthwhile. */
1995         status = ocfs2_inode_lock_update(inode, &local_bh);
1996         if (status < 0) {
1997                 if (status != -ENOENT)
1998                         mlog_errno(status);
1999                 goto bail;
2000         }
2001
2002         if (ret_bh) {
2003                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2004                 if (status < 0) {
2005                         mlog_errno(status);
2006                         goto bail;
2007                 }
2008         }
2009
2010 bail:
2011         if (status < 0) {
2012                 if (ret_bh && (*ret_bh)) {
2013                         brelse(*ret_bh);
2014                         *ret_bh = NULL;
2015                 }
2016                 if (acquired)
2017                         ocfs2_inode_unlock(inode, ex);
2018         }
2019
2020         if (local_bh)
2021                 brelse(local_bh);
2022
2023         mlog_exit(status);
2024         return status;
2025 }
2026
2027 /*
2028  * This is working around a lock inversion between tasks acquiring DLM
2029  * locks while holding a page lock and the downconvert thread which
2030  * blocks dlm lock acquiry while acquiring page locks.
2031  *
2032  * ** These _with_page variantes are only intended to be called from aop
2033  * methods that hold page locks and return a very specific *positive* error
2034  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2035  *
2036  * The DLM is called such that it returns -EAGAIN if it would have
2037  * blocked waiting for the downconvert thread.  In that case we unlock
2038  * our page so the downconvert thread can make progress.  Once we've
2039  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2040  * that called us can bubble that back up into the VFS who will then
2041  * immediately retry the aop call.
2042  *
2043  * We do a blocking lock and immediate unlock before returning, though, so that
2044  * the lock has a great chance of being cached on this node by the time the VFS
2045  * calls back to retry the aop.    This has a potential to livelock as nodes
2046  * ping locks back and forth, but that's a risk we're willing to take to avoid
2047  * the lock inversion simply.
2048  */
2049 int ocfs2_inode_lock_with_page(struct inode *inode,
2050                               struct buffer_head **ret_bh,
2051                               int ex,
2052                               struct page *page)
2053 {
2054         int ret;
2055
2056         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2057         if (ret == -EAGAIN) {
2058                 unlock_page(page);
2059                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2060                         ocfs2_inode_unlock(inode, ex);
2061                 ret = AOP_TRUNCATED_PAGE;
2062         }
2063
2064         return ret;
2065 }
2066
2067 int ocfs2_inode_lock_atime(struct inode *inode,
2068                           struct vfsmount *vfsmnt,
2069                           int *level)
2070 {
2071         int ret;
2072
2073         mlog_entry_void();
2074         ret = ocfs2_inode_lock(inode, NULL, 0);
2075         if (ret < 0) {
2076                 mlog_errno(ret);
2077                 return ret;
2078         }
2079
2080         /*
2081          * If we should update atime, we will get EX lock,
2082          * otherwise we just get PR lock.
2083          */
2084         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2085                 struct buffer_head *bh = NULL;
2086
2087                 ocfs2_inode_unlock(inode, 0);
2088                 ret = ocfs2_inode_lock(inode, &bh, 1);
2089                 if (ret < 0) {
2090                         mlog_errno(ret);
2091                         return ret;
2092                 }
2093                 *level = 1;
2094                 if (ocfs2_should_update_atime(inode, vfsmnt))
2095                         ocfs2_update_inode_atime(inode, bh);
2096                 if (bh)
2097                         brelse(bh);
2098         } else
2099                 *level = 0;
2100
2101         mlog_exit(ret);
2102         return ret;
2103 }
2104
2105 void ocfs2_inode_unlock(struct inode *inode,
2106                        int ex)
2107 {
2108         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2109         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2110         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2111
2112         mlog_entry_void();
2113
2114         mlog(0, "inode %llu drop %s META lock\n",
2115              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2116              ex ? "EXMODE" : "PRMODE");
2117
2118         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2119             !ocfs2_mount_local(osb))
2120                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2121
2122         mlog_exit_void();
2123 }
2124
2125 int ocfs2_super_lock(struct ocfs2_super *osb,
2126                      int ex)
2127 {
2128         int status = 0;
2129         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2130         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2131
2132         mlog_entry_void();
2133
2134         if (ocfs2_is_hard_readonly(osb))
2135                 return -EROFS;
2136
2137         if (ocfs2_mount_local(osb))
2138                 goto bail;
2139
2140         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2141         if (status < 0) {
2142                 mlog_errno(status);
2143                 goto bail;
2144         }
2145
2146         /* The super block lock path is really in the best position to
2147          * know when resources covered by the lock need to be
2148          * refreshed, so we do it here. Of course, making sense of
2149          * everything is up to the caller :) */
2150         status = ocfs2_should_refresh_lock_res(lockres);
2151         if (status < 0) {
2152                 mlog_errno(status);
2153                 goto bail;
2154         }
2155         if (status) {
2156                 status = ocfs2_refresh_slot_info(osb);
2157
2158                 ocfs2_complete_lock_res_refresh(lockres, status);
2159
2160                 if (status < 0)
2161                         mlog_errno(status);
2162         }
2163 bail:
2164         mlog_exit(status);
2165         return status;
2166 }
2167
2168 void ocfs2_super_unlock(struct ocfs2_super *osb,
2169                         int ex)
2170 {
2171         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2172         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2173
2174         if (!ocfs2_mount_local(osb))
2175                 ocfs2_cluster_unlock(osb, lockres, level);
2176 }
2177
2178 int ocfs2_rename_lock(struct ocfs2_super *osb)
2179 {
2180         int status;
2181         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2182
2183         if (ocfs2_is_hard_readonly(osb))
2184                 return -EROFS;
2185
2186         if (ocfs2_mount_local(osb))
2187                 return 0;
2188
2189         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
2190         if (status < 0)
2191                 mlog_errno(status);
2192
2193         return status;
2194 }
2195
2196 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2197 {
2198         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2199
2200         if (!ocfs2_mount_local(osb))
2201                 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
2202 }
2203
2204 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2205 {
2206         int ret;
2207         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2208         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2209         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2210
2211         BUG_ON(!dl);
2212
2213         if (ocfs2_is_hard_readonly(osb))
2214                 return -EROFS;
2215
2216         if (ocfs2_mount_local(osb))
2217                 return 0;
2218
2219         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2220         if (ret < 0)
2221                 mlog_errno(ret);
2222
2223         return ret;
2224 }
2225
2226 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2227 {
2228         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2229         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2230         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2231
2232         if (!ocfs2_mount_local(osb))
2233                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2234 }
2235
2236 /* Reference counting of the dlm debug structure. We want this because
2237  * open references on the debug inodes can live on after a mount, so
2238  * we can't rely on the ocfs2_super to always exist. */
2239 static void ocfs2_dlm_debug_free(struct kref *kref)
2240 {
2241         struct ocfs2_dlm_debug *dlm_debug;
2242
2243         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2244
2245         kfree(dlm_debug);
2246 }
2247
2248 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2249 {
2250         if (dlm_debug)
2251                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2252 }
2253
2254 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2255 {
2256         kref_get(&debug->d_refcnt);
2257 }
2258
2259 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2260 {
2261         struct ocfs2_dlm_debug *dlm_debug;
2262
2263         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2264         if (!dlm_debug) {
2265                 mlog_errno(-ENOMEM);
2266                 goto out;
2267         }
2268
2269         kref_init(&dlm_debug->d_refcnt);
2270         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2271         dlm_debug->d_locking_state = NULL;
2272 out:
2273         return dlm_debug;
2274 }
2275
2276 /* Access to this is arbitrated for us via seq_file->sem. */
2277 struct ocfs2_dlm_seq_priv {
2278         struct ocfs2_dlm_debug *p_dlm_debug;
2279         struct ocfs2_lock_res p_iter_res;
2280         struct ocfs2_lock_res p_tmp_res;
2281 };
2282
2283 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2284                                                  struct ocfs2_dlm_seq_priv *priv)
2285 {
2286         struct ocfs2_lock_res *iter, *ret = NULL;
2287         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2288
2289         assert_spin_locked(&ocfs2_dlm_tracking_lock);
2290
2291         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2292                 /* discover the head of the list */
2293                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2294                         mlog(0, "End of list found, %p\n", ret);
2295                         break;
2296                 }
2297
2298                 /* We track our "dummy" iteration lockres' by a NULL
2299                  * l_ops field. */
2300                 if (iter->l_ops != NULL) {
2301                         ret = iter;
2302                         break;
2303                 }
2304         }
2305
2306         return ret;
2307 }
2308
2309 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2310 {
2311         struct ocfs2_dlm_seq_priv *priv = m->private;
2312         struct ocfs2_lock_res *iter;
2313
2314         spin_lock(&ocfs2_dlm_tracking_lock);
2315         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2316         if (iter) {
2317                 /* Since lockres' have the lifetime of their container
2318                  * (which can be inodes, ocfs2_supers, etc) we want to
2319                  * copy this out to a temporary lockres while still
2320                  * under the spinlock. Obviously after this we can't
2321                  * trust any pointers on the copy returned, but that's
2322                  * ok as the information we want isn't typically held
2323                  * in them. */
2324                 priv->p_tmp_res = *iter;
2325                 iter = &priv->p_tmp_res;
2326         }
2327         spin_unlock(&ocfs2_dlm_tracking_lock);
2328
2329         return iter;
2330 }
2331
2332 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2333 {
2334 }
2335
2336 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2337 {
2338         struct ocfs2_dlm_seq_priv *priv = m->private;
2339         struct ocfs2_lock_res *iter = v;
2340         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2341
2342         spin_lock(&ocfs2_dlm_tracking_lock);
2343         iter = ocfs2_dlm_next_res(iter, priv);
2344         list_del_init(&dummy->l_debug_list);
2345         if (iter) {
2346                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
2347                 priv->p_tmp_res = *iter;
2348                 iter = &priv->p_tmp_res;
2349         }
2350         spin_unlock(&ocfs2_dlm_tracking_lock);
2351
2352         return iter;
2353 }
2354
2355 /* So that debugfs.ocfs2 can determine which format is being used */
2356 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2357 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2358 {
2359         int i;
2360         char *lvb;
2361         struct ocfs2_lock_res *lockres = v;
2362
2363         if (!lockres)
2364                 return -EINVAL;
2365
2366         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2367
2368         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2369                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2370                            lockres->l_name,
2371                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2372         else
2373                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2374
2375         seq_printf(m, "%d\t"
2376                    "0x%lx\t"
2377                    "0x%x\t"
2378                    "0x%x\t"
2379                    "%u\t"
2380                    "%u\t"
2381                    "%d\t"
2382                    "%d\t",
2383                    lockres->l_level,
2384                    lockres->l_flags,
2385                    lockres->l_action,
2386                    lockres->l_unlock_action,
2387                    lockres->l_ro_holders,
2388                    lockres->l_ex_holders,
2389                    lockres->l_requested,
2390                    lockres->l_blocking);
2391
2392         /* Dump the raw LVB */
2393         lvb = lockres->l_lksb.lvb;
2394         for(i = 0; i < DLM_LVB_LEN; i++)
2395                 seq_printf(m, "0x%x\t", lvb[i]);
2396
2397         /* End the line */
2398         seq_printf(m, "\n");
2399         return 0;
2400 }
2401
2402 static const struct seq_operations ocfs2_dlm_seq_ops = {
2403         .start =        ocfs2_dlm_seq_start,
2404         .stop =         ocfs2_dlm_seq_stop,
2405         .next =         ocfs2_dlm_seq_next,
2406         .show =         ocfs2_dlm_seq_show,
2407 };
2408
2409 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2410 {
2411         struct seq_file *seq = (struct seq_file *) file->private_data;
2412         struct ocfs2_dlm_seq_priv *priv = seq->private;
2413         struct ocfs2_lock_res *res = &priv->p_iter_res;
2414
2415         ocfs2_remove_lockres_tracking(res);
2416         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2417         return seq_release_private(inode, file);
2418 }
2419
2420 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2421 {
2422         int ret;
2423         struct ocfs2_dlm_seq_priv *priv;
2424         struct seq_file *seq;
2425         struct ocfs2_super *osb;
2426
2427         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2428         if (!priv) {
2429                 ret = -ENOMEM;
2430                 mlog_errno(ret);
2431                 goto out;
2432         }
2433         osb = inode->i_private;
2434         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2435         priv->p_dlm_debug = osb->osb_dlm_debug;
2436         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2437
2438         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2439         if (ret) {
2440                 kfree(priv);
2441                 mlog_errno(ret);
2442                 goto out;
2443         }
2444
2445         seq = (struct seq_file *) file->private_data;
2446         seq->private = priv;
2447
2448         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2449                                    priv->p_dlm_debug);
2450
2451 out:
2452         return ret;
2453 }
2454
2455 static const struct file_operations ocfs2_dlm_debug_fops = {
2456         .open =         ocfs2_dlm_debug_open,
2457         .release =      ocfs2_dlm_debug_release,
2458         .read =         seq_read,
2459         .llseek =       seq_lseek,
2460 };
2461
2462 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2463 {
2464         int ret = 0;
2465         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2466
2467         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2468                                                          S_IFREG|S_IRUSR,
2469                                                          osb->osb_debug_root,
2470                                                          osb,
2471                                                          &ocfs2_dlm_debug_fops);
2472         if (!dlm_debug->d_locking_state) {
2473                 ret = -EINVAL;
2474                 mlog(ML_ERROR,
2475                      "Unable to create locking state debugfs file.\n");
2476                 goto out;
2477         }
2478
2479         ocfs2_get_dlm_debug(dlm_debug);
2480 out:
2481         return ret;
2482 }
2483
2484 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2485 {
2486         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2487
2488         if (dlm_debug) {
2489                 debugfs_remove(dlm_debug->d_locking_state);
2490                 ocfs2_put_dlm_debug(dlm_debug);
2491         }
2492 }
2493
2494 int ocfs2_dlm_init(struct ocfs2_super *osb)
2495 {
2496         int status = 0;
2497         u32 dlm_key;
2498         struct dlm_ctxt *dlm = NULL;
2499
2500         mlog_entry_void();
2501
2502         if (ocfs2_mount_local(osb))
2503                 goto local;
2504
2505         status = ocfs2_dlm_init_debug(osb);
2506         if (status < 0) {
2507                 mlog_errno(status);
2508                 goto bail;
2509         }
2510
2511         /* launch downconvert thread */
2512         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2513         if (IS_ERR(osb->dc_task)) {
2514                 status = PTR_ERR(osb->dc_task);
2515                 osb->dc_task = NULL;
2516                 mlog_errno(status);
2517                 goto bail;
2518         }
2519
2520         /* used by the dlm code to make message headers unique, each
2521          * node in this domain must agree on this. */
2522         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2523
2524         /* for now, uuid == domain */
2525         dlm = dlm_register_domain(osb->uuid_str, dlm_key,
2526                                   &osb->osb_locking_proto);
2527         if (IS_ERR(dlm)) {
2528                 status = PTR_ERR(dlm);
2529                 mlog_errno(status);
2530                 goto bail;
2531         }
2532
2533         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2534
2535 local:
2536         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2537         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2538
2539         osb->dlm = dlm;
2540
2541         status = 0;
2542 bail:
2543         if (status < 0) {
2544                 ocfs2_dlm_shutdown_debug(osb);
2545                 if (osb->dc_task)
2546                         kthread_stop(osb->dc_task);
2547         }
2548
2549         mlog_exit(status);
2550         return status;
2551 }
2552
2553 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2554 {
2555         mlog_entry_void();
2556
2557         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2558
2559         ocfs2_drop_osb_locks(osb);
2560
2561         if (osb->dc_task) {
2562                 kthread_stop(osb->dc_task);
2563                 osb->dc_task = NULL;
2564         }
2565
2566         ocfs2_lock_res_free(&osb->osb_super_lockres);
2567         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2568
2569         dlm_unregister_domain(osb->dlm);
2570         osb->dlm = NULL;
2571
2572         ocfs2_dlm_shutdown_debug(osb);
2573
2574         mlog_exit_void();
2575 }
2576
2577 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2578 {
2579         struct ocfs2_lock_res *lockres = opaque;
2580         unsigned long flags;
2581
2582         mlog_entry_void();
2583
2584         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2585              lockres->l_unlock_action);
2586
2587         spin_lock_irqsave(&lockres->l_lock, flags);
2588         /* We tried to cancel a convert request, but it was already
2589          * granted. All we want to do here is clear our unlock
2590          * state. The wake_up call done at the bottom is redundant
2591          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2592          * hurt anything anyway */
2593         if (status == DLM_CANCELGRANT &&
2594             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2595                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2596
2597                 /* We don't clear the busy flag in this case as it
2598                  * should have been cleared by the ast which the dlm
2599                  * has called. */
2600                 goto complete_unlock;
2601         }
2602
2603         if (status != DLM_NORMAL) {
2604                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2605                      "unlock_action %d\n", status, lockres->l_name,
2606                      lockres->l_unlock_action);
2607                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2608                 return;
2609         }
2610
2611         switch(lockres->l_unlock_action) {
2612         case OCFS2_UNLOCK_CANCEL_CONVERT:
2613                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2614                 lockres->l_action = OCFS2_AST_INVALID;
2615                 break;
2616         case OCFS2_UNLOCK_DROP_LOCK:
2617                 lockres->l_level = LKM_IVMODE;
2618                 break;
2619         default:
2620                 BUG();
2621         }
2622
2623         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2624 complete_unlock:
2625         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2626         spin_unlock_irqrestore(&lockres->l_lock, flags);
2627
2628         wake_up(&lockres->l_event);
2629
2630         mlog_exit_void();
2631 }
2632
2633 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2634                            struct ocfs2_lock_res *lockres)
2635 {
2636         enum dlm_status status;
2637         unsigned long flags;
2638         int lkm_flags = 0;
2639
2640         /* We didn't get anywhere near actually using this lockres. */
2641         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2642                 goto out;
2643
2644         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2645                 lkm_flags |= LKM_VALBLK;
2646
2647         spin_lock_irqsave(&lockres->l_lock, flags);
2648
2649         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2650                         "lockres %s, flags 0x%lx\n",
2651                         lockres->l_name, lockres->l_flags);
2652
2653         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2654                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2655                      "%u, unlock_action = %u\n",
2656                      lockres->l_name, lockres->l_flags, lockres->l_action,
2657                      lockres->l_unlock_action);
2658
2659                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2660
2661                 /* XXX: Today we just wait on any busy
2662                  * locks... Perhaps we need to cancel converts in the
2663                  * future? */
2664                 ocfs2_wait_on_busy_lock(lockres);
2665
2666                 spin_lock_irqsave(&lockres->l_lock, flags);
2667         }
2668
2669         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2670                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2671                     lockres->l_level == LKM_EXMODE &&
2672                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2673                         lockres->l_ops->set_lvb(lockres);
2674         }
2675
2676         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2677                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2678                      lockres->l_name);
2679         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2680                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2681
2682         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2683                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2684                 goto out;
2685         }
2686
2687         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2688
2689         /* make sure we never get here while waiting for an ast to
2690          * fire. */
2691         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2692
2693         /* is this necessary? */
2694         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2695         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2696         spin_unlock_irqrestore(&lockres->l_lock, flags);
2697
2698         mlog(0, "lock %s\n", lockres->l_name);
2699
2700         status = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2701                                   lockres);
2702         if (status != DLM_NORMAL) {
2703                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
2704                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2705                 dlm_print_one_lock(lockres->l_lksb.lockid);
2706                 BUG();
2707         }
2708         mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
2709              lockres->l_name);
2710
2711         ocfs2_wait_on_busy_lock(lockres);
2712 out:
2713         mlog_exit(0);
2714         return 0;
2715 }
2716
2717 /* Mark the lockres as being dropped. It will no longer be
2718  * queued if blocking, but we still may have to wait on it
2719  * being dequeued from the downconvert thread before we can consider
2720  * it safe to drop. 
2721  *
2722  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2723 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2724 {
2725         int status;
2726         struct ocfs2_mask_waiter mw;
2727         unsigned long flags;
2728
2729         ocfs2_init_mask_waiter(&mw);
2730
2731         spin_lock_irqsave(&lockres->l_lock, flags);
2732         lockres->l_flags |= OCFS2_LOCK_FREEING;
2733         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2734                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2735                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2736
2737                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2738
2739                 status = ocfs2_wait_for_mask(&mw);
2740                 if (status)
2741                         mlog_errno(status);
2742
2743                 spin_lock_irqsave(&lockres->l_lock, flags);
2744         }
2745         spin_unlock_irqrestore(&lockres->l_lock, flags);
2746 }
2747
2748 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2749                                struct ocfs2_lock_res *lockres)
2750 {
2751         int ret;
2752
2753         ocfs2_mark_lockres_freeing(lockres);
2754         ret = ocfs2_drop_lock(osb, lockres);
2755         if (ret)
2756                 mlog_errno(ret);
2757 }
2758
2759 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2760 {
2761         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2762         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2763 }
2764
2765 int ocfs2_drop_inode_locks(struct inode *inode)
2766 {
2767         int status, err;
2768
2769         mlog_entry_void();
2770
2771         /* No need to call ocfs2_mark_lockres_freeing here -
2772          * ocfs2_clear_inode has done it for us. */
2773
2774         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2775                               &OCFS2_I(inode)->ip_open_lockres);
2776         if (err < 0)
2777                 mlog_errno(err);
2778
2779         status = err;
2780
2781         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2782                               &OCFS2_I(inode)->ip_inode_lockres);
2783         if (err < 0)
2784                 mlog_errno(err);
2785         if (err < 0 && !status)
2786                 status = err;
2787
2788         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2789                               &OCFS2_I(inode)->ip_rw_lockres);
2790         if (err < 0)
2791                 mlog_errno(err);
2792         if (err < 0 && !status)
2793                 status = err;
2794
2795         mlog_exit(status);
2796         return status;
2797 }
2798
2799 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2800                                       int new_level)
2801 {
2802         assert_spin_locked(&lockres->l_lock);
2803
2804         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2805
2806         if (lockres->l_level <= new_level) {
2807                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2808                      lockres->l_level, new_level);
2809                 BUG();
2810         }
2811
2812         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2813              lockres->l_name, new_level, lockres->l_blocking);
2814
2815         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2816         lockres->l_requested = new_level;
2817         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2818 }
2819
2820 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2821                                   struct ocfs2_lock_res *lockres,
2822                                   int new_level,
2823                                   int lvb)
2824 {
2825         int ret, dlm_flags = LKM_CONVERT;
2826         enum dlm_status status;
2827
2828         mlog_entry_void();
2829
2830         if (lvb)
2831                 dlm_flags |= LKM_VALBLK;
2832
2833         status = ocfs2_dlm_lock(osb->dlm,
2834                                 new_level,
2835                                 &lockres->l_lksb,
2836                                 dlm_flags,
2837                                 lockres->l_name,
2838                                 OCFS2_LOCK_ID_MAX_LEN - 1,
2839                                 lockres);
2840         if (status != DLM_NORMAL) {
2841                 ocfs2_log_dlm_error("ocfs2_dlm_lock", status, lockres);
2842                 ret = -EINVAL;
2843                 ocfs2_recover_from_dlm_error(lockres, 1);
2844                 goto bail;
2845         }
2846
2847         ret = 0;
2848 bail:
2849         mlog_exit(ret);
2850         return ret;
2851 }
2852
2853 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
2854 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2855                                         struct ocfs2_lock_res *lockres)
2856 {
2857         assert_spin_locked(&lockres->l_lock);
2858
2859         mlog_entry_void();
2860         mlog(0, "lock %s\n", lockres->l_name);
2861
2862         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2863                 /* If we're already trying to cancel a lock conversion
2864                  * then just drop the spinlock and allow the caller to
2865                  * requeue this lock. */
2866
2867                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2868                 return 0;
2869         }
2870
2871         /* were we in a convert when we got the bast fire? */
2872         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2873                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2874         /* set things up for the unlockast to know to just
2875          * clear out the ast_action and unset busy, etc. */
2876         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2877
2878         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2879                         "lock %s, invalid flags: 0x%lx\n",
2880                         lockres->l_name, lockres->l_flags);
2881
2882         return 1;
2883 }
2884
2885 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2886                                 struct ocfs2_lock_res *lockres)
2887 {
2888         int ret;
2889         enum dlm_status status;
2890
2891         mlog_entry_void();
2892         mlog(0, "lock %s\n", lockres->l_name);
2893
2894         ret = 0;
2895         status = ocfs2_dlm_unlock(osb->dlm,
2896                                   &lockres->l_lksb,
2897                                   LKM_CANCEL,
2898                                   lockres);
2899         if (status != DLM_NORMAL) {
2900                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
2901                 ret = -EINVAL;
2902                 ocfs2_recover_from_dlm_error(lockres, 0);
2903         }
2904
2905         mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
2906
2907         mlog_exit(ret);
2908         return ret;
2909 }
2910
2911 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2912                               struct ocfs2_lock_res *lockres,
2913                               struct ocfs2_unblock_ctl *ctl)
2914 {
2915         unsigned long flags;
2916         int blocking;
2917         int new_level;
2918         int ret = 0;
2919         int set_lvb = 0;
2920
2921         mlog_entry_void();
2922
2923         spin_lock_irqsave(&lockres->l_lock, flags);
2924
2925         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2926
2927 recheck:
2928         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2929                 ctl->requeue = 1;
2930                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2931                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2932                 if (ret) {
2933                         ret = ocfs2_cancel_convert(osb, lockres);
2934                         if (ret < 0)
2935                                 mlog_errno(ret);
2936                 }
2937                 goto leave;
2938         }
2939
2940         /* if we're blocking an exclusive and we have *any* holders,
2941          * then requeue. */
2942         if ((lockres->l_blocking == LKM_EXMODE)
2943             && (lockres->l_ex_holders || lockres->l_ro_holders))
2944                 goto leave_requeue;
2945
2946         /* If it's a PR we're blocking, then only
2947          * requeue if we've got any EX holders */
2948         if (lockres->l_blocking == LKM_PRMODE &&
2949             lockres->l_ex_holders)
2950                 goto leave_requeue;
2951
2952         /*
2953          * Can we get a lock in this state if the holder counts are
2954          * zero? The meta data unblock code used to check this.
2955          */
2956         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2957             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2958                 goto leave_requeue;
2959
2960         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2961
2962         if (lockres->l_ops->check_downconvert
2963             && !lockres->l_ops->check_downconvert(lockres, new_level))
2964                 goto leave_requeue;
2965
2966         /* If we get here, then we know that there are no more
2967          * incompatible holders (and anyone asking for an incompatible
2968          * lock is blocked). We can now downconvert the lock */
2969         if (!lockres->l_ops->downconvert_worker)
2970                 goto downconvert;
2971
2972         /* Some lockres types want to do a bit of work before
2973          * downconverting a lock. Allow that here. The worker function
2974          * may sleep, so we save off a copy of what we're blocking as
2975          * it may change while we're not holding the spin lock. */
2976         blocking = lockres->l_blocking;
2977         spin_unlock_irqrestore(&lockres->l_lock, flags);
2978
2979         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2980
2981         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2982                 goto leave;
2983
2984         spin_lock_irqsave(&lockres->l_lock, flags);
2985         if (blocking != lockres->l_blocking) {
2986                 /* If this changed underneath us, then we can't drop
2987                  * it just yet. */
2988                 goto recheck;
2989         }
2990
2991 downconvert:
2992         ctl->requeue = 0;
2993
2994         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2995                 if (lockres->l_level == LKM_EXMODE)
2996                         set_lvb = 1;
2997
2998                 /*
2999                  * We only set the lvb if the lock has been fully
3000                  * refreshed - otherwise we risk setting stale
3001                  * data. Otherwise, there's no need to actually clear
3002                  * out the lvb here as it's value is still valid.
3003                  */
3004                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3005                         lockres->l_ops->set_lvb(lockres);
3006         }
3007
3008         ocfs2_prepare_downconvert(lockres, new_level);
3009         spin_unlock_irqrestore(&lockres->l_lock, flags);
3010         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
3011 leave:
3012         mlog_exit(ret);
3013         return ret;
3014
3015 leave_requeue:
3016         spin_unlock_irqrestore(&lockres->l_lock, flags);
3017         ctl->requeue = 1;
3018
3019         mlog_exit(0);
3020         return 0;
3021 }
3022
3023 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3024                                      int blocking)
3025 {
3026         struct inode *inode;
3027         struct address_space *mapping;
3028
3029         inode = ocfs2_lock_res_inode(lockres);
3030         mapping = inode->i_mapping;
3031
3032         if (!S_ISREG(inode->i_mode))
3033                 goto out;
3034
3035         /*
3036          * We need this before the filemap_fdatawrite() so that it can
3037          * transfer the dirty bit from the PTE to the
3038          * page. Unfortunately this means that even for EX->PR
3039          * downconverts, we'll lose our mappings and have to build
3040          * them up again.
3041          */
3042         unmap_mapping_range(mapping, 0, 0, 0);
3043
3044         if (filemap_fdatawrite(mapping)) {
3045                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3046                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3047         }
3048         sync_mapping_buffers(mapping);
3049         if (blocking == LKM_EXMODE) {
3050                 truncate_inode_pages(mapping, 0);
3051         } else {
3052                 /* We only need to wait on the I/O if we're not also
3053                  * truncating pages because truncate_inode_pages waits
3054                  * for us above. We don't truncate pages if we're
3055                  * blocking anything < EXMODE because we want to keep
3056                  * them around in that case. */
3057                 filemap_fdatawait(mapping);
3058         }
3059
3060 out:
3061         return UNBLOCK_CONTINUE;
3062 }
3063
3064 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3065                                         int new_level)
3066 {
3067         struct inode *inode = ocfs2_lock_res_inode(lockres);
3068         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3069
3070         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
3071         BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
3072
3073         if (checkpointed)
3074                 return 1;
3075
3076         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3077         return 0;
3078 }
3079
3080 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3081 {
3082         struct inode *inode = ocfs2_lock_res_inode(lockres);
3083
3084         __ocfs2_stuff_meta_lvb(inode);
3085 }
3086
3087 /*
3088  * Does the final reference drop on our dentry lock. Right now this
3089  * happens in the downconvert thread, but we could choose to simplify the
3090  * dlmglue API and push these off to the ocfs2_wq in the future.
3091  */
3092 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3093                                      struct ocfs2_lock_res *lockres)
3094 {
3095         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3096         ocfs2_dentry_lock_put(osb, dl);
3097 }
3098
3099 /*
3100  * d_delete() matching dentries before the lock downconvert.
3101  *
3102  * At this point, any process waiting to destroy the
3103  * dentry_lock due to last ref count is stopped by the
3104  * OCFS2_LOCK_QUEUED flag.
3105  *
3106  * We have two potential problems
3107  *
3108  * 1) If we do the last reference drop on our dentry_lock (via dput)
3109  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3110  *    the downconvert to finish. Instead we take an elevated
3111  *    reference and push the drop until after we've completed our
3112  *    unblock processing.
3113  *
3114  * 2) There might be another process with a final reference,
3115  *    waiting on us to finish processing. If this is the case, we
3116  *    detect it and exit out - there's no more dentries anyway.
3117  */
3118 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3119                                        int blocking)
3120 {
3121         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3122         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3123         struct dentry *dentry;
3124         unsigned long flags;
3125         int extra_ref = 0;
3126
3127         /*
3128          * This node is blocking another node from getting a read
3129          * lock. This happens when we've renamed within a
3130          * directory. We've forced the other nodes to d_delete(), but
3131          * we never actually dropped our lock because it's still
3132          * valid. The downconvert code will retain a PR for this node,
3133          * so there's no further work to do.
3134          */
3135         if (blocking == LKM_PRMODE)
3136                 return UNBLOCK_CONTINUE;
3137
3138         /*
3139          * Mark this inode as potentially orphaned. The code in
3140          * ocfs2_delete_inode() will figure out whether it actually
3141          * needs to be freed or not.
3142          */
3143         spin_lock(&oi->ip_lock);
3144         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3145         spin_unlock(&oi->ip_lock);
3146
3147         /*
3148          * Yuck. We need to make sure however that the check of
3149          * OCFS2_LOCK_FREEING and the extra reference are atomic with
3150          * respect to a reference decrement or the setting of that
3151          * flag.
3152          */
3153         spin_lock_irqsave(&lockres->l_lock, flags);
3154         spin_lock(&dentry_attach_lock);
3155         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3156             && dl->dl_count) {
3157                 dl->dl_count++;
3158                 extra_ref = 1;
3159         }
3160         spin_unlock(&dentry_attach_lock);
3161         spin_unlock_irqrestore(&lockres->l_lock, flags);
3162
3163         mlog(0, "extra_ref = %d\n", extra_ref);
3164
3165         /*
3166          * We have a process waiting on us in ocfs2_dentry_iput(),
3167          * which means we can't have any more outstanding
3168          * aliases. There's no need to do any more work.
3169          */
3170         if (!extra_ref)
3171                 return UNBLOCK_CONTINUE;
3172
3173         spin_lock(&dentry_attach_lock);
3174         while (1) {
3175                 dentry = ocfs2_find_local_alias(dl->dl_inode,
3176                                                 dl->dl_parent_blkno, 1);
3177                 if (!dentry)
3178                         break;
3179                 spin_unlock(&dentry_attach_lock);
3180
3181                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3182                      dentry->d_name.name);
3183
3184                 /*
3185                  * The following dcache calls may do an
3186                  * iput(). Normally we don't want that from the
3187                  * downconverting thread, but in this case it's ok
3188                  * because the requesting node already has an
3189                  * exclusive lock on the inode, so it can't be queued
3190                  * for a downconvert.
3191                  */
3192                 d_delete(dentry);
3193                 dput(dentry);
3194
3195                 spin_lock(&dentry_attach_lock);
3196         }
3197         spin_unlock(&dentry_attach_lock);
3198
3199         /*
3200          * If we are the last holder of this dentry lock, there is no
3201          * reason to downconvert so skip straight to the unlock.
3202          */
3203         if (dl->dl_count == 1)
3204                 return UNBLOCK_STOP_POST;
3205
3206         return UNBLOCK_CONTINUE_POST;
3207 }
3208
3209 static struct ocfs2_locking_protocol lproto = {
3210         .lp_lock_ast            = ocfs2_locking_ast,
3211         .lp_blocking_ast        = ocfs2_blocking_ast,
3212         .lp_unlock_ast          = ocfs2_unlock_ast,
3213 };
3214
3215 /* This interface isn't the final one, hence the less-than-perfect names */
3216 void dlmglue_init_stack(void)
3217 {
3218         o2cb_get_stack(&lproto);
3219 }
3220
3221 void dlmglue_exit_stack(void)
3222 {
3223         o2cb_put_stack();
3224 }
3225
3226 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3227                                        struct ocfs2_lock_res *lockres)
3228 {
3229         int status;
3230         struct ocfs2_unblock_ctl ctl = {0, 0,};
3231         unsigned long flags;
3232
3233         /* Our reference to the lockres in this function can be
3234          * considered valid until we remove the OCFS2_LOCK_QUEUED
3235          * flag. */
3236
3237         mlog_entry_void();
3238
3239         BUG_ON(!lockres);
3240         BUG_ON(!lockres->l_ops);
3241
3242         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3243
3244         /* Detect whether a lock has been marked as going away while
3245          * the downconvert thread was processing other things. A lock can
3246          * still be marked with OCFS2_LOCK_FREEING after this check,
3247          * but short circuiting here will still save us some
3248          * performance. */
3249         spin_lock_irqsave(&lockres->l_lock, flags);
3250         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3251                 goto unqueue;
3252         spin_unlock_irqrestore(&lockres->l_lock, flags);
3253
3254         status = ocfs2_unblock_lock(osb, lockres, &ctl);
3255         if (status < 0)
3256                 mlog_errno(status);
3257
3258         spin_lock_irqsave(&lockres->l_lock, flags);
3259 unqueue:
3260         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3261                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3262         } else
3263                 ocfs2_schedule_blocked_lock(osb, lockres);
3264
3265         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3266              ctl.requeue ? "yes" : "no");
3267         spin_unlock_irqrestore(&lockres->l_lock, flags);
3268
3269         if (ctl.unblock_action != UNBLOCK_CONTINUE
3270             && lockres->l_ops->post_unlock)
3271                 lockres->l_ops->post_unlock(osb, lockres);
3272
3273         mlog_exit_void();
3274 }
3275
3276 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3277                                         struct ocfs2_lock_res *lockres)
3278 {
3279         mlog_entry_void();
3280
3281         assert_spin_locked(&lockres->l_lock);
3282
3283         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3284                 /* Do not schedule a lock for downconvert when it's on
3285                  * the way to destruction - any nodes wanting access
3286                  * to the resource will get it soon. */
3287                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3288                      lockres->l_name, lockres->l_flags);
3289                 return;
3290         }
3291
3292         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3293
3294         spin_lock(&osb->dc_task_lock);
3295         if (list_empty(&lockres->l_blocked_list)) {
3296                 list_add_tail(&lockres->l_blocked_list,
3297                               &osb->blocked_lock_list);
3298                 osb->blocked_lock_count++;
3299         }
3300         spin_unlock(&osb->dc_task_lock);
3301
3302         mlog_exit_void();
3303 }
3304
3305 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3306 {
3307         unsigned long processed;
3308         struct ocfs2_lock_res *lockres;
3309
3310         mlog_entry_void();
3311
3312         spin_lock(&osb->dc_task_lock);
3313         /* grab this early so we know to try again if a state change and
3314          * wake happens part-way through our work  */
3315         osb->dc_work_sequence = osb->dc_wake_sequence;
3316
3317         processed = osb->blocked_lock_count;
3318         while (processed) {
3319                 BUG_ON(list_empty(&osb->blocked_lock_list));
3320
3321                 lockres = list_entry(osb->blocked_lock_list.next,
3322                                      struct ocfs2_lock_res, l_blocked_list);
3323                 list_del_init(&lockres->l_blocked_list);
3324                 osb->blocked_lock_count--;
3325                 spin_unlock(&osb->dc_task_lock);
3326
3327                 BUG_ON(!processed);
3328                 processed--;
3329
3330                 ocfs2_process_blocked_lock(osb, lockres);
3331
3332                 spin_lock(&osb->dc_task_lock);
3333         }
3334         spin_unlock(&osb->dc_task_lock);
3335
3336         mlog_exit_void();
3337 }
3338
3339 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3340 {
3341         int empty = 0;
3342
3343         spin_lock(&osb->dc_task_lock);
3344         if (list_empty(&osb->blocked_lock_list))
3345                 empty = 1;
3346
3347         spin_unlock(&osb->dc_task_lock);
3348         return empty;
3349 }
3350
3351 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3352 {
3353         int should_wake = 0;
3354
3355         spin_lock(&osb->dc_task_lock);
3356         if (osb->dc_work_sequence != osb->dc_wake_sequence)
3357                 should_wake = 1;
3358         spin_unlock(&osb->dc_task_lock);
3359
3360         return should_wake;
3361 }
3362
3363 static int ocfs2_downconvert_thread(void *arg)
3364 {
3365         int status = 0;
3366         struct ocfs2_super *osb = arg;
3367
3368         /* only quit once we've been asked to stop and there is no more
3369          * work available */
3370         while (!(kthread_should_stop() &&
3371                 ocfs2_downconvert_thread_lists_empty(osb))) {
3372
3373                 wait_event_interruptible(osb->dc_event,
3374                                          ocfs2_downconvert_thread_should_wake(osb) ||
3375                                          kthread_should_stop());
3376
3377                 mlog(0, "downconvert_thread: awoken\n");
3378
3379                 ocfs2_downconvert_thread_do_work(osb);
3380         }
3381
3382         osb->dc_task = NULL;
3383         return status;
3384 }
3385
3386 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3387 {
3388         spin_lock(&osb->dc_task_lock);
3389         /* make sure the voting thread gets a swipe at whatever changes
3390          * the caller may have made to the voting state */
3391         osb->dc_wake_sequence++;
3392         spin_unlock(&osb->dc_task_lock);
3393         wake_up(&osb->dc_event);
3394 }