dlm: detect available userspace daemon
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194
195 int __init dlm_lockspace_init(void)
196 {
197         ls_count = 0;
198         mutex_init(&ls_lock);
199         INIT_LIST_HEAD(&lslist);
200         spin_lock_init(&lslist_lock);
201
202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203         if (!dlm_kset) {
204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205                 return -ENOMEM;
206         }
207         return 0;
208 }
209
210 void dlm_lockspace_exit(void)
211 {
212         kset_unregister(dlm_kset);
213 }
214
215 static int dlm_scand(void *data)
216 {
217         struct dlm_ls *ls;
218
219         while (!kthread_should_stop()) {
220                 list_for_each_entry(ls, &lslist, ls_list) {
221                         if (dlm_lock_recovery_try(ls)) {
222                                 dlm_scan_rsbs(ls);
223                                 dlm_scan_timeout(ls);
224                                 dlm_unlock_recovery(ls);
225                         }
226                 }
227                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
228         }
229         return 0;
230 }
231
232 static int dlm_scand_start(void)
233 {
234         struct task_struct *p;
235         int error = 0;
236
237         p = kthread_run(dlm_scand, NULL, "dlm_scand");
238         if (IS_ERR(p))
239                 error = PTR_ERR(p);
240         else
241                 scand_task = p;
242         return error;
243 }
244
245 static void dlm_scand_stop(void)
246 {
247         kthread_stop(scand_task);
248 }
249
250 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
251 {
252         struct dlm_ls *ls;
253
254         spin_lock(&lslist_lock);
255
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (ls->ls_global_id == id) {
258                         ls->ls_count++;
259                         goto out;
260                 }
261         }
262         ls = NULL;
263  out:
264         spin_unlock(&lslist_lock);
265         return ls;
266 }
267
268 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
269 {
270         struct dlm_ls *ls;
271
272         spin_lock(&lslist_lock);
273         list_for_each_entry(ls, &lslist, ls_list) {
274                 if (ls->ls_local_handle == lockspace) {
275                         ls->ls_count++;
276                         goto out;
277                 }
278         }
279         ls = NULL;
280  out:
281         spin_unlock(&lslist_lock);
282         return ls;
283 }
284
285 struct dlm_ls *dlm_find_lockspace_device(int minor)
286 {
287         struct dlm_ls *ls;
288
289         spin_lock(&lslist_lock);
290         list_for_each_entry(ls, &lslist, ls_list) {
291                 if (ls->ls_device.minor == minor) {
292                         ls->ls_count++;
293                         goto out;
294                 }
295         }
296         ls = NULL;
297  out:
298         spin_unlock(&lslist_lock);
299         return ls;
300 }
301
302 void dlm_put_lockspace(struct dlm_ls *ls)
303 {
304         spin_lock(&lslist_lock);
305         ls->ls_count--;
306         spin_unlock(&lslist_lock);
307 }
308
309 static void remove_lockspace(struct dlm_ls *ls)
310 {
311         for (;;) {
312                 spin_lock(&lslist_lock);
313                 if (ls->ls_count == 0) {
314                         WARN_ON(ls->ls_create_count != 0);
315                         list_del(&ls->ls_list);
316                         spin_unlock(&lslist_lock);
317                         return;
318                 }
319                 spin_unlock(&lslist_lock);
320                 ssleep(1);
321         }
322 }
323
324 static int threads_start(void)
325 {
326         int error;
327
328         /* Thread which process lock requests for all lockspace's */
329         error = dlm_astd_start();
330         if (error) {
331                 log_print("cannot start dlm_astd thread %d", error);
332                 goto fail;
333         }
334
335         error = dlm_scand_start();
336         if (error) {
337                 log_print("cannot start dlm_scand thread %d", error);
338                 goto astd_fail;
339         }
340
341         /* Thread for sending/receiving messages for all lockspace's */
342         error = dlm_lowcomms_start();
343         if (error) {
344                 log_print("cannot start dlm lowcomms %d", error);
345                 goto scand_fail;
346         }
347
348         return 0;
349
350  scand_fail:
351         dlm_scand_stop();
352  astd_fail:
353         dlm_astd_stop();
354  fail:
355         return error;
356 }
357
358 static void threads_stop(void)
359 {
360         dlm_scand_stop();
361         dlm_lowcomms_stop();
362         dlm_astd_stop();
363 }
364
365 static int new_lockspace(char *name, int namelen, void **lockspace,
366                          uint32_t flags, int lvblen)
367 {
368         struct dlm_ls *ls;
369         int i, size, error;
370         int do_unreg = 0;
371
372         if (namelen > DLM_LOCKSPACE_LEN)
373                 return -EINVAL;
374
375         if (!lvblen || (lvblen % 8))
376                 return -EINVAL;
377
378         if (!try_module_get(THIS_MODULE))
379                 return -EINVAL;
380
381         if (!dlm_user_daemon_available()) {
382                 module_put(THIS_MODULE);
383                 return -EUNATCH;
384         }
385
386         error = 0;
387
388         spin_lock(&lslist_lock);
389         list_for_each_entry(ls, &lslist, ls_list) {
390                 WARN_ON(ls->ls_create_count <= 0);
391                 if (ls->ls_namelen != namelen)
392                         continue;
393                 if (memcmp(ls->ls_name, name, namelen))
394                         continue;
395                 if (flags & DLM_LSFL_NEWEXCL) {
396                         error = -EEXIST;
397                         break;
398                 }
399                 ls->ls_create_count++;
400                 module_put(THIS_MODULE);
401                 error = 1; /* not an error, return 0 */
402                 break;
403         }
404         spin_unlock(&lslist_lock);
405
406         if (error < 0)
407                 goto out;
408         if (error)
409                 goto ret_zero;
410
411         error = -ENOMEM;
412
413         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
414         if (!ls)
415                 goto out;
416         memcpy(ls->ls_name, name, namelen);
417         ls->ls_namelen = namelen;
418         ls->ls_lvblen = lvblen;
419         ls->ls_count = 0;
420         ls->ls_flags = 0;
421
422         if (flags & DLM_LSFL_TIMEWARN)
423                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
424
425         if (flags & DLM_LSFL_FS)
426                 ls->ls_allocation = GFP_NOFS;
427         else
428                 ls->ls_allocation = GFP_KERNEL;
429
430         /* ls_exflags are forced to match among nodes, and we don't
431            need to require all nodes to have some flags set */
432         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
433                                     DLM_LSFL_NEWEXCL));
434
435         size = dlm_config.ci_rsbtbl_size;
436         ls->ls_rsbtbl_size = size;
437
438         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
439         if (!ls->ls_rsbtbl)
440                 goto out_lsfree;
441         for (i = 0; i < size; i++) {
442                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
443                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
444                 rwlock_init(&ls->ls_rsbtbl[i].lock);
445         }
446
447         size = dlm_config.ci_lkbtbl_size;
448         ls->ls_lkbtbl_size = size;
449
450         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
451         if (!ls->ls_lkbtbl)
452                 goto out_rsbfree;
453         for (i = 0; i < size; i++) {
454                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
455                 rwlock_init(&ls->ls_lkbtbl[i].lock);
456                 ls->ls_lkbtbl[i].counter = 1;
457         }
458
459         size = dlm_config.ci_dirtbl_size;
460         ls->ls_dirtbl_size = size;
461
462         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
463         if (!ls->ls_dirtbl)
464                 goto out_lkbfree;
465         for (i = 0; i < size; i++) {
466                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
467                 rwlock_init(&ls->ls_dirtbl[i].lock);
468         }
469
470         INIT_LIST_HEAD(&ls->ls_waiters);
471         mutex_init(&ls->ls_waiters_mutex);
472         INIT_LIST_HEAD(&ls->ls_orphans);
473         mutex_init(&ls->ls_orphans_mutex);
474         INIT_LIST_HEAD(&ls->ls_timeout);
475         mutex_init(&ls->ls_timeout_mutex);
476
477         INIT_LIST_HEAD(&ls->ls_nodes);
478         INIT_LIST_HEAD(&ls->ls_nodes_gone);
479         ls->ls_num_nodes = 0;
480         ls->ls_low_nodeid = 0;
481         ls->ls_total_weight = 0;
482         ls->ls_node_array = NULL;
483
484         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
485         ls->ls_stub_rsb.res_ls = ls;
486
487         ls->ls_debug_rsb_dentry = NULL;
488         ls->ls_debug_waiters_dentry = NULL;
489
490         init_waitqueue_head(&ls->ls_uevent_wait);
491         ls->ls_uevent_result = 0;
492         init_completion(&ls->ls_members_done);
493         ls->ls_members_result = -1;
494
495         ls->ls_recoverd_task = NULL;
496         mutex_init(&ls->ls_recoverd_active);
497         spin_lock_init(&ls->ls_recover_lock);
498         spin_lock_init(&ls->ls_rcom_spin);
499         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
500         ls->ls_recover_status = 0;
501         ls->ls_recover_seq = 0;
502         ls->ls_recover_args = NULL;
503         init_rwsem(&ls->ls_in_recovery);
504         init_rwsem(&ls->ls_recv_active);
505         INIT_LIST_HEAD(&ls->ls_requestqueue);
506         mutex_init(&ls->ls_requestqueue_mutex);
507         mutex_init(&ls->ls_clear_proc_locks);
508
509         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
510         if (!ls->ls_recover_buf)
511                 goto out_dirfree;
512
513         INIT_LIST_HEAD(&ls->ls_recover_list);
514         spin_lock_init(&ls->ls_recover_list_lock);
515         ls->ls_recover_list_count = 0;
516         ls->ls_local_handle = ls;
517         init_waitqueue_head(&ls->ls_wait_general);
518         INIT_LIST_HEAD(&ls->ls_root_list);
519         init_rwsem(&ls->ls_root_sem);
520
521         down_write(&ls->ls_in_recovery);
522
523         spin_lock(&lslist_lock);
524         ls->ls_create_count = 1;
525         list_add(&ls->ls_list, &lslist);
526         spin_unlock(&lslist_lock);
527
528         /* needs to find ls in lslist */
529         error = dlm_recoverd_start(ls);
530         if (error) {
531                 log_error(ls, "can't start dlm_recoverd %d", error);
532                 goto out_delist;
533         }
534
535         ls->ls_kobj.kset = dlm_kset;
536         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
537                                      "%s", ls->ls_name);
538         if (error)
539                 goto out_stop;
540         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
541
542         /* let kobject handle freeing of ls if there's an error */
543         do_unreg = 1;
544
545         /* This uevent triggers dlm_controld in userspace to add us to the
546            group of nodes that are members of this lockspace (managed by the
547            cluster infrastructure.)  Once it's done that, it tells us who the
548            current lockspace members are (via configfs) and then tells the
549            lockspace to start running (via sysfs) in dlm_ls_start(). */
550
551         error = do_uevent(ls, 1);
552         if (error)
553                 goto out_stop;
554
555         wait_for_completion(&ls->ls_members_done);
556         error = ls->ls_members_result;
557         if (error)
558                 goto out_members;
559
560         dlm_create_debug_file(ls);
561
562         log_debug(ls, "join complete");
563  ret_zero:
564         *lockspace = ls;
565         return 0;
566
567  out_members:
568         do_uevent(ls, 0);
569         dlm_clear_members(ls);
570         kfree(ls->ls_node_array);
571  out_stop:
572         dlm_recoverd_stop(ls);
573  out_delist:
574         spin_lock(&lslist_lock);
575         list_del(&ls->ls_list);
576         spin_unlock(&lslist_lock);
577         kfree(ls->ls_recover_buf);
578  out_dirfree:
579         kfree(ls->ls_dirtbl);
580  out_lkbfree:
581         kfree(ls->ls_lkbtbl);
582  out_rsbfree:
583         kfree(ls->ls_rsbtbl);
584  out_lsfree:
585         if (do_unreg)
586                 kobject_put(&ls->ls_kobj);
587         else
588                 kfree(ls);
589  out:
590         module_put(THIS_MODULE);
591         return error;
592 }
593
594 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
595                       uint32_t flags, int lvblen)
596 {
597         int error = 0;
598
599         mutex_lock(&ls_lock);
600         if (!ls_count)
601                 error = threads_start();
602         if (error)
603                 goto out;
604
605         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
606         if (!error)
607                 ls_count++;
608         else if (!ls_count)
609                 threads_stop();
610  out:
611         mutex_unlock(&ls_lock);
612         return error;
613 }
614
615 /* Return 1 if the lockspace still has active remote locks,
616  *        2 if the lockspace still has active local locks.
617  */
618 static int lockspace_busy(struct dlm_ls *ls)
619 {
620         int i, lkb_found = 0;
621         struct dlm_lkb *lkb;
622
623         /* NOTE: We check the lockidtbl here rather than the resource table.
624            This is because there may be LKBs queued as ASTs that have been
625            unlinked from their RSBs and are pending deletion once the AST has
626            been delivered */
627
628         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
629                 read_lock(&ls->ls_lkbtbl[i].lock);
630                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
631                         lkb_found = 1;
632                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
633                                             lkb_idtbl_list) {
634                                 if (!lkb->lkb_nodeid) {
635                                         read_unlock(&ls->ls_lkbtbl[i].lock);
636                                         return 2;
637                                 }
638                         }
639                 }
640                 read_unlock(&ls->ls_lkbtbl[i].lock);
641         }
642         return lkb_found;
643 }
644
645 static int release_lockspace(struct dlm_ls *ls, int force)
646 {
647         struct dlm_lkb *lkb;
648         struct dlm_rsb *rsb;
649         struct list_head *head;
650         int i, busy, rv;
651
652         busy = lockspace_busy(ls);
653
654         spin_lock(&lslist_lock);
655         if (ls->ls_create_count == 1) {
656                 if (busy > force)
657                         rv = -EBUSY;
658                 else {
659                         /* remove_lockspace takes ls off lslist */
660                         ls->ls_create_count = 0;
661                         rv = 0;
662                 }
663         } else if (ls->ls_create_count > 1) {
664                 rv = --ls->ls_create_count;
665         } else {
666                 rv = -EINVAL;
667         }
668         spin_unlock(&lslist_lock);
669
670         if (rv) {
671                 log_debug(ls, "release_lockspace no remove %d", rv);
672                 return rv;
673         }
674
675         dlm_device_deregister(ls);
676
677         if (force < 3 && dlm_user_daemon_available())
678                 do_uevent(ls, 0);
679
680         dlm_recoverd_stop(ls);
681
682         remove_lockspace(ls);
683
684         dlm_delete_debug_file(ls);
685
686         dlm_astd_suspend();
687
688         kfree(ls->ls_recover_buf);
689
690         /*
691          * Free direntry structs.
692          */
693
694         dlm_dir_clear(ls);
695         kfree(ls->ls_dirtbl);
696
697         /*
698          * Free all lkb's on lkbtbl[] lists.
699          */
700
701         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
702                 head = &ls->ls_lkbtbl[i].list;
703                 while (!list_empty(head)) {
704                         lkb = list_entry(head->next, struct dlm_lkb,
705                                          lkb_idtbl_list);
706
707                         list_del(&lkb->lkb_idtbl_list);
708
709                         dlm_del_ast(lkb);
710
711                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
712                                 dlm_free_lvb(lkb->lkb_lvbptr);
713
714                         dlm_free_lkb(lkb);
715                 }
716         }
717         dlm_astd_resume();
718
719         kfree(ls->ls_lkbtbl);
720
721         /*
722          * Free all rsb's on rsbtbl[] lists
723          */
724
725         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
726                 head = &ls->ls_rsbtbl[i].list;
727                 while (!list_empty(head)) {
728                         rsb = list_entry(head->next, struct dlm_rsb,
729                                          res_hashchain);
730
731                         list_del(&rsb->res_hashchain);
732                         dlm_free_rsb(rsb);
733                 }
734
735                 head = &ls->ls_rsbtbl[i].toss;
736                 while (!list_empty(head)) {
737                         rsb = list_entry(head->next, struct dlm_rsb,
738                                          res_hashchain);
739                         list_del(&rsb->res_hashchain);
740                         dlm_free_rsb(rsb);
741                 }
742         }
743
744         kfree(ls->ls_rsbtbl);
745
746         /*
747          * Free structures on any other lists
748          */
749
750         dlm_purge_requestqueue(ls);
751         kfree(ls->ls_recover_args);
752         dlm_clear_free_entries(ls);
753         dlm_clear_members(ls);
754         dlm_clear_members_gone(ls);
755         kfree(ls->ls_node_array);
756         log_debug(ls, "release_lockspace final free");
757         kobject_put(&ls->ls_kobj);
758         /* The ls structure will be freed when the kobject is done with */
759
760         module_put(THIS_MODULE);
761         return 0;
762 }
763
764 /*
765  * Called when a system has released all its locks and is not going to use the
766  * lockspace any longer.  We free everything we're managing for this lockspace.
767  * Remaining nodes will go through the recovery process as if we'd died.  The
768  * lockspace must continue to function as usual, participating in recoveries,
769  * until this returns.
770  *
771  * Force has 4 possible values:
772  * 0 - don't destroy locksapce if it has any LKBs
773  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
774  * 2 - destroy lockspace regardless of LKBs
775  * 3 - destroy lockspace as part of a forced shutdown
776  */
777
778 int dlm_release_lockspace(void *lockspace, int force)
779 {
780         struct dlm_ls *ls;
781         int error;
782
783         ls = dlm_find_lockspace_local(lockspace);
784         if (!ls)
785                 return -EINVAL;
786         dlm_put_lockspace(ls);
787
788         mutex_lock(&ls_lock);
789         error = release_lockspace(ls, force);
790         if (!error)
791                 ls_count--;
792         else if (!ls_count)
793                 threads_stop();
794         mutex_unlock(&ls_lock);
795
796         return error;
797 }
798
799 void dlm_stop_lockspaces(void)
800 {
801         struct dlm_ls *ls;
802
803  restart:
804         spin_lock(&lslist_lock);
805         list_for_each_entry(ls, &lslist, ls_list) {
806                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
807                         continue;
808                 spin_unlock(&lslist_lock);
809                 log_error(ls, "no userland control daemon, stopping lockspace");
810                 dlm_ls_stop(ls);
811                 goto restart;
812         }
813         spin_unlock(&lslist_lock);
814 }
815