sgi-xp: use standard bitops macros and functions
[safe/jmp/linux-2.6] / drivers / misc / sgi-xp / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *      This is the part of XPC that manages the channels and
13  *      sends/receives messages across them to/from other partitions.
14  *
15  */
16
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
26
27 /*
28  * Process a connect message from a remote partition.
29  *
30  * Note: xpc_process_connect() is expecting to be called with the
31  * spin_lock_irqsave held and will leave it locked upon return.
32  */
33 static void
34 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
35 {
36         enum xp_retval ret;
37
38         DBUG_ON(!spin_is_locked(&ch->lock));
39
40         if (!(ch->flags & XPC_C_OPENREQUEST) ||
41             !(ch->flags & XPC_C_ROPENREQUEST)) {
42                 /* nothing more to do for now */
43                 return;
44         }
45         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
46
47         if (!(ch->flags & XPC_C_SETUP)) {
48                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
49                 ret = xpc_allocate_msgqueues(ch);
50                 spin_lock_irqsave(&ch->lock, *irq_flags);
51
52                 if (ret != xpSuccess)
53                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
54
55                 ch->flags |= XPC_C_SETUP;
56
57                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
58                         return;
59
60                 DBUG_ON(ch->local_msgqueue == NULL);
61                 DBUG_ON(ch->remote_msgqueue == NULL);
62         }
63
64         if (!(ch->flags & XPC_C_OPENREPLY)) {
65                 ch->flags |= XPC_C_OPENREPLY;
66                 xpc_send_chctl_openreply(ch, irq_flags);
67         }
68
69         if (!(ch->flags & XPC_C_ROPENREPLY))
70                 return;
71
72         DBUG_ON(ch->remote_msgqueue_pa == 0);
73
74         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
75
76         dev_info(xpc_chan, "channel %d to partition %d connected\n",
77                  ch->number, ch->partid);
78
79         spin_unlock_irqrestore(&ch->lock, *irq_flags);
80         xpc_create_kthreads(ch, 1, 0);
81         spin_lock_irqsave(&ch->lock, *irq_flags);
82 }
83
84 /*
85  * spin_lock_irqsave() is expected to be held on entry.
86  */
87 static void
88 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
89 {
90         struct xpc_partition *part = &xpc_partitions[ch->partid];
91         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
92
93         DBUG_ON(!spin_is_locked(&ch->lock));
94
95         if (!(ch->flags & XPC_C_DISCONNECTING))
96                 return;
97
98         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
99
100         /* make sure all activity has settled down first */
101
102         if (atomic_read(&ch->kthreads_assigned) > 0 ||
103             atomic_read(&ch->references) > 0) {
104                 return;
105         }
106         DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
107                 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
108
109         if (part->act_state == XPC_P_DEACTIVATING) {
110                 /* can't proceed until the other side disengages from us */
111                 if (xpc_partition_engaged(ch->partid))
112                         return;
113
114         } else {
115
116                 /* as long as the other side is up do the full protocol */
117
118                 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
119                         return;
120
121                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
122                         ch->flags |= XPC_C_CLOSEREPLY;
123                         xpc_send_chctl_closereply(ch, irq_flags);
124                 }
125
126                 if (!(ch->flags & XPC_C_RCLOSEREPLY))
127                         return;
128         }
129
130         /* wake those waiting for notify completion */
131         if (atomic_read(&ch->n_to_notify) > 0) {
132                 /* we do callout while holding ch->lock, callout can't block */
133                 xpc_notify_senders_of_disconnect(ch);
134         }
135
136         /* both sides are disconnected now */
137
138         if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
139                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
140                 xpc_disconnect_callout(ch, xpDisconnected);
141                 spin_lock_irqsave(&ch->lock, *irq_flags);
142         }
143
144         /* it's now safe to free the channel's message queues */
145         xpc_free_msgqueues(ch);
146
147         /*
148          * Mark the channel disconnected and clear all other flags, including
149          * XPC_C_SETUP (because of call to xpc_free_msgqueues()) but not
150          * including XPC_C_WDISCONNECT (if it was set).
151          */
152         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
153
154         atomic_dec(&part->nchannels_active);
155
156         if (channel_was_connected) {
157                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
158                          "reason=%d\n", ch->number, ch->partid, ch->reason);
159         }
160
161         if (ch->flags & XPC_C_WDISCONNECT) {
162                 /* we won't lose the CPU since we're holding ch->lock */
163                 complete(&ch->wdisconnect_wait);
164         } else if (ch->delayed_chctl_flags) {
165                 if (part->act_state != XPC_P_DEACTIVATING) {
166                         /* time to take action on any delayed chctl flags */
167                         spin_lock(&part->chctl_lock);
168                         part->chctl.flags[ch->number] |=
169                             ch->delayed_chctl_flags;
170                         spin_unlock(&part->chctl_lock);
171                 }
172                 ch->delayed_chctl_flags = 0;
173         }
174 }
175
176 /*
177  * Process a change in the channel's remote connection state.
178  */
179 static void
180 xpc_process_openclose_chctl_flags(struct xpc_partition *part, int ch_number,
181                                   u8 chctl_flags)
182 {
183         unsigned long irq_flags;
184         struct xpc_openclose_args *args =
185             &part->remote_openclose_args[ch_number];
186         struct xpc_channel *ch = &part->channels[ch_number];
187         enum xp_retval reason;
188
189         spin_lock_irqsave(&ch->lock, irq_flags);
190
191 again:
192
193         if ((ch->flags & XPC_C_DISCONNECTED) &&
194             (ch->flags & XPC_C_WDISCONNECT)) {
195                 /*
196                  * Delay processing chctl flags until thread waiting disconnect
197                  * has had a chance to see that the channel is disconnected.
198                  */
199                 ch->delayed_chctl_flags |= chctl_flags;
200                 spin_unlock_irqrestore(&ch->lock, irq_flags);
201                 return;
202         }
203
204         if (chctl_flags & XPC_CHCTL_CLOSEREQUEST) {
205
206                 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
207                         "from partid=%d, channel=%d\n", args->reason,
208                         ch->partid, ch->number);
209
210                 /*
211                  * If RCLOSEREQUEST is set, we're probably waiting for
212                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
213                  * with this RCLOSEREQUEST in the chctl_flags.
214                  */
215
216                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
217                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
218                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
219                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
220                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
221
222                         DBUG_ON(!(chctl_flags & XPC_CHCTL_CLOSEREPLY));
223                         chctl_flags &= ~XPC_CHCTL_CLOSEREPLY;
224                         ch->flags |= XPC_C_RCLOSEREPLY;
225
226                         /* both sides have finished disconnecting */
227                         xpc_process_disconnect(ch, &irq_flags);
228                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
229                         goto again;
230                 }
231
232                 if (ch->flags & XPC_C_DISCONNECTED) {
233                         if (!(chctl_flags & XPC_CHCTL_OPENREQUEST)) {
234                                 if (part->chctl.flags[ch_number] &
235                                     XPC_CHCTL_OPENREQUEST) {
236
237                                         DBUG_ON(ch->delayed_chctl_flags != 0);
238                                         spin_lock(&part->chctl_lock);
239                                         part->chctl.flags[ch_number] |=
240                                             XPC_CHCTL_CLOSEREQUEST;
241                                         spin_unlock(&part->chctl_lock);
242                                 }
243                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
244                                 return;
245                         }
246
247                         XPC_SET_REASON(ch, 0, 0);
248                         ch->flags &= ~XPC_C_DISCONNECTED;
249
250                         atomic_inc(&part->nchannels_active);
251                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
252                 }
253
254                 chctl_flags &= ~(XPC_CHCTL_OPENREQUEST | XPC_CHCTL_OPENREPLY);
255
256                 /*
257                  * The meaningful CLOSEREQUEST connection state fields are:
258                  *      reason = reason connection is to be closed
259                  */
260
261                 ch->flags |= XPC_C_RCLOSEREQUEST;
262
263                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
264                         reason = args->reason;
265                         if (reason <= xpSuccess || reason > xpUnknownReason)
266                                 reason = xpUnknownReason;
267                         else if (reason == xpUnregistering)
268                                 reason = xpOtherUnregistering;
269
270                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
271
272                         DBUG_ON(chctl_flags & XPC_CHCTL_CLOSEREPLY);
273                         spin_unlock_irqrestore(&ch->lock, irq_flags);
274                         return;
275                 }
276
277                 xpc_process_disconnect(ch, &irq_flags);
278         }
279
280         if (chctl_flags & XPC_CHCTL_CLOSEREPLY) {
281
282                 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREPLY received from partid="
283                         "%d, channel=%d\n", ch->partid, ch->number);
284
285                 if (ch->flags & XPC_C_DISCONNECTED) {
286                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
287                         spin_unlock_irqrestore(&ch->lock, irq_flags);
288                         return;
289                 }
290
291                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
292
293                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
294                         if (part->chctl.flags[ch_number] &
295                             XPC_CHCTL_CLOSEREQUEST) {
296
297                                 DBUG_ON(ch->delayed_chctl_flags != 0);
298                                 spin_lock(&part->chctl_lock);
299                                 part->chctl.flags[ch_number] |=
300                                     XPC_CHCTL_CLOSEREPLY;
301                                 spin_unlock(&part->chctl_lock);
302                         }
303                         spin_unlock_irqrestore(&ch->lock, irq_flags);
304                         return;
305                 }
306
307                 ch->flags |= XPC_C_RCLOSEREPLY;
308
309                 if (ch->flags & XPC_C_CLOSEREPLY) {
310                         /* both sides have finished disconnecting */
311                         xpc_process_disconnect(ch, &irq_flags);
312                 }
313         }
314
315         if (chctl_flags & XPC_CHCTL_OPENREQUEST) {
316
317                 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREQUEST (msg_size=%d, "
318                         "local_nentries=%d) received from partid=%d, "
319                         "channel=%d\n", args->msg_size, args->local_nentries,
320                         ch->partid, ch->number);
321
322                 if (part->act_state == XPC_P_DEACTIVATING ||
323                     (ch->flags & XPC_C_ROPENREQUEST)) {
324                         spin_unlock_irqrestore(&ch->lock, irq_flags);
325                         return;
326                 }
327
328                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
329                         ch->delayed_chctl_flags |= XPC_CHCTL_OPENREQUEST;
330                         spin_unlock_irqrestore(&ch->lock, irq_flags);
331                         return;
332                 }
333                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
334                                        XPC_C_OPENREQUEST)));
335                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
336                                      XPC_C_OPENREPLY | XPC_C_CONNECTED));
337
338                 /*
339                  * The meaningful OPENREQUEST connection state fields are:
340                  *      msg_size = size of channel's messages in bytes
341                  *      local_nentries = remote partition's local_nentries
342                  */
343                 if (args->msg_size == 0 || args->local_nentries == 0) {
344                         /* assume OPENREQUEST was delayed by mistake */
345                         spin_unlock_irqrestore(&ch->lock, irq_flags);
346                         return;
347                 }
348
349                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
350                 ch->remote_nentries = args->local_nentries;
351
352                 if (ch->flags & XPC_C_OPENREQUEST) {
353                         if (args->msg_size != ch->msg_size) {
354                                 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
355                                                        &irq_flags);
356                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
357                                 return;
358                         }
359                 } else {
360                         ch->msg_size = args->msg_size;
361
362                         XPC_SET_REASON(ch, 0, 0);
363                         ch->flags &= ~XPC_C_DISCONNECTED;
364
365                         atomic_inc(&part->nchannels_active);
366                 }
367
368                 xpc_process_connect(ch, &irq_flags);
369         }
370
371         if (chctl_flags & XPC_CHCTL_OPENREPLY) {
372
373                 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
374                         "0x%lx, local_nentries=%d, remote_nentries=%d) "
375                         "received from partid=%d, channel=%d\n",
376                         args->local_msgqueue_pa, args->local_nentries,
377                         args->remote_nentries, ch->partid, ch->number);
378
379                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
380                         spin_unlock_irqrestore(&ch->lock, irq_flags);
381                         return;
382                 }
383                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
384                         XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
385                                                &irq_flags);
386                         spin_unlock_irqrestore(&ch->lock, irq_flags);
387                         return;
388                 }
389
390                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
391                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
392
393                 /*
394                  * The meaningful OPENREPLY connection state fields are:
395                  *      local_msgqueue_pa = physical address of remote
396                  *                          partition's local_msgqueue
397                  *      local_nentries = remote partition's local_nentries
398                  *      remote_nentries = remote partition's remote_nentries
399                  */
400                 DBUG_ON(args->local_msgqueue_pa == 0);
401                 DBUG_ON(args->local_nentries == 0);
402                 DBUG_ON(args->remote_nentries == 0);
403
404                 ch->flags |= XPC_C_ROPENREPLY;
405                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
406
407                 if (args->local_nentries < ch->remote_nentries) {
408                         dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
409                                 "remote_nentries=%d, old remote_nentries=%d, "
410                                 "partid=%d, channel=%d\n",
411                                 args->local_nentries, ch->remote_nentries,
412                                 ch->partid, ch->number);
413
414                         ch->remote_nentries = args->local_nentries;
415                 }
416                 if (args->remote_nentries < ch->local_nentries) {
417                         dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
418                                 "local_nentries=%d, old local_nentries=%d, "
419                                 "partid=%d, channel=%d\n",
420                                 args->remote_nentries, ch->local_nentries,
421                                 ch->partid, ch->number);
422
423                         ch->local_nentries = args->remote_nentries;
424                 }
425
426                 xpc_process_connect(ch, &irq_flags);
427         }
428
429         spin_unlock_irqrestore(&ch->lock, irq_flags);
430 }
431
432 /*
433  * Attempt to establish a channel connection to a remote partition.
434  */
435 static enum xp_retval
436 xpc_connect_channel(struct xpc_channel *ch)
437 {
438         unsigned long irq_flags;
439         struct xpc_registration *registration = &xpc_registrations[ch->number];
440
441         if (mutex_trylock(&registration->mutex) == 0)
442                 return xpRetry;
443
444         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
445                 mutex_unlock(&registration->mutex);
446                 return xpUnregistered;
447         }
448
449         spin_lock_irqsave(&ch->lock, irq_flags);
450
451         DBUG_ON(ch->flags & XPC_C_CONNECTED);
452         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
453
454         if (ch->flags & XPC_C_DISCONNECTING) {
455                 spin_unlock_irqrestore(&ch->lock, irq_flags);
456                 mutex_unlock(&registration->mutex);
457                 return ch->reason;
458         }
459
460         /* add info from the channel connect registration to the channel */
461
462         ch->kthreads_assigned_limit = registration->assigned_limit;
463         ch->kthreads_idle_limit = registration->idle_limit;
464         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
465         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
466         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
467
468         ch->func = registration->func;
469         DBUG_ON(registration->func == NULL);
470         ch->key = registration->key;
471
472         ch->local_nentries = registration->nentries;
473
474         if (ch->flags & XPC_C_ROPENREQUEST) {
475                 if (registration->msg_size != ch->msg_size) {
476                         /* the local and remote sides aren't the same */
477
478                         /*
479                          * Because XPC_DISCONNECT_CHANNEL() can block we're
480                          * forced to up the registration sema before we unlock
481                          * the channel lock. But that's okay here because we're
482                          * done with the part that required the registration
483                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
484                          * channel lock be locked and will unlock and relock
485                          * the channel lock as needed.
486                          */
487                         mutex_unlock(&registration->mutex);
488                         XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
489                                                &irq_flags);
490                         spin_unlock_irqrestore(&ch->lock, irq_flags);
491                         return xpUnequalMsgSizes;
492                 }
493         } else {
494                 ch->msg_size = registration->msg_size;
495
496                 XPC_SET_REASON(ch, 0, 0);
497                 ch->flags &= ~XPC_C_DISCONNECTED;
498
499                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
500         }
501
502         mutex_unlock(&registration->mutex);
503
504         /* initiate the connection */
505
506         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
507         xpc_send_chctl_openrequest(ch, &irq_flags);
508
509         xpc_process_connect(ch, &irq_flags);
510
511         spin_unlock_irqrestore(&ch->lock, irq_flags);
512
513         return xpSuccess;
514 }
515
516 void
517 xpc_process_sent_chctl_flags(struct xpc_partition *part)
518 {
519         unsigned long irq_flags;
520         union xpc_channel_ctl_flags chctl;
521         struct xpc_channel *ch;
522         int ch_number;
523         u32 ch_flags;
524
525         chctl.all_flags = xpc_get_chctl_all_flags(part);
526
527         /*
528          * Initiate channel connections for registered channels.
529          *
530          * For each connected channel that has pending messages activate idle
531          * kthreads and/or create new kthreads as needed.
532          */
533
534         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
535                 ch = &part->channels[ch_number];
536
537                 /*
538                  * Process any open or close related chctl flags, and then deal
539                  * with connecting or disconnecting the channel as required.
540                  */
541
542                 if (chctl.flags[ch_number] & XPC_OPENCLOSE_CHCTL_FLAGS) {
543                         xpc_process_openclose_chctl_flags(part, ch_number,
544                                                         chctl.flags[ch_number]);
545                 }
546
547                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
548
549                 if (ch_flags & XPC_C_DISCONNECTING) {
550                         spin_lock_irqsave(&ch->lock, irq_flags);
551                         xpc_process_disconnect(ch, &irq_flags);
552                         spin_unlock_irqrestore(&ch->lock, irq_flags);
553                         continue;
554                 }
555
556                 if (part->act_state == XPC_P_DEACTIVATING)
557                         continue;
558
559                 if (!(ch_flags & XPC_C_CONNECTED)) {
560                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
561                                 DBUG_ON(ch_flags & XPC_C_SETUP);
562                                 (void)xpc_connect_channel(ch);
563                         } else {
564                                 spin_lock_irqsave(&ch->lock, irq_flags);
565                                 xpc_process_connect(ch, &irq_flags);
566                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
567                         }
568                         continue;
569                 }
570
571                 /*
572                  * Process any message related chctl flags, this may involve
573                  * the activation of kthreads to deliver any pending messages
574                  * sent from the other partition.
575                  */
576
577                 if (chctl.flags[ch_number] & XPC_MSG_CHCTL_FLAGS)
578                         xpc_process_msg_chctl_flags(part, ch_number);
579         }
580 }
581
582 /*
583  * XPC's heartbeat code calls this function to inform XPC that a partition is
584  * going down.  XPC responds by tearing down the XPartition Communication
585  * infrastructure used for the just downed partition.
586  *
587  * XPC's heartbeat code will never call this function and xpc_partition_up()
588  * at the same time. Nor will it ever make multiple calls to either function
589  * at the same time.
590  */
591 void
592 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
593 {
594         unsigned long irq_flags;
595         int ch_number;
596         struct xpc_channel *ch;
597
598         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
599                 XPC_PARTID(part), reason);
600
601         if (!xpc_part_ref(part)) {
602                 /* infrastructure for this partition isn't currently set up */
603                 return;
604         }
605
606         /* disconnect channels associated with the partition going down */
607
608         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
609                 ch = &part->channels[ch_number];
610
611                 xpc_msgqueue_ref(ch);
612                 spin_lock_irqsave(&ch->lock, irq_flags);
613
614                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
615
616                 spin_unlock_irqrestore(&ch->lock, irq_flags);
617                 xpc_msgqueue_deref(ch);
618         }
619
620         xpc_wakeup_channel_mgr(part);
621
622         xpc_part_deref(part);
623 }
624
625 /*
626  * Called by XP at the time of channel connection registration to cause
627  * XPC to establish connections to all currently active partitions.
628  */
629 void
630 xpc_initiate_connect(int ch_number)
631 {
632         short partid;
633         struct xpc_partition *part;
634         struct xpc_channel *ch;
635
636         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
637
638         for (partid = 0; partid < xp_max_npartitions; partid++) {
639                 part = &xpc_partitions[partid];
640
641                 if (xpc_part_ref(part)) {
642                         ch = &part->channels[ch_number];
643
644                         /*
645                          * Initiate the establishment of a connection on the
646                          * newly registered channel to the remote partition.
647                          */
648                         xpc_wakeup_channel_mgr(part);
649                         xpc_part_deref(part);
650                 }
651         }
652 }
653
654 void
655 xpc_connected_callout(struct xpc_channel *ch)
656 {
657         /* let the registerer know that a connection has been established */
658
659         if (ch->func != NULL) {
660                 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
661                         "partid=%d, channel=%d\n", ch->partid, ch->number);
662
663                 ch->func(xpConnected, ch->partid, ch->number,
664                          (void *)(u64)ch->local_nentries, ch->key);
665
666                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
667                         "partid=%d, channel=%d\n", ch->partid, ch->number);
668         }
669 }
670
671 /*
672  * Called by XP at the time of channel connection unregistration to cause
673  * XPC to teardown all current connections for the specified channel.
674  *
675  * Before returning xpc_initiate_disconnect() will wait until all connections
676  * on the specified channel have been closed/torndown. So the caller can be
677  * assured that they will not be receiving any more callouts from XPC to the
678  * function they registered via xpc_connect().
679  *
680  * Arguments:
681  *
682  *      ch_number - channel # to unregister.
683  */
684 void
685 xpc_initiate_disconnect(int ch_number)
686 {
687         unsigned long irq_flags;
688         short partid;
689         struct xpc_partition *part;
690         struct xpc_channel *ch;
691
692         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
693
694         /* initiate the channel disconnect for every active partition */
695         for (partid = 0; partid < xp_max_npartitions; partid++) {
696                 part = &xpc_partitions[partid];
697
698                 if (xpc_part_ref(part)) {
699                         ch = &part->channels[ch_number];
700                         xpc_msgqueue_ref(ch);
701
702                         spin_lock_irqsave(&ch->lock, irq_flags);
703
704                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
705                                 ch->flags |= XPC_C_WDISCONNECT;
706
707                                 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
708                                                        &irq_flags);
709                         }
710
711                         spin_unlock_irqrestore(&ch->lock, irq_flags);
712
713                         xpc_msgqueue_deref(ch);
714                         xpc_part_deref(part);
715                 }
716         }
717
718         xpc_disconnect_wait(ch_number);
719 }
720
721 /*
722  * To disconnect a channel, and reflect it back to all who may be waiting.
723  *
724  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
725  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
726  * xpc_disconnect_wait().
727  *
728  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
729  */
730 void
731 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
732                        enum xp_retval reason, unsigned long *irq_flags)
733 {
734         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
735
736         DBUG_ON(!spin_is_locked(&ch->lock));
737
738         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
739                 return;
740
741         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
742
743         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
744                 reason, line, ch->partid, ch->number);
745
746         XPC_SET_REASON(ch, reason, line);
747
748         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
749         /* some of these may not have been set */
750         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
751                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
752                        XPC_C_CONNECTING | XPC_C_CONNECTED);
753
754         xpc_send_chctl_closerequest(ch, irq_flags);
755
756         if (channel_was_connected)
757                 ch->flags |= XPC_C_WASCONNECTED;
758
759         spin_unlock_irqrestore(&ch->lock, *irq_flags);
760
761         /* wake all idle kthreads so they can exit */
762         if (atomic_read(&ch->kthreads_idle) > 0) {
763                 wake_up_all(&ch->idle_wq);
764
765         } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
766                    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
767                 /* start a kthread that will do the xpDisconnecting callout */
768                 xpc_create_kthreads(ch, 1, 1);
769         }
770
771         /* wake those waiting to allocate an entry from the local msg queue */
772         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
773                 wake_up(&ch->msg_allocate_wq);
774
775         spin_lock_irqsave(&ch->lock, *irq_flags);
776 }
777
778 void
779 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
780 {
781         /*
782          * Let the channel's registerer know that the channel is being
783          * disconnected. We don't want to do this if the registerer was never
784          * informed of a connection being made.
785          */
786
787         if (ch->func != NULL) {
788                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
789                         "channel=%d\n", reason, ch->partid, ch->number);
790
791                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
792
793                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
794                         "channel=%d\n", reason, ch->partid, ch->number);
795         }
796 }
797
798 /*
799  * Wait for a message entry to become available for the specified channel,
800  * but don't wait any longer than 1 jiffy.
801  */
802 enum xp_retval
803 xpc_allocate_msg_wait(struct xpc_channel *ch)
804 {
805         enum xp_retval ret;
806
807         if (ch->flags & XPC_C_DISCONNECTING) {
808                 DBUG_ON(ch->reason == xpInterrupted);
809                 return ch->reason;
810         }
811
812         atomic_inc(&ch->n_on_msg_allocate_wq);
813         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
814         atomic_dec(&ch->n_on_msg_allocate_wq);
815
816         if (ch->flags & XPC_C_DISCONNECTING) {
817                 ret = ch->reason;
818                 DBUG_ON(ch->reason == xpInterrupted);
819         } else if (ret == 0) {
820                 ret = xpTimeout;
821         } else {
822                 ret = xpInterrupted;
823         }
824
825         return ret;
826 }
827
828 /*
829  * Send a message that contains the user's payload on the specified channel
830  * connected to the specified partition.
831  *
832  * NOTE that this routine can sleep waiting for a message entry to become
833  * available. To not sleep, pass in the XPC_NOWAIT flag.
834  *
835  * Once sent, this routine will not wait for the message to be received, nor
836  * will notification be given when it does happen.
837  *
838  * Arguments:
839  *
840  *      partid - ID of partition to which the channel is connected.
841  *      ch_number - channel # to send message on.
842  *      flags - see xp.h for valid flags.
843  *      payload - pointer to the payload which is to be sent.
844  *      payload_size - size of the payload in bytes.
845  */
846 enum xp_retval
847 xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
848                   u16 payload_size)
849 {
850         struct xpc_partition *part = &xpc_partitions[partid];
851         enum xp_retval ret = xpUnknownReason;
852
853         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
854                 partid, ch_number);
855
856         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
857         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
858         DBUG_ON(payload == NULL);
859
860         if (xpc_part_ref(part)) {
861                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
862                                    payload_size, 0, NULL, NULL);
863                 xpc_part_deref(part);
864         }
865
866         return ret;
867 }
868
869 /*
870  * Send a message that contains the user's payload on the specified channel
871  * connected to the specified partition.
872  *
873  * NOTE that this routine can sleep waiting for a message entry to become
874  * available. To not sleep, pass in the XPC_NOWAIT flag.
875  *
876  * This routine will not wait for the message to be sent or received.
877  *
878  * Once the remote end of the channel has received the message, the function
879  * passed as an argument to xpc_initiate_send_notify() will be called. This
880  * allows the sender to free up or re-use any buffers referenced by the
881  * message, but does NOT mean the message has been processed at the remote
882  * end by a receiver.
883  *
884  * If this routine returns an error, the caller's function will NOT be called.
885  *
886  * Arguments:
887  *
888  *      partid - ID of partition to which the channel is connected.
889  *      ch_number - channel # to send message on.
890  *      flags - see xp.h for valid flags.
891  *      payload - pointer to the payload which is to be sent.
892  *      payload_size - size of the payload in bytes.
893  *      func - function to call with asynchronous notification of message
894  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
895  *      key - user-defined key to be passed to the function when it's called.
896  */
897 enum xp_retval
898 xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
899                          u16 payload_size, xpc_notify_func func, void *key)
900 {
901         struct xpc_partition *part = &xpc_partitions[partid];
902         enum xp_retval ret = xpUnknownReason;
903
904         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
905                 partid, ch_number);
906
907         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
908         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
909         DBUG_ON(payload == NULL);
910         DBUG_ON(func == NULL);
911
912         if (xpc_part_ref(part)) {
913                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
914                                    payload_size, XPC_N_CALL, func, key);
915                 xpc_part_deref(part);
916         }
917         return ret;
918 }
919
920 /*
921  * Deliver a message to its intended recipient.
922  */
923 void
924 xpc_deliver_msg(struct xpc_channel *ch)
925 {
926         struct xpc_msg *msg;
927
928         msg = xpc_get_deliverable_msg(ch);
929         if (msg != NULL) {
930
931                 /*
932                  * This ref is taken to protect the payload itself from being
933                  * freed before the user is finished with it, which the user
934                  * indicates by calling xpc_initiate_received().
935                  */
936                 xpc_msgqueue_ref(ch);
937
938                 atomic_inc(&ch->kthreads_active);
939
940                 if (ch->func != NULL) {
941                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
942                                 "msg_number=%ld, partid=%d, channel=%d\n",
943                                 (void *)msg, msg->number, ch->partid,
944                                 ch->number);
945
946                         /* deliver the message to its intended recipient */
947                         ch->func(xpMsgReceived, ch->partid, ch->number,
948                                  &msg->payload, ch->key);
949
950                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
951                                 "msg_number=%ld, partid=%d, channel=%d\n",
952                                 (void *)msg, msg->number, ch->partid,
953                                 ch->number);
954                 }
955
956                 atomic_dec(&ch->kthreads_active);
957         }
958 }
959
960 /*
961  * Acknowledge receipt of a delivered message.
962  *
963  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
964  * that sent the message.
965  *
966  * This function, although called by users, does not call xpc_part_ref() to
967  * ensure that the partition infrastructure is in place. It relies on the
968  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
969  *
970  * Arguments:
971  *
972  *      partid - ID of partition to which the channel is connected.
973  *      ch_number - channel # message received on.
974  *      payload - pointer to the payload area allocated via
975  *                      xpc_initiate_send() or xpc_initiate_send_notify().
976  */
977 void
978 xpc_initiate_received(short partid, int ch_number, void *payload)
979 {
980         struct xpc_partition *part = &xpc_partitions[partid];
981         struct xpc_channel *ch;
982         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
983
984         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
985         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
986
987         ch = &part->channels[ch_number];
988         xpc_received_msg(ch, msg);
989
990         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
991         xpc_msgqueue_deref(ch);
992 }