[DLM] Fix kref_put oops
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return snprintf(buf, PAGE_SIZE, "%x\n", status);
82 }
83
84 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 {
86         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
87 }
88
89 struct dlm_attr {
90         struct attribute attr;
91         ssize_t (*show)(struct dlm_ls *, char *);
92         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
93 };
94
95 static struct dlm_attr dlm_attr_control = {
96         .attr  = {.name = "control", .mode = S_IWUSR},
97         .store = dlm_control_store
98 };
99
100 static struct dlm_attr dlm_attr_event = {
101         .attr  = {.name = "event_done", .mode = S_IWUSR},
102         .store = dlm_event_store
103 };
104
105 static struct dlm_attr dlm_attr_id = {
106         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
107         .show  = dlm_id_show,
108         .store = dlm_id_store
109 };
110
111 static struct dlm_attr dlm_attr_recover_status = {
112         .attr  = {.name = "recover_status", .mode = S_IRUGO},
113         .show  = dlm_recover_status_show
114 };
115
116 static struct dlm_attr dlm_attr_recover_nodeid = {
117         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
118         .show  = dlm_recover_nodeid_show
119 };
120
121 static struct attribute *dlm_attrs[] = {
122         &dlm_attr_control.attr,
123         &dlm_attr_event.attr,
124         &dlm_attr_id.attr,
125         &dlm_attr_recover_status.attr,
126         &dlm_attr_recover_nodeid.attr,
127         NULL,
128 };
129
130 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
131                              char *buf)
132 {
133         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
134         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
135         return a->show ? a->show(ls, buf) : 0;
136 }
137
138 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
139                               const char *buf, size_t len)
140 {
141         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
142         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
143         return a->store ? a->store(ls, buf, len) : len;
144 }
145
146 static void lockspace_kobj_release(struct kobject *k)
147 {
148         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
149         kfree(ls);
150 }
151
152 static struct sysfs_ops dlm_attr_ops = {
153         .show  = dlm_attr_show,
154         .store = dlm_attr_store,
155 };
156
157 static struct kobj_type dlm_ktype = {
158         .default_attrs = dlm_attrs,
159         .sysfs_ops     = &dlm_attr_ops,
160         .release       = lockspace_kobj_release,
161 };
162
163 static struct kset dlm_kset = {
164         .subsys = &kernel_subsys,
165         .kobj   = {.name = "dlm",},
166         .ktype  = &dlm_ktype,
167 };
168
169 static int kobject_setup(struct dlm_ls *ls)
170 {
171         char lsname[DLM_LOCKSPACE_LEN];
172         int error;
173
174         memset(lsname, 0, DLM_LOCKSPACE_LEN);
175         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
176
177         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
178         if (error)
179                 return error;
180
181         ls->ls_kobj.kset = &dlm_kset;
182         ls->ls_kobj.ktype = &dlm_ktype;
183         return 0;
184 }
185
186 static int do_uevent(struct dlm_ls *ls, int in)
187 {
188         int error;
189
190         if (in)
191                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
192         else
193                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
194
195         error = wait_event_interruptible(ls->ls_uevent_wait,
196                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
197         if (error)
198                 goto out;
199
200         error = ls->ls_uevent_result;
201  out:
202         return error;
203 }
204
205
206 int dlm_lockspace_init(void)
207 {
208         int error;
209
210         ls_count = 0;
211         mutex_init(&ls_lock);
212         INIT_LIST_HEAD(&lslist);
213         spin_lock_init(&lslist_lock);
214
215         error = kset_register(&dlm_kset);
216         if (error)
217                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
218         return error;
219 }
220
221 void dlm_lockspace_exit(void)
222 {
223         kset_unregister(&dlm_kset);
224 }
225
226 static int dlm_scand(void *data)
227 {
228         struct dlm_ls *ls;
229
230         while (!kthread_should_stop()) {
231                 list_for_each_entry(ls, &lslist, ls_list)
232                         dlm_scan_rsbs(ls);
233                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
234         }
235         return 0;
236 }
237
238 static int dlm_scand_start(void)
239 {
240         struct task_struct *p;
241         int error = 0;
242
243         p = kthread_run(dlm_scand, NULL, "dlm_scand");
244         if (IS_ERR(p))
245                 error = PTR_ERR(p);
246         else
247                 scand_task = p;
248         return error;
249 }
250
251 static void dlm_scand_stop(void)
252 {
253         kthread_stop(scand_task);
254 }
255
256 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
257 {
258         struct dlm_ls *ls;
259
260         spin_lock(&lslist_lock);
261
262         list_for_each_entry(ls, &lslist, ls_list) {
263                 if (ls->ls_namelen == namelen &&
264                     memcmp(ls->ls_name, name, namelen) == 0)
265                         goto out;
266         }
267         ls = NULL;
268  out:
269         spin_unlock(&lslist_lock);
270         return ls;
271 }
272
273 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
274 {
275         struct dlm_ls *ls;
276
277         spin_lock(&lslist_lock);
278
279         list_for_each_entry(ls, &lslist, ls_list) {
280                 if (ls->ls_global_id == id) {
281                         ls->ls_count++;
282                         goto out;
283                 }
284         }
285         ls = NULL;
286  out:
287         spin_unlock(&lslist_lock);
288         return ls;
289 }
290
291 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
292 {
293         struct dlm_ls *ls;
294
295         spin_lock(&lslist_lock);
296         list_for_each_entry(ls, &lslist, ls_list) {
297                 if (ls->ls_local_handle == lockspace) {
298                         ls->ls_count++;
299                         goto out;
300                 }
301         }
302         ls = NULL;
303  out:
304         spin_unlock(&lslist_lock);
305         return ls;
306 }
307
308 struct dlm_ls *dlm_find_lockspace_device(int minor)
309 {
310         struct dlm_ls *ls;
311
312         spin_lock(&lslist_lock);
313         list_for_each_entry(ls, &lslist, ls_list) {
314                 if (ls->ls_device.minor == minor) {
315                         ls->ls_count++;
316                         goto out;
317                 }
318         }
319         ls = NULL;
320  out:
321         spin_unlock(&lslist_lock);
322         return ls;
323 }
324
325 void dlm_put_lockspace(struct dlm_ls *ls)
326 {
327         spin_lock(&lslist_lock);
328         ls->ls_count--;
329         spin_unlock(&lslist_lock);
330 }
331
332 static void remove_lockspace(struct dlm_ls *ls)
333 {
334         for (;;) {
335                 spin_lock(&lslist_lock);
336                 if (ls->ls_count == 0) {
337                         list_del(&ls->ls_list);
338                         spin_unlock(&lslist_lock);
339                         return;
340                 }
341                 spin_unlock(&lslist_lock);
342                 ssleep(1);
343         }
344 }
345
346 static int threads_start(void)
347 {
348         int error;
349
350         /* Thread which process lock requests for all lockspace's */
351         error = dlm_astd_start();
352         if (error) {
353                 log_print("cannot start dlm_astd thread %d", error);
354                 goto fail;
355         }
356
357         error = dlm_scand_start();
358         if (error) {
359                 log_print("cannot start dlm_scand thread %d", error);
360                 goto astd_fail;
361         }
362
363         /* Thread for sending/receiving messages for all lockspace's */
364         error = dlm_lowcomms_start();
365         if (error) {
366                 log_print("cannot start dlm lowcomms %d", error);
367                 goto scand_fail;
368         }
369
370         return 0;
371
372  scand_fail:
373         dlm_scand_stop();
374  astd_fail:
375         dlm_astd_stop();
376  fail:
377         return error;
378 }
379
380 static void threads_stop(void)
381 {
382         dlm_scand_stop();
383         dlm_lowcomms_stop();
384         dlm_astd_stop();
385 }
386
387 static int new_lockspace(char *name, int namelen, void **lockspace,
388                          uint32_t flags, int lvblen)
389 {
390         struct dlm_ls *ls;
391         int i, size, error = -ENOMEM;
392
393         if (namelen > DLM_LOCKSPACE_LEN)
394                 return -EINVAL;
395
396         if (!lvblen || (lvblen % 8))
397                 return -EINVAL;
398
399         if (!try_module_get(THIS_MODULE))
400                 return -EINVAL;
401
402         ls = dlm_find_lockspace_name(name, namelen);
403         if (ls) {
404                 *lockspace = ls;
405                 module_put(THIS_MODULE);
406                 return -EEXIST;
407         }
408
409         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
410         if (!ls)
411                 goto out;
412         memcpy(ls->ls_name, name, namelen);
413         ls->ls_namelen = namelen;
414         ls->ls_exflags = flags;
415         ls->ls_lvblen = lvblen;
416         ls->ls_count = 0;
417         ls->ls_flags = 0;
418
419         size = dlm_config.rsbtbl_size;
420         ls->ls_rsbtbl_size = size;
421
422         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
423         if (!ls->ls_rsbtbl)
424                 goto out_lsfree;
425         for (i = 0; i < size; i++) {
426                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
427                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
428                 rwlock_init(&ls->ls_rsbtbl[i].lock);
429         }
430
431         size = dlm_config.lkbtbl_size;
432         ls->ls_lkbtbl_size = size;
433
434         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
435         if (!ls->ls_lkbtbl)
436                 goto out_rsbfree;
437         for (i = 0; i < size; i++) {
438                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
439                 rwlock_init(&ls->ls_lkbtbl[i].lock);
440                 ls->ls_lkbtbl[i].counter = 1;
441         }
442
443         size = dlm_config.dirtbl_size;
444         ls->ls_dirtbl_size = size;
445
446         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
447         if (!ls->ls_dirtbl)
448                 goto out_lkbfree;
449         for (i = 0; i < size; i++) {
450                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
451                 rwlock_init(&ls->ls_dirtbl[i].lock);
452         }
453
454         INIT_LIST_HEAD(&ls->ls_waiters);
455         mutex_init(&ls->ls_waiters_mutex);
456
457         INIT_LIST_HEAD(&ls->ls_nodes);
458         INIT_LIST_HEAD(&ls->ls_nodes_gone);
459         ls->ls_num_nodes = 0;
460         ls->ls_low_nodeid = 0;
461         ls->ls_total_weight = 0;
462         ls->ls_node_array = NULL;
463
464         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
465         ls->ls_stub_rsb.res_ls = ls;
466
467         ls->ls_debug_rsb_dentry = NULL;
468         ls->ls_debug_waiters_dentry = NULL;
469
470         init_waitqueue_head(&ls->ls_uevent_wait);
471         ls->ls_uevent_result = 0;
472
473         ls->ls_recoverd_task = NULL;
474         mutex_init(&ls->ls_recoverd_active);
475         spin_lock_init(&ls->ls_recover_lock);
476         ls->ls_recover_status = 0;
477         ls->ls_recover_seq = 0;
478         ls->ls_recover_args = NULL;
479         init_rwsem(&ls->ls_in_recovery);
480         INIT_LIST_HEAD(&ls->ls_requestqueue);
481         mutex_init(&ls->ls_requestqueue_mutex);
482         mutex_init(&ls->ls_clear_proc_locks);
483
484         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
485         if (!ls->ls_recover_buf)
486                 goto out_dirfree;
487
488         INIT_LIST_HEAD(&ls->ls_recover_list);
489         spin_lock_init(&ls->ls_recover_list_lock);
490         ls->ls_recover_list_count = 0;
491         ls->ls_local_handle = ls;
492         init_waitqueue_head(&ls->ls_wait_general);
493         INIT_LIST_HEAD(&ls->ls_root_list);
494         init_rwsem(&ls->ls_root_sem);
495
496         down_write(&ls->ls_in_recovery);
497
498         spin_lock(&lslist_lock);
499         list_add(&ls->ls_list, &lslist);
500         spin_unlock(&lslist_lock);
501
502         /* needs to find ls in lslist */
503         error = dlm_recoverd_start(ls);
504         if (error) {
505                 log_error(ls, "can't start dlm_recoverd %d", error);
506                 goto out_rcomfree;
507         }
508
509         dlm_create_debug_file(ls);
510
511         error = kobject_setup(ls);
512         if (error)
513                 goto out_del;
514
515         error = kobject_register(&ls->ls_kobj);
516         if (error)
517                 goto out_del;
518
519         error = do_uevent(ls, 1);
520         if (error)
521                 goto out_unreg;
522
523         *lockspace = ls;
524         return 0;
525
526  out_unreg:
527         kobject_unregister(&ls->ls_kobj);
528  out_del:
529         dlm_delete_debug_file(ls);
530         dlm_recoverd_stop(ls);
531  out_rcomfree:
532         spin_lock(&lslist_lock);
533         list_del(&ls->ls_list);
534         spin_unlock(&lslist_lock);
535         kfree(ls->ls_recover_buf);
536  out_dirfree:
537         kfree(ls->ls_dirtbl);
538  out_lkbfree:
539         kfree(ls->ls_lkbtbl);
540  out_rsbfree:
541         kfree(ls->ls_rsbtbl);
542  out_lsfree:
543         kfree(ls);
544  out:
545         module_put(THIS_MODULE);
546         return error;
547 }
548
549 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
550                       uint32_t flags, int lvblen)
551 {
552         int error = 0;
553
554         mutex_lock(&ls_lock);
555         if (!ls_count)
556                 error = threads_start();
557         if (error)
558                 goto out;
559
560         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
561         if (!error)
562                 ls_count++;
563  out:
564         mutex_unlock(&ls_lock);
565         return error;
566 }
567
568 /* Return 1 if the lockspace still has active remote locks,
569  *        2 if the lockspace still has active local locks.
570  */
571 static int lockspace_busy(struct dlm_ls *ls)
572 {
573         int i, lkb_found = 0;
574         struct dlm_lkb *lkb;
575
576         /* NOTE: We check the lockidtbl here rather than the resource table.
577            This is because there may be LKBs queued as ASTs that have been
578            unlinked from their RSBs and are pending deletion once the AST has
579            been delivered */
580
581         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
582                 read_lock(&ls->ls_lkbtbl[i].lock);
583                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
584                         lkb_found = 1;
585                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
586                                             lkb_idtbl_list) {
587                                 if (!lkb->lkb_nodeid) {
588                                         read_unlock(&ls->ls_lkbtbl[i].lock);
589                                         return 2;
590                                 }
591                         }
592                 }
593                 read_unlock(&ls->ls_lkbtbl[i].lock);
594         }
595         return lkb_found;
596 }
597
598 static int release_lockspace(struct dlm_ls *ls, int force)
599 {
600         struct dlm_lkb *lkb;
601         struct dlm_rsb *rsb;
602         struct list_head *head;
603         int i;
604         int busy = lockspace_busy(ls);
605
606         if (busy > force)
607                 return -EBUSY;
608
609         if (force < 3)
610                 do_uevent(ls, 0);
611
612         dlm_recoverd_stop(ls);
613
614         remove_lockspace(ls);
615
616         dlm_delete_debug_file(ls);
617
618         dlm_astd_suspend();
619
620         kfree(ls->ls_recover_buf);
621
622         /*
623          * Free direntry structs.
624          */
625
626         dlm_dir_clear(ls);
627         kfree(ls->ls_dirtbl);
628
629         /*
630          * Free all lkb's on lkbtbl[] lists.
631          */
632
633         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
634                 head = &ls->ls_lkbtbl[i].list;
635                 while (!list_empty(head)) {
636                         lkb = list_entry(head->next, struct dlm_lkb,
637                                          lkb_idtbl_list);
638
639                         list_del(&lkb->lkb_idtbl_list);
640
641                         dlm_del_ast(lkb);
642
643                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
644                                 free_lvb(lkb->lkb_lvbptr);
645
646                         free_lkb(lkb);
647                 }
648         }
649         dlm_astd_resume();
650
651         kfree(ls->ls_lkbtbl);
652
653         /*
654          * Free all rsb's on rsbtbl[] lists
655          */
656
657         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
658                 head = &ls->ls_rsbtbl[i].list;
659                 while (!list_empty(head)) {
660                         rsb = list_entry(head->next, struct dlm_rsb,
661                                          res_hashchain);
662
663                         list_del(&rsb->res_hashchain);
664                         free_rsb(rsb);
665                 }
666
667                 head = &ls->ls_rsbtbl[i].toss;
668                 while (!list_empty(head)) {
669                         rsb = list_entry(head->next, struct dlm_rsb,
670                                          res_hashchain);
671                         list_del(&rsb->res_hashchain);
672                         free_rsb(rsb);
673                 }
674         }
675
676         kfree(ls->ls_rsbtbl);
677
678         /*
679          * Free structures on any other lists
680          */
681
682         kfree(ls->ls_recover_args);
683         dlm_clear_free_entries(ls);
684         dlm_clear_members(ls);
685         dlm_clear_members_gone(ls);
686         kfree(ls->ls_node_array);
687         kobject_unregister(&ls->ls_kobj);
688         /* The ls structure will be freed when the kobject is done with */
689
690         mutex_lock(&ls_lock);
691         ls_count--;
692         if (!ls_count)
693                 threads_stop();
694         mutex_unlock(&ls_lock);
695
696         module_put(THIS_MODULE);
697         return 0;
698 }
699
700 /*
701  * Called when a system has released all its locks and is not going to use the
702  * lockspace any longer.  We free everything we're managing for this lockspace.
703  * Remaining nodes will go through the recovery process as if we'd died.  The
704  * lockspace must continue to function as usual, participating in recoveries,
705  * until this returns.
706  *
707  * Force has 4 possible values:
708  * 0 - don't destroy locksapce if it has any LKBs
709  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
710  * 2 - destroy lockspace regardless of LKBs
711  * 3 - destroy lockspace as part of a forced shutdown
712  */
713
714 int dlm_release_lockspace(void *lockspace, int force)
715 {
716         struct dlm_ls *ls;
717
718         ls = dlm_find_lockspace_local(lockspace);
719         if (!ls)
720                 return -EINVAL;
721         dlm_put_lockspace(ls);
722         return release_lockspace(ls, force);
723 }
724