Fix configfs leak
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194
195 int __init dlm_lockspace_init(void)
196 {
197         ls_count = 0;
198         mutex_init(&ls_lock);
199         INIT_LIST_HEAD(&lslist);
200         spin_lock_init(&lslist_lock);
201
202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203         if (!dlm_kset) {
204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205                 return -ENOMEM;
206         }
207         return 0;
208 }
209
210 void dlm_lockspace_exit(void)
211 {
212         kset_unregister(dlm_kset);
213 }
214
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217         struct dlm_ls *ls;
218
219         spin_lock(&lslist_lock);
220         list_for_each_entry(ls, &lslist, ls_list) {
221                 if (time_after_eq(jiffies, ls->ls_scan_time +
222                                             dlm_config.ci_scan_secs * HZ)) {
223                         spin_unlock(&lslist_lock);
224                         return ls;
225                 }
226         }
227         spin_unlock(&lslist_lock);
228         return NULL;
229 }
230
231 static int dlm_scand(void *data)
232 {
233         struct dlm_ls *ls;
234         int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235
236         while (!kthread_should_stop()) {
237                 ls = find_ls_to_scan();
238                 if (ls) {
239                         if (dlm_lock_recovery_try(ls)) {
240                                 ls->ls_scan_time = jiffies;
241                                 dlm_scan_rsbs(ls);
242                                 dlm_scan_timeout(ls);
243                                 dlm_unlock_recovery(ls);
244                         } else {
245                                 ls->ls_scan_time += HZ;
246                         }
247                 } else {
248                         schedule_timeout_interruptible(timeout_jiffies);
249                 }
250         }
251         return 0;
252 }
253
254 static int dlm_scand_start(void)
255 {
256         struct task_struct *p;
257         int error = 0;
258
259         p = kthread_run(dlm_scand, NULL, "dlm_scand");
260         if (IS_ERR(p))
261                 error = PTR_ERR(p);
262         else
263                 scand_task = p;
264         return error;
265 }
266
267 static void dlm_scand_stop(void)
268 {
269         kthread_stop(scand_task);
270 }
271
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274         struct dlm_ls *ls;
275
276         spin_lock(&lslist_lock);
277
278         list_for_each_entry(ls, &lslist, ls_list) {
279                 if (ls->ls_global_id == id) {
280                         ls->ls_count++;
281                         goto out;
282                 }
283         }
284         ls = NULL;
285  out:
286         spin_unlock(&lslist_lock);
287         return ls;
288 }
289
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292         struct dlm_ls *ls;
293
294         spin_lock(&lslist_lock);
295         list_for_each_entry(ls, &lslist, ls_list) {
296                 if (ls->ls_local_handle == lockspace) {
297                         ls->ls_count++;
298                         goto out;
299                 }
300         }
301         ls = NULL;
302  out:
303         spin_unlock(&lslist_lock);
304         return ls;
305 }
306
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309         struct dlm_ls *ls;
310
311         spin_lock(&lslist_lock);
312         list_for_each_entry(ls, &lslist, ls_list) {
313                 if (ls->ls_device.minor == minor) {
314                         ls->ls_count++;
315                         goto out;
316                 }
317         }
318         ls = NULL;
319  out:
320         spin_unlock(&lslist_lock);
321         return ls;
322 }
323
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326         spin_lock(&lslist_lock);
327         ls->ls_count--;
328         spin_unlock(&lslist_lock);
329 }
330
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333         for (;;) {
334                 spin_lock(&lslist_lock);
335                 if (ls->ls_count == 0) {
336                         WARN_ON(ls->ls_create_count != 0);
337                         list_del(&ls->ls_list);
338                         spin_unlock(&lslist_lock);
339                         return;
340                 }
341                 spin_unlock(&lslist_lock);
342                 ssleep(1);
343         }
344 }
345
346 static int threads_start(void)
347 {
348         int error;
349
350         /* Thread which process lock requests for all lockspace's */
351         error = dlm_astd_start();
352         if (error) {
353                 log_print("cannot start dlm_astd thread %d", error);
354                 goto fail;
355         }
356
357         error = dlm_scand_start();
358         if (error) {
359                 log_print("cannot start dlm_scand thread %d", error);
360                 goto astd_fail;
361         }
362
363         /* Thread for sending/receiving messages for all lockspace's */
364         error = dlm_lowcomms_start();
365         if (error) {
366                 log_print("cannot start dlm lowcomms %d", error);
367                 goto scand_fail;
368         }
369
370         return 0;
371
372  scand_fail:
373         dlm_scand_stop();
374  astd_fail:
375         dlm_astd_stop();
376  fail:
377         return error;
378 }
379
380 static void threads_stop(void)
381 {
382         dlm_scand_stop();
383         dlm_lowcomms_stop();
384         dlm_astd_stop();
385 }
386
387 static int new_lockspace(const char *name, int namelen, void **lockspace,
388                          uint32_t flags, int lvblen)
389 {
390         struct dlm_ls *ls;
391         int i, size, error;
392         int do_unreg = 0;
393
394         if (namelen > DLM_LOCKSPACE_LEN)
395                 return -EINVAL;
396
397         if (!lvblen || (lvblen % 8))
398                 return -EINVAL;
399
400         if (!try_module_get(THIS_MODULE))
401                 return -EINVAL;
402
403         if (!dlm_user_daemon_available()) {
404                 module_put(THIS_MODULE);
405                 return -EUNATCH;
406         }
407
408         error = 0;
409
410         spin_lock(&lslist_lock);
411         list_for_each_entry(ls, &lslist, ls_list) {
412                 WARN_ON(ls->ls_create_count <= 0);
413                 if (ls->ls_namelen != namelen)
414                         continue;
415                 if (memcmp(ls->ls_name, name, namelen))
416                         continue;
417                 if (flags & DLM_LSFL_NEWEXCL) {
418                         error = -EEXIST;
419                         break;
420                 }
421                 ls->ls_create_count++;
422                 *lockspace = ls;
423                 error = 1;
424                 break;
425         }
426         spin_unlock(&lslist_lock);
427
428         if (error)
429                 goto out;
430
431         error = -ENOMEM;
432
433         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
434         if (!ls)
435                 goto out;
436         memcpy(ls->ls_name, name, namelen);
437         ls->ls_namelen = namelen;
438         ls->ls_lvblen = lvblen;
439         ls->ls_count = 0;
440         ls->ls_flags = 0;
441         ls->ls_scan_time = jiffies;
442
443         if (flags & DLM_LSFL_TIMEWARN)
444                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445
446         /* ls_exflags are forced to match among nodes, and we don't
447            need to require all nodes to have some flags set */
448         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
449                                     DLM_LSFL_NEWEXCL));
450
451         size = dlm_config.ci_rsbtbl_size;
452         ls->ls_rsbtbl_size = size;
453
454         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
455         if (!ls->ls_rsbtbl)
456                 goto out_lsfree;
457         for (i = 0; i < size; i++) {
458                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
459                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
460                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
461         }
462
463         size = dlm_config.ci_lkbtbl_size;
464         ls->ls_lkbtbl_size = size;
465
466         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
467         if (!ls->ls_lkbtbl)
468                 goto out_rsbfree;
469         for (i = 0; i < size; i++) {
470                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
471                 rwlock_init(&ls->ls_lkbtbl[i].lock);
472                 ls->ls_lkbtbl[i].counter = 1;
473         }
474
475         size = dlm_config.ci_dirtbl_size;
476         ls->ls_dirtbl_size = size;
477
478         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
479         if (!ls->ls_dirtbl)
480                 goto out_lkbfree;
481         for (i = 0; i < size; i++) {
482                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
483                 spin_lock_init(&ls->ls_dirtbl[i].lock);
484         }
485
486         INIT_LIST_HEAD(&ls->ls_waiters);
487         mutex_init(&ls->ls_waiters_mutex);
488         INIT_LIST_HEAD(&ls->ls_orphans);
489         mutex_init(&ls->ls_orphans_mutex);
490         INIT_LIST_HEAD(&ls->ls_timeout);
491         mutex_init(&ls->ls_timeout_mutex);
492
493         INIT_LIST_HEAD(&ls->ls_nodes);
494         INIT_LIST_HEAD(&ls->ls_nodes_gone);
495         ls->ls_num_nodes = 0;
496         ls->ls_low_nodeid = 0;
497         ls->ls_total_weight = 0;
498         ls->ls_node_array = NULL;
499
500         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
501         ls->ls_stub_rsb.res_ls = ls;
502
503         ls->ls_debug_rsb_dentry = NULL;
504         ls->ls_debug_waiters_dentry = NULL;
505
506         init_waitqueue_head(&ls->ls_uevent_wait);
507         ls->ls_uevent_result = 0;
508         init_completion(&ls->ls_members_done);
509         ls->ls_members_result = -1;
510
511         ls->ls_recoverd_task = NULL;
512         mutex_init(&ls->ls_recoverd_active);
513         spin_lock_init(&ls->ls_recover_lock);
514         spin_lock_init(&ls->ls_rcom_spin);
515         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
516         ls->ls_recover_status = 0;
517         ls->ls_recover_seq = 0;
518         ls->ls_recover_args = NULL;
519         init_rwsem(&ls->ls_in_recovery);
520         init_rwsem(&ls->ls_recv_active);
521         INIT_LIST_HEAD(&ls->ls_requestqueue);
522         mutex_init(&ls->ls_requestqueue_mutex);
523         mutex_init(&ls->ls_clear_proc_locks);
524
525         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
526         if (!ls->ls_recover_buf)
527                 goto out_dirfree;
528
529         INIT_LIST_HEAD(&ls->ls_recover_list);
530         spin_lock_init(&ls->ls_recover_list_lock);
531         ls->ls_recover_list_count = 0;
532         ls->ls_local_handle = ls;
533         init_waitqueue_head(&ls->ls_wait_general);
534         INIT_LIST_HEAD(&ls->ls_root_list);
535         init_rwsem(&ls->ls_root_sem);
536
537         down_write(&ls->ls_in_recovery);
538
539         spin_lock(&lslist_lock);
540         ls->ls_create_count = 1;
541         list_add(&ls->ls_list, &lslist);
542         spin_unlock(&lslist_lock);
543
544         /* needs to find ls in lslist */
545         error = dlm_recoverd_start(ls);
546         if (error) {
547                 log_error(ls, "can't start dlm_recoverd %d", error);
548                 goto out_delist;
549         }
550
551         ls->ls_kobj.kset = dlm_kset;
552         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
553                                      "%s", ls->ls_name);
554         if (error)
555                 goto out_stop;
556         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
557
558         /* let kobject handle freeing of ls if there's an error */
559         do_unreg = 1;
560
561         /* This uevent triggers dlm_controld in userspace to add us to the
562            group of nodes that are members of this lockspace (managed by the
563            cluster infrastructure.)  Once it's done that, it tells us who the
564            current lockspace members are (via configfs) and then tells the
565            lockspace to start running (via sysfs) in dlm_ls_start(). */
566
567         error = do_uevent(ls, 1);
568         if (error)
569                 goto out_stop;
570
571         wait_for_completion(&ls->ls_members_done);
572         error = ls->ls_members_result;
573         if (error)
574                 goto out_members;
575
576         dlm_create_debug_file(ls);
577
578         log_debug(ls, "join complete");
579         *lockspace = ls;
580         return 0;
581
582  out_members:
583         do_uevent(ls, 0);
584         dlm_clear_members(ls);
585         kfree(ls->ls_node_array);
586  out_stop:
587         dlm_recoverd_stop(ls);
588  out_delist:
589         spin_lock(&lslist_lock);
590         list_del(&ls->ls_list);
591         spin_unlock(&lslist_lock);
592         kfree(ls->ls_recover_buf);
593  out_dirfree:
594         kfree(ls->ls_dirtbl);
595  out_lkbfree:
596         kfree(ls->ls_lkbtbl);
597  out_rsbfree:
598         kfree(ls->ls_rsbtbl);
599  out_lsfree:
600         if (do_unreg)
601                 kobject_put(&ls->ls_kobj);
602         else
603                 kfree(ls);
604  out:
605         module_put(THIS_MODULE);
606         return error;
607 }
608
609 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
610                       uint32_t flags, int lvblen)
611 {
612         int error = 0;
613
614         mutex_lock(&ls_lock);
615         if (!ls_count)
616                 error = threads_start();
617         if (error)
618                 goto out;
619
620         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
621         if (!error)
622                 ls_count++;
623         if (error > 0)
624                 error = 0;
625         if (!ls_count)
626                 threads_stop();
627  out:
628         mutex_unlock(&ls_lock);
629         return error;
630 }
631
632 /* Return 1 if the lockspace still has active remote locks,
633  *        2 if the lockspace still has active local locks.
634  */
635 static int lockspace_busy(struct dlm_ls *ls)
636 {
637         int i, lkb_found = 0;
638         struct dlm_lkb *lkb;
639
640         /* NOTE: We check the lockidtbl here rather than the resource table.
641            This is because there may be LKBs queued as ASTs that have been
642            unlinked from their RSBs and are pending deletion once the AST has
643            been delivered */
644
645         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
646                 read_lock(&ls->ls_lkbtbl[i].lock);
647                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
648                         lkb_found = 1;
649                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
650                                             lkb_idtbl_list) {
651                                 if (!lkb->lkb_nodeid) {
652                                         read_unlock(&ls->ls_lkbtbl[i].lock);
653                                         return 2;
654                                 }
655                         }
656                 }
657                 read_unlock(&ls->ls_lkbtbl[i].lock);
658         }
659         return lkb_found;
660 }
661
662 static int release_lockspace(struct dlm_ls *ls, int force)
663 {
664         struct dlm_lkb *lkb;
665         struct dlm_rsb *rsb;
666         struct list_head *head;
667         int i, busy, rv;
668
669         busy = lockspace_busy(ls);
670
671         spin_lock(&lslist_lock);
672         if (ls->ls_create_count == 1) {
673                 if (busy > force)
674                         rv = -EBUSY;
675                 else {
676                         /* remove_lockspace takes ls off lslist */
677                         ls->ls_create_count = 0;
678                         rv = 0;
679                 }
680         } else if (ls->ls_create_count > 1) {
681                 rv = --ls->ls_create_count;
682         } else {
683                 rv = -EINVAL;
684         }
685         spin_unlock(&lslist_lock);
686
687         if (rv) {
688                 log_debug(ls, "release_lockspace no remove %d", rv);
689                 return rv;
690         }
691
692         dlm_device_deregister(ls);
693
694         if (force < 3 && dlm_user_daemon_available())
695                 do_uevent(ls, 0);
696
697         dlm_recoverd_stop(ls);
698
699         remove_lockspace(ls);
700
701         dlm_delete_debug_file(ls);
702
703         dlm_astd_suspend();
704
705         kfree(ls->ls_recover_buf);
706
707         /*
708          * Free direntry structs.
709          */
710
711         dlm_dir_clear(ls);
712         kfree(ls->ls_dirtbl);
713
714         /*
715          * Free all lkb's on lkbtbl[] lists.
716          */
717
718         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
719                 head = &ls->ls_lkbtbl[i].list;
720                 while (!list_empty(head)) {
721                         lkb = list_entry(head->next, struct dlm_lkb,
722                                          lkb_idtbl_list);
723
724                         list_del(&lkb->lkb_idtbl_list);
725
726                         dlm_del_ast(lkb);
727
728                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729                                 dlm_free_lvb(lkb->lkb_lvbptr);
730
731                         dlm_free_lkb(lkb);
732                 }
733         }
734         dlm_astd_resume();
735
736         kfree(ls->ls_lkbtbl);
737
738         /*
739          * Free all rsb's on rsbtbl[] lists
740          */
741
742         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
743                 head = &ls->ls_rsbtbl[i].list;
744                 while (!list_empty(head)) {
745                         rsb = list_entry(head->next, struct dlm_rsb,
746                                          res_hashchain);
747
748                         list_del(&rsb->res_hashchain);
749                         dlm_free_rsb(rsb);
750                 }
751
752                 head = &ls->ls_rsbtbl[i].toss;
753                 while (!list_empty(head)) {
754                         rsb = list_entry(head->next, struct dlm_rsb,
755                                          res_hashchain);
756                         list_del(&rsb->res_hashchain);
757                         dlm_free_rsb(rsb);
758                 }
759         }
760
761         kfree(ls->ls_rsbtbl);
762
763         /*
764          * Free structures on any other lists
765          */
766
767         dlm_purge_requestqueue(ls);
768         kfree(ls->ls_recover_args);
769         dlm_clear_free_entries(ls);
770         dlm_clear_members(ls);
771         dlm_clear_members_gone(ls);
772         kfree(ls->ls_node_array);
773         log_debug(ls, "release_lockspace final free");
774         kobject_put(&ls->ls_kobj);
775         /* The ls structure will be freed when the kobject is done with */
776
777         module_put(THIS_MODULE);
778         return 0;
779 }
780
781 /*
782  * Called when a system has released all its locks and is not going to use the
783  * lockspace any longer.  We free everything we're managing for this lockspace.
784  * Remaining nodes will go through the recovery process as if we'd died.  The
785  * lockspace must continue to function as usual, participating in recoveries,
786  * until this returns.
787  *
788  * Force has 4 possible values:
789  * 0 - don't destroy locksapce if it has any LKBs
790  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
791  * 2 - destroy lockspace regardless of LKBs
792  * 3 - destroy lockspace as part of a forced shutdown
793  */
794
795 int dlm_release_lockspace(void *lockspace, int force)
796 {
797         struct dlm_ls *ls;
798         int error;
799
800         ls = dlm_find_lockspace_local(lockspace);
801         if (!ls)
802                 return -EINVAL;
803         dlm_put_lockspace(ls);
804
805         mutex_lock(&ls_lock);
806         error = release_lockspace(ls, force);
807         if (!error)
808                 ls_count--;
809         if (!ls_count)
810                 threads_stop();
811         mutex_unlock(&ls_lock);
812
813         return error;
814 }
815
816 void dlm_stop_lockspaces(void)
817 {
818         struct dlm_ls *ls;
819
820  restart:
821         spin_lock(&lslist_lock);
822         list_for_each_entry(ls, &lslist, ls_list) {
823                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
824                         continue;
825                 spin_unlock(&lslist_lock);
826                 log_error(ls, "no userland control daemon, stopping lockspace");
827                 dlm_ls_stop(ls);
828                 goto restart;
829         }
830         spin_unlock(&lslist_lock);
831 }
832