[TIPC] Initial merge
[safe/jmp/linux-2.6] / net / tipc / name_table.c
1 /*
2  * net/tipc/name_table.c: TIPC name table code
3  * 
4  * Copyright (c) 2003-2005, Ericsson Research Canada
5  * Copyright (c) 2004-2005, Wind River Systems
6  * Copyright (c) 2005-2006, Ericsson AB
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * Redistributions of source code must retain the above copyright notice, this 
13  * list of conditions and the following disclaimer.
14  * Redistributions in binary form must reproduce the above copyright notice, 
15  * this list of conditions and the following disclaimer in the documentation 
16  * and/or other materials provided with the distribution.
17  * Neither the names of the copyright holders nor the names of its 
18  * contributors may be used to endorse or promote products derived from this 
19  * software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "core.h"
35 #include "config.h"
36 #include "dbg.h"
37 #include "name_table.h"
38 #include "name_distr.h"
39 #include "addr.h"
40 #include "node_subscr.h"
41 #include "subscr.h"
42 #include "port.h"
43 #include "cluster.h"
44 #include "bcast.h"
45
46 int tipc_nametbl_size = 1024;           /* must be a power of 2 */
47
48 /**
49  * struct sub_seq - container for all published instances of a name sequence
50  * @lower: name sequence lower bound
51  * @upper: name sequence upper bound
52  * @node_list: circular list of matching publications with >= node scope
53  * @cluster_list: circular list of matching publications with >= cluster scope
54  * @zone_list: circular list of matching publications with >= zone scope
55  */
56
57 struct sub_seq {
58         u32 lower;
59         u32 upper;
60         struct publication *node_list;
61         struct publication *cluster_list;
62         struct publication *zone_list;
63 };
64
65 /** 
66  * struct name_seq - container for all published instances of a name type
67  * @type: 32 bit 'type' value for name sequence
68  * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type';
69  *        sub-sequences are sorted in ascending order
70  * @alloc: number of sub-sequences currently in array
71  * @first_free: upper bound of highest sub-sequence + 1
72  * @ns_list: links to adjacent name sequences in hash chain
73  * @subscriptions: list of subscriptions for this 'type'
74  * @lock: spinlock controlling access to name sequence structure
75  */
76
77 struct name_seq {
78         u32 type;
79         struct sub_seq *sseqs;
80         u32 alloc;
81         u32 first_free;
82         struct hlist_node ns_list;
83         struct list_head subscriptions;
84         spinlock_t lock;
85 };
86
87 /**
88  * struct name_table - table containing all existing port name publications
89  * @types: pointer to fixed-sized array of name sequence lists, 
90  *         accessed via hashing on 'type'; name sequence lists are *not* sorted
91  * @local_publ_count: number of publications issued by this node
92  */
93
94 struct name_table {
95         struct hlist_head *types;
96         u32 local_publ_count;
97 };
98
99 struct name_table table = { NULL } ;
100 static atomic_t rsv_publ_ok = ATOMIC_INIT(0);
101 rwlock_t nametbl_lock = RW_LOCK_UNLOCKED;
102
103
104 static inline int hash(int x)
105 {
106         return(x & (tipc_nametbl_size - 1));
107 }
108
109 /**
110  * publ_create - create a publication structure
111  */
112
113 static struct publication *publ_create(u32 type, u32 lower, u32 upper, 
114                                        u32 scope, u32 node, u32 port_ref,   
115                                        u32 key)
116 {
117         struct publication *publ =
118                 (struct publication *)kmalloc(sizeof(*publ), GFP_ATOMIC);
119         if (publ == NULL) {
120                 warn("Memory squeeze; failed to create publication\n");
121                 return 0;
122         }
123
124         memset(publ, 0, sizeof(*publ));
125         publ->type = type;
126         publ->lower = lower;
127         publ->upper = upper;
128         publ->scope = scope;
129         publ->node = node;
130         publ->ref = port_ref;
131         publ->key = key;
132         INIT_LIST_HEAD(&publ->local_list);
133         INIT_LIST_HEAD(&publ->pport_list);
134         INIT_LIST_HEAD(&publ->subscr.nodesub_list);
135         return publ;
136 }
137
138 /**
139  * subseq_alloc - allocate a specified number of sub-sequence structures
140  */
141
142 struct sub_seq *subseq_alloc(u32 cnt)
143 {
144         u32 sz = cnt * sizeof(struct sub_seq);
145         struct sub_seq *sseq = (struct sub_seq *)kmalloc(sz, GFP_ATOMIC);
146
147         if (sseq)
148                 memset(sseq, 0, sz);
149         return sseq;
150 }
151
152 /**
153  * nameseq_create - create a name sequence structure for the specified 'type'
154  * 
155  * Allocates a single sub-sequence structure and sets it to all 0's.
156  */
157
158 struct name_seq *nameseq_create(u32 type, struct hlist_head *seq_head)
159 {
160         struct name_seq *nseq = 
161                 (struct name_seq *)kmalloc(sizeof(*nseq), GFP_ATOMIC);
162         struct sub_seq *sseq = subseq_alloc(1);
163
164         if (!nseq || !sseq) {
165                 warn("Memory squeeze; failed to create name sequence\n");
166                 kfree(nseq);
167                 kfree(sseq);
168                 return 0;
169         }
170
171         memset(nseq, 0, sizeof(*nseq));
172         nseq->lock = SPIN_LOCK_UNLOCKED;
173         nseq->type = type;
174         nseq->sseqs = sseq;
175         dbg("nameseq_create() nseq = %x type %u, ssseqs %x, ff: %u\n",
176             nseq, type, nseq->sseqs, nseq->first_free);
177         nseq->alloc = 1;
178         INIT_HLIST_NODE(&nseq->ns_list);
179         INIT_LIST_HEAD(&nseq->subscriptions);
180         hlist_add_head(&nseq->ns_list, seq_head);
181         return nseq;
182 }
183
184 /**
185  * nameseq_find_subseq - find sub-sequence (if any) matching a name instance
186  *  
187  * Very time-critical, so binary searches through sub-sequence array.
188  */
189
190 static inline struct sub_seq *nameseq_find_subseq(struct name_seq *nseq, 
191                                                   u32 instance)
192 {
193         struct sub_seq *sseqs = nseq->sseqs;
194         int low = 0;
195         int high = nseq->first_free - 1;
196         int mid;
197
198         while (low <= high) {
199                 mid = (low + high) / 2;
200                 if (instance < sseqs[mid].lower)
201                         high = mid - 1;
202                 else if (instance > sseqs[mid].upper)
203                         low = mid + 1;
204                 else
205                         return &sseqs[mid];
206         }
207         return 0;
208 }
209
210 /**
211  * nameseq_locate_subseq - determine position of name instance in sub-sequence
212  * 
213  * Returns index in sub-sequence array of the entry that contains the specified
214  * instance value; if no entry contains that value, returns the position
215  * where a new entry for it would be inserted in the array.
216  *
217  * Note: Similar to binary search code for locating a sub-sequence.
218  */
219
220 static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
221 {
222         struct sub_seq *sseqs = nseq->sseqs;
223         int low = 0;
224         int high = nseq->first_free - 1;
225         int mid;
226
227         while (low <= high) {
228                 mid = (low + high) / 2;
229                 if (instance < sseqs[mid].lower)
230                         high = mid - 1;
231                 else if (instance > sseqs[mid].upper)
232                         low = mid + 1;
233                 else
234                         return mid;
235         }
236         return low;
237 }
238
239 /**
240  * nameseq_insert_publ - 
241  */
242
243 struct publication *nameseq_insert_publ(struct name_seq *nseq,
244                                         u32 type, u32 lower, u32 upper,
245                                         u32 scope, u32 node, u32 port, u32 key)
246 {
247         struct subscription *s;
248         struct subscription *st;
249         struct publication *publ;
250         struct sub_seq *sseq;
251         int created_subseq = 0;
252
253         assert(nseq->first_free <= nseq->alloc);
254         sseq = nameseq_find_subseq(nseq, lower);
255         dbg("nameseq_ins: for seq %x,<%u,%u>, found sseq %x\n",
256             nseq, type, lower, sseq);
257         if (sseq) {
258
259                 /* Lower end overlaps existing entry => need an exact match */
260
261                 if ((sseq->lower != lower) || (sseq->upper != upper)) {
262                         warn("Overlapping publ <%u,%u,%u>\n", type, lower, upper);
263                         return 0;
264                 }
265         } else {
266                 u32 inspos;
267                 struct sub_seq *freesseq;
268
269                 /* Find where lower end should be inserted */
270
271                 inspos = nameseq_locate_subseq(nseq, lower);
272
273                 /* Fail if upper end overlaps into an existing entry */
274
275                 if ((inspos < nseq->first_free) &&
276                     (upper >= nseq->sseqs[inspos].lower)) {
277                         warn("Overlapping publ <%u,%u,%u>\n", type, lower, upper);
278                         return 0;
279                 }
280
281                 /* Ensure there is space for new sub-sequence */
282
283                 if (nseq->first_free == nseq->alloc) {
284                         struct sub_seq *sseqs = nseq->sseqs;
285                         nseq->sseqs = subseq_alloc(nseq->alloc * 2);
286                         if (nseq->sseqs != NULL) {
287                                 memcpy(nseq->sseqs, sseqs,
288                                        nseq->alloc * sizeof (struct sub_seq));
289                                 kfree(sseqs);
290                                 dbg("Allocated %u sseqs\n", nseq->alloc);
291                                 nseq->alloc *= 2;
292                         } else {
293                                 warn("Memory squeeze; failed to create sub-sequence\n");
294                                 return 0;
295                         }
296                 }
297                 dbg("Have %u sseqs for type %u\n", nseq->alloc, type);
298
299                 /* Insert new sub-sequence */
300
301                 dbg("ins in pos %u, ff = %u\n", inspos, nseq->first_free);
302                 sseq = &nseq->sseqs[inspos];
303                 freesseq = &nseq->sseqs[nseq->first_free];
304                 memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof (*sseq));
305                 memset(sseq, 0, sizeof (*sseq));
306                 nseq->first_free++;
307                 sseq->lower = lower;
308                 sseq->upper = upper;
309                 created_subseq = 1;
310         }
311         dbg("inserting (%u %u %u) from %x:%u into sseq %x(%u,%u) of seq %x\n",
312             type, lower, upper, node, port, sseq,
313             sseq->lower, sseq->upper, nseq);
314
315         /* Insert a publication: */
316
317         publ = publ_create(type, lower, upper, scope, node, port, key);
318         if (!publ)
319                 return 0;
320         dbg("inserting publ %x, node=%x publ->node=%x, subscr->node=%x\n",
321             publ, node, publ->node, publ->subscr.node);
322
323         if (!sseq->zone_list)
324                 sseq->zone_list = publ->zone_list_next = publ;
325         else {
326                 publ->zone_list_next = sseq->zone_list->zone_list_next;
327                 sseq->zone_list->zone_list_next = publ;
328         }
329
330         if (in_own_cluster(node)) {
331                 if (!sseq->cluster_list)
332                         sseq->cluster_list = publ->cluster_list_next = publ;
333                 else {
334                         publ->cluster_list_next =
335                         sseq->cluster_list->cluster_list_next;
336                         sseq->cluster_list->cluster_list_next = publ;
337                 }
338         }
339
340         if (node == tipc_own_addr) {
341                 if (!sseq->node_list)
342                         sseq->node_list = publ->node_list_next = publ;
343                 else {
344                         publ->node_list_next = sseq->node_list->node_list_next;
345                         sseq->node_list->node_list_next = publ;
346                 }
347         }
348
349         /* 
350          * Any subscriptions waiting for notification? 
351          */
352         list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
353                 dbg("calling report_overlap()\n");
354                 subscr_report_overlap(s,
355                                       publ->lower,
356                                       publ->upper,
357                                       TIPC_PUBLISHED,
358                                       publ->ref, 
359                                       publ->node,
360                                       created_subseq);
361         }
362         return publ;
363 }
364
365 /**
366  * nameseq_remove_publ -
367  */
368
369 struct publication *nameseq_remove_publ(struct name_seq *nseq, u32 inst,
370                                         u32 node, u32 ref, u32 key)
371 {
372         struct publication *publ;
373         struct publication *prev;
374         struct sub_seq *sseq = nameseq_find_subseq(nseq, inst);
375         struct sub_seq *free;
376         struct subscription *s, *st;
377         int removed_subseq = 0;
378
379         assert(nseq);
380
381         if (!sseq) {
382                 int i;
383
384                 warn("Withdraw unknown <%u,%u>?\n", nseq->type, inst);
385                 assert(nseq->sseqs);
386                 dbg("Dumping subseqs %x for %x, alloc = %u,ff=%u\n",
387                     nseq->sseqs, nseq, nseq->alloc, 
388                     nseq->first_free);
389                 for (i = 0; i < nseq->first_free; i++) {
390                         dbg("Subseq %u(%x): lower = %u,upper = %u\n",
391                             i, &nseq->sseqs[i], nseq->sseqs[i].lower,
392                             nseq->sseqs[i].upper);
393                 }
394                 return 0;
395         }
396         dbg("nameseq_remove: seq: %x, sseq %x, <%u,%u> key %u\n",
397             nseq, sseq, nseq->type, inst, key);
398
399         prev = sseq->zone_list;
400         publ = sseq->zone_list->zone_list_next;
401         while ((publ->key != key) || (publ->ref != ref) || 
402                (publ->node && (publ->node != node))) {
403                 prev = publ;
404                 publ = publ->zone_list_next;
405                 assert(prev != sseq->zone_list);
406         }
407         if (publ != sseq->zone_list)
408                 prev->zone_list_next = publ->zone_list_next;
409         else if (publ->zone_list_next != publ) {
410                 prev->zone_list_next = publ->zone_list_next;
411                 sseq->zone_list = publ->zone_list_next;
412         } else {
413                 sseq->zone_list = 0;
414         }
415
416         if (in_own_cluster(node)) {
417                 prev = sseq->cluster_list;
418                 publ = sseq->cluster_list->cluster_list_next;
419                 while ((publ->key != key) || (publ->ref != ref) || 
420                        (publ->node && (publ->node != node))) {
421                         prev = publ;
422                         publ = publ->cluster_list_next;
423                         assert(prev != sseq->cluster_list);
424                 }
425                 if (publ != sseq->cluster_list)
426                         prev->cluster_list_next = publ->cluster_list_next;
427                 else if (publ->cluster_list_next != publ) {
428                         prev->cluster_list_next = publ->cluster_list_next;
429                         sseq->cluster_list = publ->cluster_list_next;
430                 } else {
431                         sseq->cluster_list = 0;
432                 }
433         }
434
435         if (node == tipc_own_addr) {
436                 prev = sseq->node_list;
437                 publ = sseq->node_list->node_list_next;
438                 while ((publ->key != key) || (publ->ref != ref) || 
439                        (publ->node && (publ->node != node))) {
440                         prev = publ;
441                         publ = publ->node_list_next;
442                         assert(prev != sseq->node_list);
443                 }
444                 if (publ != sseq->node_list)
445                         prev->node_list_next = publ->node_list_next;
446                 else if (publ->node_list_next != publ) {
447                         prev->node_list_next = publ->node_list_next;
448                         sseq->node_list = publ->node_list_next;
449                 } else {
450                         sseq->node_list = 0;
451                 }
452         }
453         assert(!publ->node || (publ->node == node));
454         assert(publ->ref == ref);
455         assert(publ->key == key);
456
457         /* 
458          * Contract subseq list if no more publications:
459          */
460         if (!sseq->node_list && !sseq->cluster_list && !sseq->zone_list) {
461                 free = &nseq->sseqs[nseq->first_free--];
462                 memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof (*sseq));
463                 removed_subseq = 1;
464         }
465
466         /* 
467          * Any subscriptions waiting ? 
468          */
469         list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
470                 subscr_report_overlap(s,
471                                       publ->lower,
472                                       publ->upper,
473                                       TIPC_WITHDRAWN, 
474                                       publ->ref, 
475                                       publ->node,
476                                       removed_subseq);
477         }
478         return publ;
479 }
480
481 /**
482  * nameseq_subscribe: attach a subscription, and issue
483  * the prescribed number of events if there is any sub-
484  * sequence overlapping with the requested sequence
485  */
486
487 void nameseq_subscribe(struct name_seq *nseq, struct subscription *s)
488 {
489         struct sub_seq *sseq = nseq->sseqs;
490
491         list_add(&s->nameseq_list, &nseq->subscriptions);
492
493         if (!sseq)
494                 return;
495
496         while (sseq != &nseq->sseqs[nseq->first_free]) {
497                 struct publication *zl = sseq->zone_list;
498                 if (zl && subscr_overlap(s,sseq->lower,sseq->upper)) {
499                         struct publication *crs = zl;
500                         int must_report = 1;
501
502                         do {
503                                 subscr_report_overlap(s, 
504                                                        sseq->lower, 
505                                                        sseq->upper,
506                                                        TIPC_PUBLISHED,
507                                                        crs->ref,
508                                                        crs->node,
509                                                        must_report);
510                                 must_report = 0;
511                                 crs = crs->zone_list_next;
512                         } while (crs != zl);
513                 }
514                 sseq++;
515         }
516 }
517
518 static struct name_seq *nametbl_find_seq(u32 type)
519 {
520         struct hlist_head *seq_head;
521         struct hlist_node *seq_node;
522         struct name_seq *ns;
523
524         dbg("find_seq %u,(%u,0x%x) table = %p, hash[type] = %u\n",
525             type, ntohl(type), type, table.types, hash(type));
526
527         seq_head = &table.types[hash(type)];
528         hlist_for_each_entry(ns, seq_node, seq_head, ns_list) {
529                 if (ns->type == type) {
530                         dbg("found %x\n", ns);
531                         return ns;
532                 }
533         }
534
535         return 0;
536 };
537
538 struct publication *nametbl_insert_publ(u32 type, u32 lower, u32 upper,
539                     u32 scope, u32 node, u32 port, u32 key)
540 {
541         struct name_seq *seq = nametbl_find_seq(type);
542
543         dbg("ins_publ: <%u,%x,%x> found %x\n", type, lower, upper, seq);
544         if (lower > upper) {
545                 warn("Failed to publish illegal <%u,%u,%u>\n",
546                      type, lower, upper);
547                 return 0;
548         }
549
550         dbg("Publishing <%u,%u,%u> from %x\n", type, lower, upper, node);
551         if (!seq) {
552                 seq = nameseq_create(type, &table.types[hash(type)]);
553                 dbg("nametbl_insert_publ: created %x\n", seq);
554         }
555         if (!seq)
556                 return 0;
557
558         assert(seq->type == type);
559         return nameseq_insert_publ(seq, type, lower, upper,
560                                    scope, node, port, key);
561 }
562
563 struct publication *nametbl_remove_publ(u32 type, u32 lower, 
564                                         u32 node, u32 ref, u32 key)
565 {
566         struct publication *publ;
567         struct name_seq *seq = nametbl_find_seq(type);
568
569         if (!seq)
570                 return 0;
571
572         dbg("Withdrawing <%u,%u> from %x\n", type, lower, node);
573         publ = nameseq_remove_publ(seq, lower, node, ref, key);
574
575         if (!seq->first_free && list_empty(&seq->subscriptions)) {
576                 hlist_del_init(&seq->ns_list);
577                 kfree(seq->sseqs);
578                 kfree(seq);
579         }
580         return publ;
581 }
582
583 /*
584  * nametbl_translate(): Translate tipc_name -> tipc_portid.
585  *                      Very time-critical.
586  *
587  * Note: on entry 'destnode' is the search domain used during translation;
588  *       on exit it passes back the node address of the matching port (if any)
589  */
590
591 u32 nametbl_translate(u32 type, u32 instance, u32 *destnode)
592 {
593         struct sub_seq *sseq;
594         struct publication *publ = 0;
595         struct name_seq *seq;
596         u32 ref;
597
598         if (!in_scope(*destnode, tipc_own_addr))
599                 return 0;
600
601         read_lock_bh(&nametbl_lock);
602         seq = nametbl_find_seq(type);
603         if (unlikely(!seq))
604                 goto not_found;
605         sseq = nameseq_find_subseq(seq, instance);
606         if (unlikely(!sseq))
607                 goto not_found;
608         spin_lock_bh(&seq->lock);
609
610         /* Closest-First Algorithm: */
611         if (likely(!*destnode)) {
612                 publ = sseq->node_list;
613                 if (publ) {
614                         sseq->node_list = publ->node_list_next;
615 found:
616                         ref = publ->ref;
617                         *destnode = publ->node;
618                         spin_unlock_bh(&seq->lock);
619                         read_unlock_bh(&nametbl_lock);
620                         return ref;
621                 }
622                 publ = sseq->cluster_list;
623                 if (publ) {
624                         sseq->cluster_list = publ->cluster_list_next;
625                         goto found;
626                 }
627                 publ = sseq->zone_list;
628                 if (publ) {
629                         sseq->zone_list = publ->zone_list_next;
630                         goto found;
631                 }
632         }
633
634         /* Round-Robin Algorithm: */
635         else if (*destnode == tipc_own_addr) {
636                 publ = sseq->node_list;
637                 if (publ) {
638                         sseq->node_list = publ->node_list_next;
639                         goto found;
640                 }
641         } else if (in_own_cluster(*destnode)) {
642                 publ = sseq->cluster_list;
643                 if (publ) {
644                         sseq->cluster_list = publ->cluster_list_next;
645                         goto found;
646                 }
647         } else {
648                 publ = sseq->zone_list;
649                 if (publ) {
650                         sseq->zone_list = publ->zone_list_next;
651                         goto found;
652                 }
653         }
654         spin_unlock_bh(&seq->lock);
655 not_found:
656         *destnode = 0;
657         read_unlock_bh(&nametbl_lock);
658         return 0;
659 }
660
661 /**
662  * nametbl_mc_translate - find multicast destinations
663  * 
664  * Creates list of all local ports that overlap the given multicast address;
665  * also determines if any off-node ports overlap.
666  *
667  * Note: Publications with a scope narrower than 'limit' are ignored.
668  * (i.e. local node-scope publications mustn't receive messages arriving
669  * from another node, even if the multcast link brought it here)
670  * 
671  * Returns non-zero if any off-node ports overlap
672  */
673
674 int nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
675                          struct port_list *dports)
676 {
677         struct name_seq *seq;
678         struct sub_seq *sseq;
679         struct sub_seq *sseq_stop;
680         int res = 0;
681
682         read_lock_bh(&nametbl_lock);
683         seq = nametbl_find_seq(type);
684         if (!seq)
685                 goto exit;
686
687         spin_lock_bh(&seq->lock);
688
689         sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
690         sseq_stop = seq->sseqs + seq->first_free;
691         for (; sseq != sseq_stop; sseq++) {
692                 struct publication *publ;
693
694                 if (sseq->lower > upper)
695                         break;
696                 publ = sseq->cluster_list;
697                 if (publ && (publ->scope <= limit))
698                         do {
699                                 if (publ->node == tipc_own_addr)
700                                         port_list_add(dports, publ->ref);
701                                 else
702                                         res = 1;
703                                 publ = publ->cluster_list_next;
704                         } while (publ != sseq->cluster_list);
705         }
706
707         spin_unlock_bh(&seq->lock);
708 exit:
709         read_unlock_bh(&nametbl_lock);
710         return res;
711 }
712
713 /**
714  * nametbl_publish_rsv - publish port name using a reserved name type
715  */
716
717 int nametbl_publish_rsv(u32 ref, unsigned int scope, 
718                         struct tipc_name_seq const *seq)
719 {
720         int res;
721
722         atomic_inc(&rsv_publ_ok);
723         res = tipc_publish(ref, scope, seq);
724         atomic_dec(&rsv_publ_ok);
725         return res;
726 }
727
728 /**
729  * nametbl_publish - add name publication to network name tables
730  */
731
732 struct publication *nametbl_publish(u32 type, u32 lower, u32 upper, 
733                                     u32 scope, u32 port_ref, u32 key)
734 {
735         struct publication *publ;
736
737         if (table.local_publ_count >= tipc_max_publications) {
738                 warn("Failed publish: max %u local publication\n", 
739                      tipc_max_publications);
740                 return 0;
741         }
742         if ((type < TIPC_RESERVED_TYPES) && !atomic_read(&rsv_publ_ok)) {
743                 warn("Failed to publish reserved name <%u,%u,%u>\n",
744                      type, lower, upper);
745                 return 0;
746         }
747
748         write_lock_bh(&nametbl_lock);
749         table.local_publ_count++;
750         publ = nametbl_insert_publ(type, lower, upper, scope,
751                                    tipc_own_addr, port_ref, key);
752         if (publ && (scope != TIPC_NODE_SCOPE)) {
753                 named_publish(publ);
754         }
755         write_unlock_bh(&nametbl_lock);
756         return publ;
757 }
758
759 /**
760  * nametbl_withdraw - withdraw name publication from network name tables
761  */
762
763 int nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
764 {
765         struct publication *publ;
766
767         dbg("nametbl_withdraw:<%d,%d,%d>\n", type, lower, key);
768         write_lock_bh(&nametbl_lock);
769         publ = nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
770         if (publ) {
771                 table.local_publ_count--;
772                 if (publ->scope != TIPC_NODE_SCOPE)
773                         named_withdraw(publ);
774                 write_unlock_bh(&nametbl_lock);
775                 list_del_init(&publ->pport_list);
776                 kfree(publ);
777                 return 1;
778         }
779         write_unlock_bh(&nametbl_lock);
780         return 0;
781 }
782
783 /**
784  * nametbl_subscribe - add a subscription object to the name table
785  */
786
787 void
788 nametbl_subscribe(struct subscription *s)
789 {
790         u32 type = s->seq.type;
791         struct name_seq *seq;
792
793         write_lock_bh(&nametbl_lock);
794         seq = nametbl_find_seq(type);
795         if (!seq) {
796                 seq = nameseq_create(type, &table.types[hash(type)]);
797         }
798         if (seq){
799                 spin_lock_bh(&seq->lock);
800                 dbg("nametbl_subscribe:found %x for <%u,%u,%u>\n",
801                     seq, type, s->seq.lower, s->seq.upper);
802                 assert(seq->type == type);
803                 nameseq_subscribe(seq, s);
804                 spin_unlock_bh(&seq->lock);
805         }
806         write_unlock_bh(&nametbl_lock);
807 }
808
809 /**
810  * nametbl_unsubscribe - remove a subscription object from name table
811  */
812
813 void
814 nametbl_unsubscribe(struct subscription *s)
815 {
816         struct name_seq *seq;
817
818         write_lock_bh(&nametbl_lock);
819         seq = nametbl_find_seq(s->seq.type);
820         if (seq != NULL){
821                 spin_lock_bh(&seq->lock);
822                 list_del_init(&s->nameseq_list);
823                 spin_unlock_bh(&seq->lock);
824                 if ((seq->first_free == 0) && list_empty(&seq->subscriptions)) {
825                         hlist_del_init(&seq->ns_list);
826                         kfree(seq->sseqs);
827                         kfree(seq);
828                 }
829         }
830         write_unlock_bh(&nametbl_lock);
831 }
832
833
834 /**
835  * subseq_list: print specified sub-sequence contents into the given buffer
836  */
837
838 static void subseq_list(struct sub_seq *sseq, struct print_buf *buf, u32 depth,
839                         u32 index)
840 {
841         char portIdStr[27];
842         char *scopeStr;
843         struct publication *publ = sseq->zone_list;
844
845         tipc_printf(buf, "%-10u %-10u ", sseq->lower, sseq->upper);
846
847         if (depth == 2 || !publ) {
848                 tipc_printf(buf, "\n");
849                 return;
850         }
851
852         do {
853                 sprintf (portIdStr, "<%u.%u.%u:%u>",
854                          tipc_zone(publ->node), tipc_cluster(publ->node),
855                          tipc_node(publ->node), publ->ref);
856                 tipc_printf(buf, "%-26s ", portIdStr);
857                 if (depth > 3) {
858                         if (publ->node != tipc_own_addr)
859                                 scopeStr = "";
860                         else if (publ->scope == TIPC_NODE_SCOPE)
861                                 scopeStr = "node";
862                         else if (publ->scope == TIPC_CLUSTER_SCOPE)
863                                 scopeStr = "cluster";
864                         else
865                                 scopeStr = "zone";
866                         tipc_printf(buf, "%-10u %s", publ->key, scopeStr);
867                 }
868
869                 publ = publ->zone_list_next;
870                 if (publ == sseq->zone_list)
871                         break;
872
873                 tipc_printf(buf, "\n%33s", " ");
874         } while (1);
875
876         tipc_printf(buf, "\n");
877 }
878
879 /**
880  * nameseq_list: print specified name sequence contents into the given buffer
881  */
882
883 static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,
884                          u32 type, u32 lowbound, u32 upbound, u32 index)
885 {
886         struct sub_seq *sseq;
887         char typearea[11];
888
889         sprintf(typearea, "%-10u", seq->type);
890
891         if (depth == 1) {
892                 tipc_printf(buf, "%s\n", typearea);
893                 return;
894         }
895
896         for (sseq = seq->sseqs; sseq != &seq->sseqs[seq->first_free]; sseq++) {
897                 if ((lowbound <= sseq->upper) && (upbound >= sseq->lower)) {
898                         tipc_printf(buf, "%s ", typearea);
899                         subseq_list(sseq, buf, depth, index);
900                         sprintf(typearea, "%10s", " ");
901                 }
902         }
903 }
904
905 /**
906  * nametbl_header - print name table header into the given buffer
907  */
908
909 static void nametbl_header(struct print_buf *buf, u32 depth)
910 {
911         tipc_printf(buf, "Type       ");
912
913         if (depth > 1)
914                 tipc_printf(buf, "Lower      Upper      ");
915         if (depth > 2)
916                 tipc_printf(buf, "Port Identity              ");
917         if (depth > 3)
918                 tipc_printf(buf, "Publication");
919
920         tipc_printf(buf, "\n-----------");
921
922         if (depth > 1)
923                 tipc_printf(buf, "--------------------- ");
924         if (depth > 2)
925                 tipc_printf(buf, "-------------------------- ");
926         if (depth > 3)
927                 tipc_printf(buf, "------------------");
928
929         tipc_printf(buf, "\n");
930 }
931
932 /**
933  * nametbl_list - print specified name table contents into the given buffer
934  */
935
936 static void nametbl_list(struct print_buf *buf, u32 depth_info, 
937                          u32 type, u32 lowbound, u32 upbound)
938 {
939         struct hlist_head *seq_head;
940         struct hlist_node *seq_node;
941         struct name_seq *seq;
942         int all_types;
943         u32 depth;
944         u32 i;
945
946         all_types = (depth_info & TIPC_NTQ_ALLTYPES);
947         depth = (depth_info & ~TIPC_NTQ_ALLTYPES);
948
949         if (depth == 0)
950                 return;
951
952         if (all_types) {
953                 /* display all entries in name table to specified depth */
954                 nametbl_header(buf, depth);
955                 lowbound = 0;
956                 upbound = ~0;
957                 for (i = 0; i < tipc_nametbl_size; i++) {
958                         seq_head = &table.types[i];
959                         hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
960                                 nameseq_list(seq, buf, depth, seq->type, 
961                                              lowbound, upbound, i);
962                         }
963                 }
964         } else {
965                 /* display only the sequence that matches the specified type */
966                 if (upbound < lowbound) {
967                         tipc_printf(buf, "invalid name sequence specified\n");
968                         return;
969                 }
970                 nametbl_header(buf, depth);
971                 i = hash(type);
972                 seq_head = &table.types[i];
973                 hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
974                         if (seq->type == type) {
975                                 nameseq_list(seq, buf, depth, type, 
976                                              lowbound, upbound, i);
977                                 break;
978                         }
979                 }
980         }
981 }
982
983 void nametbl_print(struct print_buf *buf, const char *str)
984 {
985         tipc_printf(buf, str);
986         read_lock_bh(&nametbl_lock);
987         nametbl_list(buf, 0, 0, 0, 0);
988         read_unlock_bh(&nametbl_lock);
989 }
990
991 #define MAX_NAME_TBL_QUERY 32768
992
993 struct sk_buff *nametbl_get(const void *req_tlv_area, int req_tlv_space)
994 {
995         struct sk_buff *buf;
996         struct tipc_name_table_query *argv;
997         struct tlv_desc *rep_tlv;
998         struct print_buf b;
999         int str_len;
1000
1001         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NAME_TBL_QUERY))
1002                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
1003
1004         buf = cfg_reply_alloc(TLV_SPACE(MAX_NAME_TBL_QUERY));
1005         if (!buf)
1006                 return NULL;
1007
1008         rep_tlv = (struct tlv_desc *)buf->data;
1009         printbuf_init(&b, TLV_DATA(rep_tlv), MAX_NAME_TBL_QUERY);
1010         argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area);
1011         read_lock_bh(&nametbl_lock);
1012         nametbl_list(&b, ntohl(argv->depth), ntohl(argv->type), 
1013                      ntohl(argv->lowbound), ntohl(argv->upbound));
1014         read_unlock_bh(&nametbl_lock);
1015         str_len = printbuf_validate(&b);
1016
1017         skb_put(buf, TLV_SPACE(str_len));
1018         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
1019
1020         return buf;
1021 }
1022
1023 void nametbl_dump(void)
1024 {
1025         nametbl_list(CONS, 0, 0, 0, 0);
1026 }
1027
1028 int nametbl_init(void)
1029 {
1030         int array_size = sizeof(struct hlist_head) * tipc_nametbl_size;
1031
1032         table.types = (struct hlist_head *)kmalloc(array_size, GFP_ATOMIC);
1033         if (!table.types)
1034                 return -ENOMEM;
1035
1036         write_lock_bh(&nametbl_lock);
1037         memset(table.types, 0, array_size);
1038         table.local_publ_count = 0;
1039         write_unlock_bh(&nametbl_lock);
1040         return 0;
1041 }
1042
1043 void nametbl_stop(void)
1044 {
1045         struct hlist_head *seq_head;
1046         struct hlist_node *seq_node;
1047         struct hlist_node *tmp;
1048         struct name_seq *seq;
1049         u32 i;
1050
1051         if (!table.types)
1052                 return;
1053
1054         write_lock_bh(&nametbl_lock);
1055         for (i = 0; i < tipc_nametbl_size; i++) {
1056                 seq_head = &table.types[i];
1057                 hlist_for_each_entry_safe(seq, seq_node, tmp, seq_head, ns_list) {
1058                         struct sub_seq *sseq = seq->sseqs;
1059
1060                         for (; sseq != &seq->sseqs[seq->first_free]; sseq++) {
1061                                 struct publication *publ = sseq->zone_list;
1062                                 assert(publ);
1063                                 do {
1064                                         struct publication *next =
1065                                                 publ->zone_list_next;
1066                                         kfree(publ);
1067                                         publ = next;
1068                                 }
1069                                 while (publ != sseq->zone_list);
1070                         }
1071                 }
1072         }
1073         kfree(table.types);
1074         table.types = NULL;
1075         write_unlock_bh(&nametbl_lock);
1076 }