mmc: s3c6410: enable ADMA feature in 6410 sdhci controller
[safe/jmp/linux-2.6] / net / tipc / subscr.c
index dde23f1..ab6eab4 100644 (file)
@@ -1,8 +1,8 @@
 /*
- * net/tipc/subscr.c: TIPC subscription service
+ * net/tipc/subscr.c: TIPC network topology service
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005-2007, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 
 #include "core.h"
 #include "dbg.h"
-#include "subscr.h"
 #include "name_table.h"
+#include "port.h"
 #include "ref.h"
+#include "subscr.h"
 
 /**
  * struct subscriber - TIPC network topology subscriber
- * @ref: object reference to subscriber object itself
- * @lock: pointer to spinlock controlling access to subscriber object
+ * @port_ref: object reference to server port connecting to subscriber
+ * @lock: pointer to spinlock controlling access to subscriber's server port
  * @subscriber_list: adjacent subscribers in top. server's list of subscribers
  * @subscription_list: list of subscription objects for this subscriber
- * @port_ref: object reference to port used to communicate with subscriber
  */
 
 struct subscriber {
-       u32 ref;
+       u32 port_ref;
        spinlock_t *lock;
        struct list_head subscriber_list;
        struct list_head subscription_list;
-       u32 port_ref;
 };
 
 /**
@@ -77,20 +76,10 @@ struct top_srv {
 static struct top_srv topsrv = { 0 };
 
 /**
- * htohl - convert value to endianness used by destination
- * @in: value to convert
- * @swap: non-zero if endianness must be reversed
- *
- * Returns converted value
- */
-
-static u32 htohl(u32 in, int swap)
-{
-       return swap ? (u32)___constant_swab32(in) : in;
-}
-
-/**
  * subscr_send_event - send a message containing a tipc_event to the subscriber
+ *
+ * Note: Must not hold subscriber's server port lock, since tipc_send() will
+ *       try to take the lock if the message is rejected and returned!
  */
 
 static void subscr_send_event(struct subscription *sub,
@@ -105,12 +94,12 @@ static void subscr_send_event(struct subscription *sub,
        msg_sect.iov_base = (void *)&sub->evt;
        msg_sect.iov_len = sizeof(struct tipc_event);
 
-       sub->evt.event = htohl(event, sub->swap);
-       sub->evt.found_lower = htohl(found_lower, sub->swap);
-       sub->evt.found_upper = htohl(found_upper, sub->swap);
-       sub->evt.port.ref = htohl(port_ref, sub->swap);
-       sub->evt.port.node = htohl(node, sub->swap);
-       tipc_send(sub->owner->port_ref, 1, &msg_sect);
+       sub->evt.event = htonl(event);
+       sub->evt.found_lower = htonl(found_lower);
+       sub->evt.found_upper = htonl(found_upper);
+       sub->evt.port.ref = htonl(port_ref);
+       sub->evt.port.node = htonl(node);
+       tipc_send(sub->server_ref, 1, &msg_sect);
 }
 
 /**
@@ -147,8 +136,6 @@ void tipc_subscr_report_overlap(struct subscription *sub,
                                u32 node,
                                int must)
 {
-       dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower,
-           sub->seq.upper, found_lower, found_upper);
        if (!tipc_subscr_overlap(sub, found_lower, found_upper))
                return;
        if (!must && !(sub->filter & TIPC_SUB_PORTS))
@@ -163,20 +150,18 @@ void tipc_subscr_report_overlap(struct subscription *sub,
 
 static void subscr_timeout(struct subscription *sub)
 {
-       struct subscriber *subscriber;
-       u32 subscriber_ref;
+       struct port *server_port;
 
-       /* Validate subscriber reference (in case subscriber is terminating) */
+       /* Validate server port reference (in case subscriber is terminating) */
 
-       subscriber_ref = sub->owner->ref;
-       subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref);
-       if (subscriber == NULL)
+       server_port = tipc_port_lock(sub->server_ref);
+       if (server_port == NULL)
                return;
 
        /* Validate timeout (in case subscription is being cancelled) */
 
        if (sub->timeout == TIPC_WAIT_FOREVER) {
-               tipc_ref_unlock(subscriber_ref);
+               tipc_port_unlock(server_port);
                return;
        }
 
@@ -184,19 +169,21 @@ static void subscr_timeout(struct subscription *sub)
 
        tipc_nametbl_unsubscribe(sub);
 
-       /* Notify subscriber of timeout, then unlink subscription */
+       /* Unlink subscription from subscriber */
 
-       subscr_send_event(sub,
-                         sub->evt.s.seq.lower,
-                         sub->evt.s.seq.upper,
-                         TIPC_SUBSCR_TIMEOUT,
-                         0,
-                         0);
        list_del(&sub->subscription_list);
 
+       /* Release subscriber's server port */
+
+       tipc_port_unlock(server_port);
+
+       /* Notify subscriber of timeout */
+
+       subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
+                         TIPC_SUBSCR_TIMEOUT, 0, 0);
+
        /* Now destroy subscription */
 
-       tipc_ref_unlock(subscriber_ref);
        k_term_timer(&sub->timer);
        kfree(sub);
        atomic_dec(&topsrv.subscription_count);
@@ -205,7 +192,7 @@ static void subscr_timeout(struct subscription *sub)
 /**
  * subscr_del - delete a subscription within a subscription list
  *
- * Called with subscriber locked.
+ * Called with subscriber port locked.
  */
 
 static void subscr_del(struct subscription *sub)
@@ -219,7 +206,7 @@ static void subscr_del(struct subscription *sub)
 /**
  * subscr_terminate - terminate communication with a subscriber
  *
- * Called with subscriber locked.  Routine must temporarily release this lock
+ * Called with subscriber port locked.  Routine must temporarily release lock
  * to enable subscription timeout routine(s) to finish without deadlocking;
  * the lock is then reclaimed to allow caller to release it upon return.
  * (This should work even in the unlikely event some other thread creates
@@ -229,14 +216,21 @@ static void subscr_del(struct subscription *sub)
 
 static void subscr_terminate(struct subscriber *subscriber)
 {
+       u32 port_ref;
        struct subscription *sub;
        struct subscription *sub_temp;
 
        /* Invalidate subscriber reference */
 
-       tipc_ref_discard(subscriber->ref);
+       port_ref = subscriber->port_ref;
+       subscriber->port_ref = 0;
        spin_unlock_bh(subscriber->lock);
 
+       /* Sever connection to subscriber */
+
+       tipc_shutdown(port_ref);
+       tipc_deleteport(port_ref);
+
        /* Destroy any existing subscriptions for subscriber */
 
        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
@@ -250,27 +244,25 @@ static void subscr_terminate(struct subscriber *subscriber)
                subscr_del(sub);
        }
 
-       /* Sever connection to subscriber */
-
-       tipc_shutdown(subscriber->port_ref);
-       tipc_deleteport(subscriber->port_ref);
-
        /* Remove subscriber from topology server's subscriber list */
 
        spin_lock_bh(&topsrv.lock);
        list_del(&subscriber->subscriber_list);
        spin_unlock_bh(&topsrv.lock);
 
-       /* Now destroy subscriber */
+       /* Reclaim subscriber lock */
 
        spin_lock_bh(subscriber->lock);
+
+       /* Now destroy subscriber */
+
        kfree(subscriber);
 }
 
 /**
  * subscr_cancel - handle subscription cancellation request
  *
- * Called with subscriber locked.  Routine must temporarily release this lock
+ * Called with subscriber port locked.  Routine must temporarily release lock
  * to enable the subscription timeout routine to finish without deadlocking;
  * the lock is then reclaimed to allow caller to release it upon return.
  *
@@ -282,16 +274,29 @@ static void subscr_cancel(struct tipc_subscr *s,
 {
        struct subscription *sub;
        struct subscription *sub_temp;
+       __u32 type, lower, upper, timeout, filter;
        int found = 0;
 
        /* Find first matching subscription, exit if not found */
 
+       type = ntohl(s->seq.type);
+       lower = ntohl(s->seq.lower);
+       upper = ntohl(s->seq.upper);
+       timeout = ntohl(s->timeout);
+       filter = ntohl(s->filter) & ~TIPC_SUB_CANCEL;
+
        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
                                 subscription_list) {
-               if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
-                       found = 1;
-                       break;
-               }
+                       if ((type == sub->seq.type) &&
+                           (lower == sub->seq.lower) &&
+                           (upper == sub->seq.upper) &&
+                           (timeout == sub->timeout) &&
+                            (filter == sub->filter) &&
+                             !memcmp(s->usr_handle,sub->evt.s.usr_handle,
+                                    sizeof(s->usr_handle)) ){
+                               found = 1;
+                               break;
+                       }
        }
        if (!found)
                return;
@@ -305,7 +310,7 @@ static void subscr_cancel(struct tipc_subscr *s,
                k_term_timer(&sub->timer);
                spin_lock_bh(subscriber->lock);
        }
-       dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n",
+       dbg("Cancel: removing sub %u,%u,%u from subscriber %p list\n",
            sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
        subscr_del(sub);
 }
@@ -313,25 +318,19 @@ static void subscr_cancel(struct tipc_subscr *s,
 /**
  * subscr_subscribe - create subscription for subscriber
  *
- * Called with subscriber locked
+ * Called with subscriber port locked.
  */
 
-static void subscr_subscribe(struct tipc_subscr *s,
-                            struct subscriber *subscriber)
+static struct subscription *subscr_subscribe(struct tipc_subscr *s,
+                                            struct subscriber *subscriber)
 {
        struct subscription *sub;
-       int swap;
-
-       /* Determine subscriber's endianness */
-
-       swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
 
        /* Detect & process a subscription cancellation request */
 
-       if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
-               s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
+       if (ntohl(s->filter) & TIPC_SUB_CANCEL) {
                subscr_cancel(s, subscriber);
-               return;
+               return NULL;
        }
 
        /* Refuse subscription if global limit exceeded */
@@ -340,65 +339,64 @@ static void subscr_subscribe(struct tipc_subscr *s,
                warn("Subscription rejected, subscription limit reached (%u)\n",
                     tipc_max_subscriptions);
                subscr_terminate(subscriber);
-               return;
+               return NULL;
        }
 
        /* Allocate subscription object */
 
-       sub = kzalloc(sizeof(*sub), GFP_ATOMIC);
+       sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
        if (!sub) {
                warn("Subscription rejected, no memory\n");
                subscr_terminate(subscriber);
-               return;
+               return NULL;
        }
 
        /* Initialize subscription object */
 
-       sub->seq.type = htohl(s->seq.type, swap);
-       sub->seq.lower = htohl(s->seq.lower, swap);
-       sub->seq.upper = htohl(s->seq.upper, swap);
-       sub->timeout = htohl(s->timeout, swap);
-       sub->filter = htohl(s->filter, swap);
-       if ((!(sub->filter & TIPC_SUB_PORTS)
-            == !(sub->filter & TIPC_SUB_SERVICE))
-           || (sub->seq.lower > sub->seq.upper)) {
+       sub->seq.type = ntohl(s->seq.type);
+       sub->seq.lower = ntohl(s->seq.lower);
+       sub->seq.upper = ntohl(s->seq.upper);
+       sub->timeout = ntohl(s->timeout);
+       sub->filter = ntohl(s->filter);
+       if ((sub->filter && (sub->filter != TIPC_SUB_PORTS)) ||
+           (sub->seq.lower > sub->seq.upper)) {
                warn("Subscription rejected, illegal request\n");
                kfree(sub);
                subscr_terminate(subscriber);
-               return;
+               return NULL;
        }
        sub->event_cb = subscr_send_event;
-       memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
-       INIT_LIST_HEAD(&sub->subscription_list);
        INIT_LIST_HEAD(&sub->nameseq_list);
        list_add(&sub->subscription_list, &subscriber->subscription_list);
-       sub->swap = swap;
+       sub->server_ref = subscriber->port_ref;
+       memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
        atomic_inc(&topsrv.subscription_count);
        if (sub->timeout != TIPC_WAIT_FOREVER) {
                k_init_timer(&sub->timer,
                             (Handler)subscr_timeout, (unsigned long)sub);
                k_start_timer(&sub->timer, sub->timeout);
        }
-       sub->owner = subscriber;
-       tipc_nametbl_subscribe(sub);
+
+       return sub;
 }
 
 /**
  * subscr_conn_shutdown_event - handle termination request from subscriber
+ *
+ * Called with subscriber's server port unlocked.
  */
 
 static void subscr_conn_shutdown_event(void *usr_handle,
-                                      u32 portref,
+                                      u32 port_ref,
                                       struct sk_buff **buf,
                                       unsigned char const *data,
                                       unsigned int size,
                                       int reason)
 {
-       struct subscriber *subscriber;
+       struct subscriber *subscriber = usr_handle;
        spinlock_t *subscriber_lock;
 
-       subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
-       if (subscriber == NULL)
+       if (tipc_port_lock(port_ref) == NULL)
                return;
 
        subscriber_lock = subscriber->lock;
@@ -408,6 +406,8 @@ static void subscr_conn_shutdown_event(void *usr_handle,
 
 /**
  * subscr_conn_msg_event - handle new subscription request from subscriber
+ *
+ * Called with subscriber's server port unlocked.
  */
 
 static void subscr_conn_msg_event(void *usr_handle,
@@ -416,20 +416,46 @@ static void subscr_conn_msg_event(void *usr_handle,
                                  const unchar *data,
                                  u32 size)
 {
-       struct subscriber *subscriber;
+       struct subscriber *subscriber = usr_handle;
        spinlock_t *subscriber_lock;
+       struct subscription *sub;
+
+       /*
+        * Lock subscriber's server port (& make a local copy of lock pointer,
+        * in case subscriber is deleted while processing subscription request)
+        */
 
-       subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
-       if (subscriber == NULL)
+       if (tipc_port_lock(port_ref) == NULL)
                return;
 
        subscriber_lock = subscriber->lock;
-       if (size != sizeof(struct tipc_subscr))
-               subscr_terminate(subscriber);
-       else
-               subscr_subscribe((struct tipc_subscr *)data, subscriber);
 
-       spin_unlock_bh(subscriber_lock);
+       if (size != sizeof(struct tipc_subscr)) {
+               subscr_terminate(subscriber);
+               spin_unlock_bh(subscriber_lock);
+       } else {
+               sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
+               spin_unlock_bh(subscriber_lock);
+               if (sub != NULL) {
+
+                       /*
+                        * We must release the server port lock before adding a
+                        * subscription to the name table since TIPC needs to be
+                        * able to (re)acquire the port lock if an event message
+                        * issued by the subscription process is rejected and
+                        * returned.  The subscription cannot be deleted while
+                        * it is being added to the name table because:
+                        * a) the single-threading of the native API port code
+                        *    ensures the subscription cannot be cancelled and
+                        *    the subscriber connection cannot be broken, and
+                        * b) the name table lock ensures the subscription
+                        *    timeout code cannot delete the subscription,
+                        * so the subscription object is still protected.
+                        */
+
+                       tipc_nametbl_subscribe(sub);
+               }
+       }
 }
 
 /**
@@ -445,16 +471,10 @@ static void subscr_named_msg_event(void *usr_handle,
                                   struct tipc_portid const *orig,
                                   struct tipc_name_seq const *dest)
 {
-       struct subscriber *subscriber;
-       struct iovec msg_sect = {NULL, 0};
-       spinlock_t *subscriber_lock;
+       static struct iovec msg_sect = {NULL, 0};
 
-       dbg("subscr_named_msg_event: orig = %x own = %x,\n",
-           orig->node, tipc_own_addr);
-       if (size && (size != sizeof(struct tipc_subscr))) {
-               warn("Subscriber rejected, invalid subscription size\n");
-               return;
-       }
+       struct subscriber *subscriber;
+       u32 server_port_ref;
 
        /* Create subscriber object */
 
@@ -465,18 +485,11 @@ static void subscr_named_msg_event(void *usr_handle,
        }
        INIT_LIST_HEAD(&subscriber->subscription_list);
        INIT_LIST_HEAD(&subscriber->subscriber_list);
-       subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
-       if (subscriber->ref == 0) {
-               warn("Subscriber rejected, reference table exhausted\n");
-               kfree(subscriber);
-               return;
-       }
-       spin_unlock_bh(subscriber->lock);
 
-       /* Establish a connection to subscriber */
+       /* Create server port & establish connection to subscriber */
 
        tipc_createport(topsrv.user_ref,
-                       (void *)(unsigned long)subscriber->ref,
+                       subscriber,
                        importance,
                        NULL,
                        NULL,
@@ -488,32 +501,36 @@ static void subscr_named_msg_event(void *usr_handle,
                        &subscriber->port_ref);
        if (subscriber->port_ref == 0) {
                warn("Subscriber rejected, unable to create port\n");
-               tipc_ref_discard(subscriber->ref);
                kfree(subscriber);
                return;
        }
        tipc_connect2port(subscriber->port_ref, orig);
 
+       /* Lock server port (& save lock address for future use) */
+
+       subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;
 
        /* Add subscriber to topology server's subscriber list */
 
-       tipc_ref_lock(subscriber->ref);
        spin_lock_bh(&topsrv.lock);
        list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
        spin_unlock_bh(&topsrv.lock);
 
-       /*
-        * Subscribe now if message contains a subscription,
-        * otherwise send an empty response to complete connection handshaking
-        */
+       /* Unlock server port */
 
-       subscriber_lock = subscriber->lock;
-       if (size)
-               subscr_subscribe((struct tipc_subscr *)data, subscriber);
-       else
-               tipc_send(subscriber->port_ref, 1, &msg_sect);
+       server_port_ref = subscriber->port_ref;
+       spin_unlock_bh(subscriber->lock);
 
-       spin_unlock_bh(subscriber_lock);
+       /* Send an ACK- to complete connection handshaking */
+
+       tipc_send(server_port_ref, 1, &msg_sect);
+
+       /* Handle optional subscription request */
+
+       if (size != 0) {
+               subscr_conn_msg_event(subscriber, server_port_ref,
+                                     buf, data, size);
+       }
 }
 
 int tipc_subscr_start(void)
@@ -572,8 +589,8 @@ void tipc_subscr_stop(void)
                list_for_each_entry_safe(subscriber, subscriber_temp,
                                         &topsrv.subscriber_list,
                                         subscriber_list) {
-                       tipc_ref_lock(subscriber->ref);
                        subscriber_lock = subscriber->lock;
+                       spin_lock_bh(subscriber_lock);
                        subscr_terminate(subscriber);
                        spin_unlock_bh(subscriber_lock);
                }