461d9872d4d395a92e4475fc76cdb16c058fb252
[safe/jmp/linux-2.6] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50
51 #include "drbd_vli.h"
52
53 struct flush_work {
54         struct drbd_work w;
55         struct drbd_epoch *epoch;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72         struct drbd_epoch *prev;
73         spin_lock(&mdev->epoch_lock);
74         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75         if (prev == epoch || prev == mdev->current_epoch)
76                 prev = NULL;
77         spin_unlock(&mdev->epoch_lock);
78         return prev;
79 }
80
81 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83 /*
84  * some helper functions to deal with single linked page lists,
85  * page->private being our "next" pointer.
86  */
87
88 /* If at least n pages are linked at head, get n pages off.
89  * Otherwise, don't modify head, and return NULL.
90  * Locking is the responsibility of the caller.
91  */
92 static struct page *page_chain_del(struct page **head, int n)
93 {
94         struct page *page;
95         struct page *tmp;
96
97         BUG_ON(!n);
98         BUG_ON(!head);
99
100         page = *head;
101         while (page) {
102                 tmp = page_chain_next(page);
103                 if (--n == 0)
104                         break; /* found sufficient pages */
105                 if (tmp == NULL)
106                         /* insufficient pages, don't use any of them. */
107                         return NULL;
108                 page = tmp;
109         }
110
111         /* add end of list marker for the returned list */
112         set_page_private(page, 0);
113         /* actual return value, and adjustment of head */
114         page = *head;
115         *head = tmp;
116         return page;
117 }
118
119 /* may be used outside of locks to find the tail of a (usually short)
120  * "private" page chain, before adding it back to a global chain head
121  * with page_chain_add() under a spinlock. */
122 static struct page *page_chain_tail(struct page *page, int *len)
123 {
124         struct page *tmp;
125         int i = 1;
126         while ((tmp = page_chain_next(page)))
127                 ++i, page = tmp;
128         if (len)
129                 *len = i;
130         return page;
131 }
132
133 static int page_chain_free(struct page *page)
134 {
135         struct page *tmp;
136         int i = 0;
137         page_chain_for_each_safe(page, tmp) {
138                 put_page(page);
139                 ++i;
140         }
141         return i;
142 }
143
144 static void page_chain_add(struct page **head,
145                 struct page *chain_first, struct page *chain_last)
146 {
147 #if 1
148         struct page *tmp;
149         tmp = page_chain_tail(chain_first, NULL);
150         BUG_ON(tmp != chain_last);
151 #endif
152
153         /* add chain to head */
154         set_page_private(chain_last, (unsigned long)*head);
155         *head = chain_first;
156 }
157
158 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_pp_alloc will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 /* kick lower level device, if we have more than (arbitrary number)
204  * reference counts on it, which typically are locally submitted io
205  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
206 static void maybe_kick_lo(struct drbd_conf *mdev)
207 {
208         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
209                 drbd_kick_lo(mdev);
210 }
211
212 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
213 {
214         struct drbd_epoch_entry *e;
215         struct list_head *le, *tle;
216
217         /* The EEs are always appended to the end of the list. Since
218            they are sent in order over the wire, they have to finish
219            in order. As soon as we see the first not finished we can
220            stop to examine the list... */
221
222         list_for_each_safe(le, tle, &mdev->net_ee) {
223                 e = list_entry(le, struct drbd_epoch_entry, w.list);
224                 if (drbd_ee_has_active_page(e))
225                         break;
226                 list_move(le, to_be_freed);
227         }
228 }
229
230 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
231 {
232         LIST_HEAD(reclaimed);
233         struct drbd_epoch_entry *e, *t;
234
235         maybe_kick_lo(mdev);
236         spin_lock_irq(&mdev->req_lock);
237         reclaim_net_ee(mdev, &reclaimed);
238         spin_unlock_irq(&mdev->req_lock);
239
240         list_for_each_entry_safe(e, t, &reclaimed, w.list)
241                 drbd_free_ee(mdev, e);
242 }
243
244 /**
245  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
246  * @mdev:       DRBD device.
247  * @number:     number of pages requested
248  * @retry:      whether to retry, if not enough pages are available right now
249  *
250  * Tries to allocate number pages, first from our own page pool, then from
251  * the kernel, unless this allocation would exceed the max_buffers setting.
252  * Possibly retry until DRBD frees sufficient pages somewhere else.
253  *
254  * Returns a page chain linked via page->private.
255  */
256 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
257 {
258         struct page *page = NULL;
259         DEFINE_WAIT(wait);
260
261         /* Yes, we may run up to @number over max_buffers. If we
262          * follow it strictly, the admin will get it wrong anyways. */
263         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
264                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
265
266         while (page == NULL) {
267                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
268
269                 drbd_kick_lo_and_reclaim_net(mdev);
270
271                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
272                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
273                         if (page)
274                                 break;
275                 }
276
277                 if (!retry)
278                         break;
279
280                 if (signal_pending(current)) {
281                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
282                         break;
283                 }
284
285                 schedule();
286         }
287         finish_wait(&drbd_pp_wait, &wait);
288
289         if (page)
290                 atomic_add(number, &mdev->pp_in_use);
291         return page;
292 }
293
294 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
295  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296  * Either links the page chain back to the global pool,
297  * or returns all pages to the system. */
298 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
299 {
300         int i;
301         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         atomic_sub(i, &mdev->pp_in_use);
312         i = atomic_read(&mdev->pp_in_use);
313         if (i < 0)
314                 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_ee()
324  drbd_alloc_ee()
325  drbd_init_ee()
326  drbd_release_ee()
327  drbd_ee_fix_bhs()
328  drbd_process_done_ee()
329  drbd_clear_done_ee()
330  drbd_wait_ee_list_empty()
331 */
332
333 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
334                                      u64 id,
335                                      sector_t sector,
336                                      unsigned int data_size,
337                                      gfp_t gfp_mask) __must_hold(local)
338 {
339         struct drbd_epoch_entry *e;
340         struct page *page;
341         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
342
343         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
344                 return NULL;
345
346         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347         if (!e) {
348                 if (!(gfp_mask & __GFP_NOWARN))
349                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
350                 return NULL;
351         }
352
353         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
354         if (!page)
355                 goto fail;
356
357         INIT_HLIST_NODE(&e->colision);
358         e->epoch = NULL;
359         e->mdev = mdev;
360         e->pages = page;
361         atomic_set(&e->pending_bios, 0);
362         e->size = data_size;
363         e->flags = 0;
364         e->sector = sector;
365         e->sector = sector;
366         e->block_id = id;
367
368         return e;
369
370  fail:
371         mempool_free(e, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
376 {
377         drbd_pp_free(mdev, e->pages);
378         D_ASSERT(atomic_read(&e->pending_bios) == 0);
379         D_ASSERT(hlist_unhashed(&e->colision));
380         mempool_free(e, drbd_ee_mempool);
381 }
382
383 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
384 {
385         LIST_HEAD(work_list);
386         struct drbd_epoch_entry *e, *t;
387         int count = 0;
388
389         spin_lock_irq(&mdev->req_lock);
390         list_splice_init(list, &work_list);
391         spin_unlock_irq(&mdev->req_lock);
392
393         list_for_each_entry_safe(e, t, &work_list, w.list) {
394                 drbd_free_ee(mdev, e);
395                 count++;
396         }
397         return count;
398 }
399
400
401 /*
402  * This function is called from _asender only_
403  * but see also comments in _req_mod(,barrier_acked)
404  * and receive_Barrier.
405  *
406  * Move entries from net_ee to done_ee, if ready.
407  * Grab done_ee, call all callbacks, free the entries.
408  * The callbacks typically send out ACKs.
409  */
410 static int drbd_process_done_ee(struct drbd_conf *mdev)
411 {
412         LIST_HEAD(work_list);
413         LIST_HEAD(reclaimed);
414         struct drbd_epoch_entry *e, *t;
415         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
416
417         spin_lock_irq(&mdev->req_lock);
418         reclaim_net_ee(mdev, &reclaimed);
419         list_splice_init(&mdev->done_ee, &work_list);
420         spin_unlock_irq(&mdev->req_lock);
421
422         list_for_each_entry_safe(e, t, &reclaimed, w.list)
423                 drbd_free_ee(mdev, e);
424
425         /* possible callbacks here:
426          * e_end_block, and e_end_resync_block, e_send_discard_ack.
427          * all ignore the last argument.
428          */
429         list_for_each_entry_safe(e, t, &work_list, w.list) {
430                 /* list_del not necessary, next/prev members not touched */
431                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
432                 drbd_free_ee(mdev, e);
433         }
434         wake_up(&mdev->ee_wait);
435
436         return ok;
437 }
438
439 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440 {
441         DEFINE_WAIT(wait);
442
443         /* avoids spin_lock/unlock
444          * and calling prepare_to_wait in the fast path */
445         while (!list_empty(head)) {
446                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
447                 spin_unlock_irq(&mdev->req_lock);
448                 drbd_kick_lo(mdev);
449                 schedule();
450                 finish_wait(&mdev->ee_wait, &wait);
451                 spin_lock_irq(&mdev->req_lock);
452         }
453 }
454
455 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456 {
457         spin_lock_irq(&mdev->req_lock);
458         _drbd_wait_ee_list_empty(mdev, head);
459         spin_unlock_irq(&mdev->req_lock);
460 }
461
462 /* see also kernel_accept; which is only present since 2.6.18.
463  * also we want to log which part of it failed, exactly */
464 static int drbd_accept(struct drbd_conf *mdev, const char **what,
465                 struct socket *sock, struct socket **newsock)
466 {
467         struct sock *sk = sock->sk;
468         int err = 0;
469
470         *what = "listen";
471         err = sock->ops->listen(sock, 5);
472         if (err < 0)
473                 goto out;
474
475         *what = "sock_create_lite";
476         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
477                                newsock);
478         if (err < 0)
479                 goto out;
480
481         *what = "accept";
482         err = sock->ops->accept(sock, *newsock, 0);
483         if (err < 0) {
484                 sock_release(*newsock);
485                 *newsock = NULL;
486                 goto out;
487         }
488         (*newsock)->ops  = sock->ops;
489
490 out:
491         return err;
492 }
493
494 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
495                     void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 dev_info(DEV, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         dev_info(DEV, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
566
567         return rv;
568 }
569
570 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571 {
572         const char *what;
573         struct socket *sock;
574         struct sockaddr_in6 src_in6;
575         int err;
576         int disconnect_on_error = 1;
577
578         if (!get_net_conf(mdev))
579                 return NULL;
580
581         what = "sock_create_kern";
582         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583                 SOCK_STREAM, IPPROTO_TCP, &sock);
584         if (err < 0) {
585                 sock = NULL;
586                 goto out;
587         }
588
589         sock->sk->sk_rcvtimeo =
590         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
591
592        /* explicitly bind to the configured IP as source IP
593         *  for the outgoing connections.
594         *  This is needed for multihomed hosts and to be
595         *  able to use lo: interfaces for drbd.
596         * Make sure to use 0 as port number, so linux selects
597         *  a free one dynamically.
598         */
599         memcpy(&src_in6, mdev->net_conf->my_addr,
600                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
601         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
602                 src_in6.sin6_port = 0;
603         else
604                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
605
606         what = "bind before connect";
607         err = sock->ops->bind(sock,
608                               (struct sockaddr *) &src_in6,
609                               mdev->net_conf->my_addr_len);
610         if (err < 0)
611                 goto out;
612
613         /* connect may fail, peer not yet available.
614          * stay C_WF_CONNECTION, don't go Disconnecting! */
615         disconnect_on_error = 0;
616         what = "connect";
617         err = sock->ops->connect(sock,
618                                  (struct sockaddr *)mdev->net_conf->peer_addr,
619                                  mdev->net_conf->peer_addr_len, 0);
620
621 out:
622         if (err < 0) {
623                 if (sock) {
624                         sock_release(sock);
625                         sock = NULL;
626                 }
627                 switch (-err) {
628                         /* timeout, busy, signal pending */
629                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
630                 case EINTR: case ERESTARTSYS:
631                         /* peer not (yet) available, network problem */
632                 case ECONNREFUSED: case ENETUNREACH:
633                 case EHOSTDOWN:    case EHOSTUNREACH:
634                         disconnect_on_error = 0;
635                         break;
636                 default:
637                         dev_err(DEV, "%s failed, err = %d\n", what, err);
638                 }
639                 if (disconnect_on_error)
640                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
641         }
642         put_net_conf(mdev);
643         return sock;
644 }
645
646 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
647 {
648         int timeo, err;
649         struct socket *s_estab = NULL, *s_listen;
650         const char *what;
651
652         if (!get_net_conf(mdev))
653                 return NULL;
654
655         what = "sock_create_kern";
656         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
657                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
658         if (err) {
659                 s_listen = NULL;
660                 goto out;
661         }
662
663         timeo = mdev->net_conf->try_connect_int * HZ;
664         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
665
666         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
667         s_listen->sk->sk_rcvtimeo = timeo;
668         s_listen->sk->sk_sndtimeo = timeo;
669
670         what = "bind before listen";
671         err = s_listen->ops->bind(s_listen,
672                               (struct sockaddr *) mdev->net_conf->my_addr,
673                               mdev->net_conf->my_addr_len);
674         if (err < 0)
675                 goto out;
676
677         err = drbd_accept(mdev, &what, s_listen, &s_estab);
678
679 out:
680         if (s_listen)
681                 sock_release(s_listen);
682         if (err < 0) {
683                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
684                         dev_err(DEV, "%s failed, err = %d\n", what, err);
685                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
686                 }
687         }
688         put_net_conf(mdev);
689
690         return s_estab;
691 }
692
693 static int drbd_send_fp(struct drbd_conf *mdev,
694         struct socket *sock, enum drbd_packets cmd)
695 {
696         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
697
698         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
699 }
700
701 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
702 {
703         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
704         int rr;
705
706         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
707
708         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
709                 return be16_to_cpu(h->command);
710
711         return 0xffff;
712 }
713
714 /**
715  * drbd_socket_okay() - Free the socket if its connection is not okay
716  * @mdev:       DRBD device.
717  * @sock:       pointer to the pointer to the socket.
718  */
719 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
720 {
721         int rr;
722         char tb[4];
723
724         if (!*sock)
725                 return FALSE;
726
727         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
728
729         if (rr > 0 || rr == -EAGAIN) {
730                 return TRUE;
731         } else {
732                 sock_release(*sock);
733                 *sock = NULL;
734                 return FALSE;
735         }
736 }
737
738 /*
739  * return values:
740  *   1 yes, we have a valid connection
741  *   0 oops, did not work out, please try again
742  *  -1 peer talks different language,
743  *     no point in trying again, please go standalone.
744  *  -2 We do not have a network config...
745  */
746 static int drbd_connect(struct drbd_conf *mdev)
747 {
748         struct socket *s, *sock, *msock;
749         int try, h, ok;
750
751         D_ASSERT(!mdev->data.socket);
752
753         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
754                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
755
756         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757                 return -2;
758
759         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761         sock  = NULL;
762         msock = NULL;
763
764         do {
765                 for (try = 0;;) {
766                         /* 3 tries, this should take less than a second! */
767                         s = drbd_try_connect(mdev);
768                         if (s || ++try >= 3)
769                                 break;
770                         /* give the other side time to call bind() & listen() */
771                         __set_current_state(TASK_INTERRUPTIBLE);
772                         schedule_timeout(HZ / 10);
773                 }
774
775                 if (s) {
776                         if (!sock) {
777                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
778                                 sock = s;
779                                 s = NULL;
780                         } else if (!msock) {
781                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
782                                 msock = s;
783                                 s = NULL;
784                         } else {
785                                 dev_err(DEV, "Logic error in drbd_connect()\n");
786                                 goto out_release_sockets;
787                         }
788                 }
789
790                 if (sock && msock) {
791                         __set_current_state(TASK_INTERRUPTIBLE);
792                         schedule_timeout(HZ / 10);
793                         ok = drbd_socket_okay(mdev, &sock);
794                         ok = drbd_socket_okay(mdev, &msock) && ok;
795                         if (ok)
796                                 break;
797                 }
798
799 retry:
800                 s = drbd_wait_for_connect(mdev);
801                 if (s) {
802                         try = drbd_recv_fp(mdev, s);
803                         drbd_socket_okay(mdev, &sock);
804                         drbd_socket_okay(mdev, &msock);
805                         switch (try) {
806                         case P_HAND_SHAKE_S:
807                                 if (sock) {
808                                         dev_warn(DEV, "initial packet S crossed\n");
809                                         sock_release(sock);
810                                 }
811                                 sock = s;
812                                 break;
813                         case P_HAND_SHAKE_M:
814                                 if (msock) {
815                                         dev_warn(DEV, "initial packet M crossed\n");
816                                         sock_release(msock);
817                                 }
818                                 msock = s;
819                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
820                                 break;
821                         default:
822                                 dev_warn(DEV, "Error receiving initial packet\n");
823                                 sock_release(s);
824                                 if (random32() & 1)
825                                         goto retry;
826                         }
827                 }
828
829                 if (mdev->state.conn <= C_DISCONNECTING)
830                         goto out_release_sockets;
831                 if (signal_pending(current)) {
832                         flush_signals(current);
833                         smp_rmb();
834                         if (get_t_state(&mdev->receiver) == Exiting)
835                                 goto out_release_sockets;
836                 }
837
838                 if (sock && msock) {
839                         ok = drbd_socket_okay(mdev, &sock);
840                         ok = drbd_socket_okay(mdev, &msock) && ok;
841                         if (ok)
842                                 break;
843                 }
844         } while (1);
845
846         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
847         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
848
849         sock->sk->sk_allocation = GFP_NOIO;
850         msock->sk->sk_allocation = GFP_NOIO;
851
852         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854
855         if (mdev->net_conf->sndbuf_size) {
856                 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
857                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
858         }
859
860         if (mdev->net_conf->rcvbuf_size) {
861                 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
862                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
863         }
864
865         /* NOT YET ...
866          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
867          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
868          * first set it to the P_HAND_SHAKE timeout,
869          * which we set to 4x the configured ping_timeout. */
870         sock->sk->sk_sndtimeo =
871         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
872
873         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
874         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
875
876         /* we don't want delays.
877          * we use TCP_CORK where apropriate, though */
878         drbd_tcp_nodelay(sock);
879         drbd_tcp_nodelay(msock);
880
881         mdev->data.socket = sock;
882         mdev->meta.socket = msock;
883         mdev->last_received = jiffies;
884
885         D_ASSERT(mdev->asender.task == NULL);
886
887         h = drbd_do_handshake(mdev);
888         if (h <= 0)
889                 return h;
890
891         if (mdev->cram_hmac_tfm) {
892                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
893                 switch (drbd_do_auth(mdev)) {
894                 case -1:
895                         dev_err(DEV, "Authentication of peer failed\n");
896                         return -1;
897                 case 0:
898                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
899                         return 0;
900                 }
901         }
902
903         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
904                 return 0;
905
906         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
907         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
908
909         atomic_set(&mdev->packet_seq, 0);
910         mdev->peer_seq = 0;
911
912         drbd_thread_start(&mdev->asender);
913
914         if (!drbd_send_protocol(mdev))
915                 return -1;
916         drbd_send_sync_param(mdev, &mdev->sync_conf);
917         drbd_send_sizes(mdev, 0, 0);
918         drbd_send_uuids(mdev);
919         drbd_send_state(mdev);
920         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
921         clear_bit(RESIZE_PENDING, &mdev->flags);
922
923         return 1;
924
925 out_release_sockets:
926         if (sock)
927                 sock_release(sock);
928         if (msock)
929                 sock_release(msock);
930         return -1;
931 }
932
933 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
934 {
935         int r;
936
937         r = drbd_recv(mdev, h, sizeof(*h));
938
939         if (unlikely(r != sizeof(*h))) {
940                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
941                 return FALSE;
942         };
943         h->command = be16_to_cpu(h->command);
944         h->length  = be16_to_cpu(h->length);
945         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
946                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
947                     (long)be32_to_cpu(h->magic),
948                     h->command, h->length);
949                 return FALSE;
950         }
951         mdev->last_received = jiffies;
952
953         return TRUE;
954 }
955
956 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
957 {
958         int rv;
959
960         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
961                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962                                         NULL, BLKDEV_IFL_WAIT);
963                 if (rv) {
964                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
965                         /* would rather check on EOPNOTSUPP, but that is not reliable.
966                          * don't try again for ANY return value != 0
967                          * if (rv == -EOPNOTSUPP) */
968                         drbd_bump_write_ordering(mdev, WO_drain_io);
969                 }
970                 put_ldev(mdev);
971         }
972
973         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
974 }
975
976 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
977 {
978         struct flush_work *fw = (struct flush_work *)w;
979         struct drbd_epoch *epoch = fw->epoch;
980
981         kfree(w);
982
983         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
984                 drbd_flush_after_epoch(mdev, epoch);
985
986         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
987                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
988
989         return 1;
990 }
991
992 /**
993  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994  * @mdev:       DRBD device.
995  * @epoch:      Epoch object.
996  * @ev:         Epoch event.
997  */
998 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999                                                struct drbd_epoch *epoch,
1000                                                enum epoch_event ev)
1001 {
1002         int finish, epoch_size;
1003         struct drbd_epoch *next_epoch;
1004         int schedule_flush = 0;
1005         enum finish_epoch rv = FE_STILL_LIVE;
1006
1007         spin_lock(&mdev->epoch_lock);
1008         do {
1009                 next_epoch = NULL;
1010                 finish = 0;
1011
1012                 epoch_size = atomic_read(&epoch->epoch_size);
1013
1014                 switch (ev & ~EV_CLEANUP) {
1015                 case EV_PUT:
1016                         atomic_dec(&epoch->active);
1017                         break;
1018                 case EV_GOT_BARRIER_NR:
1019                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1020
1021                         /* Special case: If we just switched from WO_bio_barrier to
1022                            WO_bdev_flush we should not finish the current epoch */
1023                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1024                             mdev->write_ordering != WO_bio_barrier &&
1025                             epoch == mdev->current_epoch)
1026                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1027                         break;
1028                 case EV_BARRIER_DONE:
1029                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1030                         break;
1031                 case EV_BECAME_LAST:
1032                         /* nothing to do*/
1033                         break;
1034                 }
1035
1036                 if (epoch_size != 0 &&
1037                     atomic_read(&epoch->active) == 0 &&
1038                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1039                     epoch->list.prev == &mdev->current_epoch->list &&
1040                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1041                         /* Nearly all conditions are met to finish that epoch... */
1042                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1043                             mdev->write_ordering == WO_none ||
1044                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1045                             ev & EV_CLEANUP) {
1046                                 finish = 1;
1047                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1048                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1049                                  mdev->write_ordering == WO_bio_barrier) {
1050                                 atomic_inc(&epoch->active);
1051                                 schedule_flush = 1;
1052                         }
1053                 }
1054                 if (finish) {
1055                         if (!(ev & EV_CLEANUP)) {
1056                                 spin_unlock(&mdev->epoch_lock);
1057                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1058                                 spin_lock(&mdev->epoch_lock);
1059                         }
1060                         dec_unacked(mdev);
1061
1062                         if (mdev->current_epoch != epoch) {
1063                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1064                                 list_del(&epoch->list);
1065                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1066                                 mdev->epochs--;
1067                                 kfree(epoch);
1068
1069                                 if (rv == FE_STILL_LIVE)
1070                                         rv = FE_DESTROYED;
1071                         } else {
1072                                 epoch->flags = 0;
1073                                 atomic_set(&epoch->epoch_size, 0);
1074                                 /* atomic_set(&epoch->active, 0); is alrady zero */
1075                                 if (rv == FE_STILL_LIVE)
1076                                         rv = FE_RECYCLED;
1077                         }
1078                 }
1079
1080                 if (!next_epoch)
1081                         break;
1082
1083                 epoch = next_epoch;
1084         } while (1);
1085
1086         spin_unlock(&mdev->epoch_lock);
1087
1088         if (schedule_flush) {
1089                 struct flush_work *fw;
1090                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1091                 if (fw) {
1092                         fw->w.cb = w_flush;
1093                         fw->epoch = epoch;
1094                         drbd_queue_work(&mdev->data.work, &fw->w);
1095                 } else {
1096                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1097                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1098                         /* That is not a recursion, only one level */
1099                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1100                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1101                 }
1102         }
1103
1104         return rv;
1105 }
1106
1107 /**
1108  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1109  * @mdev:       DRBD device.
1110  * @wo:         Write ordering method to try.
1111  */
1112 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1113 {
1114         enum write_ordering_e pwo;
1115         static char *write_ordering_str[] = {
1116                 [WO_none] = "none",
1117                 [WO_drain_io] = "drain",
1118                 [WO_bdev_flush] = "flush",
1119                 [WO_bio_barrier] = "barrier",
1120         };
1121
1122         pwo = mdev->write_ordering;
1123         wo = min(pwo, wo);
1124         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1125                 wo = WO_bdev_flush;
1126         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1127                 wo = WO_drain_io;
1128         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1129                 wo = WO_none;
1130         mdev->write_ordering = wo;
1131         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1132                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1133 }
1134
1135 /**
1136  * drbd_submit_ee()
1137  * @mdev:       DRBD device.
1138  * @e:          epoch entry
1139  * @rw:         flag field, see bio->bi_rw
1140  */
1141 /* TODO allocate from our own bio_set. */
1142 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143                 const unsigned rw, const int fault_type)
1144 {
1145         struct bio *bios = NULL;
1146         struct bio *bio;
1147         struct page *page = e->pages;
1148         sector_t sector = e->sector;
1149         unsigned ds = e->size;
1150         unsigned n_bios = 0;
1151         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153         if (atomic_read(&mdev->new_c_uuid)) {
1154                 if (atomic_add_unless(&mdev->new_c_uuid, -1, 1)) {
1155                         drbd_uuid_new_current(mdev);
1156                         drbd_md_sync(mdev);
1157
1158                         atomic_dec(&mdev->new_c_uuid);
1159                         wake_up(&mdev->misc_wait);
1160                 }
1161                 wait_event(mdev->misc_wait, !atomic_read(&mdev->new_c_uuid));
1162         }
1163
1164         /* In most cases, we will only need one bio.  But in case the lower
1165          * level restrictions happen to be different at this offset on this
1166          * side than those of the sending peer, we may need to submit the
1167          * request in more than one bio. */
1168 next_bio:
1169         bio = bio_alloc(GFP_NOIO, nr_pages);
1170         if (!bio) {
1171                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1172                 goto fail;
1173         }
1174         /* > e->sector, unless this is the first bio */
1175         bio->bi_sector = sector;
1176         bio->bi_bdev = mdev->ldev->backing_bdev;
1177         /* we special case some flags in the multi-bio case, see below
1178          * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1179         bio->bi_rw = rw;
1180         bio->bi_private = e;
1181         bio->bi_end_io = drbd_endio_sec;
1182
1183         bio->bi_next = bios;
1184         bios = bio;
1185         ++n_bios;
1186
1187         page_chain_for_each(page) {
1188                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1189                 if (!bio_add_page(bio, page, len, 0)) {
1190                         /* a single page must always be possible! */
1191                         BUG_ON(bio->bi_vcnt == 0);
1192                         goto next_bio;
1193                 }
1194                 ds -= len;
1195                 sector += len >> 9;
1196                 --nr_pages;
1197         }
1198         D_ASSERT(page == NULL);
1199         D_ASSERT(ds == 0);
1200
1201         atomic_set(&e->pending_bios, n_bios);
1202         do {
1203                 bio = bios;
1204                 bios = bios->bi_next;
1205                 bio->bi_next = NULL;
1206
1207                 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1208                 if (bios)
1209                         bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1210
1211                 drbd_generic_make_request(mdev, fault_type, bio);
1212
1213                 /* strip off BIO_RW_BARRIER,
1214                  * unless it is the first or last bio */
1215                 if (bios && bios->bi_next)
1216                         bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1217         } while (bios);
1218         maybe_kick_lo(mdev);
1219         return 0;
1220
1221 fail:
1222         while (bios) {
1223                 bio = bios;
1224                 bios = bios->bi_next;
1225                 bio_put(bio);
1226         }
1227         return -ENOMEM;
1228 }
1229
1230 /**
1231  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1232  * @mdev:       DRBD device.
1233  * @w:          work object.
1234  * @cancel:     The connection will be closed anyways (unused in this callback)
1235  */
1236 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1237 {
1238         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1239         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1240            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1241            so that we can finish that epoch in drbd_may_finish_epoch().
1242            That is necessary if we already have a long chain of Epochs, before
1243            we realize that BIO_RW_BARRIER is actually not supported */
1244
1245         /* As long as the -ENOTSUPP on the barrier is reported immediately
1246            that will never trigger. If it is reported late, we will just
1247            print that warning and continue correctly for all future requests
1248            with WO_bdev_flush */
1249         if (previous_epoch(mdev, e->epoch))
1250                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1251
1252         /* we still have a local reference,
1253          * get_ldev was done in receive_Data. */
1254
1255         e->w.cb = e_end_block;
1256         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1257                 /* drbd_submit_ee fails for one reason only:
1258                  * if was not able to allocate sufficient bios.
1259                  * requeue, try again later. */
1260                 e->w.cb = w_e_reissue;
1261                 drbd_queue_work(&mdev->data.work, &e->w);
1262         }
1263         return 1;
1264 }
1265
1266 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1267 {
1268         int rv, issue_flush;
1269         struct p_barrier *p = (struct p_barrier *)h;
1270         struct drbd_epoch *epoch;
1271
1272         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1273
1274         rv = drbd_recv(mdev, h->payload, h->length);
1275         ERR_IF(rv != h->length) return FALSE;
1276
1277         inc_unacked(mdev);
1278
1279         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1280                 drbd_kick_lo(mdev);
1281
1282         mdev->current_epoch->barrier_nr = p->barrier;
1283         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1284
1285         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1286          * the activity log, which means it would not be resynced in case the
1287          * R_PRIMARY crashes now.
1288          * Therefore we must send the barrier_ack after the barrier request was
1289          * completed. */
1290         switch (mdev->write_ordering) {
1291         case WO_bio_barrier:
1292         case WO_none:
1293                 if (rv == FE_RECYCLED)
1294                         return TRUE;
1295                 break;
1296
1297         case WO_bdev_flush:
1298         case WO_drain_io:
1299                 if (rv == FE_STILL_LIVE) {
1300                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1301                         drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1302                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1303                 }
1304                 if (rv == FE_RECYCLED)
1305                         return TRUE;
1306
1307                 /* The asender will send all the ACKs and barrier ACKs out, since
1308                    all EEs moved from the active_ee to the done_ee. We need to
1309                    provide a new epoch object for the EEs that come in soon */
1310                 break;
1311         }
1312
1313         /* receiver context, in the writeout path of the other node.
1314          * avoid potential distributed deadlock */
1315         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1316         if (!epoch) {
1317                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1318                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1319                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1320                 if (issue_flush) {
1321                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1322                         if (rv == FE_RECYCLED)
1323                                 return TRUE;
1324                 }
1325
1326                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1327
1328                 return TRUE;
1329         }
1330
1331         epoch->flags = 0;
1332         atomic_set(&epoch->epoch_size, 0);
1333         atomic_set(&epoch->active, 0);
1334
1335         spin_lock(&mdev->epoch_lock);
1336         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1337                 list_add(&epoch->list, &mdev->current_epoch->list);
1338                 mdev->current_epoch = epoch;
1339                 mdev->epochs++;
1340         } else {
1341                 /* The current_epoch got recycled while we allocated this one... */
1342                 kfree(epoch);
1343         }
1344         spin_unlock(&mdev->epoch_lock);
1345
1346         return TRUE;
1347 }
1348
1349 /* used from receive_RSDataReply (recv_resync_read)
1350  * and from receive_Data */
1351 static struct drbd_epoch_entry *
1352 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1353 {
1354         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1355         struct drbd_epoch_entry *e;
1356         struct page *page;
1357         int dgs, ds, rr;
1358         void *dig_in = mdev->int_dig_in;
1359         void *dig_vv = mdev->int_dig_vv;
1360         unsigned long *data;
1361
1362         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1363                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1364
1365         if (dgs) {
1366                 rr = drbd_recv(mdev, dig_in, dgs);
1367                 if (rr != dgs) {
1368                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1369                              rr, dgs);
1370                         return NULL;
1371                 }
1372         }
1373
1374         data_size -= dgs;
1375
1376         ERR_IF(data_size &  0x1ff) return NULL;
1377         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1378
1379         /* even though we trust out peer,
1380          * we sometimes have to double check. */
1381         if (sector + (data_size>>9) > capacity) {
1382                 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1383                         (unsigned long long)capacity,
1384                         (unsigned long long)sector, data_size);
1385                 return NULL;
1386         }
1387
1388         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1389          * "criss-cross" setup, that might cause write-out on some other DRBD,
1390          * which in turn might block on the other node at this very place.  */
1391         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1392         if (!e)
1393                 return NULL;
1394
1395         ds = data_size;
1396         page = e->pages;
1397         page_chain_for_each(page) {
1398                 unsigned len = min_t(int, ds, PAGE_SIZE);
1399                 data = kmap(page);
1400                 rr = drbd_recv(mdev, data, len);
1401                 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1402                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1403                         data[0] = data[0] ^ (unsigned long)-1;
1404                 }
1405                 kunmap(page);
1406                 if (rr != len) {
1407                         drbd_free_ee(mdev, e);
1408                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1409                              rr, len);
1410                         return NULL;
1411                 }
1412                 ds -= rr;
1413         }
1414
1415         if (dgs) {
1416                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1417                 if (memcmp(dig_in, dig_vv, dgs)) {
1418                         dev_err(DEV, "Digest integrity check FAILED.\n");
1419                         drbd_bcast_ee(mdev, "digest failed",
1420                                         dgs, dig_in, dig_vv, e);
1421                         drbd_free_ee(mdev, e);
1422                         return NULL;
1423                 }
1424         }
1425         mdev->recv_cnt += data_size>>9;
1426         return e;
1427 }
1428
1429 /* drbd_drain_block() just takes a data block
1430  * out of the socket input buffer, and discards it.
1431  */
1432 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1433 {
1434         struct page *page;
1435         int rr, rv = 1;
1436         void *data;
1437
1438         if (!data_size)
1439                 return TRUE;
1440
1441         page = drbd_pp_alloc(mdev, 1, 1);
1442
1443         data = kmap(page);
1444         while (data_size) {
1445                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1446                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1447                         rv = 0;
1448                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1449                              rr, min_t(int, data_size, PAGE_SIZE));
1450                         break;
1451                 }
1452                 data_size -= rr;
1453         }
1454         kunmap(page);
1455         drbd_pp_free(mdev, page);
1456         return rv;
1457 }
1458
1459 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1460                            sector_t sector, int data_size)
1461 {
1462         struct bio_vec *bvec;
1463         struct bio *bio;
1464         int dgs, rr, i, expect;
1465         void *dig_in = mdev->int_dig_in;
1466         void *dig_vv = mdev->int_dig_vv;
1467
1468         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1469                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1470
1471         if (dgs) {
1472                 rr = drbd_recv(mdev, dig_in, dgs);
1473                 if (rr != dgs) {
1474                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1475                              rr, dgs);
1476                         return 0;
1477                 }
1478         }
1479
1480         data_size -= dgs;
1481
1482         /* optimistically update recv_cnt.  if receiving fails below,
1483          * we disconnect anyways, and counters will be reset. */
1484         mdev->recv_cnt += data_size>>9;
1485
1486         bio = req->master_bio;
1487         D_ASSERT(sector == bio->bi_sector);
1488
1489         bio_for_each_segment(bvec, bio, i) {
1490                 expect = min_t(int, data_size, bvec->bv_len);
1491                 rr = drbd_recv(mdev,
1492                              kmap(bvec->bv_page)+bvec->bv_offset,
1493                              expect);
1494                 kunmap(bvec->bv_page);
1495                 if (rr != expect) {
1496                         dev_warn(DEV, "short read receiving data reply: "
1497                              "read %d expected %d\n",
1498                              rr, expect);
1499                         return 0;
1500                 }
1501                 data_size -= rr;
1502         }
1503
1504         if (dgs) {
1505                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1506                 if (memcmp(dig_in, dig_vv, dgs)) {
1507                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1508                         return 0;
1509                 }
1510         }
1511
1512         D_ASSERT(data_size == 0);
1513         return 1;
1514 }
1515
1516 /* e_end_resync_block() is called via
1517  * drbd_process_done_ee() by asender only */
1518 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1519 {
1520         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1521         sector_t sector = e->sector;
1522         int ok;
1523
1524         D_ASSERT(hlist_unhashed(&e->colision));
1525
1526         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1527                 drbd_set_in_sync(mdev, sector, e->size);
1528                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1529         } else {
1530                 /* Record failure to sync */
1531                 drbd_rs_failed_io(mdev, sector, e->size);
1532
1533                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1534         }
1535         dec_unacked(mdev);
1536
1537         return ok;
1538 }
1539
1540 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1541 {
1542         struct drbd_epoch_entry *e;
1543
1544         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1545         if (!e)
1546                 goto fail;
1547
1548         dec_rs_pending(mdev);
1549
1550         inc_unacked(mdev);
1551         /* corresponding dec_unacked() in e_end_resync_block()
1552          * respective _drbd_clear_done_ee */
1553
1554         e->w.cb = e_end_resync_block;
1555
1556         spin_lock_irq(&mdev->req_lock);
1557         list_add(&e->w.list, &mdev->sync_ee);
1558         spin_unlock_irq(&mdev->req_lock);
1559
1560         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1561                 return TRUE;
1562
1563         drbd_free_ee(mdev, e);
1564 fail:
1565         put_ldev(mdev);
1566         return FALSE;
1567 }
1568
1569 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1570 {
1571         struct drbd_request *req;
1572         sector_t sector;
1573         unsigned int header_size, data_size;
1574         int ok;
1575         struct p_data *p = (struct p_data *)h;
1576
1577         header_size = sizeof(*p) - sizeof(*h);
1578         data_size   = h->length  - header_size;
1579
1580         ERR_IF(data_size == 0) return FALSE;
1581
1582         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1583                 return FALSE;
1584
1585         sector = be64_to_cpu(p->sector);
1586
1587         spin_lock_irq(&mdev->req_lock);
1588         req = _ar_id_to_req(mdev, p->block_id, sector);
1589         spin_unlock_irq(&mdev->req_lock);
1590         if (unlikely(!req)) {
1591                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1592                 return FALSE;
1593         }
1594
1595         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1596          * special casing it there for the various failure cases.
1597          * still no race with drbd_fail_pending_reads */
1598         ok = recv_dless_read(mdev, req, sector, data_size);
1599
1600         if (ok)
1601                 req_mod(req, data_received);
1602         /* else: nothing. handled from drbd_disconnect...
1603          * I don't think we may complete this just yet
1604          * in case we are "on-disconnect: freeze" */
1605
1606         return ok;
1607 }
1608
1609 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1610 {
1611         sector_t sector;
1612         unsigned int header_size, data_size;
1613         int ok;
1614         struct p_data *p = (struct p_data *)h;
1615
1616         header_size = sizeof(*p) - sizeof(*h);
1617         data_size   = h->length  - header_size;
1618
1619         ERR_IF(data_size == 0) return FALSE;
1620
1621         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1622                 return FALSE;
1623
1624         sector = be64_to_cpu(p->sector);
1625         D_ASSERT(p->block_id == ID_SYNCER);
1626
1627         if (get_ldev(mdev)) {
1628                 /* data is submitted to disk within recv_resync_read.
1629                  * corresponding put_ldev done below on error,
1630                  * or in drbd_endio_write_sec. */
1631                 ok = recv_resync_read(mdev, sector, data_size);
1632         } else {
1633                 if (__ratelimit(&drbd_ratelimit_state))
1634                         dev_err(DEV, "Can not write resync data to local disk.\n");
1635
1636                 ok = drbd_drain_block(mdev, data_size);
1637
1638                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1639         }
1640
1641         return ok;
1642 }
1643
1644 /* e_end_block() is called via drbd_process_done_ee().
1645  * this means this function only runs in the asender thread
1646  */
1647 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1648 {
1649         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1650         sector_t sector = e->sector;
1651         struct drbd_epoch *epoch;
1652         int ok = 1, pcmd;
1653
1654         if (e->flags & EE_IS_BARRIER) {
1655                 epoch = previous_epoch(mdev, e->epoch);
1656                 if (epoch)
1657                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1658         }
1659
1660         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1661                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1662                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1663                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1664                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1665                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1666                         ok &= drbd_send_ack(mdev, pcmd, e);
1667                         if (pcmd == P_RS_WRITE_ACK)
1668                                 drbd_set_in_sync(mdev, sector, e->size);
1669                 } else {
1670                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1671                         /* we expect it to be marked out of sync anyways...
1672                          * maybe assert this?  */
1673                 }
1674                 dec_unacked(mdev);
1675         }
1676         /* we delete from the conflict detection hash _after_ we sent out the
1677          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1678         if (mdev->net_conf->two_primaries) {
1679                 spin_lock_irq(&mdev->req_lock);
1680                 D_ASSERT(!hlist_unhashed(&e->colision));
1681                 hlist_del_init(&e->colision);
1682                 spin_unlock_irq(&mdev->req_lock);
1683         } else {
1684                 D_ASSERT(hlist_unhashed(&e->colision));
1685         }
1686
1687         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1688
1689         return ok;
1690 }
1691
1692 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1693 {
1694         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1695         int ok = 1;
1696
1697         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1698         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1699
1700         spin_lock_irq(&mdev->req_lock);
1701         D_ASSERT(!hlist_unhashed(&e->colision));
1702         hlist_del_init(&e->colision);
1703         spin_unlock_irq(&mdev->req_lock);
1704
1705         dec_unacked(mdev);
1706
1707         return ok;
1708 }
1709
1710 /* Called from receive_Data.
1711  * Synchronize packets on sock with packets on msock.
1712  *
1713  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1714  * packet traveling on msock, they are still processed in the order they have
1715  * been sent.
1716  *
1717  * Note: we don't care for Ack packets overtaking P_DATA packets.
1718  *
1719  * In case packet_seq is larger than mdev->peer_seq number, there are
1720  * outstanding packets on the msock. We wait for them to arrive.
1721  * In case we are the logically next packet, we update mdev->peer_seq
1722  * ourselves. Correctly handles 32bit wrap around.
1723  *
1724  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1725  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1726  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1727  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1728  *
1729  * returns 0 if we may process the packet,
1730  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1731 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1732 {
1733         DEFINE_WAIT(wait);
1734         unsigned int p_seq;
1735         long timeout;
1736         int ret = 0;
1737         spin_lock(&mdev->peer_seq_lock);
1738         for (;;) {
1739                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1740                 if (seq_le(packet_seq, mdev->peer_seq+1))
1741                         break;
1742                 if (signal_pending(current)) {
1743                         ret = -ERESTARTSYS;
1744                         break;
1745                 }
1746                 p_seq = mdev->peer_seq;
1747                 spin_unlock(&mdev->peer_seq_lock);
1748                 timeout = schedule_timeout(30*HZ);
1749                 spin_lock(&mdev->peer_seq_lock);
1750                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1751                         ret = -ETIMEDOUT;
1752                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1753                         break;
1754                 }
1755         }
1756         finish_wait(&mdev->seq_wait, &wait);
1757         if (mdev->peer_seq+1 == packet_seq)
1758                 mdev->peer_seq++;
1759         spin_unlock(&mdev->peer_seq_lock);
1760         return ret;
1761 }
1762
1763 /* mirrored write */
1764 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1765 {
1766         sector_t sector;
1767         struct drbd_epoch_entry *e;
1768         struct p_data *p = (struct p_data *)h;
1769         int header_size, data_size;
1770         int rw = WRITE;
1771         u32 dp_flags;
1772
1773         header_size = sizeof(*p) - sizeof(*h);
1774         data_size   = h->length  - header_size;
1775
1776         ERR_IF(data_size == 0) return FALSE;
1777
1778         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1779                 return FALSE;
1780
1781         if (!get_ldev(mdev)) {
1782                 if (__ratelimit(&drbd_ratelimit_state))
1783                         dev_err(DEV, "Can not write mirrored data block "
1784                             "to local disk.\n");
1785                 spin_lock(&mdev->peer_seq_lock);
1786                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1787                         mdev->peer_seq++;
1788                 spin_unlock(&mdev->peer_seq_lock);
1789
1790                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1791                 atomic_inc(&mdev->current_epoch->epoch_size);
1792                 return drbd_drain_block(mdev, data_size);
1793         }
1794
1795         /* get_ldev(mdev) successful.
1796          * Corresponding put_ldev done either below (on various errors),
1797          * or in drbd_endio_write_sec, if we successfully submit the data at
1798          * the end of this function. */
1799
1800         sector = be64_to_cpu(p->sector);
1801         e = read_in_block(mdev, p->block_id, sector, data_size);
1802         if (!e) {
1803                 put_ldev(mdev);
1804                 return FALSE;
1805         }
1806
1807         e->w.cb = e_end_block;
1808
1809         spin_lock(&mdev->epoch_lock);
1810         e->epoch = mdev->current_epoch;
1811         atomic_inc(&e->epoch->epoch_size);
1812         atomic_inc(&e->epoch->active);
1813
1814         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1815                 struct drbd_epoch *epoch;
1816                 /* Issue a barrier if we start a new epoch, and the previous epoch
1817                    was not a epoch containing a single request which already was
1818                    a Barrier. */
1819                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1820                 if (epoch == e->epoch) {
1821                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1822                         rw |= (1<<BIO_RW_BARRIER);
1823                         e->flags |= EE_IS_BARRIER;
1824                 } else {
1825                         if (atomic_read(&epoch->epoch_size) > 1 ||
1826                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1827                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1828                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1829                                 rw |= (1<<BIO_RW_BARRIER);
1830                                 e->flags |= EE_IS_BARRIER;
1831                         }
1832                 }
1833         }
1834         spin_unlock(&mdev->epoch_lock);
1835
1836         dp_flags = be32_to_cpu(p->dp_flags);
1837         if (dp_flags & DP_HARDBARRIER) {
1838                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1839                 /* rw |= (1<<BIO_RW_BARRIER); */
1840         }
1841         if (dp_flags & DP_RW_SYNC)
1842                 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1843         if (dp_flags & DP_MAY_SET_IN_SYNC)
1844                 e->flags |= EE_MAY_SET_IN_SYNC;
1845
1846         /* I'm the receiver, I do hold a net_cnt reference. */
1847         if (!mdev->net_conf->two_primaries) {
1848                 spin_lock_irq(&mdev->req_lock);
1849         } else {
1850                 /* don't get the req_lock yet,
1851                  * we may sleep in drbd_wait_peer_seq */
1852                 const int size = e->size;
1853                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1854                 DEFINE_WAIT(wait);
1855                 struct drbd_request *i;
1856                 struct hlist_node *n;
1857                 struct hlist_head *slot;
1858                 int first;
1859
1860                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1861                 BUG_ON(mdev->ee_hash == NULL);
1862                 BUG_ON(mdev->tl_hash == NULL);
1863
1864                 /* conflict detection and handling:
1865                  * 1. wait on the sequence number,
1866                  *    in case this data packet overtook ACK packets.
1867                  * 2. check our hash tables for conflicting requests.
1868                  *    we only need to walk the tl_hash, since an ee can not
1869                  *    have a conflict with an other ee: on the submitting
1870                  *    node, the corresponding req had already been conflicting,
1871                  *    and a conflicting req is never sent.
1872                  *
1873                  * Note: for two_primaries, we are protocol C,
1874                  * so there cannot be any request that is DONE
1875                  * but still on the transfer log.
1876                  *
1877                  * unconditionally add to the ee_hash.
1878                  *
1879                  * if no conflicting request is found:
1880                  *    submit.
1881                  *
1882                  * if any conflicting request is found
1883                  * that has not yet been acked,
1884                  * AND I have the "discard concurrent writes" flag:
1885                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1886                  *
1887                  * if any conflicting request is found:
1888                  *       block the receiver, waiting on misc_wait
1889                  *       until no more conflicting requests are there,
1890                  *       or we get interrupted (disconnect).
1891                  *
1892                  *       we do not just write after local io completion of those
1893                  *       requests, but only after req is done completely, i.e.
1894                  *       we wait for the P_DISCARD_ACK to arrive!
1895                  *
1896                  *       then proceed normally, i.e. submit.
1897                  */
1898                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1899                         goto out_interrupted;
1900
1901                 spin_lock_irq(&mdev->req_lock);
1902
1903                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1904
1905 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1906                 slot = tl_hash_slot(mdev, sector);
1907                 first = 1;
1908                 for (;;) {
1909                         int have_unacked = 0;
1910                         int have_conflict = 0;
1911                         prepare_to_wait(&mdev->misc_wait, &wait,
1912                                 TASK_INTERRUPTIBLE);
1913                         hlist_for_each_entry(i, n, slot, colision) {
1914                                 if (OVERLAPS) {
1915                                         /* only ALERT on first iteration,
1916                                          * we may be woken up early... */
1917                                         if (first)
1918                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1919                                                       " new: %llus +%u; pending: %llus +%u\n",
1920                                                       current->comm, current->pid,
1921                                                       (unsigned long long)sector, size,
1922                                                       (unsigned long long)i->sector, i->size);
1923                                         if (i->rq_state & RQ_NET_PENDING)
1924                                                 ++have_unacked;
1925                                         ++have_conflict;
1926                                 }
1927                         }
1928 #undef OVERLAPS
1929                         if (!have_conflict)
1930                                 break;
1931
1932                         /* Discard Ack only for the _first_ iteration */
1933                         if (first && discard && have_unacked) {
1934                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1935                                      (unsigned long long)sector);
1936                                 inc_unacked(mdev);
1937                                 e->w.cb = e_send_discard_ack;
1938                                 list_add_tail(&e->w.list, &mdev->done_ee);
1939
1940                                 spin_unlock_irq(&mdev->req_lock);
1941
1942                                 /* we could probably send that P_DISCARD_ACK ourselves,
1943                                  * but I don't like the receiver using the msock */
1944
1945                                 put_ldev(mdev);
1946                                 wake_asender(mdev);
1947                                 finish_wait(&mdev->misc_wait, &wait);
1948                                 return TRUE;
1949                         }
1950
1951                         if (signal_pending(current)) {
1952                                 hlist_del_init(&e->colision);
1953
1954                                 spin_unlock_irq(&mdev->req_lock);
1955
1956                                 finish_wait(&mdev->misc_wait, &wait);
1957                                 goto out_interrupted;
1958                         }
1959
1960                         spin_unlock_irq(&mdev->req_lock);
1961                         if (first) {
1962                                 first = 0;
1963                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1964                                      "sec=%llus\n", (unsigned long long)sector);
1965                         } else if (discard) {
1966                                 /* we had none on the first iteration.
1967                                  * there must be none now. */
1968                                 D_ASSERT(have_unacked == 0);
1969                         }
1970                         schedule();
1971                         spin_lock_irq(&mdev->req_lock);
1972                 }
1973                 finish_wait(&mdev->misc_wait, &wait);
1974         }
1975
1976         list_add(&e->w.list, &mdev->active_ee);
1977         spin_unlock_irq(&mdev->req_lock);
1978
1979         switch (mdev->net_conf->wire_protocol) {
1980         case DRBD_PROT_C:
1981                 inc_unacked(mdev);
1982                 /* corresponding dec_unacked() in e_end_block()
1983                  * respective _drbd_clear_done_ee */
1984                 break;
1985         case DRBD_PROT_B:
1986                 /* I really don't like it that the receiver thread
1987                  * sends on the msock, but anyways */
1988                 drbd_send_ack(mdev, P_RECV_ACK, e);
1989                 break;
1990         case DRBD_PROT_A:
1991                 /* nothing to do */
1992                 break;
1993         }
1994
1995         if (mdev->state.pdsk == D_DISKLESS) {
1996                 /* In case we have the only disk of the cluster, */
1997                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1998                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1999                 drbd_al_begin_io(mdev, e->sector);
2000         }
2001
2002         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2003                 return TRUE;
2004
2005 out_interrupted:
2006         /* yes, the epoch_size now is imbalanced.
2007          * but we drop the connection anyways, so we don't have a chance to
2008          * receive a barrier... atomic_inc(&mdev->epoch_size); */
2009         put_ldev(mdev);
2010         drbd_free_ee(mdev, e);
2011         return FALSE;
2012 }
2013
2014 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2015 {
2016         sector_t sector;
2017         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2018         struct drbd_epoch_entry *e;
2019         struct digest_info *di = NULL;
2020         int size, digest_size;
2021         unsigned int fault_type;
2022         struct p_block_req *p =
2023                 (struct p_block_req *)h;
2024         const int brps = sizeof(*p)-sizeof(*h);
2025
2026         if (drbd_recv(mdev, h->payload, brps) != brps)
2027                 return FALSE;
2028
2029         sector = be64_to_cpu(p->sector);
2030         size   = be32_to_cpu(p->blksize);
2031
2032         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2033                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2034                                 (unsigned long long)sector, size);
2035                 return FALSE;
2036         }
2037         if (sector + (size>>9) > capacity) {
2038                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039                                 (unsigned long long)sector, size);
2040                 return FALSE;
2041         }
2042
2043         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2044                 if (__ratelimit(&drbd_ratelimit_state))
2045                         dev_err(DEV, "Can not satisfy peer's read request, "
2046                             "no local data.\n");
2047                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2048                                  P_NEG_RS_DREPLY , p);
2049                 return drbd_drain_block(mdev, h->length - brps);
2050         }
2051
2052         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2053          * "criss-cross" setup, that might cause write-out on some other DRBD,
2054          * which in turn might block on the other node at this very place.  */
2055         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2056         if (!e) {
2057                 put_ldev(mdev);
2058                 return FALSE;
2059         }
2060
2061         switch (h->command) {
2062         case P_DATA_REQUEST:
2063                 e->w.cb = w_e_end_data_req;
2064                 fault_type = DRBD_FAULT_DT_RD;
2065                 break;
2066         case P_RS_DATA_REQUEST:
2067                 e->w.cb = w_e_end_rsdata_req;
2068                 fault_type = DRBD_FAULT_RS_RD;
2069                 /* Eventually this should become asynchronously. Currently it
2070                  * blocks the whole receiver just to delay the reading of a
2071                  * resync data block.
2072                  * the drbd_work_queue mechanism is made for this...
2073                  */
2074                 if (!drbd_rs_begin_io(mdev, sector)) {
2075                         /* we have been interrupted,
2076                          * probably connection lost! */
2077                         D_ASSERT(signal_pending(current));
2078                         goto out_free_e;
2079                 }
2080                 break;
2081
2082         case P_OV_REPLY:
2083         case P_CSUM_RS_REQUEST:
2084                 fault_type = DRBD_FAULT_RS_RD;
2085                 digest_size = h->length - brps ;
2086                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2087                 if (!di)
2088                         goto out_free_e;
2089
2090                 di->digest_size = digest_size;
2091                 di->digest = (((char *)di)+sizeof(struct digest_info));
2092
2093                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2094                         goto out_free_e;
2095
2096                 e->block_id = (u64)(unsigned long)di;
2097                 if (h->command == P_CSUM_RS_REQUEST) {
2098                         D_ASSERT(mdev->agreed_pro_version >= 89);
2099                         e->w.cb = w_e_end_csum_rs_req;
2100                 } else if (h->command == P_OV_REPLY) {
2101                         e->w.cb = w_e_end_ov_reply;
2102                         dec_rs_pending(mdev);
2103                         break;
2104                 }
2105
2106                 if (!drbd_rs_begin_io(mdev, sector)) {
2107                         /* we have been interrupted, probably connection lost! */
2108                         D_ASSERT(signal_pending(current));
2109                         goto out_free_e;
2110                 }
2111                 break;
2112
2113         case P_OV_REQUEST:
2114                 if (mdev->state.conn >= C_CONNECTED &&
2115                     mdev->state.conn != C_VERIFY_T)
2116                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2117                                 drbd_conn_str(mdev->state.conn));
2118                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2119                     mdev->agreed_pro_version >= 90) {
2120                         mdev->ov_start_sector = sector;
2121                         mdev->ov_position = sector;
2122                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2123                         dev_info(DEV, "Online Verify start sector: %llu\n",
2124                                         (unsigned long long)sector);
2125                 }
2126                 e->w.cb = w_e_end_ov_req;
2127                 fault_type = DRBD_FAULT_RS_RD;
2128                 /* Eventually this should become asynchronous. Currently it
2129                  * blocks the whole receiver just to delay the reading of a
2130                  * resync data block.
2131                  * the drbd_work_queue mechanism is made for this...
2132                  */
2133                 if (!drbd_rs_begin_io(mdev, sector)) {
2134                         /* we have been interrupted,
2135                          * probably connection lost! */
2136                         D_ASSERT(signal_pending(current));
2137                         goto out_free_e;
2138                 }
2139                 break;
2140
2141
2142         default:
2143                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2144                     cmdname(h->command));
2145                 fault_type = DRBD_FAULT_MAX;
2146         }
2147
2148         spin_lock_irq(&mdev->req_lock);
2149         list_add(&e->w.list, &mdev->read_ee);
2150         spin_unlock_irq(&mdev->req_lock);
2151
2152         inc_unacked(mdev);
2153
2154         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2155                 return TRUE;
2156
2157 out_free_e:
2158         kfree(di);
2159         put_ldev(mdev);
2160         drbd_free_ee(mdev, e);
2161         return FALSE;
2162 }
2163
2164 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2165 {
2166         int self, peer, rv = -100;
2167         unsigned long ch_self, ch_peer;
2168
2169         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2170         peer = mdev->p_uuid[UI_BITMAP] & 1;
2171
2172         ch_peer = mdev->p_uuid[UI_SIZE];
2173         ch_self = mdev->comm_bm_set;
2174
2175         switch (mdev->net_conf->after_sb_0p) {
2176         case ASB_CONSENSUS:
2177         case ASB_DISCARD_SECONDARY:
2178         case ASB_CALL_HELPER:
2179                 dev_err(DEV, "Configuration error.\n");
2180                 break;
2181         case ASB_DISCONNECT:
2182                 break;
2183         case ASB_DISCARD_YOUNGER_PRI:
2184                 if (self == 0 && peer == 1) {
2185                         rv = -1;
2186                         break;
2187                 }
2188                 if (self == 1 && peer == 0) {
2189                         rv =  1;
2190                         break;
2191                 }
2192                 /* Else fall through to one of the other strategies... */
2193         case ASB_DISCARD_OLDER_PRI:
2194                 if (self == 0 && peer == 1) {
2195                         rv = 1;
2196                         break;
2197                 }
2198                 if (self == 1 && peer == 0) {
2199                         rv = -1;
2200                         break;
2201                 }
2202                 /* Else fall through to one of the other strategies... */
2203                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2204                      "Using discard-least-changes instead\n");
2205         case ASB_DISCARD_ZERO_CHG:
2206                 if (ch_peer == 0 && ch_self == 0) {
2207                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2208                                 ? -1 : 1;
2209                         break;
2210                 } else {
2211                         if (ch_peer == 0) { rv =  1; break; }
2212                         if (ch_self == 0) { rv = -1; break; }
2213                 }
2214                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2215                         break;
2216         case ASB_DISCARD_LEAST_CHG:
2217                 if      (ch_self < ch_peer)
2218                         rv = -1;
2219                 else if (ch_self > ch_peer)
2220                         rv =  1;
2221                 else /* ( ch_self == ch_peer ) */
2222                      /* Well, then use something else. */
2223                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2224                                 ? -1 : 1;
2225                 break;
2226         case ASB_DISCARD_LOCAL:
2227                 rv = -1;
2228                 break;
2229         case ASB_DISCARD_REMOTE:
2230                 rv =  1;
2231         }
2232
2233         return rv;
2234 }
2235
2236 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2237 {
2238         int self, peer, hg, rv = -100;
2239
2240         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2241         peer = mdev->p_uuid[UI_BITMAP] & 1;
2242
2243         switch (mdev->net_conf->after_sb_1p) {
2244         case ASB_DISCARD_YOUNGER_PRI:
2245         case ASB_DISCARD_OLDER_PRI:
2246         case ASB_DISCARD_LEAST_CHG:
2247         case ASB_DISCARD_LOCAL:
2248         case ASB_DISCARD_REMOTE:
2249                 dev_err(DEV, "Configuration error.\n");
2250                 break;
2251         case ASB_DISCONNECT:
2252                 break;
2253         case ASB_CONSENSUS:
2254                 hg = drbd_asb_recover_0p(mdev);
2255                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2256                         rv = hg;
2257                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2258                         rv = hg;
2259                 break;
2260         case ASB_VIOLENTLY:
2261                 rv = drbd_asb_recover_0p(mdev);
2262                 break;
2263         case ASB_DISCARD_SECONDARY:
2264                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2265         case ASB_CALL_HELPER:
2266                 hg = drbd_asb_recover_0p(mdev);
2267                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2268                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2269                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2270                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2271                           * we do not need to wait for the after state change work either. */
2272                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2273                         if (self != SS_SUCCESS) {
2274                                 drbd_khelper(mdev, "pri-lost-after-sb");
2275                         } else {
2276                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2277                                 rv = hg;
2278                         }
2279                 } else
2280                         rv = hg;
2281         }
2282
2283         return rv;
2284 }
2285
2286 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2287 {
2288         int self, peer, hg, rv = -100;
2289
2290         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2291         peer = mdev->p_uuid[UI_BITMAP] & 1;
2292
2293         switch (mdev->net_conf->after_sb_2p) {
2294         case ASB_DISCARD_YOUNGER_PRI:
2295         case ASB_DISCARD_OLDER_PRI:
2296         case ASB_DISCARD_LEAST_CHG:
2297         case ASB_DISCARD_LOCAL:
2298         case ASB_DISCARD_REMOTE:
2299         case ASB_CONSENSUS:
2300         case ASB_DISCARD_SECONDARY:
2301                 dev_err(DEV, "Configuration error.\n");
2302                 break;
2303         case ASB_VIOLENTLY:
2304                 rv = drbd_asb_recover_0p(mdev);
2305                 break;
2306         case ASB_DISCONNECT:
2307                 break;
2308         case ASB_CALL_HELPER:
2309                 hg = drbd_asb_recover_0p(mdev);
2310                 if (hg == -1) {
2311                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2312                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2313                           * we do not need to wait for the after state change work either. */
2314                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2315                         if (self != SS_SUCCESS) {
2316                                 drbd_khelper(mdev, "pri-lost-after-sb");
2317                         } else {
2318                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2319                                 rv = hg;
2320                         }
2321                 } else
2322                         rv = hg;
2323         }
2324
2325         return rv;
2326 }
2327
2328 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2329                            u64 bits, u64 flags)
2330 {
2331         if (!uuid) {
2332                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2333                 return;
2334         }
2335         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2336              text,
2337              (unsigned long long)uuid[UI_CURRENT],
2338              (unsigned long long)uuid[UI_BITMAP],
2339              (unsigned long long)uuid[UI_HISTORY_START],
2340              (unsigned long long)uuid[UI_HISTORY_END],
2341              (unsigned long long)bits,
2342              (unsigned long long)flags);
2343 }
2344
2345 /*
2346   100   after split brain try auto recover
2347     2   C_SYNC_SOURCE set BitMap
2348     1   C_SYNC_SOURCE use BitMap
2349     0   no Sync
2350    -1   C_SYNC_TARGET use BitMap
2351    -2   C_SYNC_TARGET set BitMap
2352  -100   after split brain, disconnect
2353 -1000   unrelated data
2354  */
2355 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2356 {
2357         u64 self, peer;
2358         int i, j;
2359
2360         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2361         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2362
2363         *rule_nr = 10;
2364         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2365                 return 0;
2366
2367         *rule_nr = 20;
2368         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2369              peer != UUID_JUST_CREATED)
2370                 return -2;
2371
2372         *rule_nr = 30;
2373         if (self != UUID_JUST_CREATED &&
2374             (peer == UUID_JUST_CREATED || peer == (u64)0))
2375                 return 2;
2376
2377         if (self == peer) {
2378                 int rct, dc; /* roles at crash time */
2379
2380                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2381
2382                         if (mdev->agreed_pro_version < 91)
2383                                 return -1001;
2384
2385                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2386                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2387                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2388                                 drbd_uuid_set_bm(mdev, 0UL);
2389
2390                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2391                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2392                                 *rule_nr = 34;
2393                         } else {
2394                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2395                                 *rule_nr = 36;
2396                         }
2397
2398                         return 1;
2399                 }
2400
2401                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2402
2403                         if (mdev->agreed_pro_version < 91)
2404                                 return -1001;
2405
2406                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2407                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2408                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2409
2410                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2411                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2412                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2413
2414                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2415                                 *rule_nr = 35;
2416                         } else {
2417                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2418                                 *rule_nr = 37;
2419                         }
2420
2421                         return -1;
2422                 }
2423
2424                 /* Common power [off|failure] */
2425                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2426                         (mdev->p_uuid[UI_FLAGS] & 2);
2427                 /* lowest bit is set when we were primary,
2428                  * next bit (weight 2) is set when peer was primary */
2429                 *rule_nr = 40;
2430
2431                 switch (rct) {
2432                 case 0: /* !self_pri && !peer_pri */ return 0;
2433                 case 1: /*  self_pri && !peer_pri */ return 1;
2434                 case 2: /* !self_pri &&  peer_pri */ return -1;
2435                 case 3: /*  self_pri &&  peer_pri */
2436                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2437                         return dc ? -1 : 1;
2438                 }
2439         }
2440
2441         *rule_nr = 50;
2442         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2443         if (self == peer)
2444                 return -1;
2445
2446         *rule_nr = 51;
2447         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2448         if (self == peer) {
2449                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2450                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2451                 if (self == peer) {
2452                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2453                            resync as sync source modifications of the peer's UUIDs. */
2454
2455                         if (mdev->agreed_pro_version < 91)
2456                                 return -1001;
2457
2458                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2459                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2460                         return -1;
2461                 }
2462         }
2463
2464         *rule_nr = 60;
2465         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2466         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2467                 peer = mdev->p_uuid[i] & ~((u64)1);
2468                 if (self == peer)
2469                         return -2;
2470         }
2471
2472         *rule_nr = 70;
2473         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2474         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2475         if (self == peer)
2476                 return 1;
2477
2478         *rule_nr = 71;
2479         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2480         if (self == peer) {
2481                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2482                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2483                 if (self == peer) {
2484                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2485                            resync as sync source modifications of our UUIDs. */
2486
2487                         if (mdev->agreed_pro_version < 91)
2488                                 return -1001;
2489
2490                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2491                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2492
2493                         dev_info(DEV, "Undid last start of resync:\n");
2494
2495                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2496                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2497
2498                         return 1;
2499                 }
2500         }
2501
2502
2503         *rule_nr = 80;
2504         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2505         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2506                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2507                 if (self == peer)
2508                         return 2;
2509         }
2510
2511         *rule_nr = 90;
2512         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2513         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2514         if (self == peer && self != ((u64)0))
2515                 return 100;
2516
2517         *rule_nr = 100;
2518         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2519                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2520                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2521                         peer = mdev->p_uuid[j] & ~((u64)1);
2522                         if (self == peer)
2523                                 return -100;
2524                 }
2525         }
2526
2527         return -1000;
2528 }
2529
2530 /* drbd_sync_handshake() returns the new conn state on success, or
2531    CONN_MASK (-1) on failure.
2532  */
2533 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2534                                            enum drbd_disk_state peer_disk) __must_hold(local)
2535 {
2536         int hg, rule_nr;
2537         enum drbd_conns rv = C_MASK;
2538         enum drbd_disk_state mydisk;
2539
2540         mydisk = mdev->state.disk;
2541         if (mydisk == D_NEGOTIATING)
2542                 mydisk = mdev->new_state_tmp.disk;
2543
2544         dev_info(DEV, "drbd_sync_handshake:\n");
2545         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2546         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2547                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2548
2549         hg = drbd_uuid_compare(mdev, &rule_nr);
2550
2551         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2552
2553         if (hg == -1000) {
2554                 dev_alert(DEV, "Unrelated data, aborting!\n");
2555                 return C_MASK;
2556         }
2557         if (hg == -1001) {
2558                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2559                 return C_MASK;
2560         }
2561
2562         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2563             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2564                 int f = (hg == -100) || abs(hg) == 2;
2565                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2566                 if (f)
2567                         hg = hg*2;
2568                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2569                      hg > 0 ? "source" : "target");
2570         }
2571
2572         if (abs(hg) == 100)
2573                 drbd_khelper(mdev, "initial-split-brain");
2574
2575         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2576                 int pcount = (mdev->state.role == R_PRIMARY)
2577                            + (peer_role == R_PRIMARY);
2578                 int forced = (hg == -100);
2579
2580                 switch (pcount) {
2581                 case 0:
2582                         hg = drbd_asb_recover_0p(mdev);
2583                         break;
2584                 case 1:
2585                         hg = drbd_asb_recover_1p(mdev);
2586                         break;
2587                 case 2:
2588                         hg = drbd_asb_recover_2p(mdev);
2589                         break;
2590                 }
2591                 if (abs(hg) < 100) {
2592                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2593                              "automatically solved. Sync from %s node\n",
2594                              pcount, (hg < 0) ? "peer" : "this");
2595                         if (forced) {
2596                                 dev_warn(DEV, "Doing a full sync, since"
2597                                      " UUIDs where ambiguous.\n");
2598                                 hg = hg*2;
2599                         }
2600                 }
2601         }
2602
2603         if (hg == -100) {
2604                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2605                         hg = -1;
2606                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2607                         hg = 1;
2608
2609                 if (abs(hg) < 100)
2610                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2611                              "Sync from %s node\n",
2612                              (hg < 0) ? "peer" : "this");
2613         }
2614
2615         if (hg == -100) {
2616                 /* FIXME this log message is not correct if we end up here
2617                  * after an attempted attach on a diskless node.
2618                  * We just refuse to attach -- well, we drop the "connection"
2619                  * to that disk, in a way... */
2620                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2621                 drbd_khelper(mdev, "split-brain");
2622                 return C_MASK;
2623         }
2624
2625         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2626                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2627                 return C_MASK;
2628         }
2629
2630         if (hg < 0 && /* by intention we do not use mydisk here. */
2631             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2632                 switch (mdev->net_conf->rr_conflict) {
2633                 case ASB_CALL_HELPER:
2634                         drbd_khelper(mdev, "pri-lost");
2635                         /* fall through */
2636                 case ASB_DISCONNECT:
2637                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2638                         return C_MASK;
2639                 case ASB_VIOLENTLY:
2640                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2641                              "assumption\n");
2642                 }
2643         }
2644
2645         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2646                 if (hg == 0)
2647                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2648                 else
2649                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2650                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2651                                  abs(hg) >= 2 ? "full" : "bit-map based");
2652                 return C_MASK;
2653         }
2654
2655         if (abs(hg) >= 2) {
2656                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2657                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2658                         return C_MASK;
2659         }
2660
2661         if (hg > 0) { /* become sync source. */
2662                 rv = C_WF_BITMAP_S;
2663         } else if (hg < 0) { /* become sync target */
2664                 rv = C_WF_BITMAP_T;
2665         } else {
2666                 rv = C_CONNECTED;
2667                 if (drbd_bm_total_weight(mdev)) {
2668                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2669                              drbd_bm_total_weight(mdev));
2670                 }
2671         }
2672
2673         return rv;
2674 }
2675
2676 /* returns 1 if invalid */
2677 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2678 {
2679         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2680         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2681             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2682                 return 0;
2683
2684         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2685         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2686             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2687                 return 1;
2688
2689         /* everything else is valid if they are equal on both sides. */
2690         if (peer == self)
2691                 return 0;
2692
2693         /* everything es is invalid. */
2694         return 1;
2695 }
2696
2697 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2698 {
2699         struct p_protocol *p = (struct p_protocol *)h;
2700         int header_size, data_size;
2701         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2702         int p_want_lose, p_two_primaries, cf;
2703         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2704
2705         header_size = sizeof(*p) - sizeof(*h);
2706         data_size   = h->length  - header_size;
2707
2708         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2709                 return FALSE;
2710
2711         p_proto         = be32_to_cpu(p->protocol);
2712         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2713         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2714         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2715         p_two_primaries = be32_to_cpu(p->two_primaries);
2716         cf              = be32_to_cpu(p->conn_flags);
2717         p_want_lose = cf & CF_WANT_LOSE;
2718
2719         clear_bit(CONN_DRY_RUN, &mdev->flags);
2720
2721         if (cf & CF_DRY_RUN)
2722                 set_bit(CONN_DRY_RUN, &mdev->flags);
2723
2724         if (p_proto != mdev->net_conf->wire_protocol) {
2725                 dev_err(DEV, "incompatible communication protocols\n");
2726                 goto disconnect;
2727         }
2728
2729         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2730                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2731                 goto disconnect;
2732         }
2733
2734         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2735                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2736                 goto disconnect;
2737         }
2738
2739         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2740                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2741                 goto disconnect;
2742         }
2743
2744         if (p_want_lose && mdev->net_conf->want_lose) {
2745                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2746                 goto disconnect;
2747         }
2748
2749         if (p_two_primaries != mdev->net_conf->two_primaries) {
2750                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2751                 goto disconnect;
2752         }
2753
2754         if (mdev->agreed_pro_version >= 87) {
2755                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2756
2757                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2758                         return FALSE;
2759
2760                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2761                 if (strcmp(p_integrity_alg, my_alg)) {
2762                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2763                         goto disconnect;
2764                 }
2765                 dev_info(DEV, "data-integrity-alg: %s\n",
2766                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2767         }
2768
2769         return TRUE;
2770
2771 disconnect:
2772         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2773         return FALSE;
2774 }
2775
2776 /* helper function
2777  * input: alg name, feature name
2778  * return: NULL (alg name was "")
2779  *         ERR_PTR(error) if something goes wrong
2780  *         or the crypto hash ptr, if it worked out ok. */
2781 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2782                 const char *alg, const char *name)
2783 {
2784         struct crypto_hash *tfm;
2785
2786         if (!alg[0])
2787                 return NULL;
2788
2789         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2790         if (IS_ERR(tfm)) {
2791                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2792                         alg, name, PTR_ERR(tfm));
2793                 return tfm;
2794         }
2795         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2796                 crypto_free_hash(tfm);
2797                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2798                 return ERR_PTR(-EINVAL);
2799         }
2800         return tfm;
2801 }
2802
2803 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2804 {
2805         int ok = TRUE;
2806         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2807         unsigned int header_size, data_size, exp_max_sz;
2808         struct crypto_hash *verify_tfm = NULL;
2809         struct crypto_hash *csums_tfm = NULL;
2810         const int apv = mdev->agreed_pro_version;
2811
2812         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2813                     : apv == 88 ? sizeof(struct p_rs_param)
2814                                         + SHARED_SECRET_MAX
2815                     : /* 89 */    sizeof(struct p_rs_param_89);
2816
2817         if (h->length > exp_max_sz) {
2818                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2819                     h->length, exp_max_sz);
2820                 return FALSE;
2821         }
2822
2823         if (apv <= 88) {
2824                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2825                 data_size   = h->length  - header_size;
2826         } else /* apv >= 89 */ {
2827                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2828                 data_size   = h->length  - header_size;
2829                 D_ASSERT(data_size == 0);
2830         }
2831
2832         /* initialize verify_alg and csums_alg */
2833         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2834
2835         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2836                 return FALSE;
2837
2838         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2839
2840         if (apv >= 88) {
2841                 if (apv == 88) {
2842                         if (data_size > SHARED_SECRET_MAX) {
2843                                 dev_err(DEV, "verify-alg too long, "
2844                                     "peer wants %u, accepting only %u byte\n",
2845                                                 data_size, SHARED_SECRET_MAX);
2846                                 return FALSE;
2847                         }
2848
2849                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2850                                 return FALSE;
2851
2852                         /* we expect NUL terminated string */
2853                         /* but just in case someone tries to be evil */
2854                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2855                         p->verify_alg[data_size-1] = 0;
2856
2857                 } else /* apv >= 89 */ {
2858                         /* we still expect NUL terminated strings */
2859                         /* but just in case someone tries to be evil */
2860                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2861                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2862                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2863                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2864                 }
2865
2866                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2867                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2868                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2869                                     mdev->sync_conf.verify_alg, p->verify_alg);
2870                                 goto disconnect;
2871                         }
2872                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2873                                         p->verify_alg, "verify-alg");
2874                         if (IS_ERR(verify_tfm)) {
2875                                 verify_tfm = NULL;
2876                                 goto disconnect;
2877                         }
2878                 }
2879
2880                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2881                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2882                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2883                                     mdev->sync_conf.csums_alg, p->csums_alg);
2884                                 goto disconnect;
2885                         }
2886                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2887                                         p->csums_alg, "csums-alg");
2888                         if (IS_ERR(csums_tfm)) {
2889                                 csums_tfm = NULL;
2890                                 goto disconnect;
2891                         }
2892                 }
2893
2894
2895                 spin_lock(&mdev->peer_seq_lock);
2896                 /* lock against drbd_nl_syncer_conf() */
2897                 if (verify_tfm) {
2898                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2899                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2900                         crypto_free_hash(mdev->verify_tfm);
2901                         mdev->verify_tfm = verify_tfm;
2902                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2903                 }
2904                 if (csums_tfm) {
2905                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2906                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2907                         crypto_free_hash(mdev->csums_tfm);
2908                         mdev->csums_tfm = csums_tfm;
2909                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2910                 }
2911                 spin_unlock(&mdev->peer_seq_lock);
2912         }
2913
2914         return ok;
2915 disconnect:
2916         /* just for completeness: actually not needed,
2917          * as this is not reached if csums_tfm was ok. */
2918         crypto_free_hash(csums_tfm);
2919         /* but free the verify_tfm again, if csums_tfm did not work out */
2920         crypto_free_hash(verify_tfm);
2921         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2922         return FALSE;
2923 }
2924
2925 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2926 {
2927         /* sorry, we currently have no working implementation
2928          * of distributed TCQ */
2929 }
2930
2931 /* warn if the arguments differ by more than 12.5% */
2932 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2933         const char *s, sector_t a, sector_t b)
2934 {
2935         sector_t d;
2936         if (a == 0 || b == 0)
2937                 return;
2938         d = (a > b) ? (a - b) : (b - a);
2939         if (d > (a>>3) || d > (b>>3))
2940                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2941                      (unsigned long long)a, (unsigned long long)b);
2942 }
2943
2944 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2945 {
2946         struct p_sizes *p = (struct p_sizes *)h;
2947         enum determine_dev_size dd = unchanged;
2948         unsigned int max_seg_s;
2949         sector_t p_size, p_usize, my_usize;
2950         int ldsc = 0; /* local disk size changed */
2951         enum dds_flags ddsf;
2952
2953         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2954         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2955                 return FALSE;
2956
2957         p_size = be64_to_cpu(p->d_size);
2958         p_usize = be64_to_cpu(p->u_size);
2959
2960         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2961                 dev_err(DEV, "some backing storage is needed\n");
2962                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2963                 return FALSE;
2964         }
2965
2966         /* just store the peer's disk size for now.
2967          * we still need to figure out whether we accept that. */
2968         mdev->p_size = p_size;
2969
2970 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2971         if (get_ldev(mdev)) {
2972                 warn_if_differ_considerably(mdev, "lower level device sizes",
2973                            p_size, drbd_get_max_capacity(mdev->ldev));
2974                 warn_if_differ_considerably(mdev, "user requested size",
2975                                             p_usize, mdev->ldev->dc.disk_size);
2976
2977                 /* if this is the first connect, or an otherwise expected
2978                  * param exchange, choose the minimum */
2979                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2980                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2981                                              p_usize);
2982
2983                 my_usize = mdev->ldev->dc.disk_size;
2984
2985                 if (mdev->ldev->dc.disk_size != p_usize) {
2986                         mdev->ldev->dc.disk_size = p_usize;
2987                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2988                              (unsigned long)mdev->ldev->dc.disk_size);
2989                 }
2990
2991                 /* Never shrink a device with usable data during connect.
2992                    But allow online shrinking if we are connected. */
2993                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2994                    drbd_get_capacity(mdev->this_bdev) &&
2995                    mdev->state.disk >= D_OUTDATED &&
2996                    mdev->state.conn < C_CONNECTED) {
2997                         dev_err(DEV, "The peer's disk size is too small!\n");
2998                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2999                         mdev->ldev->dc.disk_size = my_usize;
3000                         put_ldev(mdev);
3001                         return FALSE;
3002                 }
3003                 put_ldev(mdev);
3004         }
3005 #undef min_not_zero
3006
3007         ddsf = be16_to_cpu(p->dds_flags);
3008         if (get_ldev(mdev)) {
3009                 dd = drbd_determin_dev_size(mdev, ddsf);
3010                 put_ldev(mdev);
3011                 if (dd == dev_size_error)
3012                         return FALSE;
3013                 drbd_md_sync(mdev);
3014         } else {
3015                 /* I am diskless, need to accept the peer's size. */
3016                 drbd_set_my_capacity(mdev, p_size);
3017         }
3018
3019         if (get_ldev(mdev)) {
3020                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3021                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3022                         ldsc = 1;
3023                 }
3024
3025                 if (mdev->agreed_pro_version < 94)
3026                         max_seg_s = be32_to_cpu(p->max_segment_size);
3027                 else /* drbd 8.3.8 onwards */
3028                         max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3029
3030                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3031                         drbd_setup_queue_param(mdev, max_seg_s);
3032
3033                 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3034                 put_ldev(mdev);
3035         }
3036
3037         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3038                 if (be64_to_cpu(p->c_size) !=
3039                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3040                         /* we have different sizes, probably peer
3041                          * needs to know my new size... */
3042                         drbd_send_sizes(mdev, 0, ddsf);
3043                 }
3044                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3045                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3046                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3047                             mdev->state.disk >= D_INCONSISTENT) {
3048                                 if (ddsf & DDSF_NO_RESYNC)
3049                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3050                                 else
3051                                         resync_after_online_grow(mdev);
3052                         } else
3053                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3054                 }
3055         }
3056
3057         return TRUE;
3058 }
3059
3060 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3061 {
3062         struct p_uuids *p = (struct p_uuids *)h;
3063         u64 *p_uuid;
3064         int i;
3065
3066         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3067         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3068                 return FALSE;
3069
3070         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3071
3072         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3073                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3074
3075         kfree(mdev->p_uuid);
3076         mdev->p_uuid = p_uuid;
3077
3078         if (mdev->state.conn < C_CONNECTED &&
3079             mdev->state.disk < D_INCONSISTENT &&
3080             mdev->state.role == R_PRIMARY &&
3081             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3082                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3083                     (unsigned long long)mdev->ed_uuid);
3084                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3085                 return FALSE;
3086         }
3087
3088         if (get_ldev(mdev)) {
3089                 int skip_initial_sync =
3090                         mdev->state.conn == C_CONNECTED &&
3091                         mdev->agreed_pro_version >= 90 &&
3092                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3093                         (p_uuid[UI_FLAGS] & 8);
3094                 if (skip_initial_sync) {
3095                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3096                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3097                                         "clear_n_write from receive_uuids");
3098                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3099                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3100                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3101                                         CS_VERBOSE, NULL);
3102                         drbd_md_sync(mdev);
3103                 }
3104                 put_ldev(mdev);
3105         }
3106
3107         /* Before we test for the disk state, we should wait until an eventually
3108            ongoing cluster wide state change is finished. That is important if
3109            we are primary and are detaching from our disk. We need to see the
3110            new disk state... */
3111         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3112         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3113                 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3114
3115         return TRUE;
3116 }
3117
3118 /**
3119  * convert_state() - Converts the peer's view of the cluster state to our point of view
3120  * @ps:         The state as seen by the peer.
3121  */
3122 static union drbd_state convert_state(union drbd_state ps)
3123 {
3124         union drbd_state ms;
3125
3126         static enum drbd_conns c_tab[] = {
3127                 [C_CONNECTED] = C_CONNECTED,
3128
3129                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3130                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3131                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3132                 [C_VERIFY_S]       = C_VERIFY_T,
3133                 [C_MASK]   = C_MASK,
3134         };
3135
3136         ms.i = ps.i;
3137
3138         ms.conn = c_tab[ps.conn];
3139         ms.peer = ps.role;
3140         ms.role = ps.peer;
3141         ms.pdsk = ps.disk;
3142         ms.disk = ps.pdsk;
3143         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3144
3145         return ms;
3146 }
3147
3148 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3149 {
3150         struct p_req_state *p = (struct p_req_state *)h;
3151         union drbd_state mask, val;
3152         int rv;
3153
3154         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3155         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3156                 return FALSE;
3157
3158         mask.i = be32_to_cpu(p->mask);
3159         val.i = be32_to_cpu(p->val);
3160
3161         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3162             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3163                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3164                 return TRUE;
3165         }
3166
3167         mask = convert_state(mask);
3168         val = convert_state(val);
3169
3170         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3171
3172         drbd_send_sr_reply(mdev, rv);
3173         drbd_md_sync(mdev);
3174
3175         return TRUE;
3176 }
3177
3178 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3179 {
3180         struct p_state *p = (struct p_state *)h;
3181         enum drbd_conns nconn, oconn;
3182         union drbd_state ns, peer_state;
3183         enum drbd_disk_state real_peer_disk;
3184         int rv;
3185
3186         ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3187                 return FALSE;
3188
3189         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3190                 return FALSE;
3191
3192         peer_state.i = be32_to_cpu(p->state);
3193
3194         real_peer_disk = peer_state.disk;
3195         if (peer_state.disk == D_NEGOTIATING) {
3196                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3197                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3198         }
3199
3200         spin_lock_irq(&mdev->req_lock);
3201  retry:
3202         oconn = nconn = mdev->state.conn;
3203         spin_unlock_irq(&mdev->req_lock);
3204
3205         if (nconn == C_WF_REPORT_PARAMS)
3206                 nconn = C_CONNECTED;
3207
3208         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3209             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3210                 int cr; /* consider resync */
3211
3212                 /* if we established a new connection */
3213                 cr  = (oconn < C_CONNECTED);
3214                 /* if we had an established connection
3215                  * and one of the nodes newly attaches a disk */
3216                 cr |= (oconn == C_CONNECTED &&
3217                        (peer_state.disk == D_NEGOTIATING ||
3218                         mdev->state.disk == D_NEGOTIATING));
3219                 /* if we have both been inconsistent, and the peer has been
3220                  * forced to be UpToDate with --overwrite-data */
3221                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3222                 /* if we had been plain connected, and the admin requested to
3223                  * start a sync by "invalidate" or "invalidate-remote" */
3224                 cr |= (oconn == C_CONNECTED &&
3225                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3226                                  peer_state.conn <= C_WF_BITMAP_T));
3227
3228                 if (cr)
3229                         nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3230
3231                 put_ldev(mdev);
3232                 if (nconn == C_MASK) {
3233                         nconn = C_CONNECTED;
3234                         if (mdev->state.disk == D_NEGOTIATING) {
3235                                 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3236                         } else if (peer_state.disk == D_NEGOTIATING) {
3237                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3238                                 peer_state.disk = D_DISKLESS;
3239                                 real_peer_disk = D_DISKLESS;
3240                         } else {
3241                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3242                                         return FALSE;
3243                                 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3244                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3245                                 return FALSE;
3246                         }
3247                 }
3248         }
3249
3250         spin_lock_irq(&mdev->req_lock);
3251         if (mdev->state.conn != oconn)
3252                 goto retry;
3253         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3254         ns.i = mdev->state.i;
3255         ns.conn = nconn;
3256         ns.peer = peer_state.role;
3257         ns.pdsk = real_peer_disk;
3258         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3259         if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3260                 ns.disk = mdev->new_state_tmp.disk;
3261
3262         rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3263         ns = mdev->state;
3264         spin_unlock_irq(&mdev->req_lock);
3265
3266         if (rv < SS_SUCCESS) {
3267                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3268                 return FALSE;
3269         }
3270
3271         if (oconn > C_WF_REPORT_PARAMS) {
3272                 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3273                     peer_state.disk != D_NEGOTIATING ) {
3274                         /* we want resync, peer has not yet decided to sync... */
3275                         /* Nowadays only used when forcing a node into primary role and
3276                            setting its disk to UpToDate with that */
3277                         drbd_send_uuids(mdev);
3278                         drbd_send_state(mdev);
3279                 }
3280         }
3281
3282         mdev->net_conf->want_lose = 0;
3283
3284         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3285
3286         return TRUE;
3287 }
3288
3289 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3290 {
3291         struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3292
3293         wait_event(mdev->misc_wait,
3294                    mdev->state.conn == C_WF_SYNC_UUID ||
3295                    mdev->state.conn < C_CONNECTED ||
3296                    mdev->state.disk < D_NEGOTIATING);
3297
3298         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3299
3300         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3301         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3302                 return FALSE;
3303
3304         /* Here the _drbd_uuid_ functions are right, current should
3305            _not_ be rotated into the history */
3306         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3307                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3308                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3309
3310                 drbd_start_resync(mdev, C_SYNC_TARGET);
3311
3312                 put_ldev(mdev);
3313         } else
3314                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3315
3316         return TRUE;
3317 }
3318
3319 enum receive_bitmap_ret { OK, DONE, FAILED };
3320
3321 static enum receive_bitmap_ret
3322 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3323         unsigned long *buffer, struct bm_xfer_ctx *c)
3324 {
3325         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3326         unsigned want = num_words * sizeof(long);
3327
3328         if (want != h->length) {
3329                 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3330                 return FAILED;
3331         }
3332         if (want == 0)
3333                 return DONE;
3334         if (drbd_recv(mdev, buffer, want) != want)
3335                 return FAILED;
3336
3337         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3338
3339         c->word_offset += num_words;
3340         c->bit_offset = c->word_offset * BITS_PER_LONG;
3341         if (c->bit_offset > c->bm_bits)
3342                 c->bit_offset = c->bm_bits;
3343
3344         return OK;
3345 }
3346
3347 static enum receive_bitmap_ret
3348 recv_bm_rle_bits(struct drbd_conf *mdev,
3349                 struct p_compressed_bm *p,
3350                 struct bm_xfer_ctx *c)
3351 {
3352         struct bitstream bs;
3353         u64 look_ahead;
3354         u64 rl;
3355         u64 tmp;
3356         unsigned long s = c->bit_offset;
3357         unsigned long e;
3358         int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3359         int toggle = DCBP_get_start(p);
3360         int have;
3361         int bits;
3362
3363         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3364
3365         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3366         if (bits < 0)
3367                 return FAILED;
3368
3369         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3370                 bits = vli_decode_bits(&rl, look_ahead);
3371                 if (bits <= 0)
3372                         return FAILED;
3373
3374                 if (toggle) {
3375                         e = s + rl -1;
3376                         if (e >= c->bm_bits) {
3377                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3378                                 return FAILED;
3379                         }
3380                         _drbd_bm_set_bits(mdev, s, e);
3381                 }
3382
3383                 if (have < bits) {
3384                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3385                                 have, bits, look_ahead,
3386                                 (unsigned int)(bs.cur.b - p->code),
3387                                 (unsigned int)bs.buf_len);
3388                         return FAILED;
3389                 }
3390                 look_ahead >>= bits;
3391                 have -= bits;
3392
3393                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3394                 if (bits < 0)
3395                         return FAILED;
3396                 look_ahead |= tmp << have;
3397                 have += bits;
3398         }
3399
3400         c->bit_offset = s;
3401         bm_xfer_ctx_bit_to_word_offset(c);
3402
3403         return (s == c->bm_bits) ? DONE : OK;
3404 }
3405
3406 static enum receive_bitmap_ret
3407 decode_bitmap_c(struct drbd_conf *mdev,
3408                 struct p_compressed_bm *p,
3409                 struct bm_xfer_ctx *c)
3410 {
3411         if (DCBP_get_code(p) == RLE_VLI_Bits)
3412                 return recv_bm_rle_bits(mdev, p, c);
3413
3414         /* other variants had been implemented for evaluation,
3415          * but have been dropped as this one turned out to be "best"
3416          * during all our tests. */
3417
3418         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3419         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3420         return FAILED;
3421 }
3422
3423 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3424                 const char *direction, struct bm_xfer_ctx *c)
3425 {
3426         /* what would it take to transfer it "plaintext" */
3427         unsigned plain = sizeof(struct p_header) *
3428                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3429                 + c->bm_words * sizeof(long);
3430         unsigned total = c->bytes[0] + c->bytes[1];
3431         unsigned r;
3432
3433         /* total can not be zero. but just in case: */
3434         if (total == 0)
3435                 return;
3436
3437         /* don't report if not compressed */
3438         if (total >= plain)
3439                 return;
3440
3441         /* total < plain. check for overflow, still */
3442         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3443                                     : (1000 * total / plain);
3444
3445         if (r > 1000)
3446                 r = 1000;
3447
3448         r = 1000 - r;
3449         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3450              "total %u; compression: %u.%u%%\n",
3451                         direction,
3452                         c->bytes[1], c->packets[1],
3453                         c->bytes[0], c->packets[0],
3454                         total, r/10, r % 10);
3455 }
3456
3457 /* Since we are processing the bitfield from lower addresses to higher,
3458    it does not matter if the process it in 32 bit chunks or 64 bit
3459    chunks as long as it is little endian. (Understand it as byte stream,
3460    beginning with the lowest byte...) If we would use big endian
3461    we would need to process it from the highest address to the lowest,
3462    in order to be agnostic to the 32 vs 64 bits issue.
3463
3464    returns 0 on failure, 1 if we successfully received it. */
3465 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3466 {
3467         struct bm_xfer_ctx c;
3468         void *buffer;
3469         enum receive_bitmap_ret ret;
3470         int ok = FALSE;
3471
3472         wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3473
3474         drbd_bm_lock(mdev, "receive bitmap");
3475
3476         /* maybe we should use some per thread scratch page,
3477          * and allocate that during initial device creation? */
3478         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3479         if (!buffer) {
3480                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3481                 goto out;
3482         }
3483
3484         c = (struct bm_xfer_ctx) {
3485                 .bm_bits = drbd_bm_bits(mdev),
3486                 .bm_words = drbd_bm_words(mdev),
3487         };
3488
3489         do {
3490                 if (h->command == P_BITMAP) {
3491                         ret = receive_bitmap_plain(mdev, h, buffer, &c);
3492                 } else if (h->command == P_COMPRESSED_BITMAP) {
3493                         /* MAYBE: sanity check that we speak proto >= 90,
3494                          * and the feature is enabled! */
3495                         struct p_compressed_bm *p;
3496
3497                         if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3498                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3499                                 goto out;
3500                         }
3501                         /* use the page buff */
3502                         p = buffer;
3503                         memcpy(p, h, sizeof(*h));
3504                         if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3505                                 goto out;
3506                         if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3507                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3508                                 return FAILED;
3509                         }
3510                         ret = decode_bitmap_c(mdev, p, &c);
3511                 } else {
3512                         dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3513                         goto out;
3514                 }
3515
3516                 c.packets[h->command == P_BITMAP]++;
3517                 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3518
3519                 if (ret != OK)
3520                         break;
3521
3522                 if (!drbd_recv_header(mdev, h))
3523                         goto out;
3524         } while (ret == OK);
3525         if (ret == FAILED)
3526                 goto out;
3527
3528         INFO_bm_xfer_stats(mdev, "receive", &c);
3529
3530         if (mdev->state.conn == C_WF_BITMAP_T) {
3531                 ok = !drbd_send_bitmap(mdev);
3532                 if (!ok)
3533                         goto out;
3534                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3535                 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3536                 D_ASSERT(ok == SS_SUCCESS);
3537         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3538                 /* admin may have requested C_DISCONNECTING,
3539                  * other threads may have noticed network errors */
3540                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3541                     drbd_conn_str(mdev->state.conn));
3542         }
3543
3544         ok = TRUE;
3545  out:
3546         drbd_bm_unlock(mdev);
3547         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3548                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3549         free_page((unsigned long) buffer);
3550         return ok;
3551 }
3552
3553 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3554 {
3555         /* TODO zero copy sink :) */
3556         static char sink[128];
3557         int size, want, r;
3558
3559         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3560              h->command, h->length);
3561
3562         size = h->length;
3563         while (size > 0) {
3564                 want = min_t(int, size, sizeof(sink));
3565                 r = drbd_recv(mdev, sink, want);
3566                 ERR_IF(r <= 0) break;
3567                 size -= r;
3568         }
3569         return size == 0;
3570 }
3571
3572 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3573 {
3574         if (mdev->state.disk >= D_INCONSISTENT)
3575                 drbd_kick_lo(mdev);
3576
3577         /* Make sure we've acked all the TCP data associated
3578          * with the data requests being unplugged */
3579         drbd_tcp_quickack(mdev->data.socket);
3580
3581         return TRUE;
3582 }
3583
3584 static void timeval_sub_us(struct timeval* tv, unsigned int us)
3585 {
3586         tv->tv_sec -= us / 1000000;
3587         us = us % 1000000;
3588         if (tv->tv_usec > us) {
3589                 tv->tv_usec += 1000000;
3590                 tv->tv_sec--;
3591         }
3592         tv->tv_usec -= us;
3593 }
3594
3595 static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3596 {
3597         struct delay_probe *dp;
3598         struct list_head *le;
3599         struct timeval now;
3600         int seq_num;
3601         int offset;
3602         int data_delay;
3603
3604         seq_num = be32_to_cpu(p->seq_num);
3605         offset  = be32_to_cpu(p->offset);
3606
3607         spin_lock(&mdev->peer_seq_lock);
3608         if (!list_empty(&mdev->delay_probes)) {
3609                 if (from == USE_DATA_SOCKET)
3610                         le = mdev->delay_probes.next;
3611                 else
3612                         le = mdev->delay_probes.prev;
3613
3614                 dp = list_entry(le, struct delay_probe, list);
3615
3616                 if (dp->seq_num == seq_num) {
3617                         list_del(le);
3618                         spin_unlock(&mdev->peer_seq_lock);
3619                         do_gettimeofday(&now);
3620                         timeval_sub_us(&now, offset);
3621                         data_delay =
3622                                 now.tv_usec - dp->time.tv_usec +
3623                                 (now.tv_sec - dp->time.tv_sec) * 1000000;
3624
3625                         if (data_delay > 0)
3626                                 mdev->data_delay = data_delay;
3627
3628                         kfree(dp);
3629                         return;
3630                 }
3631
3632                 if (dp->seq_num > seq_num) {
3633                         spin_unlock(&mdev->peer_seq_lock);
3634                         dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3635                         return; /* Do not alloca a struct delay_probe.... */
3636                 }
3637         }
3638         spin_unlock(&mdev->peer_seq_lock);
3639
3640         dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3641         if (!dp) {
3642                 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3643                 return;
3644         }
3645
3646         dp->seq_num = seq_num;
3647         do_gettimeofday(&dp->time);
3648         timeval_sub_us(&dp->time, offset);
3649
3650         spin_lock(&mdev->peer_seq_lock);
3651         if (from == USE_DATA_SOCKET)
3652                 list_add(&dp->list, &mdev->delay_probes);
3653         else
3654                 list_add_tail(&dp->list, &mdev->delay_probes);
3655         spin_unlock(&mdev->peer_seq_lock);
3656 }
3657
3658 static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3659 {
3660         struct p_delay_probe *p = (struct p_delay_probe *)h;
3661
3662         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3663         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3664                 return FALSE;
3665
3666         got_delay_probe(mdev, USE_DATA_SOCKET, p);
3667         return TRUE;
3668 }
3669
3670 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3671
3672 static drbd_cmd_handler_f drbd_default_handler[] = {
3673         [P_DATA]            = receive_Data,
3674         [P_DATA_REPLY]      = receive_DataReply,
3675         [P_RS_DATA_REPLY]   = receive_RSDataReply,
3676         [P_BARRIER]         = receive_Barrier,
3677         [P_BITMAP]          = receive_bitmap,
3678         [P_COMPRESSED_BITMAP]    = receive_bitmap,
3679         [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3680         [P_DATA_REQUEST]    = receive_DataRequest,
3681         [P_RS_DATA_REQUEST] = receive_DataRequest,
3682         [P_SYNC_PARAM]      = receive_SyncParam,
3683         [P_SYNC_PARAM89]           = receive_SyncParam,
3684         [P_PROTOCOL]        = receive_protocol,
3685         [P_UUIDS]           = receive_uuids,
3686         [P_SIZES]           = receive_sizes,
3687         [P_STATE]           = receive_state,
3688         [P_STATE_CHG_REQ]   = receive_req_state,
3689         [P_SYNC_UUID]       = receive_sync_uuid,
3690         [P_OV_REQUEST]      = receive_DataRequest,
3691         [P_OV_REPLY]        = receive_DataRequest,
3692         [P_CSUM_RS_REQUEST]    = receive_DataRequest,
3693         [P_DELAY_PROBE]     = receive_delay_probe,
3694         /* anything missing from this table is in
3695          * the asender_tbl, see get_asender_cmd */
3696         [P_MAX_CMD]         = NULL,
3697 };
3698
3699 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3700 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3701
3702 static void drbdd(struct drbd_conf *mdev)
3703 {
3704         drbd_cmd_handler_f handler;
3705         struct p_header *header = &mdev->data.rbuf.header;
3706
3707         while (get_t_state(&mdev->receiver) == Running) {
3708                 drbd_thread_current_set_cpu(mdev);
3709                 if (!drbd_recv_header(mdev, header)) {
3710                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3711                         break;
3712                 }
3713
3714                 if (header->command < P_MAX_CMD)
3715                         handler = drbd_cmd_handler[header->command];
3716                 else if (P_MAY_IGNORE < header->command
3717                      && header->command < P_MAX_OPT_CMD)
3718                         handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3719                 else if (header->command > P_MAX_OPT_CMD)
3720                         handler = receive_skip;
3721                 else
3722                         handler = NULL;
3723
3724                 if (unlikely(!handler)) {
3725                         dev_err(DEV, "unknown packet type %d, l: %d!\n",
3726                             header->command, header->length);
3727                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3728                         break;
3729                 }
3730                 if (unlikely(!handler(mdev, header))) {
3731                         dev_err(DEV, "error receiving %s, l: %d!\n",
3732                             cmdname(header->command), header->length);
3733                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3734                         break;
3735                 }
3736         }
3737 }
3738
3739 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3740 {
3741         struct hlist_head *slot;
3742         struct hlist_node *pos;
3743         struct hlist_node *tmp;
3744         struct drbd_request *req;
3745         int i;
3746
3747         /*
3748          * Application READ requests
3749          */
3750         spin_lock_irq(&mdev->req_lock);
3751         for (i = 0; i < APP_R_HSIZE; i++) {
3752                 slot = mdev->app_reads_hash+i;
3753                 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3754                         /* it may (but should not any longer!)
3755                          * be on the work queue; if that assert triggers,
3756                          * we need to also grab the
3757                          * spin_lock_irq(&mdev->data.work.q_lock);
3758                          * and list_del_init here. */
3759                         D_ASSERT(list_empty(&req->w.list));
3760                         /* It would be nice to complete outside of spinlock.
3761                          * But this is easier for now. */
3762                         _req_mod(req, connection_lost_while_pending);
3763                 }
3764         }
3765         for (i = 0; i < APP_R_HSIZE; i++)
3766                 if (!hlist_empty(mdev->app_reads_hash+i))
3767                         dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3768                                 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3769
3770         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3771         spin_unlock_irq(&mdev->req_lock);
3772 }
3773
3774 void drbd_flush_workqueue(struct drbd_conf *mdev)
3775 {
3776         struct drbd_wq_barrier barr;
3777
3778         barr.w.cb = w_prev_work_done;
3779         init_completion(&barr.done);
3780         drbd_queue_work(&mdev->data.work, &barr.w);
3781         wait_for_completion(&barr.done);
3782 }
3783
3784 static void drbd_disconnect(struct drbd_conf *mdev)
3785 {
3786         enum drbd_fencing_p fp;
3787         union drbd_state os, ns;
3788         int rv = SS_UNKNOWN_ERROR;
3789         unsigned int i;
3790
3791         if (mdev->state.conn == C_STANDALONE)
3792                 return;
3793         if (mdev->state.conn >= C_WF_CONNECTION)
3794                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3795                                 drbd_conn_str(mdev->state.conn));
3796
3797         /* asender does not clean up anything. it must not interfere, either */
3798         drbd_thread_stop(&mdev->asender);
3799         drbd_free_sock(mdev);
3800
3801         spin_lock_irq(&mdev->req_lock);
3802         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3803         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3804         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3805         spin_unlock_irq(&mdev->req_lock);
3806
3807         /* We do not have data structures that would allow us to
3808          * get the rs_pending_cnt down to 0 again.
3809          *  * On C_SYNC_TARGET we do not have any data structures describing
3810          *    the pending RSDataRequest's we have sent.
3811          *  * On C_SYNC_SOURCE there is no data structure that tracks
3812          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3813          *  And no, it is not the sum of the reference counts in the
3814          *  resync_LRU. The resync_LRU tracks the whole operation including
3815          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3816          *  on the fly. */
3817         drbd_rs_cancel_all(mdev);
3818         mdev->rs_total = 0;
3819         mdev->rs_failed = 0;
3820         atomic_set(&mdev->rs_pending_cnt, 0);
3821         wake_up(&mdev->misc_wait);
3822
3823         /* make sure syncer is stopped and w_resume_next_sg queued */
3824         del_timer_sync(&mdev->resync_timer);
3825         set_bit(STOP_SYNC_TIMER, &mdev->flags);
3826         resync_timer_fn((unsigned long)mdev);
3827
3828         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3829          * w_make_resync_request etc. which may still be on the worker queue
3830          * to be "canceled" */
3831         drbd_flush_workqueue(mdev);
3832
3833         /* This also does reclaim_net_ee().  If we do this too early, we might
3834          * miss some resync ee and pages.*/
3835         drbd_process_done_ee(mdev);
3836
3837         kfree(mdev->p_uuid);
3838         mdev->p_uuid = NULL;
3839
3840         if (!mdev->state.susp)
3841                 tl_clear(mdev);
3842
3843         drbd_fail_pending_reads(mdev);
3844
3845         dev_info(DEV, "Connection closed\n");
3846
3847         drbd_md_sync(mdev);
3848
3849         fp = FP_DONT_CARE;
3850         if (get_ldev(mdev)) {
3851                 fp = mdev->ldev->dc.fencing;
3852                 put_ldev(mdev);
3853         }
3854
3855         if (mdev->state.role == R_PRIMARY) {
3856                 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3857                         enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3858                         drbd_request_state(mdev, NS(pdsk, nps));
3859                 }
3860         }
3861
3862         spin_lock_irq(&mdev->req_lock);
3863         os = mdev->state;
3864         if (os.conn >= C_UNCONNECTED) {
3865                 /* Do not restart in case we are C_DISCONNECTING */
3866                 ns = os;
3867                 ns.conn = C_UNCONNECTED;
3868                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3869         }
3870         spin_unlock_irq(&mdev->req_lock);
3871
3872         if (os.conn == C_DISCONNECTING) {
3873                 struct hlist_head *h;
3874                 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3875
3876                 /* we must not free the tl_hash
3877                  * while application io is still on the fly */
3878                 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3879
3880                 spin_lock_irq(&mdev->req_lock);
3881                 /* paranoia code */
3882                 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3883                         if (h->first)
3884                                 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3885                                                 (int)(h - mdev->ee_hash), h->first);
3886                 kfree(mdev->ee_hash);
3887                 mdev->ee_hash = NULL;
3888                 mdev->ee_hash_s = 0;
3889
3890                 /* paranoia code */
3891                 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3892                         if (h->first)
3893                                 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3894                                                 (int)(h - mdev->tl_hash), h->first);
3895                 kfree(mdev->tl_hash);
3896                 mdev->tl_hash = NULL;
3897                 mdev->tl_hash_s = 0;
3898                 spin_unlock_irq(&mdev->req_lock);
3899
3900                 crypto_free_hash(mdev->cram_hmac_tfm);
3901                 mdev->cram_hmac_tfm = NULL;
3902
3903                 kfree(mdev->net_conf);
3904                 mdev->net_conf = NULL;
3905                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3906         }
3907
3908         /* tcp_close and release of sendpage pages can be deferred.  I don't
3909          * want to use SO_LINGER, because apparently it can be deferred for
3910          * more than 20 seconds (longest time I checked).
3911          *
3912          * Actually we don't care for exactly when the network stack does its
3913          * put_page(), but release our reference on these pages right here.
3914          */
3915         i = drbd_release_ee(mdev, &mdev->net_ee);
3916         if (i)
3917                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3918         i = atomic_read(&mdev->pp_in_use);
3919         if (i)
3920                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3921
3922         D_ASSERT(list_empty(&mdev->read_ee));
3923         D_ASSERT(list_empty(&mdev->active_ee));
3924         D_ASSERT(list_empty(&mdev->sync_ee));
3925         D_ASSERT(list_empty(&mdev->done_ee));
3926
3927         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3928         atomic_set(&mdev->current_epoch->epoch_size, 0);
3929         D_ASSERT(list_empty(&mdev->current_epoch->list));
3930 }
3931
3932 /*
3933  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3934  * we can agree on is stored in agreed_pro_version.
3935  *
3936  * feature flags and the reserved array should be enough room for future
3937  * enhancements of the handshake protocol, and possible plugins...
3938  *
3939  * for now, they are expected to be zero, but ignored.
3940  */
3941 static int drbd_send_handshake(struct drbd_conf *mdev)
3942 {
3943         /* ASSERT current == mdev->receiver ... */
3944         struct p_handshake *p = &mdev->data.sbuf.handshake;
3945         int ok;
3946
3947         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3948                 dev_err(DEV, "interrupted during initial handshake\n");
3949                 return 0; /* interrupted. not ok. */
3950         }
3951
3952         if (mdev->data.socket == NULL) {
3953                 mutex_unlock(&mdev->data.mutex);
3954                 return 0;
3955         }
3956
3957         memset(p, 0, sizeof(*p));
3958         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3959         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3960         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3961                              (struct p_header *)p, sizeof(*p), 0 );
3962         mutex_unlock(&mdev->data.mutex);
3963         return ok;
3964 }
3965
3966 /*
3967  * return values:
3968  *   1 yes, we have a valid connection
3969  *   0 oops, did not work out, please try again
3970  *  -1 peer talks different language,
3971  *     no point in trying again, please go standalone.
3972  */
3973 static int drbd_do_handshake(struct drbd_conf *mdev)
3974 {
3975         /* ASSERT current == mdev->receiver ... */
3976         struct p_handshake *p = &mdev->data.rbuf.handshake;
3977         const int expect = sizeof(struct p_handshake)
3978                           -sizeof(struct p_header);
3979         int rv;
3980
3981         rv = drbd_send_handshake(mdev);
3982         if (!rv)
3983                 return 0;
3984
3985         rv = drbd_recv_header(mdev, &p->head);
3986         if (!rv)
3987                 return 0;
3988
3989         if (p->head.command != P_HAND_SHAKE) {
3990                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3991                      cmdname(p->head.command), p->head.command);
3992                 return -1;
3993         }
3994
3995         if (p->head.length != expect) {
3996                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3997                      expect, p->head.length);
3998                 return -1;
3999         }
4000
4001         rv = drbd_recv(mdev, &p->head.payload, expect);
4002
4003         if (rv != expect) {
4004                 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4005                 return 0;
4006         }
4007
4008         p->protocol_min = be32_to_cpu(p->protocol_min);
4009         p->protocol_max = be32_to_cpu(p->protocol_max);
4010         if (p->protocol_max == 0)
4011                 p->protocol_max = p->protocol_min;
4012
4013         if (PRO_VERSION_MAX < p->protocol_min ||
4014             PRO_VERSION_MIN > p->protocol_max)
4015                 goto incompat;
4016
4017         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4018
4019         dev_info(DEV, "Handshake successful: "
4020              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4021
4022         return 1;
4023
4024  incompat:
4025         dev_err(DEV, "incompatible DRBD dialects: "
4026             "I support %d-%d, peer supports %d-%d\n",
4027             PRO_VERSION_MIN, PRO_VERSION_MAX,
4028             p->protocol_min, p->protocol_max);
4029         return -1;
4030 }
4031
4032 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4033 static int drbd_do_auth(struct drbd_conf *mdev)
4034 {
4035         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4036         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4037         return -1;
4038 }
4039 #else
4040 #define CHALLENGE_LEN 64
4041
4042 /* Return value:
4043         1 - auth succeeded,
4044         0 - failed, try again (network error),
4045         -1 - auth failed, don't try again.
4046 */
4047
4048 static int drbd_do_auth(struct drbd_conf *mdev)
4049 {
4050         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4051         struct scatterlist sg;
4052         char *response = NULL;
4053         char *right_response = NULL;
4054         char *peers_ch = NULL;
4055         struct p_header p;
4056         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4057         unsigned int resp_size;
4058         struct hash_desc desc;
4059         int rv;
4060
4061         desc.tfm = mdev->cram_hmac_tfm;
4062         desc.flags = 0;
4063
4064         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4065                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4066         if (rv) {
4067                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4068                 rv = -1;
4069                 goto fail;
4070         }
4071
4072         get_random_bytes(my_challenge, CHALLENGE_LEN);
4073
4074         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4075         if (!rv)
4076                 goto fail;
4077
4078         rv = drbd_recv_header(mdev, &p);
4079         if (!rv)
4080                 goto fail;
4081
4082         if (p.command != P_AUTH_CHALLENGE) {
4083                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4084                     cmdname(p.command), p.command);
4085                 rv = 0;
4086                 goto fail;
4087         }
4088
4089         if (p.length > CHALLENGE_LEN*2) {
4090                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4091                 rv = -1;
4092                 goto fail;
4093         }
4094
4095         peers_ch = kmalloc(p.length, GFP_NOIO);
4096         if (peers_ch == NULL) {
4097                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4098                 rv = -1;
4099                 goto fail;
4100         }
4101
4102         rv = drbd_recv(mdev, peers_ch, p.length);
4103
4104         if (rv != p.length) {
4105                 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4106                 rv = 0;
4107                 goto fail;
4108         }
4109
4110         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4111         response = kmalloc(resp_size, GFP_NOIO);
4112         if (response == NULL) {
4113                 dev_err(DEV, "kmalloc of response failed\n");
4114                 rv = -1;
4115                 goto fail;
4116         }
4117
4118         sg_init_table(&sg, 1);
4119         sg_set_buf(&sg, peers_ch, p.length);
4120
4121         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4122         if (rv) {
4123                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4124                 rv = -1;
4125                 goto fail;
4126         }
4127
4128         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4129         if (!rv)
4130                 goto fail;
4131
4132         rv = drbd_recv_header(mdev, &p);
4133         if (!rv)
4134                 goto fail;
4135
4136         if (p.command != P_AUTH_RESPONSE) {
4137                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4138                     cmdname(p.command), p.command);
4139                 rv = 0;
4140                 goto fail;
4141         }
4142
4143         if (p.length != resp_size) {
4144                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4145                 rv = 0;
4146                 goto fail;
4147         }
4148
4149         rv = drbd_recv(mdev, response , resp_size);
4150
4151         if (rv != resp_size) {
4152                 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4153                 rv = 0;
4154                 goto fail;
4155         }
4156
4157         right_response = kmalloc(resp_size, GFP_NOIO);
4158         if (right_response == NULL) {
4159                 dev_err(DEV, "kmalloc of right_response failed\n");
4160                 rv = -1;
4161                 goto fail;
4162         }
4163
4164         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4165
4166         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4167         if (rv) {
4168                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4169                 rv = -1;
4170                 goto fail;
4171         }
4172
4173         rv = !memcmp(response, right_response, resp_size);
4174
4175         if (rv)
4176                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4177                      resp_size, mdev->net_conf->cram_hmac_alg);
4178         else
4179                 rv = -1;
4180
4181  fail:
4182         kfree(peers_ch);
4183         kfree(response);
4184         kfree(right_response);
4185
4186         return rv;
4187 }
4188 #endif
4189
4190 int drbdd_init(struct drbd_thread *thi)
4191 {
4192         struct drbd_conf *mdev = thi->mdev;
4193         unsigned int minor = mdev_to_minor(mdev);
4194         int h;
4195
4196         sprintf(current->comm, "drbd%d_receiver", minor);
4197
4198         dev_info(DEV, "receiver (re)started\n");
4199
4200         do {
4201                 h = drbd_connect(mdev);
4202                 if (h == 0) {
4203                         drbd_disconnect(mdev);
4204                         __set_current_state(TASK_INTERRUPTIBLE);
4205                         schedule_timeout(HZ);
4206                 }
4207                 if (h == -1) {
4208                         dev_warn(DEV, "Discarding network configuration.\n");
4209                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4210                 }
4211         } while (h == 0);
4212
4213         if (h > 0) {
4214                 if (get_net_conf(mdev)) {
4215                         drbdd(mdev);
4216                         put_net_conf(mdev);
4217                 }
4218         }
4219
4220         drbd_disconnect(mdev);
4221
4222         dev_info(DEV, "receiver terminated\n");
4223         return 0;
4224 }
4225
4226 /* ********* acknowledge sender ******** */
4227
4228 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4229 {
4230         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4231
4232         int retcode = be32_to_cpu(p->retcode);
4233
4234         if (retcode >= SS_SUCCESS) {
4235                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4236         } else {
4237                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4238                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4239                     drbd_set_st_err_str(retcode), retcode);
4240         }
4241         wake_up(&mdev->state_wait);
4242
4243         return TRUE;
4244 }
4245
4246 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4247 {
4248         return drbd_send_ping_ack(mdev);
4249
4250 }
4251
4252 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4253 {
4254         /* restore idle timeout */
4255         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4256         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4257                 wake_up(&mdev->misc_wait);
4258
4259         return TRUE;
4260 }
4261
4262 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4263 {
4264         struct p_block_ack *p = (struct p_block_ack *)h;
4265         sector_t sector = be64_to_cpu(p->sector);
4266         int blksize = be32_to_cpu(p->blksize);
4267
4268         D_ASSERT(mdev->agreed_pro_version >= 89);
4269
4270         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4271
4272         drbd_rs_complete_io(mdev, sector);
4273         drbd_set_in_sync(mdev, sector, blksize);
4274         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4275         mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4276         dec_rs_pending(mdev);
4277
4278         return TRUE;
4279 }
4280
4281 /* when we receive the ACK for a write request,
4282  * verify that we actually know about it */
4283 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4284         u64 id, sector_t sector)
4285 {
4286         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4287         struct hlist_node *n;
4288         struct drbd_request *req;
4289
4290         hlist_for_each_entry(req, n, slot, colision) {
4291                 if ((unsigned long)req == (unsigned long)id) {
4292                         if (req->sector != sector) {
4293                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4294                                     "wrong sector (%llus versus %llus)\n", req,
4295                                     (unsigned long long)req->sector,
4296                                     (unsigned long long)sector);
4297                                 break;
4298                         }
4299                         return req;
4300                 }
4301         }
4302         dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4303                 (void *)(unsigned long)id, (unsigned long long)sector);
4304         return NULL;
4305 }
4306
4307 typedef struct drbd_request *(req_validator_fn)
4308         (struct drbd_conf *mdev, u64 id, sector_t sector);
4309
4310 static int validate_req_change_req_state(struct drbd_conf *mdev,
4311         u64 id, sector_t sector, req_validator_fn validator,
4312         const char *func, enum drbd_req_event what)
4313 {
4314         struct drbd_request *req;
4315         struct bio_and_error m;
4316
4317         spin_lock_irq(&mdev->req_lock);
4318         req = validator(mdev, id, sector);
4319         if (unlikely(!req)) {
4320                 spin_unlock_irq(&mdev->req_lock);
4321                 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4322                 return FALSE;
4323         }
4324         __req_mod(req, what, &m);
4325         spin_unlock_irq(&mdev->req_lock);
4326
4327         if (m.bio)
4328                 complete_master_bio(mdev, &m);
4329         return TRUE;
4330 }
4331
4332 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4333 {
4334         struct p_block_ack *p = (struct p_block_ack *)h;
4335         sector_t sector = be64_to_cpu(p->sector);
4336         int blksize = be32_to_cpu(p->blksize);
4337         enum drbd_req_event what;
4338
4339         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4340
4341         if (is_syncer_block_id(p->block_id)) {
4342                 drbd_set_in_sync(mdev, sector, blksize);
4343                 dec_rs_pending(mdev);
4344                 return TRUE;
4345         }
4346         switch (be16_to_cpu(h->command)) {
4347         case P_RS_WRITE_ACK:
4348                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4349                 what = write_acked_by_peer_and_sis;
4350                 break;
4351         case P_WRITE_ACK:
4352                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4353                 what = write_acked_by_peer;
4354                 break;
4355         case P_RECV_ACK:
4356                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4357                 what = recv_acked_by_peer;
4358                 break;
4359         case P_DISCARD_ACK:
4360                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4361                 what = conflict_discarded_by_peer;
4362                 break;
4363         default:
4364                 D_ASSERT(0);
4365                 return FALSE;
4366         }
4367
4368         return validate_req_change_req_state(mdev, p->block_id, sector,
4369                 _ack_id_to_req, __func__ , what);
4370 }
4371
4372 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4373 {
4374         struct p_block_ack *p = (struct p_block_ack *)h;
4375         sector_t sector = be64_to_cpu(p->sector);
4376
4377         if (__ratelimit(&drbd_ratelimit_state))
4378                 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4379
4380         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4381
4382         if (is_syncer_block_id(p->block_id)) {
4383                 int size = be32_to_cpu(p->blksize);
4384                 dec_rs_pending(mdev);
4385                 drbd_rs_failed_io(mdev, sector, size);
4386                 return TRUE;
4387         }
4388         return validate_req_change_req_state(mdev, p->block_id, sector,
4389                 _ack_id_to_req, __func__ , neg_acked);
4390 }
4391
4392 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4393 {
4394         struct p_block_ack *p = (struct p_block_ack *)h;
4395         sector_t sector = be64_to_cpu(p->sector);
4396
4397         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4398         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4399             (unsigned long long)sector, be32_to_cpu(p->blksize));
4400
4401         return validate_req_change_req_state(mdev, p->block_id, sector,
4402                 _ar_id_to_req, __func__ , neg_acked);
4403 }
4404
4405 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4406 {
4407         sector_t sector;
4408         int size;
4409         struct p_block_ack *p = (struct p_block_ack *)h;
4410
4411         sector = be64_to_cpu(p->sector);
4412         size = be32_to_cpu(p->blksize);
4413
4414         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4415
4416         dec_rs_pending(mdev);
4417
4418         if (get_ldev_if_state(mdev, D_FAILED)) {
4419                 drbd_rs_complete_io(mdev, sector);
4420                 drbd_rs_failed_io(mdev, sector, size);
4421                 put_ldev(mdev);
4422         }
4423
4424         return TRUE;
4425 }
4426
4427 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4428 {
4429         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4430
4431         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4432
4433         return TRUE;
4434 }
4435
4436 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4437 {
4438         struct p_block_ack *p = (struct p_block_ack *)h;
4439         struct drbd_work *w;
4440         sector_t sector;
4441         int size;
4442
4443         sector = be64_to_cpu(p->sector);
4444         size = be32_to_cpu(p->blksize);
4445
4446         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4447
4448         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4449                 drbd_ov_oos_found(mdev, sector, size);
4450         else
4451                 ov_oos_print(mdev);
4452
4453         drbd_rs_complete_io(mdev, sector);
4454         dec_rs_pending(mdev);
4455
4456         if (--mdev->ov_left == 0) {
4457                 w = kmalloc(sizeof(*w), GFP_NOIO);
4458                 if (w) {
4459                         w->cb = w_ov_finished;
4460                         drbd_queue_work_front(&mdev->data.work, w);
4461                 } else {
4462                         dev_err(DEV, "kmalloc(w) failed.");
4463                         ov_oos_print(mdev);
4464                         drbd_resync_finished(mdev);
4465                 }
4466         }
4467         return TRUE;
4468 }
4469
4470 static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4471 {
4472         struct p_delay_probe *p = (struct p_delay_probe *)h;
4473
4474         got_delay_probe(mdev, USE_META_SOCKET, p);
4475         return TRUE;
4476 }
4477
4478 struct asender_cmd {
4479         size_t pkt_size;
4480         int (*process)(struct drbd_conf *mdev, struct p_header *h);
4481 };
4482
4483 static struct asender_cmd *get_asender_cmd(int cmd)
4484 {
4485         static struct asender_cmd asender_tbl[] = {
4486                 /* anything missing from this table is in
4487                  * the drbd_cmd_handler (drbd_default_handler) table,
4488                  * see the beginning of drbdd() */
4489         [P_PING]            = { sizeof(struct p_header), got_Ping },
4490         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4491         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4492         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4493         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4494         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4495         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4496         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4497         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4498         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4499         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4500         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4501         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4502         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
4503         [P_MAX_CMD]         = { 0, NULL },
4504         };
4505         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4506                 return NULL;
4507         return &asender_tbl[cmd];
4508 }
4509
4510 int drbd_asender(struct drbd_thread *thi)
4511 {
4512         struct drbd_conf *mdev = thi->mdev;
4513         struct p_header *h = &mdev->meta.rbuf.header;
4514         struct asender_cmd *cmd = NULL;
4515
4516         int rv, len;
4517         void *buf    = h;
4518         int received = 0;
4519         int expect   = sizeof(struct p_header);
4520         int empty;
4521
4522         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4523
4524         current->policy = SCHED_RR;  /* Make this a realtime task! */
4525         current->rt_priority = 2;    /* more important than all other tasks */
4526
4527         while (get_t_state(thi) == Running) {
4528                 drbd_thread_current_set_cpu(mdev);
4529                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4530                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4531                         mdev->meta.socket->sk->sk_rcvtimeo =
4532                                 mdev->net_conf->ping_timeo*HZ/10;
4533                 }
4534
4535                 /* conditionally cork;
4536                  * it may hurt latency if we cork without much to send */
4537                 if (!mdev->net_conf->no_cork &&
4538                         3 < atomic_read(&mdev->unacked_cnt))
4539                         drbd_tcp_cork(mdev->meta.socket);
4540                 while (1) {
4541                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4542                         flush_signals(current);
4543                         if (!drbd_process_done_ee(mdev)) {
4544                                 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4545                                 goto reconnect;
4546                         }
4547                         /* to avoid race with newly queued ACKs */
4548                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4549                         spin_lock_irq(&mdev->req_lock);
4550                         empty = list_empty(&mdev->done_ee);
4551                         spin_unlock_irq(&mdev->req_lock);
4552                         /* new ack may have been queued right here,
4553                          * but then there is also a signal pending,
4554                          * and we start over... */
4555                         if (empty)
4556                                 break;
4557                 }
4558                 /* but unconditionally uncork unless disabled */
4559                 if (!mdev->net_conf->no_cork)
4560                         drbd_tcp_uncork(mdev->meta.socket);
4561
4562                 /* short circuit, recv_msg would return EINTR anyways. */
4563                 if (signal_pending(current))
4564                         continue;
4565
4566                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4567                                      buf, expect-received, 0);
4568                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4569
4570                 flush_signals(current);
4571
4572                 /* Note:
4573                  * -EINTR        (on meta) we got a signal
4574                  * -EAGAIN       (on meta) rcvtimeo expired
4575                  * -ECONNRESET   other side closed the connection
4576                  * -ERESTARTSYS  (on data) we got a signal
4577                  * rv <  0       other than above: unexpected error!
4578                  * rv == expected: full header or command
4579                  * rv <  expected: "woken" by signal during receive
4580                  * rv == 0       : "connection shut down by peer"
4581                  */
4582                 if (likely(rv > 0)) {
4583                         received += rv;
4584                         buf      += rv;
4585                 } else if (rv == 0) {
4586                         dev_err(DEV, "meta connection shut down by peer.\n");
4587                         goto reconnect;
4588                 } else if (rv == -EAGAIN) {
4589                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4590                             mdev->net_conf->ping_timeo*HZ/10) {
4591                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4592                                 goto reconnect;
4593                         }
4594                         set_bit(SEND_PING, &mdev->flags);
4595                         continue;
4596                 } else if (rv == -EINTR) {
4597                         continue;
4598                 } else {
4599                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4600                         goto reconnect;
4601                 }
4602
4603                 if (received == expect && cmd == NULL) {
4604                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4605                                 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4606                                     (long)be32_to_cpu(h->magic),
4607                                     h->command, h->length);
4608                                 goto reconnect;
4609                         }
4610                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4611                         len = be16_to_cpu(h->length);
4612                         if (unlikely(cmd == NULL)) {
4613                                 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4614                                     (long)be32_to_cpu(h->magic),
4615                                     h->command, h->length);
4616                                 goto disconnect;
4617                         }
4618                         expect = cmd->pkt_size;
4619                         ERR_IF(len != expect-sizeof(struct p_header))
4620                                 goto reconnect;
4621                 }
4622                 if (received == expect) {
4623                         D_ASSERT(cmd != NULL);
4624                         if (!cmd->process(mdev, h))
4625                                 goto reconnect;
4626
4627                         buf      = h;
4628                         received = 0;
4629                         expect   = sizeof(struct p_header);
4630                         cmd      = NULL;
4631                 }
4632         }
4633
4634         if (0) {
4635 reconnect:
4636                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4637         }
4638         if (0) {
4639 disconnect:
4640                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4641         }
4642         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4643
4644         D_ASSERT(mdev->state.conn < C_CONNECTED);
4645         dev_info(DEV, "asender terminated\n");
4646
4647         return 0;
4648 }