[DLM] block dlm_recv in recovery transition
[safe/jmp/linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
201
202         /* dlm_controld will see the uevent, do the necessary group management
203            and then write to sysfs to wake us */
204
205         error = wait_event_interruptible(ls->ls_uevent_wait,
206                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
207
208         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
209
210         if (error)
211                 goto out;
212
213         error = ls->ls_uevent_result;
214  out:
215         if (error)
216                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217                           error, ls->ls_uevent_result);
218         return error;
219 }
220
221
222 int dlm_lockspace_init(void)
223 {
224         int error;
225
226         ls_count = 0;
227         mutex_init(&ls_lock);
228         INIT_LIST_HEAD(&lslist);
229         spin_lock_init(&lslist_lock);
230
231         kobj_set_kset_s(&dlm_kset, kernel_subsys);
232         error = kset_register(&dlm_kset);
233         if (error)
234                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
235         return error;
236 }
237
238 void dlm_lockspace_exit(void)
239 {
240         kset_unregister(&dlm_kset);
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 list_for_each_entry(ls, &lslist, ls_list) {
249                         if (dlm_lock_recovery_try(ls)) {
250                                 dlm_scan_rsbs(ls);
251                                 dlm_scan_timeout(ls);
252                                 dlm_unlock_recovery(ls);
253                         }
254                 }
255                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256         }
257         return 0;
258 }
259
260 static int dlm_scand_start(void)
261 {
262         struct task_struct *p;
263         int error = 0;
264
265         p = kthread_run(dlm_scand, NULL, "dlm_scand");
266         if (IS_ERR(p))
267                 error = PTR_ERR(p);
268         else
269                 scand_task = p;
270         return error;
271 }
272
273 static void dlm_scand_stop(void)
274 {
275         kthread_stop(scand_task);
276 }
277
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280         struct dlm_ls *ls;
281
282         spin_lock(&lslist_lock);
283
284         list_for_each_entry(ls, &lslist, ls_list) {
285                 if (ls->ls_namelen == namelen &&
286                     memcmp(ls->ls_name, name, namelen) == 0)
287                         goto out;
288         }
289         ls = NULL;
290  out:
291         spin_unlock(&lslist_lock);
292         return ls;
293 }
294
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297         struct dlm_ls *ls;
298
299         spin_lock(&lslist_lock);
300
301         list_for_each_entry(ls, &lslist, ls_list) {
302                 if (ls->ls_global_id == id) {
303                         ls->ls_count++;
304                         goto out;
305                 }
306         }
307         ls = NULL;
308  out:
309         spin_unlock(&lslist_lock);
310         return ls;
311 }
312
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315         struct dlm_ls *ls;
316
317         spin_lock(&lslist_lock);
318         list_for_each_entry(ls, &lslist, ls_list) {
319                 if (ls->ls_local_handle == lockspace) {
320                         ls->ls_count++;
321                         goto out;
322                 }
323         }
324         ls = NULL;
325  out:
326         spin_unlock(&lslist_lock);
327         return ls;
328 }
329
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332         struct dlm_ls *ls;
333
334         spin_lock(&lslist_lock);
335         list_for_each_entry(ls, &lslist, ls_list) {
336                 if (ls->ls_device.minor == minor) {
337                         ls->ls_count++;
338                         goto out;
339                 }
340         }
341         ls = NULL;
342  out:
343         spin_unlock(&lslist_lock);
344         return ls;
345 }
346
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349         spin_lock(&lslist_lock);
350         ls->ls_count--;
351         spin_unlock(&lslist_lock);
352 }
353
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356         for (;;) {
357                 spin_lock(&lslist_lock);
358                 if (ls->ls_count == 0) {
359                         list_del(&ls->ls_list);
360                         spin_unlock(&lslist_lock);
361                         return;
362                 }
363                 spin_unlock(&lslist_lock);
364                 ssleep(1);
365         }
366 }
367
368 static int threads_start(void)
369 {
370         int error;
371
372         /* Thread which process lock requests for all lockspace's */
373         error = dlm_astd_start();
374         if (error) {
375                 log_print("cannot start dlm_astd thread %d", error);
376                 goto fail;
377         }
378
379         error = dlm_scand_start();
380         if (error) {
381                 log_print("cannot start dlm_scand thread %d", error);
382                 goto astd_fail;
383         }
384
385         /* Thread for sending/receiving messages for all lockspace's */
386         error = dlm_lowcomms_start();
387         if (error) {
388                 log_print("cannot start dlm lowcomms %d", error);
389                 goto scand_fail;
390         }
391
392         return 0;
393
394  scand_fail:
395         dlm_scand_stop();
396  astd_fail:
397         dlm_astd_stop();
398  fail:
399         return error;
400 }
401
402 static void threads_stop(void)
403 {
404         dlm_scand_stop();
405         dlm_lowcomms_stop();
406         dlm_astd_stop();
407 }
408
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410                          uint32_t flags, int lvblen)
411 {
412         struct dlm_ls *ls;
413         int i, size, error = -ENOMEM;
414         int do_unreg = 0;
415
416         if (namelen > DLM_LOCKSPACE_LEN)
417                 return -EINVAL;
418
419         if (!lvblen || (lvblen % 8))
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         ls = dlm_find_lockspace_name(name, namelen);
426         if (ls) {
427                 *lockspace = ls;
428                 module_put(THIS_MODULE);
429                 return -EEXIST;
430         }
431
432         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433         if (!ls)
434                 goto out;
435         memcpy(ls->ls_name, name, namelen);
436         ls->ls_namelen = namelen;
437         ls->ls_lvblen = lvblen;
438         ls->ls_count = 0;
439         ls->ls_flags = 0;
440
441         if (flags & DLM_LSFL_TIMEWARN)
442                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
443
444         if (flags & DLM_LSFL_FS)
445                 ls->ls_allocation = GFP_NOFS;
446         else
447                 ls->ls_allocation = GFP_KERNEL;
448
449         /* ls_exflags are forced to match among nodes, and we don't
450            need to require all nodes to have TIMEWARN or FS set */
451         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
452
453         size = dlm_config.ci_rsbtbl_size;
454         ls->ls_rsbtbl_size = size;
455
456         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
457         if (!ls->ls_rsbtbl)
458                 goto out_lsfree;
459         for (i = 0; i < size; i++) {
460                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462                 rwlock_init(&ls->ls_rsbtbl[i].lock);
463         }
464
465         size = dlm_config.ci_lkbtbl_size;
466         ls->ls_lkbtbl_size = size;
467
468         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
469         if (!ls->ls_lkbtbl)
470                 goto out_rsbfree;
471         for (i = 0; i < size; i++) {
472                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
473                 rwlock_init(&ls->ls_lkbtbl[i].lock);
474                 ls->ls_lkbtbl[i].counter = 1;
475         }
476
477         size = dlm_config.ci_dirtbl_size;
478         ls->ls_dirtbl_size = size;
479
480         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
481         if (!ls->ls_dirtbl)
482                 goto out_lkbfree;
483         for (i = 0; i < size; i++) {
484                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
485                 rwlock_init(&ls->ls_dirtbl[i].lock);
486         }
487
488         INIT_LIST_HEAD(&ls->ls_waiters);
489         mutex_init(&ls->ls_waiters_mutex);
490         INIT_LIST_HEAD(&ls->ls_orphans);
491         mutex_init(&ls->ls_orphans_mutex);
492         INIT_LIST_HEAD(&ls->ls_timeout);
493         mutex_init(&ls->ls_timeout_mutex);
494
495         INIT_LIST_HEAD(&ls->ls_nodes);
496         INIT_LIST_HEAD(&ls->ls_nodes_gone);
497         ls->ls_num_nodes = 0;
498         ls->ls_low_nodeid = 0;
499         ls->ls_total_weight = 0;
500         ls->ls_node_array = NULL;
501
502         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
503         ls->ls_stub_rsb.res_ls = ls;
504
505         ls->ls_debug_rsb_dentry = NULL;
506         ls->ls_debug_waiters_dentry = NULL;
507
508         init_waitqueue_head(&ls->ls_uevent_wait);
509         ls->ls_uevent_result = 0;
510         init_completion(&ls->ls_members_done);
511         ls->ls_members_result = -1;
512
513         ls->ls_recoverd_task = NULL;
514         mutex_init(&ls->ls_recoverd_active);
515         spin_lock_init(&ls->ls_recover_lock);
516         spin_lock_init(&ls->ls_rcom_spin);
517         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
518         ls->ls_recover_status = 0;
519         ls->ls_recover_seq = 0;
520         ls->ls_recover_args = NULL;
521         init_rwsem(&ls->ls_in_recovery);
522         init_rwsem(&ls->ls_recv_active);
523         INIT_LIST_HEAD(&ls->ls_requestqueue);
524         mutex_init(&ls->ls_requestqueue_mutex);
525         mutex_init(&ls->ls_clear_proc_locks);
526
527         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
528         if (!ls->ls_recover_buf)
529                 goto out_dirfree;
530
531         INIT_LIST_HEAD(&ls->ls_recover_list);
532         spin_lock_init(&ls->ls_recover_list_lock);
533         ls->ls_recover_list_count = 0;
534         ls->ls_local_handle = ls;
535         init_waitqueue_head(&ls->ls_wait_general);
536         INIT_LIST_HEAD(&ls->ls_root_list);
537         init_rwsem(&ls->ls_root_sem);
538
539         down_write(&ls->ls_in_recovery);
540
541         spin_lock(&lslist_lock);
542         list_add(&ls->ls_list, &lslist);
543         spin_unlock(&lslist_lock);
544
545         /* needs to find ls in lslist */
546         error = dlm_recoverd_start(ls);
547         if (error) {
548                 log_error(ls, "can't start dlm_recoverd %d", error);
549                 goto out_delist;
550         }
551
552         error = kobject_setup(ls);
553         if (error)
554                 goto out_stop;
555
556         error = kobject_register(&ls->ls_kobj);
557         if (error)
558                 goto out_stop;
559
560         /* let kobject handle freeing of ls if there's an error */
561         do_unreg = 1;
562
563         /* This uevent triggers dlm_controld in userspace to add us to the
564            group of nodes that are members of this lockspace (managed by the
565            cluster infrastructure.)  Once it's done that, it tells us who the
566            current lockspace members are (via configfs) and then tells the
567            lockspace to start running (via sysfs) in dlm_ls_start(). */
568
569         error = do_uevent(ls, 1);
570         if (error)
571                 goto out_stop;
572
573         wait_for_completion(&ls->ls_members_done);
574         error = ls->ls_members_result;
575         if (error)
576                 goto out_members;
577
578         dlm_create_debug_file(ls);
579
580         log_debug(ls, "join complete");
581
582         *lockspace = ls;
583         return 0;
584
585  out_members:
586         do_uevent(ls, 0);
587         dlm_clear_members(ls);
588         kfree(ls->ls_node_array);
589  out_stop:
590         dlm_recoverd_stop(ls);
591  out_delist:
592         spin_lock(&lslist_lock);
593         list_del(&ls->ls_list);
594         spin_unlock(&lslist_lock);
595         kfree(ls->ls_recover_buf);
596  out_dirfree:
597         kfree(ls->ls_dirtbl);
598  out_lkbfree:
599         kfree(ls->ls_lkbtbl);
600  out_rsbfree:
601         kfree(ls->ls_rsbtbl);
602  out_lsfree:
603         if (do_unreg)
604                 kobject_unregister(&ls->ls_kobj);
605         else
606                 kfree(ls);
607  out:
608         module_put(THIS_MODULE);
609         return error;
610 }
611
612 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
613                       uint32_t flags, int lvblen)
614 {
615         int error = 0;
616
617         mutex_lock(&ls_lock);
618         if (!ls_count)
619                 error = threads_start();
620         if (error)
621                 goto out;
622
623         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
624         if (!error)
625                 ls_count++;
626         else if (!ls_count)
627                 threads_stop();
628  out:
629         mutex_unlock(&ls_lock);
630         return error;
631 }
632
633 /* Return 1 if the lockspace still has active remote locks,
634  *        2 if the lockspace still has active local locks.
635  */
636 static int lockspace_busy(struct dlm_ls *ls)
637 {
638         int i, lkb_found = 0;
639         struct dlm_lkb *lkb;
640
641         /* NOTE: We check the lockidtbl here rather than the resource table.
642            This is because there may be LKBs queued as ASTs that have been
643            unlinked from their RSBs and are pending deletion once the AST has
644            been delivered */
645
646         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
647                 read_lock(&ls->ls_lkbtbl[i].lock);
648                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
649                         lkb_found = 1;
650                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
651                                             lkb_idtbl_list) {
652                                 if (!lkb->lkb_nodeid) {
653                                         read_unlock(&ls->ls_lkbtbl[i].lock);
654                                         return 2;
655                                 }
656                         }
657                 }
658                 read_unlock(&ls->ls_lkbtbl[i].lock);
659         }
660         return lkb_found;
661 }
662
663 static int release_lockspace(struct dlm_ls *ls, int force)
664 {
665         struct dlm_lkb *lkb;
666         struct dlm_rsb *rsb;
667         struct list_head *head;
668         int i;
669         int busy = lockspace_busy(ls);
670
671         if (busy > force)
672                 return -EBUSY;
673
674         if (force < 3)
675                 do_uevent(ls, 0);
676
677         dlm_recoverd_stop(ls);
678
679         remove_lockspace(ls);
680
681         dlm_delete_debug_file(ls);
682
683         dlm_astd_suspend();
684
685         kfree(ls->ls_recover_buf);
686
687         /*
688          * Free direntry structs.
689          */
690
691         dlm_dir_clear(ls);
692         kfree(ls->ls_dirtbl);
693
694         /*
695          * Free all lkb's on lkbtbl[] lists.
696          */
697
698         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
699                 head = &ls->ls_lkbtbl[i].list;
700                 while (!list_empty(head)) {
701                         lkb = list_entry(head->next, struct dlm_lkb,
702                                          lkb_idtbl_list);
703
704                         list_del(&lkb->lkb_idtbl_list);
705
706                         dlm_del_ast(lkb);
707
708                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
709                                 free_lvb(lkb->lkb_lvbptr);
710
711                         free_lkb(lkb);
712                 }
713         }
714         dlm_astd_resume();
715
716         kfree(ls->ls_lkbtbl);
717
718         /*
719          * Free all rsb's on rsbtbl[] lists
720          */
721
722         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
723                 head = &ls->ls_rsbtbl[i].list;
724                 while (!list_empty(head)) {
725                         rsb = list_entry(head->next, struct dlm_rsb,
726                                          res_hashchain);
727
728                         list_del(&rsb->res_hashchain);
729                         free_rsb(rsb);
730                 }
731
732                 head = &ls->ls_rsbtbl[i].toss;
733                 while (!list_empty(head)) {
734                         rsb = list_entry(head->next, struct dlm_rsb,
735                                          res_hashchain);
736                         list_del(&rsb->res_hashchain);
737                         free_rsb(rsb);
738                 }
739         }
740
741         kfree(ls->ls_rsbtbl);
742
743         /*
744          * Free structures on any other lists
745          */
746
747         dlm_purge_requestqueue(ls);
748         kfree(ls->ls_recover_args);
749         dlm_clear_free_entries(ls);
750         dlm_clear_members(ls);
751         dlm_clear_members_gone(ls);
752         kfree(ls->ls_node_array);
753         kobject_unregister(&ls->ls_kobj);
754         /* The ls structure will be freed when the kobject is done with */
755
756         mutex_lock(&ls_lock);
757         ls_count--;
758         if (!ls_count)
759                 threads_stop();
760         mutex_unlock(&ls_lock);
761
762         module_put(THIS_MODULE);
763         return 0;
764 }
765
766 /*
767  * Called when a system has released all its locks and is not going to use the
768  * lockspace any longer.  We free everything we're managing for this lockspace.
769  * Remaining nodes will go through the recovery process as if we'd died.  The
770  * lockspace must continue to function as usual, participating in recoveries,
771  * until this returns.
772  *
773  * Force has 4 possible values:
774  * 0 - don't destroy locksapce if it has any LKBs
775  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
776  * 2 - destroy lockspace regardless of LKBs
777  * 3 - destroy lockspace as part of a forced shutdown
778  */
779
780 int dlm_release_lockspace(void *lockspace, int force)
781 {
782         struct dlm_ls *ls;
783
784         ls = dlm_find_lockspace_local(lockspace);
785         if (!ls)
786                 return -EINVAL;
787         dlm_put_lockspace(ls);
788         return release_lockspace(ls, force);
789 }
790