[DLM] dlm: user locks
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return sprintf(buf, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return sprintf(buf, "%x\n", status);
82 }
83
84 struct dlm_attr {
85         struct attribute attr;
86         ssize_t (*show)(struct dlm_ls *, char *);
87         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
88 };
89
90 static struct dlm_attr dlm_attr_control = {
91         .attr  = {.name = "control", .mode = S_IWUSR},
92         .store = dlm_control_store
93 };
94
95 static struct dlm_attr dlm_attr_event = {
96         .attr  = {.name = "event_done", .mode = S_IWUSR},
97         .store = dlm_event_store
98 };
99
100 static struct dlm_attr dlm_attr_id = {
101         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
102         .show  = dlm_id_show,
103         .store = dlm_id_store
104 };
105
106 static struct dlm_attr dlm_attr_recover_status = {
107         .attr  = {.name = "recover_status", .mode = S_IRUGO},
108         .show  = dlm_recover_status_show
109 };
110
111 static struct attribute *dlm_attrs[] = {
112         &dlm_attr_control.attr,
113         &dlm_attr_event.attr,
114         &dlm_attr_id.attr,
115         &dlm_attr_recover_status.attr,
116         NULL,
117 };
118
119 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
120                              char *buf)
121 {
122         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
123         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
124         return a->show ? a->show(ls, buf) : 0;
125 }
126
127 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
128                               const char *buf, size_t len)
129 {
130         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
131         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
132         return a->store ? a->store(ls, buf, len) : len;
133 }
134
135 static struct sysfs_ops dlm_attr_ops = {
136         .show  = dlm_attr_show,
137         .store = dlm_attr_store,
138 };
139
140 static struct kobj_type dlm_ktype = {
141         .default_attrs = dlm_attrs,
142         .sysfs_ops     = &dlm_attr_ops,
143 };
144
145 static struct kset dlm_kset = {
146         .subsys = &kernel_subsys,
147         .kobj   = {.name = "dlm",},
148         .ktype  = &dlm_ktype,
149 };
150
151 static int kobject_setup(struct dlm_ls *ls)
152 {
153         char lsname[DLM_LOCKSPACE_LEN];
154         int error;
155
156         memset(lsname, 0, DLM_LOCKSPACE_LEN);
157         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
158
159         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
160         if (error)
161                 return error;
162
163         ls->ls_kobj.kset = &dlm_kset;
164         ls->ls_kobj.ktype = &dlm_ktype;
165         return 0;
166 }
167
168 static int do_uevent(struct dlm_ls *ls, int in)
169 {
170         int error;
171
172         if (in)
173                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
174         else
175                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
176
177         error = wait_event_interruptible(ls->ls_uevent_wait,
178                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
179         if (error)
180                 goto out;
181
182         error = ls->ls_uevent_result;
183  out:
184         return error;
185 }
186
187
188 int dlm_lockspace_init(void)
189 {
190         int error;
191
192         ls_count = 0;
193         mutex_init(&ls_lock);
194         INIT_LIST_HEAD(&lslist);
195         spin_lock_init(&lslist_lock);
196
197         error = kset_register(&dlm_kset);
198         if (error)
199                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
200         return error;
201 }
202
203 void dlm_lockspace_exit(void)
204 {
205         kset_unregister(&dlm_kset);
206 }
207
208 static int dlm_scand(void *data)
209 {
210         struct dlm_ls *ls;
211
212         while (!kthread_should_stop()) {
213                 list_for_each_entry(ls, &lslist, ls_list)
214                         dlm_scan_rsbs(ls);
215                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
216         }
217         return 0;
218 }
219
220 static int dlm_scand_start(void)
221 {
222         struct task_struct *p;
223         int error = 0;
224
225         p = kthread_run(dlm_scand, NULL, "dlm_scand");
226         if (IS_ERR(p))
227                 error = PTR_ERR(p);
228         else
229                 scand_task = p;
230         return error;
231 }
232
233 static void dlm_scand_stop(void)
234 {
235         kthread_stop(scand_task);
236 }
237
238 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
239 {
240         struct dlm_ls *ls;
241
242         spin_lock(&lslist_lock);
243
244         list_for_each_entry(ls, &lslist, ls_list) {
245                 if (ls->ls_namelen == namelen &&
246                     memcmp(ls->ls_name, name, namelen) == 0)
247                         goto out;
248         }
249         ls = NULL;
250  out:
251         spin_unlock(&lslist_lock);
252         return ls;
253 }
254
255 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
256 {
257         struct dlm_ls *ls;
258
259         spin_lock(&lslist_lock);
260
261         list_for_each_entry(ls, &lslist, ls_list) {
262                 if (ls->ls_global_id == id) {
263                         ls->ls_count++;
264                         goto out;
265                 }
266         }
267         ls = NULL;
268  out:
269         spin_unlock(&lslist_lock);
270         return ls;
271 }
272
273 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
274 {
275         struct dlm_ls *ls;
276
277         spin_lock(&lslist_lock);
278         list_for_each_entry(ls, &lslist, ls_list) {
279                 if (ls->ls_local_handle == lockspace) {
280                         ls->ls_count++;
281                         goto out;
282                 }
283         }
284         ls = NULL;
285  out:
286         spin_unlock(&lslist_lock);
287         return ls;
288 }
289
290 struct dlm_ls *dlm_find_lockspace_device(int minor)
291 {
292         struct dlm_ls *ls;
293
294         spin_lock(&lslist_lock);
295         list_for_each_entry(ls, &lslist, ls_list) {
296                 if (ls->ls_device.minor == minor) {
297                         ls->ls_count++;
298                         goto out;
299                 }
300         }
301         ls = NULL;
302  out:
303         spin_unlock(&lslist_lock);
304         return ls;
305 }
306
307 void dlm_put_lockspace(struct dlm_ls *ls)
308 {
309         spin_lock(&lslist_lock);
310         ls->ls_count--;
311         spin_unlock(&lslist_lock);
312 }
313
314 static void remove_lockspace(struct dlm_ls *ls)
315 {
316         for (;;) {
317                 spin_lock(&lslist_lock);
318                 if (ls->ls_count == 0) {
319                         list_del(&ls->ls_list);
320                         spin_unlock(&lslist_lock);
321                         return;
322                 }
323                 spin_unlock(&lslist_lock);
324                 ssleep(1);
325         }
326 }
327
328 static int threads_start(void)
329 {
330         int error;
331
332         /* Thread which process lock requests for all lockspace's */
333         error = dlm_astd_start();
334         if (error) {
335                 log_print("cannot start dlm_astd thread %d", error);
336                 goto fail;
337         }
338
339         error = dlm_scand_start();
340         if (error) {
341                 log_print("cannot start dlm_scand thread %d", error);
342                 goto astd_fail;
343         }
344
345         /* Thread for sending/receiving messages for all lockspace's */
346         error = dlm_lowcomms_start();
347         if (error) {
348                 log_print("cannot start dlm lowcomms %d", error);
349                 goto scand_fail;
350         }
351
352         return 0;
353
354  scand_fail:
355         dlm_scand_stop();
356  astd_fail:
357         dlm_astd_stop();
358  fail:
359         return error;
360 }
361
362 static void threads_stop(void)
363 {
364         dlm_scand_stop();
365         dlm_lowcomms_stop();
366         dlm_astd_stop();
367 }
368
369 static int new_lockspace(char *name, int namelen, void **lockspace,
370                          uint32_t flags, int lvblen)
371 {
372         struct dlm_ls *ls;
373         int i, size, error = -ENOMEM;
374
375         if (namelen > DLM_LOCKSPACE_LEN)
376                 return -EINVAL;
377
378         if (!lvblen || (lvblen % 8))
379                 return -EINVAL;
380
381         if (!try_module_get(THIS_MODULE))
382                 return -EINVAL;
383
384         ls = dlm_find_lockspace_name(name, namelen);
385         if (ls) {
386                 *lockspace = ls;
387                 module_put(THIS_MODULE);
388                 return -EEXIST;
389         }
390
391         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
392         if (!ls)
393                 goto out;
394         memcpy(ls->ls_name, name, namelen);
395         ls->ls_namelen = namelen;
396         ls->ls_exflags = flags;
397         ls->ls_lvblen = lvblen;
398         ls->ls_count = 0;
399         ls->ls_flags = 0;
400
401         size = dlm_config.rsbtbl_size;
402         ls->ls_rsbtbl_size = size;
403
404         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
405         if (!ls->ls_rsbtbl)
406                 goto out_lsfree;
407         for (i = 0; i < size; i++) {
408                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
409                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
410                 rwlock_init(&ls->ls_rsbtbl[i].lock);
411         }
412
413         size = dlm_config.lkbtbl_size;
414         ls->ls_lkbtbl_size = size;
415
416         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
417         if (!ls->ls_lkbtbl)
418                 goto out_rsbfree;
419         for (i = 0; i < size; i++) {
420                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
421                 rwlock_init(&ls->ls_lkbtbl[i].lock);
422                 ls->ls_lkbtbl[i].counter = 1;
423         }
424
425         size = dlm_config.dirtbl_size;
426         ls->ls_dirtbl_size = size;
427
428         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
429         if (!ls->ls_dirtbl)
430                 goto out_lkbfree;
431         for (i = 0; i < size; i++) {
432                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
433                 rwlock_init(&ls->ls_dirtbl[i].lock);
434         }
435
436         INIT_LIST_HEAD(&ls->ls_waiters);
437         mutex_init(&ls->ls_waiters_mutex);
438
439         INIT_LIST_HEAD(&ls->ls_nodes);
440         INIT_LIST_HEAD(&ls->ls_nodes_gone);
441         ls->ls_num_nodes = 0;
442         ls->ls_low_nodeid = 0;
443         ls->ls_total_weight = 0;
444         ls->ls_node_array = NULL;
445
446         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
447         ls->ls_stub_rsb.res_ls = ls;
448
449         ls->ls_debug_dentry = NULL;
450
451         init_waitqueue_head(&ls->ls_uevent_wait);
452         ls->ls_uevent_result = 0;
453
454         ls->ls_recoverd_task = NULL;
455         mutex_init(&ls->ls_recoverd_active);
456         spin_lock_init(&ls->ls_recover_lock);
457         ls->ls_recover_status = 0;
458         ls->ls_recover_seq = 0;
459         ls->ls_recover_args = NULL;
460         init_rwsem(&ls->ls_in_recovery);
461         INIT_LIST_HEAD(&ls->ls_requestqueue);
462         mutex_init(&ls->ls_requestqueue_mutex);
463         mutex_init(&ls->ls_clear_proc_locks);
464
465         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
466         if (!ls->ls_recover_buf)
467                 goto out_dirfree;
468
469         INIT_LIST_HEAD(&ls->ls_recover_list);
470         spin_lock_init(&ls->ls_recover_list_lock);
471         ls->ls_recover_list_count = 0;
472         ls->ls_local_handle = ls;
473         init_waitqueue_head(&ls->ls_wait_general);
474         INIT_LIST_HEAD(&ls->ls_root_list);
475         init_rwsem(&ls->ls_root_sem);
476
477         down_write(&ls->ls_in_recovery);
478
479         error = dlm_recoverd_start(ls);
480         if (error) {
481                 log_error(ls, "can't start dlm_recoverd %d", error);
482                 goto out_rcomfree;
483         }
484
485         spin_lock(&lslist_lock);
486         list_add(&ls->ls_list, &lslist);
487         spin_unlock(&lslist_lock);
488
489         dlm_create_debug_file(ls);
490
491         error = kobject_setup(ls);
492         if (error)
493                 goto out_del;
494
495         error = kobject_register(&ls->ls_kobj);
496         if (error)
497                 goto out_del;
498
499         error = do_uevent(ls, 1);
500         if (error)
501                 goto out_unreg;
502
503         *lockspace = ls;
504         return 0;
505
506  out_unreg:
507         kobject_unregister(&ls->ls_kobj);
508  out_del:
509         dlm_delete_debug_file(ls);
510         spin_lock(&lslist_lock);
511         list_del(&ls->ls_list);
512         spin_unlock(&lslist_lock);
513         dlm_recoverd_stop(ls);
514  out_rcomfree:
515         kfree(ls->ls_recover_buf);
516  out_dirfree:
517         kfree(ls->ls_dirtbl);
518  out_lkbfree:
519         kfree(ls->ls_lkbtbl);
520  out_rsbfree:
521         kfree(ls->ls_rsbtbl);
522  out_lsfree:
523         kfree(ls);
524  out:
525         module_put(THIS_MODULE);
526         return error;
527 }
528
529 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
530                       uint32_t flags, int lvblen)
531 {
532         int error = 0;
533
534         mutex_lock(&ls_lock);
535         if (!ls_count)
536                 error = threads_start();
537         if (error)
538                 goto out;
539
540         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
541         if (!error)
542                 ls_count++;
543  out:
544         mutex_unlock(&ls_lock);
545         return error;
546 }
547
548 /* Return 1 if the lockspace still has active remote locks,
549  *        2 if the lockspace still has active local locks.
550  */
551 static int lockspace_busy(struct dlm_ls *ls)
552 {
553         int i, lkb_found = 0;
554         struct dlm_lkb *lkb;
555
556         /* NOTE: We check the lockidtbl here rather than the resource table.
557            This is because there may be LKBs queued as ASTs that have been
558            unlinked from their RSBs and are pending deletion once the AST has
559            been delivered */
560
561         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
562                 read_lock(&ls->ls_lkbtbl[i].lock);
563                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
564                         lkb_found = 1;
565                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
566                                             lkb_idtbl_list) {
567                                 if (!lkb->lkb_nodeid) {
568                                         read_unlock(&ls->ls_lkbtbl[i].lock);
569                                         return 2;
570                                 }
571                         }
572                 }
573                 read_unlock(&ls->ls_lkbtbl[i].lock);
574         }
575         return lkb_found;
576 }
577
578 static int release_lockspace(struct dlm_ls *ls, int force)
579 {
580         struct dlm_lkb *lkb;
581         struct dlm_rsb *rsb;
582         struct list_head *head;
583         int i;
584         int busy = lockspace_busy(ls);
585
586         if (busy > force)
587                 return -EBUSY;
588
589         if (force < 3)
590                 do_uevent(ls, 0);
591
592         dlm_recoverd_stop(ls);
593
594         remove_lockspace(ls);
595
596         dlm_delete_debug_file(ls);
597
598         dlm_astd_suspend();
599
600         kfree(ls->ls_recover_buf);
601
602         /*
603          * Free direntry structs.
604          */
605
606         dlm_dir_clear(ls);
607         kfree(ls->ls_dirtbl);
608
609         /*
610          * Free all lkb's on lkbtbl[] lists.
611          */
612
613         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
614                 head = &ls->ls_lkbtbl[i].list;
615                 while (!list_empty(head)) {
616                         lkb = list_entry(head->next, struct dlm_lkb,
617                                          lkb_idtbl_list);
618
619                         list_del(&lkb->lkb_idtbl_list);
620
621                         dlm_del_ast(lkb);
622
623                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
624                                 free_lvb(lkb->lkb_lvbptr);
625
626                         free_lkb(lkb);
627                 }
628         }
629         dlm_astd_resume();
630
631         kfree(ls->ls_lkbtbl);
632
633         /*
634          * Free all rsb's on rsbtbl[] lists
635          */
636
637         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
638                 head = &ls->ls_rsbtbl[i].list;
639                 while (!list_empty(head)) {
640                         rsb = list_entry(head->next, struct dlm_rsb,
641                                          res_hashchain);
642
643                         list_del(&rsb->res_hashchain);
644                         free_rsb(rsb);
645                 }
646
647                 head = &ls->ls_rsbtbl[i].toss;
648                 while (!list_empty(head)) {
649                         rsb = list_entry(head->next, struct dlm_rsb,
650                                          res_hashchain);
651                         list_del(&rsb->res_hashchain);
652                         free_rsb(rsb);
653                 }
654         }
655
656         kfree(ls->ls_rsbtbl);
657
658         /*
659          * Free structures on any other lists
660          */
661
662         kfree(ls->ls_recover_args);
663         dlm_clear_free_entries(ls);
664         dlm_clear_members(ls);
665         dlm_clear_members_gone(ls);
666         kfree(ls->ls_node_array);
667         kobject_unregister(&ls->ls_kobj);
668         kfree(ls);
669
670         mutex_lock(&ls_lock);
671         ls_count--;
672         if (!ls_count)
673                 threads_stop();
674         mutex_unlock(&ls_lock);
675
676         module_put(THIS_MODULE);
677         return 0;
678 }
679
680 /*
681  * Called when a system has released all its locks and is not going to use the
682  * lockspace any longer.  We free everything we're managing for this lockspace.
683  * Remaining nodes will go through the recovery process as if we'd died.  The
684  * lockspace must continue to function as usual, participating in recoveries,
685  * until this returns.
686  *
687  * Force has 4 possible values:
688  * 0 - don't destroy locksapce if it has any LKBs
689  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
690  * 2 - destroy lockspace regardless of LKBs
691  * 3 - destroy lockspace as part of a forced shutdown
692  */
693
694 int dlm_release_lockspace(void *lockspace, int force)
695 {
696         struct dlm_ls *ls;
697
698         ls = dlm_find_lockspace_local(lockspace);
699         if (!ls)
700                 return -EINVAL;
701         dlm_put_lockspace(ls);
702         return release_lockspace(ls, force);
703 }
704