[DLM] show nodeid for recovery message
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return sprintf(buf, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return sprintf(buf, "%x\n", status);
82 }
83
84 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 {
86         return sprintf(buf, "%d\n", ls->ls_recover_nodeid);
87 }
88
89 struct dlm_attr {
90         struct attribute attr;
91         ssize_t (*show)(struct dlm_ls *, char *);
92         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
93 };
94
95 static struct dlm_attr dlm_attr_control = {
96         .attr  = {.name = "control", .mode = S_IWUSR},
97         .store = dlm_control_store
98 };
99
100 static struct dlm_attr dlm_attr_event = {
101         .attr  = {.name = "event_done", .mode = S_IWUSR},
102         .store = dlm_event_store
103 };
104
105 static struct dlm_attr dlm_attr_id = {
106         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
107         .show  = dlm_id_show,
108         .store = dlm_id_store
109 };
110
111 static struct dlm_attr dlm_attr_recover_status = {
112         .attr  = {.name = "recover_status", .mode = S_IRUGO},
113         .show  = dlm_recover_status_show
114 };
115
116 static struct dlm_attr dlm_attr_recover_nodeid = {
117         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
118         .show  = dlm_recover_nodeid_show
119 };
120
121 static struct attribute *dlm_attrs[] = {
122         &dlm_attr_control.attr,
123         &dlm_attr_event.attr,
124         &dlm_attr_id.attr,
125         &dlm_attr_recover_status.attr,
126         &dlm_attr_recover_nodeid.attr,
127         NULL,
128 };
129
130 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
131                              char *buf)
132 {
133         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
134         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
135         return a->show ? a->show(ls, buf) : 0;
136 }
137
138 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
139                               const char *buf, size_t len)
140 {
141         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
142         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
143         return a->store ? a->store(ls, buf, len) : len;
144 }
145
146 static struct sysfs_ops dlm_attr_ops = {
147         .show  = dlm_attr_show,
148         .store = dlm_attr_store,
149 };
150
151 static struct kobj_type dlm_ktype = {
152         .default_attrs = dlm_attrs,
153         .sysfs_ops     = &dlm_attr_ops,
154 };
155
156 static struct kset dlm_kset = {
157         .subsys = &kernel_subsys,
158         .kobj   = {.name = "dlm",},
159         .ktype  = &dlm_ktype,
160 };
161
162 static int kobject_setup(struct dlm_ls *ls)
163 {
164         char lsname[DLM_LOCKSPACE_LEN];
165         int error;
166
167         memset(lsname, 0, DLM_LOCKSPACE_LEN);
168         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
169
170         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
171         if (error)
172                 return error;
173
174         ls->ls_kobj.kset = &dlm_kset;
175         ls->ls_kobj.ktype = &dlm_ktype;
176         return 0;
177 }
178
179 static int do_uevent(struct dlm_ls *ls, int in)
180 {
181         int error;
182
183         if (in)
184                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
185         else
186                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
187
188         error = wait_event_interruptible(ls->ls_uevent_wait,
189                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
190         if (error)
191                 goto out;
192
193         error = ls->ls_uevent_result;
194  out:
195         return error;
196 }
197
198
199 int dlm_lockspace_init(void)
200 {
201         int error;
202
203         ls_count = 0;
204         mutex_init(&ls_lock);
205         INIT_LIST_HEAD(&lslist);
206         spin_lock_init(&lslist_lock);
207
208         error = kset_register(&dlm_kset);
209         if (error)
210                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
211         return error;
212 }
213
214 void dlm_lockspace_exit(void)
215 {
216         kset_unregister(&dlm_kset);
217 }
218
219 static int dlm_scand(void *data)
220 {
221         struct dlm_ls *ls;
222
223         while (!kthread_should_stop()) {
224                 list_for_each_entry(ls, &lslist, ls_list)
225                         dlm_scan_rsbs(ls);
226                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
227         }
228         return 0;
229 }
230
231 static int dlm_scand_start(void)
232 {
233         struct task_struct *p;
234         int error = 0;
235
236         p = kthread_run(dlm_scand, NULL, "dlm_scand");
237         if (IS_ERR(p))
238                 error = PTR_ERR(p);
239         else
240                 scand_task = p;
241         return error;
242 }
243
244 static void dlm_scand_stop(void)
245 {
246         kthread_stop(scand_task);
247 }
248
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
250 {
251         struct dlm_ls *ls;
252
253         spin_lock(&lslist_lock);
254
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (ls->ls_namelen == namelen &&
257                     memcmp(ls->ls_name, name, namelen) == 0)
258                         goto out;
259         }
260         ls = NULL;
261  out:
262         spin_unlock(&lslist_lock);
263         return ls;
264 }
265
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
267 {
268         struct dlm_ls *ls;
269
270         spin_lock(&lslist_lock);
271
272         list_for_each_entry(ls, &lslist, ls_list) {
273                 if (ls->ls_global_id == id) {
274                         ls->ls_count++;
275                         goto out;
276                 }
277         }
278         ls = NULL;
279  out:
280         spin_unlock(&lslist_lock);
281         return ls;
282 }
283
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289         list_for_each_entry(ls, &lslist, ls_list) {
290                 if (ls->ls_local_handle == lockspace) {
291                         ls->ls_count++;
292                         goto out;
293                 }
294         }
295         ls = NULL;
296  out:
297         spin_unlock(&lslist_lock);
298         return ls;
299 }
300
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
302 {
303         struct dlm_ls *ls;
304
305         spin_lock(&lslist_lock);
306         list_for_each_entry(ls, &lslist, ls_list) {
307                 if (ls->ls_device.minor == minor) {
308                         ls->ls_count++;
309                         goto out;
310                 }
311         }
312         ls = NULL;
313  out:
314         spin_unlock(&lslist_lock);
315         return ls;
316 }
317
318 void dlm_put_lockspace(struct dlm_ls *ls)
319 {
320         spin_lock(&lslist_lock);
321         ls->ls_count--;
322         spin_unlock(&lslist_lock);
323 }
324
325 static void remove_lockspace(struct dlm_ls *ls)
326 {
327         for (;;) {
328                 spin_lock(&lslist_lock);
329                 if (ls->ls_count == 0) {
330                         list_del(&ls->ls_list);
331                         spin_unlock(&lslist_lock);
332                         return;
333                 }
334                 spin_unlock(&lslist_lock);
335                 ssleep(1);
336         }
337 }
338
339 static int threads_start(void)
340 {
341         int error;
342
343         /* Thread which process lock requests for all lockspace's */
344         error = dlm_astd_start();
345         if (error) {
346                 log_print("cannot start dlm_astd thread %d", error);
347                 goto fail;
348         }
349
350         error = dlm_scand_start();
351         if (error) {
352                 log_print("cannot start dlm_scand thread %d", error);
353                 goto astd_fail;
354         }
355
356         /* Thread for sending/receiving messages for all lockspace's */
357         error = dlm_lowcomms_start();
358         if (error) {
359                 log_print("cannot start dlm lowcomms %d", error);
360                 goto scand_fail;
361         }
362
363         return 0;
364
365  scand_fail:
366         dlm_scand_stop();
367  astd_fail:
368         dlm_astd_stop();
369  fail:
370         return error;
371 }
372
373 static void threads_stop(void)
374 {
375         dlm_scand_stop();
376         dlm_lowcomms_stop();
377         dlm_astd_stop();
378 }
379
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381                          uint32_t flags, int lvblen)
382 {
383         struct dlm_ls *ls;
384         int i, size, error = -ENOMEM;
385
386         if (namelen > DLM_LOCKSPACE_LEN)
387                 return -EINVAL;
388
389         if (!lvblen || (lvblen % 8))
390                 return -EINVAL;
391
392         if (!try_module_get(THIS_MODULE))
393                 return -EINVAL;
394
395         ls = dlm_find_lockspace_name(name, namelen);
396         if (ls) {
397                 *lockspace = ls;
398                 module_put(THIS_MODULE);
399                 return -EEXIST;
400         }
401
402         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
403         if (!ls)
404                 goto out;
405         memcpy(ls->ls_name, name, namelen);
406         ls->ls_namelen = namelen;
407         ls->ls_exflags = flags;
408         ls->ls_lvblen = lvblen;
409         ls->ls_count = 0;
410         ls->ls_flags = 0;
411
412         size = dlm_config.rsbtbl_size;
413         ls->ls_rsbtbl_size = size;
414
415         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
416         if (!ls->ls_rsbtbl)
417                 goto out_lsfree;
418         for (i = 0; i < size; i++) {
419                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
420                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
421                 rwlock_init(&ls->ls_rsbtbl[i].lock);
422         }
423
424         size = dlm_config.lkbtbl_size;
425         ls->ls_lkbtbl_size = size;
426
427         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
428         if (!ls->ls_lkbtbl)
429                 goto out_rsbfree;
430         for (i = 0; i < size; i++) {
431                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
432                 rwlock_init(&ls->ls_lkbtbl[i].lock);
433                 ls->ls_lkbtbl[i].counter = 1;
434         }
435
436         size = dlm_config.dirtbl_size;
437         ls->ls_dirtbl_size = size;
438
439         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
440         if (!ls->ls_dirtbl)
441                 goto out_lkbfree;
442         for (i = 0; i < size; i++) {
443                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
444                 rwlock_init(&ls->ls_dirtbl[i].lock);
445         }
446
447         INIT_LIST_HEAD(&ls->ls_waiters);
448         mutex_init(&ls->ls_waiters_mutex);
449
450         INIT_LIST_HEAD(&ls->ls_nodes);
451         INIT_LIST_HEAD(&ls->ls_nodes_gone);
452         ls->ls_num_nodes = 0;
453         ls->ls_low_nodeid = 0;
454         ls->ls_total_weight = 0;
455         ls->ls_node_array = NULL;
456
457         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
458         ls->ls_stub_rsb.res_ls = ls;
459
460         ls->ls_debug_rsb_dentry = NULL;
461         ls->ls_debug_waiters_dentry = NULL;
462
463         init_waitqueue_head(&ls->ls_uevent_wait);
464         ls->ls_uevent_result = 0;
465
466         ls->ls_recoverd_task = NULL;
467         mutex_init(&ls->ls_recoverd_active);
468         spin_lock_init(&ls->ls_recover_lock);
469         ls->ls_recover_status = 0;
470         ls->ls_recover_seq = 0;
471         ls->ls_recover_args = NULL;
472         init_rwsem(&ls->ls_in_recovery);
473         INIT_LIST_HEAD(&ls->ls_requestqueue);
474         mutex_init(&ls->ls_requestqueue_mutex);
475         mutex_init(&ls->ls_clear_proc_locks);
476
477         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
478         if (!ls->ls_recover_buf)
479                 goto out_dirfree;
480
481         INIT_LIST_HEAD(&ls->ls_recover_list);
482         spin_lock_init(&ls->ls_recover_list_lock);
483         ls->ls_recover_list_count = 0;
484         ls->ls_local_handle = ls;
485         init_waitqueue_head(&ls->ls_wait_general);
486         INIT_LIST_HEAD(&ls->ls_root_list);
487         init_rwsem(&ls->ls_root_sem);
488
489         down_write(&ls->ls_in_recovery);
490
491         error = dlm_recoverd_start(ls);
492         if (error) {
493                 log_error(ls, "can't start dlm_recoverd %d", error);
494                 goto out_rcomfree;
495         }
496
497         spin_lock(&lslist_lock);
498         list_add(&ls->ls_list, &lslist);
499         spin_unlock(&lslist_lock);
500
501         dlm_create_debug_file(ls);
502
503         error = kobject_setup(ls);
504         if (error)
505                 goto out_del;
506
507         error = kobject_register(&ls->ls_kobj);
508         if (error)
509                 goto out_del;
510
511         error = do_uevent(ls, 1);
512         if (error)
513                 goto out_unreg;
514
515         *lockspace = ls;
516         return 0;
517
518  out_unreg:
519         kobject_unregister(&ls->ls_kobj);
520  out_del:
521         dlm_delete_debug_file(ls);
522         spin_lock(&lslist_lock);
523         list_del(&ls->ls_list);
524         spin_unlock(&lslist_lock);
525         dlm_recoverd_stop(ls);
526  out_rcomfree:
527         kfree(ls->ls_recover_buf);
528  out_dirfree:
529         kfree(ls->ls_dirtbl);
530  out_lkbfree:
531         kfree(ls->ls_lkbtbl);
532  out_rsbfree:
533         kfree(ls->ls_rsbtbl);
534  out_lsfree:
535         kfree(ls);
536  out:
537         module_put(THIS_MODULE);
538         return error;
539 }
540
541 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
542                       uint32_t flags, int lvblen)
543 {
544         int error = 0;
545
546         mutex_lock(&ls_lock);
547         if (!ls_count)
548                 error = threads_start();
549         if (error)
550                 goto out;
551
552         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
553         if (!error)
554                 ls_count++;
555  out:
556         mutex_unlock(&ls_lock);
557         return error;
558 }
559
560 /* Return 1 if the lockspace still has active remote locks,
561  *        2 if the lockspace still has active local locks.
562  */
563 static int lockspace_busy(struct dlm_ls *ls)
564 {
565         int i, lkb_found = 0;
566         struct dlm_lkb *lkb;
567
568         /* NOTE: We check the lockidtbl here rather than the resource table.
569            This is because there may be LKBs queued as ASTs that have been
570            unlinked from their RSBs and are pending deletion once the AST has
571            been delivered */
572
573         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
574                 read_lock(&ls->ls_lkbtbl[i].lock);
575                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
576                         lkb_found = 1;
577                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
578                                             lkb_idtbl_list) {
579                                 if (!lkb->lkb_nodeid) {
580                                         read_unlock(&ls->ls_lkbtbl[i].lock);
581                                         return 2;
582                                 }
583                         }
584                 }
585                 read_unlock(&ls->ls_lkbtbl[i].lock);
586         }
587         return lkb_found;
588 }
589
590 static int release_lockspace(struct dlm_ls *ls, int force)
591 {
592         struct dlm_lkb *lkb;
593         struct dlm_rsb *rsb;
594         struct list_head *head;
595         int i;
596         int busy = lockspace_busy(ls);
597
598         if (busy > force)
599                 return -EBUSY;
600
601         if (force < 3)
602                 do_uevent(ls, 0);
603
604         dlm_recoverd_stop(ls);
605
606         remove_lockspace(ls);
607
608         dlm_delete_debug_file(ls);
609
610         dlm_astd_suspend();
611
612         kfree(ls->ls_recover_buf);
613
614         /*
615          * Free direntry structs.
616          */
617
618         dlm_dir_clear(ls);
619         kfree(ls->ls_dirtbl);
620
621         /*
622          * Free all lkb's on lkbtbl[] lists.
623          */
624
625         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
626                 head = &ls->ls_lkbtbl[i].list;
627                 while (!list_empty(head)) {
628                         lkb = list_entry(head->next, struct dlm_lkb,
629                                          lkb_idtbl_list);
630
631                         list_del(&lkb->lkb_idtbl_list);
632
633                         dlm_del_ast(lkb);
634
635                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
636                                 free_lvb(lkb->lkb_lvbptr);
637
638                         free_lkb(lkb);
639                 }
640         }
641         dlm_astd_resume();
642
643         kfree(ls->ls_lkbtbl);
644
645         /*
646          * Free all rsb's on rsbtbl[] lists
647          */
648
649         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
650                 head = &ls->ls_rsbtbl[i].list;
651                 while (!list_empty(head)) {
652                         rsb = list_entry(head->next, struct dlm_rsb,
653                                          res_hashchain);
654
655                         list_del(&rsb->res_hashchain);
656                         free_rsb(rsb);
657                 }
658
659                 head = &ls->ls_rsbtbl[i].toss;
660                 while (!list_empty(head)) {
661                         rsb = list_entry(head->next, struct dlm_rsb,
662                                          res_hashchain);
663                         list_del(&rsb->res_hashchain);
664                         free_rsb(rsb);
665                 }
666         }
667
668         kfree(ls->ls_rsbtbl);
669
670         /*
671          * Free structures on any other lists
672          */
673
674         kfree(ls->ls_recover_args);
675         dlm_clear_free_entries(ls);
676         dlm_clear_members(ls);
677         dlm_clear_members_gone(ls);
678         kfree(ls->ls_node_array);
679         kobject_unregister(&ls->ls_kobj);
680         kfree(ls);
681
682         mutex_lock(&ls_lock);
683         ls_count--;
684         if (!ls_count)
685                 threads_stop();
686         mutex_unlock(&ls_lock);
687
688         module_put(THIS_MODULE);
689         return 0;
690 }
691
692 /*
693  * Called when a system has released all its locks and is not going to use the
694  * lockspace any longer.  We free everything we're managing for this lockspace.
695  * Remaining nodes will go through the recovery process as if we'd died.  The
696  * lockspace must continue to function as usual, participating in recoveries,
697  * until this returns.
698  *
699  * Force has 4 possible values:
700  * 0 - don't destroy locksapce if it has any LKBs
701  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
702  * 2 - destroy lockspace regardless of LKBs
703  * 3 - destroy lockspace as part of a forced shutdown
704  */
705
706 int dlm_release_lockspace(void *lockspace, int force)
707 {
708         struct dlm_ls *ls;
709
710         ls = dlm_find_lockspace_local(lockspace);
711         if (!ls)
712                 return -EINVAL;
713         dlm_put_lockspace(ls);
714         return release_lockspace(ls, force);
715 }
716