a818fde244763aef7f0db52a135822435883f2b1
[safe/jmp/linux-2.6] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdebug.h"
45 #include "dlmdomain.h"
46
47 #include "dlmver.h"
48
49 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50 #include "cluster/masklog.h"
51
52 /*
53  *
54  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
55  *    dlm_domain_lock
56  *    struct dlm_ctxt->spinlock
57  *    struct dlm_lock_resource->spinlock
58  *    struct dlm_ctxt->master_lock
59  *    struct dlm_ctxt->ast_lock
60  *    dlm_master_list_entry->spinlock
61  *    dlm_lock->spinlock
62  *
63  */
64
65 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
66 LIST_HEAD(dlm_domains);
67 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
68
69 #define DLM_DOMAIN_BACKOFF_MS 200
70
71 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
72 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
73 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
74 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
75
76 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
77
78 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
79 {
80         hlist_del_init(&lockres->hash_node);
81         dlm_lockres_put(lockres);
82 }
83
84 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
85                        struct dlm_lock_resource *res)
86 {
87         struct hlist_head *bucket;
88         struct qstr *q;
89
90         assert_spin_locked(&dlm->spinlock);
91
92         q = &res->lockname;
93         bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
94
95         /* get a reference for our hashtable */
96         dlm_lockres_get(res);
97
98         hlist_add_head(&res->hash_node, bucket);
99 }
100
101 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
102                                                 const char *name,
103                                                 unsigned int len,
104                                                 unsigned int hash)
105 {
106         struct hlist_node *iter;
107         struct dlm_lock_resource *tmpres=NULL;
108         struct hlist_head *bucket;
109
110         mlog_entry("%.*s\n", len, name);
111
112         assert_spin_locked(&dlm->spinlock);
113
114         bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
115
116         /* check for pre-existing lock */
117         hlist_for_each(iter, bucket) {
118                 tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
119                 if (tmpres->lockname.len == len &&
120                     memcmp(tmpres->lockname.name, name, len) == 0) {
121                         dlm_lockres_get(tmpres);
122                         break;
123                 }
124
125                 tmpres = NULL;
126         }
127         return tmpres;
128 }
129
130 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
131                                     const char *name,
132                                     unsigned int len)
133 {
134         struct dlm_lock_resource *res;
135         unsigned int hash = dlm_lockid_hash(name, len);
136
137         spin_lock(&dlm->spinlock);
138         res = __dlm_lookup_lockres(dlm, name, len, hash);
139         spin_unlock(&dlm->spinlock);
140         return res;
141 }
142
143 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
144 {
145         struct dlm_ctxt *tmp = NULL;
146         struct list_head *iter;
147
148         assert_spin_locked(&dlm_domain_lock);
149
150         /* tmp->name here is always NULL terminated,
151          * but domain may not be! */
152         list_for_each(iter, &dlm_domains) {
153                 tmp = list_entry (iter, struct dlm_ctxt, list);
154                 if (strlen(tmp->name) == len &&
155                     memcmp(tmp->name, domain, len)==0)
156                         break;
157                 tmp = NULL;
158         }
159
160         return tmp;
161 }
162
163 /* For null terminated domain strings ONLY */
164 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
165 {
166         assert_spin_locked(&dlm_domain_lock);
167
168         return __dlm_lookup_domain_full(domain, strlen(domain));
169 }
170
171
172 /* returns true on one of two conditions:
173  * 1) the domain does not exist
174  * 2) the domain exists and it's state is "joined" */
175 static int dlm_wait_on_domain_helper(const char *domain)
176 {
177         int ret = 0;
178         struct dlm_ctxt *tmp = NULL;
179
180         spin_lock(&dlm_domain_lock);
181
182         tmp = __dlm_lookup_domain(domain);
183         if (!tmp)
184                 ret = 1;
185         else if (tmp->dlm_state == DLM_CTXT_JOINED)
186                 ret = 1;
187
188         spin_unlock(&dlm_domain_lock);
189         return ret;
190 }
191
192 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
193 {
194         if (dlm->lockres_hash)
195                 free_page((unsigned long) dlm->lockres_hash);
196
197         if (dlm->name)
198                 kfree(dlm->name);
199
200         kfree(dlm);
201 }
202
203 /* A little strange - this function will be called while holding
204  * dlm_domain_lock and is expected to be holding it on the way out. We
205  * will however drop and reacquire it multiple times */
206 static void dlm_ctxt_release(struct kref *kref)
207 {
208         struct dlm_ctxt *dlm;
209
210         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
211
212         BUG_ON(dlm->num_joins);
213         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
214
215         /* we may still be in the list if we hit an error during join. */
216         list_del_init(&dlm->list);
217
218         spin_unlock(&dlm_domain_lock);
219
220         mlog(0, "freeing memory from domain %s\n", dlm->name);
221
222         wake_up(&dlm_domain_events);
223
224         dlm_free_ctxt_mem(dlm);
225
226         spin_lock(&dlm_domain_lock);
227 }
228
229 void dlm_put(struct dlm_ctxt *dlm)
230 {
231         spin_lock(&dlm_domain_lock);
232         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
233         spin_unlock(&dlm_domain_lock);
234 }
235
236 static void __dlm_get(struct dlm_ctxt *dlm)
237 {
238         kref_get(&dlm->dlm_refs);
239 }
240
241 /* given a questionable reference to a dlm object, gets a reference if
242  * it can find it in the list, otherwise returns NULL in which case
243  * you shouldn't trust your pointer. */
244 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
245 {
246         struct list_head *iter;
247         struct dlm_ctxt *target = NULL;
248
249         spin_lock(&dlm_domain_lock);
250
251         list_for_each(iter, &dlm_domains) {
252                 target = list_entry (iter, struct dlm_ctxt, list);
253
254                 if (target == dlm) {
255                         __dlm_get(target);
256                         break;
257                 }
258
259                 target = NULL;
260         }
261
262         spin_unlock(&dlm_domain_lock);
263
264         return target;
265 }
266
267 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
268 {
269         int ret;
270
271         spin_lock(&dlm_domain_lock);
272         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
273                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
274         spin_unlock(&dlm_domain_lock);
275
276         return ret;
277 }
278
279 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
280 {
281         dlm_unregister_domain_handlers(dlm);
282         dlm_complete_thread(dlm);
283         dlm_complete_recovery_thread(dlm);
284
285         /* We've left the domain. Now we can take ourselves out of the
286          * list and allow the kref stuff to help us free the
287          * memory. */
288         spin_lock(&dlm_domain_lock);
289         list_del_init(&dlm->list);
290         spin_unlock(&dlm_domain_lock);
291
292         /* Wake up anyone waiting for us to remove this domain */
293         wake_up(&dlm_domain_events);
294 }
295
296 static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
297 {
298         int i;
299         struct dlm_lock_resource *res;
300
301         mlog(0, "Migrating locks from domain %s\n", dlm->name);
302 restart:
303         spin_lock(&dlm->spinlock);
304         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
305                 while (!hlist_empty(&dlm->lockres_hash[i])) {
306                         res = hlist_entry(dlm->lockres_hash[i].first,
307                                           struct dlm_lock_resource, hash_node);
308                         /* need reference when manually grabbing lockres */
309                         dlm_lockres_get(res);
310                         /* this should unhash the lockres
311                          * and exit with dlm->spinlock */
312                         mlog(0, "purging res=%p\n", res);
313                         if (dlm_lockres_is_dirty(dlm, res)) {
314                                 /* HACK!  this should absolutely go.
315                                  * need to figure out why some empty
316                                  * lockreses are still marked dirty */
317                                 mlog(ML_ERROR, "lockres %.*s dirty!\n",
318                                      res->lockname.len, res->lockname.name);
319
320                                 spin_unlock(&dlm->spinlock);
321                                 dlm_kick_thread(dlm, res);
322                                 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
323                                 dlm_lockres_put(res);
324                                 goto restart;
325                         }
326                         dlm_purge_lockres(dlm, res);
327                         dlm_lockres_put(res);
328                 }
329         }
330         spin_unlock(&dlm->spinlock);
331
332         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
333 }
334
335 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
336 {
337         int ret;
338
339         spin_lock(&dlm->spinlock);
340         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
341         spin_unlock(&dlm->spinlock);
342
343         return ret;
344 }
345
346 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
347 {
348         /* Yikes, a double spinlock! I need domain_lock for the dlm
349          * state and the dlm spinlock for join state... Sorry! */
350 again:
351         spin_lock(&dlm_domain_lock);
352         spin_lock(&dlm->spinlock);
353
354         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
355                 mlog(0, "Node %d is joining, we wait on it.\n",
356                           dlm->joining_node);
357                 spin_unlock(&dlm->spinlock);
358                 spin_unlock(&dlm_domain_lock);
359
360                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
361                 goto again;
362         }
363
364         dlm->dlm_state = DLM_CTXT_LEAVING;
365         spin_unlock(&dlm->spinlock);
366         spin_unlock(&dlm_domain_lock);
367 }
368
369 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
370 {
371         int node = -1;
372
373         assert_spin_locked(&dlm->spinlock);
374
375         mlog(ML_NOTICE, "Nodes in my domain (\"%s\"):\n", dlm->name);
376
377         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
378                                      node + 1)) < O2NM_MAX_NODES) {
379                 mlog(ML_NOTICE, " node %d\n", node);
380         }
381 }
382
383 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
384 {
385         struct dlm_ctxt *dlm = data;
386         unsigned int node;
387         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
388
389         mlog_entry("%p %u %p", msg, len, data);
390
391         if (!dlm_grab(dlm))
392                 return 0;
393
394         node = exit_msg->node_idx;
395
396         mlog(0, "Node %u leaves domain %s\n", node, dlm->name);
397
398         spin_lock(&dlm->spinlock);
399         clear_bit(node, dlm->domain_map);
400         __dlm_print_nodes(dlm);
401
402         /* notify anything attached to the heartbeat events */
403         dlm_hb_event_notify_attached(dlm, node, 0);
404
405         spin_unlock(&dlm->spinlock);
406
407         dlm_put(dlm);
408
409         return 0;
410 }
411
412 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
413                                     unsigned int node)
414 {
415         int status;
416         struct dlm_exit_domain leave_msg;
417
418         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
419                   node, dlm->name, dlm->node_num);
420
421         memset(&leave_msg, 0, sizeof(leave_msg));
422         leave_msg.node_idx = dlm->node_num;
423
424         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
425                                     &leave_msg, sizeof(leave_msg), node,
426                                     NULL);
427
428         mlog(0, "status return %d from o2net_send_message\n", status);
429
430         return status;
431 }
432
433
434 static void dlm_leave_domain(struct dlm_ctxt *dlm)
435 {
436         int node, clear_node, status;
437
438         /* At this point we've migrated away all our locks and won't
439          * accept mastership of new ones. The dlm is responsible for
440          * almost nothing now. We make sure not to confuse any joining
441          * nodes and then commence shutdown procedure. */
442
443         spin_lock(&dlm->spinlock);
444         /* Clear ourselves from the domain map */
445         clear_bit(dlm->node_num, dlm->domain_map);
446         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
447                                      0)) < O2NM_MAX_NODES) {
448                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
449                  * -nodes cannot be added now as the
450                  *   query_join_handlers knows to respond with OK_NO_MAP
451                  * -we catch the right network errors if a node is
452                  *   removed from the map while we're sending him the
453                  *   exit message. */
454                 spin_unlock(&dlm->spinlock);
455
456                 clear_node = 1;
457
458                 status = dlm_send_one_domain_exit(dlm, node);
459                 if (status < 0 &&
460                     status != -ENOPROTOOPT &&
461                     status != -ENOTCONN) {
462                         mlog(ML_NOTICE, "Error %d sending domain exit message "
463                              "to node %d\n", status, node);
464
465                         /* Not sure what to do here but lets sleep for
466                          * a bit in case this was a transient
467                          * error... */
468                         msleep(DLM_DOMAIN_BACKOFF_MS);
469                         clear_node = 0;
470                 }
471
472                 spin_lock(&dlm->spinlock);
473                 /* If we're not clearing the node bit then we intend
474                  * to loop back around to try again. */
475                 if (clear_node)
476                         clear_bit(node, dlm->domain_map);
477         }
478         spin_unlock(&dlm->spinlock);
479 }
480
481 int dlm_joined(struct dlm_ctxt *dlm)
482 {
483         int ret = 0;
484
485         spin_lock(&dlm_domain_lock);
486
487         if (dlm->dlm_state == DLM_CTXT_JOINED)
488                 ret = 1;
489
490         spin_unlock(&dlm_domain_lock);
491
492         return ret;
493 }
494
495 int dlm_shutting_down(struct dlm_ctxt *dlm)
496 {
497         int ret = 0;
498
499         spin_lock(&dlm_domain_lock);
500
501         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
502                 ret = 1;
503
504         spin_unlock(&dlm_domain_lock);
505
506         return ret;
507 }
508
509 void dlm_unregister_domain(struct dlm_ctxt *dlm)
510 {
511         int leave = 0;
512
513         spin_lock(&dlm_domain_lock);
514         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
515         BUG_ON(!dlm->num_joins);
516
517         dlm->num_joins--;
518         if (!dlm->num_joins) {
519                 /* We mark it "in shutdown" now so new register
520                  * requests wait until we've completely left the
521                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
522                  * want new domain joins to communicate with us at
523                  * least until we've completed migration of our
524                  * resources. */
525                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
526                 leave = 1;
527         }
528         spin_unlock(&dlm_domain_lock);
529
530         if (leave) {
531                 mlog(0, "shutting down domain %s\n", dlm->name);
532
533                 /* We changed dlm state, notify the thread */
534                 dlm_kick_thread(dlm, NULL);
535
536                 dlm_migrate_all_locks(dlm);
537                 dlm_mark_domain_leaving(dlm);
538                 dlm_leave_domain(dlm);
539                 dlm_complete_dlm_shutdown(dlm);
540         }
541         dlm_put(dlm);
542 }
543 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
544
545 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
546 {
547         struct dlm_query_join_request *query;
548         enum dlm_query_join_response response;
549         struct dlm_ctxt *dlm = NULL;
550
551         query = (struct dlm_query_join_request *) msg->buf;
552
553         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
554                   query->domain);
555
556         /*
557          * If heartbeat doesn't consider the node live, tell it
558          * to back off and try again.  This gives heartbeat a chance
559          * to catch up.
560          */
561         if (!o2hb_check_node_heartbeating(query->node_idx)) {
562                 mlog(0, "node %u is not in our live map yet\n",
563                      query->node_idx);
564
565                 response = JOIN_DISALLOW;
566                 goto respond;
567         }
568
569         response = JOIN_OK_NO_MAP;
570
571         spin_lock(&dlm_domain_lock);
572         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
573         /* Once the dlm ctxt is marked as leaving then we don't want
574          * to be put in someone's domain map. 
575          * Also, explicitly disallow joining at certain troublesome
576          * times (ie. during recovery). */
577         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
578                 int bit = query->node_idx;
579                 spin_lock(&dlm->spinlock);
580
581                 if (dlm->dlm_state == DLM_CTXT_NEW &&
582                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
583                         /*If this is a brand new context and we
584                          * haven't started our join process yet, then
585                          * the other node won the race. */
586                         response = JOIN_OK_NO_MAP;
587                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
588                         /* Disallow parallel joins. */
589                         response = JOIN_DISALLOW;
590                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
591                         mlog(ML_NOTICE, "node %u trying to join, but recovery "
592                              "is ongoing.\n", bit);
593                         response = JOIN_DISALLOW;
594                 } else if (test_bit(bit, dlm->recovery_map)) {
595                         mlog(ML_NOTICE, "node %u trying to join, but it "
596                              "still needs recovery.\n", bit);
597                         response = JOIN_DISALLOW;
598                 } else if (test_bit(bit, dlm->domain_map)) {
599                         mlog(ML_NOTICE, "node %u trying to join, but it "
600                              "is still in the domain! needs recovery?\n",
601                              bit);
602                         response = JOIN_DISALLOW;
603                 } else {
604                         /* Alright we're fully a part of this domain
605                          * so we keep some state as to who's joining
606                          * and indicate to him that needs to be fixed
607                          * up. */
608                         response = JOIN_OK;
609                         __dlm_set_joining_node(dlm, query->node_idx);
610                 }
611
612                 spin_unlock(&dlm->spinlock);
613         }
614         spin_unlock(&dlm_domain_lock);
615
616 respond:
617         mlog(0, "We respond with %u\n", response);
618
619         return response;
620 }
621
622 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
623 {
624         struct dlm_assert_joined *assert;
625         struct dlm_ctxt *dlm = NULL;
626
627         assert = (struct dlm_assert_joined *) msg->buf;
628
629         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
630                   assert->domain);
631
632         spin_lock(&dlm_domain_lock);
633         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
634         /* XXX should we consider no dlm ctxt an error? */
635         if (dlm) {
636                 spin_lock(&dlm->spinlock);
637
638                 /* Alright, this node has officially joined our
639                  * domain. Set him in the map and clean up our
640                  * leftover join state. */
641                 BUG_ON(dlm->joining_node != assert->node_idx);
642                 set_bit(assert->node_idx, dlm->domain_map);
643                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
644
645                 __dlm_print_nodes(dlm);
646
647                 /* notify anything attached to the heartbeat events */
648                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
649
650                 spin_unlock(&dlm->spinlock);
651         }
652         spin_unlock(&dlm_domain_lock);
653
654         return 0;
655 }
656
657 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
658 {
659         struct dlm_cancel_join *cancel;
660         struct dlm_ctxt *dlm = NULL;
661
662         cancel = (struct dlm_cancel_join *) msg->buf;
663
664         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
665                   cancel->domain);
666
667         spin_lock(&dlm_domain_lock);
668         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
669
670         if (dlm) {
671                 spin_lock(&dlm->spinlock);
672
673                 /* Yikes, this guy wants to cancel his join. No
674                  * problem, we simply cleanup our join state. */
675                 BUG_ON(dlm->joining_node != cancel->node_idx);
676                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
677
678                 spin_unlock(&dlm->spinlock);
679         }
680         spin_unlock(&dlm_domain_lock);
681
682         return 0;
683 }
684
685 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
686                                     unsigned int node)
687 {
688         int status;
689         struct dlm_cancel_join cancel_msg;
690
691         memset(&cancel_msg, 0, sizeof(cancel_msg));
692         cancel_msg.node_idx = dlm->node_num;
693         cancel_msg.name_len = strlen(dlm->name);
694         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
695
696         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
697                                     &cancel_msg, sizeof(cancel_msg), node,
698                                     NULL);
699         if (status < 0) {
700                 mlog_errno(status);
701                 goto bail;
702         }
703
704 bail:
705         return status;
706 }
707
708 /* map_size should be in bytes. */
709 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
710                                  unsigned long *node_map,
711                                  unsigned int map_size)
712 {
713         int status, tmpstat;
714         unsigned int node;
715
716         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
717                          sizeof(unsigned long))) {
718                 mlog(ML_ERROR,
719                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
720                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
721                 return -EINVAL;
722         }
723
724         status = 0;
725         node = -1;
726         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
727                                      node + 1)) < O2NM_MAX_NODES) {
728                 if (node == dlm->node_num)
729                         continue;
730
731                 tmpstat = dlm_send_one_join_cancel(dlm, node);
732                 if (tmpstat) {
733                         mlog(ML_ERROR, "Error return %d cancelling join on "
734                              "node %d\n", tmpstat, node);
735                         if (!status)
736                                 status = tmpstat;
737                 }
738         }
739
740         if (status)
741                 mlog_errno(status);
742         return status;
743 }
744
745 static int dlm_request_join(struct dlm_ctxt *dlm,
746                             int node,
747                             enum dlm_query_join_response *response)
748 {
749         int status, retval;
750         struct dlm_query_join_request join_msg;
751
752         mlog(0, "querying node %d\n", node);
753
754         memset(&join_msg, 0, sizeof(join_msg));
755         join_msg.node_idx = dlm->node_num;
756         join_msg.name_len = strlen(dlm->name);
757         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
758
759         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
760                                     sizeof(join_msg), node, &retval);
761         if (status < 0 && status != -ENOPROTOOPT) {
762                 mlog_errno(status);
763                 goto bail;
764         }
765
766         /* -ENOPROTOOPT from the net code means the other side isn't
767             listening for our message type -- that's fine, it means
768             his dlm isn't up, so we can consider him a 'yes' but not
769             joined into the domain.  */
770         if (status == -ENOPROTOOPT) {
771                 status = 0;
772                 *response = JOIN_OK_NO_MAP;
773         } else if (retval == JOIN_DISALLOW ||
774                    retval == JOIN_OK ||
775                    retval == JOIN_OK_NO_MAP) {
776                 *response = retval;
777         } else {
778                 status = -EINVAL;
779                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
780                      node);
781         }
782
783         mlog(0, "status %d, node %d response is %d\n", status, node,
784                   *response);
785
786 bail:
787         return status;
788 }
789
790 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
791                                     unsigned int node)
792 {
793         int status;
794         struct dlm_assert_joined assert_msg;
795
796         mlog(0, "Sending join assert to node %u\n", node);
797
798         memset(&assert_msg, 0, sizeof(assert_msg));
799         assert_msg.node_idx = dlm->node_num;
800         assert_msg.name_len = strlen(dlm->name);
801         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
802
803         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
804                                     &assert_msg, sizeof(assert_msg), node,
805                                     NULL);
806         if (status < 0)
807                 mlog_errno(status);
808
809         return status;
810 }
811
812 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
813                                   unsigned long *node_map)
814 {
815         int status, node, live;
816
817         status = 0;
818         node = -1;
819         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
820                                      node + 1)) < O2NM_MAX_NODES) {
821                 if (node == dlm->node_num)
822                         continue;
823
824                 do {
825                         /* It is very important that this message be
826                          * received so we spin until either the node
827                          * has died or it gets the message. */
828                         status = dlm_send_one_join_assert(dlm, node);
829
830                         spin_lock(&dlm->spinlock);
831                         live = test_bit(node, dlm->live_nodes_map);
832                         spin_unlock(&dlm->spinlock);
833
834                         if (status) {
835                                 mlog(ML_ERROR, "Error return %d asserting "
836                                      "join on node %d\n", status, node);
837
838                                 /* give us some time between errors... */
839                                 if (live)
840                                         msleep(DLM_DOMAIN_BACKOFF_MS);
841                         }
842                 } while (status && live);
843         }
844 }
845
846 struct domain_join_ctxt {
847         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
848         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
849 };
850
851 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
852                                    struct domain_join_ctxt *ctxt,
853                                    enum dlm_query_join_response response)
854 {
855         int ret;
856
857         if (response == JOIN_DISALLOW) {
858                 mlog(0, "Latest response of disallow -- should restart\n");
859                 return 1;
860         }
861
862         spin_lock(&dlm->spinlock);
863         /* For now, we restart the process if the node maps have
864          * changed at all */
865         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
866                      sizeof(dlm->live_nodes_map));
867         spin_unlock(&dlm->spinlock);
868
869         if (ret)
870                 mlog(0, "Node maps changed -- should restart\n");
871
872         return ret;
873 }
874
875 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
876 {
877         int status = 0, tmpstat, node;
878         struct domain_join_ctxt *ctxt;
879         enum dlm_query_join_response response;
880
881         mlog_entry("%p", dlm);
882
883         ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
884         if (!ctxt) {
885                 status = -ENOMEM;
886                 mlog_errno(status);
887                 goto bail;
888         }
889
890         /* group sem locking should work for us here -- we're already
891          * registered for heartbeat events so filling this should be
892          * atomic wrt getting those handlers called. */
893         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
894
895         spin_lock(&dlm->spinlock);
896         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
897
898         __dlm_set_joining_node(dlm, dlm->node_num);
899
900         spin_unlock(&dlm->spinlock);
901
902         node = -1;
903         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
904                                      node + 1)) < O2NM_MAX_NODES) {
905                 if (node == dlm->node_num)
906                         continue;
907
908                 status = dlm_request_join(dlm, node, &response);
909                 if (status < 0) {
910                         mlog_errno(status);
911                         goto bail;
912                 }
913
914                 /* Ok, either we got a response or the node doesn't have a
915                  * dlm up. */
916                 if (response == JOIN_OK)
917                         set_bit(node, ctxt->yes_resp_map);
918
919                 if (dlm_should_restart_join(dlm, ctxt, response)) {
920                         status = -EAGAIN;
921                         goto bail;
922                 }
923         }
924
925         mlog(0, "Yay, done querying nodes!\n");
926
927         /* Yay, everyone agree's we can join the domain. My domain is
928          * comprised of all nodes who were put in the
929          * yes_resp_map. Copy that into our domain map and send a join
930          * assert message to clean up everyone elses state. */
931         spin_lock(&dlm->spinlock);
932         memcpy(dlm->domain_map, ctxt->yes_resp_map,
933                sizeof(ctxt->yes_resp_map));
934         set_bit(dlm->node_num, dlm->domain_map);
935         spin_unlock(&dlm->spinlock);
936
937         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
938
939         /* Joined state *must* be set before the joining node
940          * information, otherwise the query_join handler may read no
941          * current joiner but a state of NEW and tell joining nodes
942          * we're not in the domain. */
943         spin_lock(&dlm_domain_lock);
944         dlm->dlm_state = DLM_CTXT_JOINED;
945         dlm->num_joins++;
946         spin_unlock(&dlm_domain_lock);
947
948 bail:
949         spin_lock(&dlm->spinlock);
950         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
951         if (!status)
952                 __dlm_print_nodes(dlm);
953         spin_unlock(&dlm->spinlock);
954
955         if (ctxt) {
956                 /* Do we need to send a cancel message to any nodes? */
957                 if (status < 0) {
958                         tmpstat = dlm_send_join_cancels(dlm,
959                                                         ctxt->yes_resp_map,
960                                                         sizeof(ctxt->yes_resp_map));
961                         if (tmpstat < 0)
962                                 mlog_errno(tmpstat);
963                 }
964                 kfree(ctxt);
965         }
966
967         mlog(0, "returning %d\n", status);
968         return status;
969 }
970
971 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
972 {
973         o2hb_unregister_callback(&dlm->dlm_hb_up);
974         o2hb_unregister_callback(&dlm->dlm_hb_down);
975         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
976 }
977
978 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
979 {
980         int status;
981
982         mlog(0, "registering handlers.\n");
983
984         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
985                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
986         status = o2hb_register_callback(&dlm->dlm_hb_down);
987         if (status)
988                 goto bail;
989
990         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
991                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
992         status = o2hb_register_callback(&dlm->dlm_hb_up);
993         if (status)
994                 goto bail;
995
996         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
997                                         sizeof(struct dlm_master_request),
998                                         dlm_master_request_handler,
999                                         dlm, &dlm->dlm_domain_handlers);
1000         if (status)
1001                 goto bail;
1002
1003         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1004                                         sizeof(struct dlm_assert_master),
1005                                         dlm_assert_master_handler,
1006                                         dlm, &dlm->dlm_domain_handlers);
1007         if (status)
1008                 goto bail;
1009
1010         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1011                                         sizeof(struct dlm_create_lock),
1012                                         dlm_create_lock_handler,
1013                                         dlm, &dlm->dlm_domain_handlers);
1014         if (status)
1015                 goto bail;
1016
1017         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1018                                         DLM_CONVERT_LOCK_MAX_LEN,
1019                                         dlm_convert_lock_handler,
1020                                         dlm, &dlm->dlm_domain_handlers);
1021         if (status)
1022                 goto bail;
1023
1024         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1025                                         DLM_UNLOCK_LOCK_MAX_LEN,
1026                                         dlm_unlock_lock_handler,
1027                                         dlm, &dlm->dlm_domain_handlers);
1028         if (status)
1029                 goto bail;
1030
1031         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1032                                         DLM_PROXY_AST_MAX_LEN,
1033                                         dlm_proxy_ast_handler,
1034                                         dlm, &dlm->dlm_domain_handlers);
1035         if (status)
1036                 goto bail;
1037
1038         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1039                                         sizeof(struct dlm_exit_domain),
1040                                         dlm_exit_domain_handler,
1041                                         dlm, &dlm->dlm_domain_handlers);
1042         if (status)
1043                 goto bail;
1044
1045         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1046                                         sizeof(struct dlm_migrate_request),
1047                                         dlm_migrate_request_handler,
1048                                         dlm, &dlm->dlm_domain_handlers);
1049         if (status)
1050                 goto bail;
1051
1052         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1053                                         DLM_MIG_LOCKRES_MAX_LEN,
1054                                         dlm_mig_lockres_handler,
1055                                         dlm, &dlm->dlm_domain_handlers);
1056         if (status)
1057                 goto bail;
1058
1059         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1060                                         sizeof(struct dlm_master_requery),
1061                                         dlm_master_requery_handler,
1062                                         dlm, &dlm->dlm_domain_handlers);
1063         if (status)
1064                 goto bail;
1065
1066         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1067                                         sizeof(struct dlm_lock_request),
1068                                         dlm_request_all_locks_handler,
1069                                         dlm, &dlm->dlm_domain_handlers);
1070         if (status)
1071                 goto bail;
1072
1073         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1074                                         sizeof(struct dlm_reco_data_done),
1075                                         dlm_reco_data_done_handler,
1076                                         dlm, &dlm->dlm_domain_handlers);
1077         if (status)
1078                 goto bail;
1079
1080         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1081                                         sizeof(struct dlm_begin_reco),
1082                                         dlm_begin_reco_handler,
1083                                         dlm, &dlm->dlm_domain_handlers);
1084         if (status)
1085                 goto bail;
1086
1087         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1088                                         sizeof(struct dlm_finalize_reco),
1089                                         dlm_finalize_reco_handler,
1090                                         dlm, &dlm->dlm_domain_handlers);
1091         if (status)
1092                 goto bail;
1093
1094 bail:
1095         if (status)
1096                 dlm_unregister_domain_handlers(dlm);
1097
1098         return status;
1099 }
1100
1101 static int dlm_join_domain(struct dlm_ctxt *dlm)
1102 {
1103         int status;
1104
1105         BUG_ON(!dlm);
1106
1107         mlog(0, "Join domain %s\n", dlm->name);
1108
1109         status = dlm_register_domain_handlers(dlm);
1110         if (status) {
1111                 mlog_errno(status);
1112                 goto bail;
1113         }
1114
1115         status = dlm_launch_thread(dlm);
1116         if (status < 0) {
1117                 mlog_errno(status);
1118                 goto bail;
1119         }
1120
1121         status = dlm_launch_recovery_thread(dlm);
1122         if (status < 0) {
1123                 mlog_errno(status);
1124                 goto bail;
1125         }
1126
1127         do {
1128                 unsigned int backoff;
1129                 status = dlm_try_to_join_domain(dlm);
1130
1131                 /* If we're racing another node to the join, then we
1132                  * need to back off temporarily and let them
1133                  * complete. */
1134                 if (status == -EAGAIN) {
1135                         if (signal_pending(current)) {
1136                                 status = -ERESTARTSYS;
1137                                 goto bail;
1138                         }
1139
1140                         /*
1141                          * <chip> After you!
1142                          * <dale> No, after you!
1143                          * <chip> I insist!
1144                          * <dale> But you first!
1145                          * ...
1146                          */
1147                         backoff = (unsigned int)(jiffies & 0x3);
1148                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1149                         mlog(0, "backoff %d\n", backoff);
1150                         msleep(backoff);
1151                 }
1152         } while (status == -EAGAIN);
1153
1154         if (status < 0) {
1155                 mlog_errno(status);
1156                 goto bail;
1157         }
1158
1159         status = 0;
1160 bail:
1161         wake_up(&dlm_domain_events);
1162
1163         if (status) {
1164                 dlm_unregister_domain_handlers(dlm);
1165                 dlm_complete_thread(dlm);
1166                 dlm_complete_recovery_thread(dlm);
1167         }
1168
1169         return status;
1170 }
1171
1172 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1173                                 u32 key)
1174 {
1175         int i;
1176         struct dlm_ctxt *dlm = NULL;
1177
1178         dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
1179         if (!dlm) {
1180                 mlog_errno(-ENOMEM);
1181                 goto leave;
1182         }
1183
1184         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1185         if (dlm->name == NULL) {
1186                 mlog_errno(-ENOMEM);
1187                 kfree(dlm);
1188                 dlm = NULL;
1189                 goto leave;
1190         }
1191
1192         dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
1193         if (!dlm->lockres_hash) {
1194                 mlog_errno(-ENOMEM);
1195                 kfree(dlm->name);
1196                 kfree(dlm);
1197                 dlm = NULL;
1198                 goto leave;
1199         }
1200
1201         for (i=0; i<DLM_HASH_BUCKETS; i++)
1202                 INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
1203
1204         strcpy(dlm->name, domain);
1205         dlm->key = key;
1206         dlm->node_num = o2nm_this_node();
1207
1208         spin_lock_init(&dlm->spinlock);
1209         spin_lock_init(&dlm->master_lock);
1210         spin_lock_init(&dlm->ast_lock);
1211         INIT_LIST_HEAD(&dlm->list);
1212         INIT_LIST_HEAD(&dlm->dirty_list);
1213         INIT_LIST_HEAD(&dlm->reco.resources);
1214         INIT_LIST_HEAD(&dlm->reco.received);
1215         INIT_LIST_HEAD(&dlm->reco.node_data);
1216         INIT_LIST_HEAD(&dlm->purge_list);
1217         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1218         dlm->reco.state = 0;
1219
1220         INIT_LIST_HEAD(&dlm->pending_asts);
1221         INIT_LIST_HEAD(&dlm->pending_basts);
1222
1223         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1224                   dlm->recovery_map, &(dlm->recovery_map[0]));
1225
1226         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1227         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1228         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1229
1230         dlm->dlm_thread_task = NULL;
1231         dlm->dlm_reco_thread_task = NULL;
1232         init_waitqueue_head(&dlm->dlm_thread_wq);
1233         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1234         init_waitqueue_head(&dlm->reco.event);
1235         init_waitqueue_head(&dlm->ast_wq);
1236         init_waitqueue_head(&dlm->migration_wq);
1237         INIT_LIST_HEAD(&dlm->master_list);
1238         INIT_LIST_HEAD(&dlm->mle_hb_events);
1239
1240         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1241         init_waitqueue_head(&dlm->dlm_join_events);
1242
1243         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1244         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1245         atomic_set(&dlm->local_resources, 0);
1246         atomic_set(&dlm->remote_resources, 0);
1247         atomic_set(&dlm->unknown_resources, 0);
1248
1249         spin_lock_init(&dlm->work_lock);
1250         INIT_LIST_HEAD(&dlm->work_list);
1251         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
1252
1253         kref_init(&dlm->dlm_refs);
1254         dlm->dlm_state = DLM_CTXT_NEW;
1255
1256         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1257
1258         mlog(0, "context init: refcount %u\n",
1259                   atomic_read(&dlm->dlm_refs.refcount));
1260
1261 leave:
1262         return dlm;
1263 }
1264
1265 /*
1266  * dlm_register_domain: one-time setup per "domain"
1267  */
1268 struct dlm_ctxt * dlm_register_domain(const char *domain,
1269                                u32 key)
1270 {
1271         int ret;
1272         struct dlm_ctxt *dlm = NULL;
1273         struct dlm_ctxt *new_ctxt = NULL;
1274
1275         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1276                 ret = -ENAMETOOLONG;
1277                 mlog(ML_ERROR, "domain name length too long\n");
1278                 goto leave;
1279         }
1280
1281         if (!o2hb_check_local_node_heartbeating()) {
1282                 mlog(ML_ERROR, "the local node has not been configured, or is "
1283                      "not heartbeating\n");
1284                 ret = -EPROTO;
1285                 goto leave;
1286         }
1287
1288         mlog(0, "register called for domain \"%s\"\n", domain);
1289
1290 retry:
1291         dlm = NULL;
1292         if (signal_pending(current)) {
1293                 ret = -ERESTARTSYS;
1294                 mlog_errno(ret);
1295                 goto leave;
1296         }
1297
1298         spin_lock(&dlm_domain_lock);
1299
1300         dlm = __dlm_lookup_domain(domain);
1301         if (dlm) {
1302                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1303                         spin_unlock(&dlm_domain_lock);
1304
1305                         mlog(0, "This ctxt is not joined yet!\n");
1306                         wait_event_interruptible(dlm_domain_events,
1307                                                  dlm_wait_on_domain_helper(
1308                                                          domain));
1309                         goto retry;
1310                 }
1311
1312                 __dlm_get(dlm);
1313                 dlm->num_joins++;
1314
1315                 spin_unlock(&dlm_domain_lock);
1316
1317                 ret = 0;
1318                 goto leave;
1319         }
1320
1321         /* doesn't exist */
1322         if (!new_ctxt) {
1323                 spin_unlock(&dlm_domain_lock);
1324
1325                 new_ctxt = dlm_alloc_ctxt(domain, key);
1326                 if (new_ctxt)
1327                         goto retry;
1328
1329                 ret = -ENOMEM;
1330                 mlog_errno(ret);
1331                 goto leave;
1332         }
1333
1334         /* a little variable switch-a-roo here... */
1335         dlm = new_ctxt;
1336         new_ctxt = NULL;
1337
1338         /* add the new domain */
1339         list_add_tail(&dlm->list, &dlm_domains);
1340         spin_unlock(&dlm_domain_lock);
1341
1342         ret = dlm_join_domain(dlm);
1343         if (ret) {
1344                 mlog_errno(ret);
1345                 dlm_put(dlm);
1346                 goto leave;
1347         }
1348
1349         ret = 0;
1350 leave:
1351         if (new_ctxt)
1352                 dlm_free_ctxt_mem(new_ctxt);
1353
1354         if (ret < 0)
1355                 dlm = ERR_PTR(ret);
1356
1357         return dlm;
1358 }
1359 EXPORT_SYMBOL_GPL(dlm_register_domain);
1360
1361 static LIST_HEAD(dlm_join_handlers);
1362
1363 static void dlm_unregister_net_handlers(void)
1364 {
1365         o2net_unregister_handler_list(&dlm_join_handlers);
1366 }
1367
1368 static int dlm_register_net_handlers(void)
1369 {
1370         int status = 0;
1371
1372         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1373                                         sizeof(struct dlm_query_join_request),
1374                                         dlm_query_join_handler,
1375                                         NULL, &dlm_join_handlers);
1376         if (status)
1377                 goto bail;
1378
1379         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1380                                         sizeof(struct dlm_assert_joined),
1381                                         dlm_assert_joined_handler,
1382                                         NULL, &dlm_join_handlers);
1383         if (status)
1384                 goto bail;
1385
1386         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1387                                         sizeof(struct dlm_cancel_join),
1388                                         dlm_cancel_join_handler,
1389                                         NULL, &dlm_join_handlers);
1390
1391 bail:
1392         if (status < 0)
1393                 dlm_unregister_net_handlers();
1394
1395         return status;
1396 }
1397
1398 /* Domain eviction callback handling.
1399  *
1400  * The file system requires notification of node death *before* the
1401  * dlm completes it's recovery work, otherwise it may be able to
1402  * acquire locks on resources requiring recovery. Since the dlm can
1403  * evict a node from it's domain *before* heartbeat fires, a similar
1404  * mechanism is required. */
1405
1406 /* Eviction is not expected to happen often, so a per-domain lock is
1407  * not necessary. Eviction callbacks are allowed to sleep for short
1408  * periods of time. */
1409 static DECLARE_RWSEM(dlm_callback_sem);
1410
1411 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1412                                         int node_num)
1413 {
1414         struct list_head *iter;
1415         struct dlm_eviction_cb *cb;
1416
1417         down_read(&dlm_callback_sem);
1418         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1419                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1420
1421                 cb->ec_func(node_num, cb->ec_data);
1422         }
1423         up_read(&dlm_callback_sem);
1424 }
1425
1426 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1427                            dlm_eviction_func *f,
1428                            void *data)
1429 {
1430         INIT_LIST_HEAD(&cb->ec_item);
1431         cb->ec_func = f;
1432         cb->ec_data = data;
1433 }
1434 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1435
1436 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1437                               struct dlm_eviction_cb *cb)
1438 {
1439         down_write(&dlm_callback_sem);
1440         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1441         up_write(&dlm_callback_sem);
1442 }
1443 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1444
1445 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1446 {
1447         down_write(&dlm_callback_sem);
1448         list_del_init(&cb->ec_item);
1449         up_write(&dlm_callback_sem);
1450 }
1451 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1452
1453 static int __init dlm_init(void)
1454 {
1455         int status;
1456
1457         dlm_print_version();
1458
1459         status = dlm_init_mle_cache();
1460         if (status)
1461                 return -1;
1462
1463         status = dlm_register_net_handlers();
1464         if (status) {
1465                 dlm_destroy_mle_cache();
1466                 return -1;
1467         }
1468
1469         return 0;
1470 }
1471
1472 static void __exit dlm_exit (void)
1473 {
1474         dlm_unregister_net_handlers();
1475         dlm_destroy_mle_cache();
1476 }
1477
1478 MODULE_AUTHOR("Oracle");
1479 MODULE_LICENSE("GPL");
1480
1481 module_init(dlm_init);
1482 module_exit(dlm_exit);