Merge branch 'master' into for-2.6.35
[safe/jmp/linux-2.6] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50
51 #include "drbd_vli.h"
52
53 struct flush_work {
54         struct drbd_work w;
55         struct drbd_epoch *epoch;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72         struct drbd_epoch *prev;
73         spin_lock(&mdev->epoch_lock);
74         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75         if (prev == epoch || prev == mdev->current_epoch)
76                 prev = NULL;
77         spin_unlock(&mdev->epoch_lock);
78         return prev;
79 }
80
81 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84 {
85         struct page *page = NULL;
86
87         /* Yes, testing drbd_pp_vacant outside the lock is racy.
88          * So what. It saves a spin_lock. */
89         if (drbd_pp_vacant > 0) {
90                 spin_lock(&drbd_pp_lock);
91                 page = drbd_pp_pool;
92                 if (page) {
93                         drbd_pp_pool = (struct page *)page_private(page);
94                         set_page_private(page, 0); /* just to be polite */
95                         drbd_pp_vacant--;
96                 }
97                 spin_unlock(&drbd_pp_lock);
98         }
99         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100          * "criss-cross" setup, that might cause write-out on some other DRBD,
101          * which in turn might block on the other node at this very place.  */
102         if (!page)
103                 page = alloc_page(GFP_TRY);
104         if (page)
105                 atomic_inc(&mdev->pp_in_use);
106         return page;
107 }
108
109 /* kick lower level device, if we have more than (arbitrary number)
110  * reference counts on it, which typically are locally submitted io
111  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
112 static void maybe_kick_lo(struct drbd_conf *mdev)
113 {
114         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115                 drbd_kick_lo(mdev);
116 }
117
118 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119 {
120         struct drbd_epoch_entry *e;
121         struct list_head *le, *tle;
122
123         /* The EEs are always appended to the end of the list. Since
124            they are sent in order over the wire, they have to finish
125            in order. As soon as we see the first not finished we can
126            stop to examine the list... */
127
128         list_for_each_safe(le, tle, &mdev->net_ee) {
129                 e = list_entry(le, struct drbd_epoch_entry, w.list);
130                 if (drbd_bio_has_active_page(e->private_bio))
131                         break;
132                 list_move(le, to_be_freed);
133         }
134 }
135
136 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137 {
138         LIST_HEAD(reclaimed);
139         struct drbd_epoch_entry *e, *t;
140
141         maybe_kick_lo(mdev);
142         spin_lock_irq(&mdev->req_lock);
143         reclaim_net_ee(mdev, &reclaimed);
144         spin_unlock_irq(&mdev->req_lock);
145
146         list_for_each_entry_safe(e, t, &reclaimed, w.list)
147                 drbd_free_ee(mdev, e);
148 }
149
150 /**
151  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152  * @mdev:       DRBD device.
153  * @retry:      whether or not to retry allocation forever (or until signalled)
154  *
155  * Tries to allocate a page, first from our own page pool, then from the
156  * kernel, unless this allocation would exceed the max_buffers setting.
157  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158  */
159 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160 {
161         struct page *page = NULL;
162         DEFINE_WAIT(wait);
163
164         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165                 page = drbd_pp_first_page_or_try_alloc(mdev);
166                 if (page)
167                         return page;
168         }
169
170         for (;;) {
171                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173                 drbd_kick_lo_and_reclaim_net(mdev);
174
175                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176                         page = drbd_pp_first_page_or_try_alloc(mdev);
177                         if (page)
178                                 break;
179                 }
180
181                 if (!retry)
182                         break;
183
184                 if (signal_pending(current)) {
185                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186                         break;
187                 }
188
189                 schedule();
190         }
191         finish_wait(&drbd_pp_wait, &wait);
192
193         return page;
194 }
195
196 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199 {
200         int free_it;
201
202         spin_lock(&drbd_pp_lock);
203         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204                 free_it = 1;
205         } else {
206                 set_page_private(page, (unsigned long)drbd_pp_pool);
207                 drbd_pp_pool = page;
208                 drbd_pp_vacant++;
209                 free_it = 0;
210         }
211         spin_unlock(&drbd_pp_lock);
212
213         atomic_dec(&mdev->pp_in_use);
214
215         if (free_it)
216                 __free_page(page);
217
218         wake_up(&drbd_pp_wait);
219 }
220
221 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222 {
223         struct page *p_to_be_freed = NULL;
224         struct page *page;
225         struct bio_vec *bvec;
226         int i;
227
228         spin_lock(&drbd_pp_lock);
229         __bio_for_each_segment(bvec, bio, i, 0) {
230                 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231                         set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232                         p_to_be_freed = bvec->bv_page;
233                 } else {
234                         set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235                         drbd_pp_pool = bvec->bv_page;
236                         drbd_pp_vacant++;
237                 }
238         }
239         spin_unlock(&drbd_pp_lock);
240         atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242         while (p_to_be_freed) {
243                 page = p_to_be_freed;
244                 p_to_be_freed = (struct page *)page_private(page);
245                 set_page_private(page, 0); /* just to be polite */
246                 put_page(page);
247         }
248
249         wake_up(&drbd_pp_wait);
250 }
251
252 /*
253 You need to hold the req_lock:
254  _drbd_wait_ee_list_empty()
255
256 You must not have the req_lock:
257  drbd_free_ee()
258  drbd_alloc_ee()
259  drbd_init_ee()
260  drbd_release_ee()
261  drbd_ee_fix_bhs()
262  drbd_process_done_ee()
263  drbd_clear_done_ee()
264  drbd_wait_ee_list_empty()
265 */
266
267 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268                                      u64 id,
269                                      sector_t sector,
270                                      unsigned int data_size,
271                                      gfp_t gfp_mask) __must_hold(local)
272 {
273         struct request_queue *q;
274         struct drbd_epoch_entry *e;
275         struct page *page;
276         struct bio *bio;
277         unsigned int ds;
278
279         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280                 return NULL;
281
282         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283         if (!e) {
284                 if (!(gfp_mask & __GFP_NOWARN))
285                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286                 return NULL;
287         }
288
289         bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290         if (!bio) {
291                 if (!(gfp_mask & __GFP_NOWARN))
292                         dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293                 goto fail1;
294         }
295
296         bio->bi_bdev = mdev->ldev->backing_bdev;
297         bio->bi_sector = sector;
298
299         ds = data_size;
300         while (ds) {
301                 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302                 if (!page) {
303                         if (!(gfp_mask & __GFP_NOWARN))
304                                 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305                         goto fail2;
306                 }
307                 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308                         drbd_pp_free(mdev, page);
309                         dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310                             "data_size=%u,ds=%u) failed\n",
311                             (unsigned long long)sector, data_size, ds);
312
313                         q = bdev_get_queue(bio->bi_bdev);
314                         if (q->merge_bvec_fn) {
315                                 struct bvec_merge_data bvm = {
316                                         .bi_bdev = bio->bi_bdev,
317                                         .bi_sector = bio->bi_sector,
318                                         .bi_size = bio->bi_size,
319                                         .bi_rw = bio->bi_rw,
320                                 };
321                                 int l = q->merge_bvec_fn(q, &bvm,
322                                                 &bio->bi_io_vec[bio->bi_vcnt]);
323                                 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324                         }
325
326                         /* dump more of the bio. */
327                         dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328                         dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329                         dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330                         dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332                         goto fail2;
333                         break;
334                 }
335                 ds -= min_t(int, ds, PAGE_SIZE);
336         }
337
338         D_ASSERT(data_size == bio->bi_size);
339
340         bio->bi_private = e;
341         e->mdev = mdev;
342         e->sector = sector;
343         e->size = bio->bi_size;
344
345         e->private_bio = bio;
346         e->block_id = id;
347         INIT_HLIST_NODE(&e->colision);
348         e->epoch = NULL;
349         e->flags = 0;
350
351         return e;
352
353  fail2:
354         drbd_pp_free_bio_pages(mdev, bio);
355         bio_put(bio);
356  fail1:
357         mempool_free(e, drbd_ee_mempool);
358
359         return NULL;
360 }
361
362 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363 {
364         struct bio *bio = e->private_bio;
365         drbd_pp_free_bio_pages(mdev, bio);
366         bio_put(bio);
367         D_ASSERT(hlist_unhashed(&e->colision));
368         mempool_free(e, drbd_ee_mempool);
369 }
370
371 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372 {
373         LIST_HEAD(work_list);
374         struct drbd_epoch_entry *e, *t;
375         int count = 0;
376
377         spin_lock_irq(&mdev->req_lock);
378         list_splice_init(list, &work_list);
379         spin_unlock_irq(&mdev->req_lock);
380
381         list_for_each_entry_safe(e, t, &work_list, w.list) {
382                 drbd_free_ee(mdev, e);
383                 count++;
384         }
385         return count;
386 }
387
388
389 /*
390  * This function is called from _asender only_
391  * but see also comments in _req_mod(,barrier_acked)
392  * and receive_Barrier.
393  *
394  * Move entries from net_ee to done_ee, if ready.
395  * Grab done_ee, call all callbacks, free the entries.
396  * The callbacks typically send out ACKs.
397  */
398 static int drbd_process_done_ee(struct drbd_conf *mdev)
399 {
400         LIST_HEAD(work_list);
401         LIST_HEAD(reclaimed);
402         struct drbd_epoch_entry *e, *t;
403         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405         spin_lock_irq(&mdev->req_lock);
406         reclaim_net_ee(mdev, &reclaimed);
407         list_splice_init(&mdev->done_ee, &work_list);
408         spin_unlock_irq(&mdev->req_lock);
409
410         list_for_each_entry_safe(e, t, &reclaimed, w.list)
411                 drbd_free_ee(mdev, e);
412
413         /* possible callbacks here:
414          * e_end_block, and e_end_resync_block, e_send_discard_ack.
415          * all ignore the last argument.
416          */
417         list_for_each_entry_safe(e, t, &work_list, w.list) {
418                 /* list_del not necessary, next/prev members not touched */
419                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420                 drbd_free_ee(mdev, e);
421         }
422         wake_up(&mdev->ee_wait);
423
424         return ok;
425 }
426
427 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428 {
429         DEFINE_WAIT(wait);
430
431         /* avoids spin_lock/unlock
432          * and calling prepare_to_wait in the fast path */
433         while (!list_empty(head)) {
434                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435                 spin_unlock_irq(&mdev->req_lock);
436                 drbd_kick_lo(mdev);
437                 schedule();
438                 finish_wait(&mdev->ee_wait, &wait);
439                 spin_lock_irq(&mdev->req_lock);
440         }
441 }
442
443 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 {
445         spin_lock_irq(&mdev->req_lock);
446         _drbd_wait_ee_list_empty(mdev, head);
447         spin_unlock_irq(&mdev->req_lock);
448 }
449
450 /* see also kernel_accept; which is only present since 2.6.18.
451  * also we want to log which part of it failed, exactly */
452 static int drbd_accept(struct drbd_conf *mdev, const char **what,
453                 struct socket *sock, struct socket **newsock)
454 {
455         struct sock *sk = sock->sk;
456         int err = 0;
457
458         *what = "listen";
459         err = sock->ops->listen(sock, 5);
460         if (err < 0)
461                 goto out;
462
463         *what = "sock_create_lite";
464         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465                                newsock);
466         if (err < 0)
467                 goto out;
468
469         *what = "accept";
470         err = sock->ops->accept(sock, *newsock, 0);
471         if (err < 0) {
472                 sock_release(*newsock);
473                 *newsock = NULL;
474                 goto out;
475         }
476         (*newsock)->ops  = sock->ops;
477
478 out:
479         return err;
480 }
481
482 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483                     void *buf, size_t size, int flags)
484 {
485         mm_segment_t oldfs;
486         struct kvec iov = {
487                 .iov_base = buf,
488                 .iov_len = size,
489         };
490         struct msghdr msg = {
491                 .msg_iovlen = 1,
492                 .msg_iov = (struct iovec *)&iov,
493                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494         };
495         int rv;
496
497         oldfs = get_fs();
498         set_fs(KERNEL_DS);
499         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500         set_fs(oldfs);
501
502         return rv;
503 }
504
505 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506 {
507         mm_segment_t oldfs;
508         struct kvec iov = {
509                 .iov_base = buf,
510                 .iov_len = size,
511         };
512         struct msghdr msg = {
513                 .msg_iovlen = 1,
514                 .msg_iov = (struct iovec *)&iov,
515                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516         };
517         int rv;
518
519         oldfs = get_fs();
520         set_fs(KERNEL_DS);
521
522         for (;;) {
523                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524                 if (rv == size)
525                         break;
526
527                 /* Note:
528                  * ECONNRESET   other side closed the connection
529                  * ERESTARTSYS  (on  sock) we got a signal
530                  */
531
532                 if (rv < 0) {
533                         if (rv == -ECONNRESET)
534                                 dev_info(DEV, "sock was reset by peer\n");
535                         else if (rv != -ERESTARTSYS)
536                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537                         break;
538                 } else if (rv == 0) {
539                         dev_info(DEV, "sock was shut down by peer\n");
540                         break;
541                 } else  {
542                         /* signal came in, or peer/link went down,
543                          * after we read a partial message
544                          */
545                         /* D_ASSERT(signal_pending(current)); */
546                         break;
547                 }
548         };
549
550         set_fs(oldfs);
551
552         if (rv != size)
553                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555         return rv;
556 }
557
558 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559 {
560         const char *what;
561         struct socket *sock;
562         struct sockaddr_in6 src_in6;
563         int err;
564         int disconnect_on_error = 1;
565
566         if (!get_net_conf(mdev))
567                 return NULL;
568
569         what = "sock_create_kern";
570         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571                 SOCK_STREAM, IPPROTO_TCP, &sock);
572         if (err < 0) {
573                 sock = NULL;
574                 goto out;
575         }
576
577         sock->sk->sk_rcvtimeo =
578         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
579
580        /* explicitly bind to the configured IP as source IP
581         *  for the outgoing connections.
582         *  This is needed for multihomed hosts and to be
583         *  able to use lo: interfaces for drbd.
584         * Make sure to use 0 as port number, so linux selects
585         *  a free one dynamically.
586         */
587         memcpy(&src_in6, mdev->net_conf->my_addr,
588                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590                 src_in6.sin6_port = 0;
591         else
592                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594         what = "bind before connect";
595         err = sock->ops->bind(sock,
596                               (struct sockaddr *) &src_in6,
597                               mdev->net_conf->my_addr_len);
598         if (err < 0)
599                 goto out;
600
601         /* connect may fail, peer not yet available.
602          * stay C_WF_CONNECTION, don't go Disconnecting! */
603         disconnect_on_error = 0;
604         what = "connect";
605         err = sock->ops->connect(sock,
606                                  (struct sockaddr *)mdev->net_conf->peer_addr,
607                                  mdev->net_conf->peer_addr_len, 0);
608
609 out:
610         if (err < 0) {
611                 if (sock) {
612                         sock_release(sock);
613                         sock = NULL;
614                 }
615                 switch (-err) {
616                         /* timeout, busy, signal pending */
617                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618                 case EINTR: case ERESTARTSYS:
619                         /* peer not (yet) available, network problem */
620                 case ECONNREFUSED: case ENETUNREACH:
621                 case EHOSTDOWN:    case EHOSTUNREACH:
622                         disconnect_on_error = 0;
623                         break;
624                 default:
625                         dev_err(DEV, "%s failed, err = %d\n", what, err);
626                 }
627                 if (disconnect_on_error)
628                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629         }
630         put_net_conf(mdev);
631         return sock;
632 }
633
634 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635 {
636         int timeo, err;
637         struct socket *s_estab = NULL, *s_listen;
638         const char *what;
639
640         if (!get_net_conf(mdev))
641                 return NULL;
642
643         what = "sock_create_kern";
644         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646         if (err) {
647                 s_listen = NULL;
648                 goto out;
649         }
650
651         timeo = mdev->net_conf->try_connect_int * HZ;
652         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
655         s_listen->sk->sk_rcvtimeo = timeo;
656         s_listen->sk->sk_sndtimeo = timeo;
657
658         what = "bind before listen";
659         err = s_listen->ops->bind(s_listen,
660                               (struct sockaddr *) mdev->net_conf->my_addr,
661                               mdev->net_conf->my_addr_len);
662         if (err < 0)
663                 goto out;
664
665         err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667 out:
668         if (s_listen)
669                 sock_release(s_listen);
670         if (err < 0) {
671                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672                         dev_err(DEV, "%s failed, err = %d\n", what, err);
673                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674                 }
675         }
676         put_net_conf(mdev);
677
678         return s_estab;
679 }
680
681 static int drbd_send_fp(struct drbd_conf *mdev,
682         struct socket *sock, enum drbd_packets cmd)
683 {
684         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687 }
688
689 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690 {
691         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692         int rr;
693
694         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697                 return be16_to_cpu(h->command);
698
699         return 0xffff;
700 }
701
702 /**
703  * drbd_socket_okay() - Free the socket if its connection is not okay
704  * @mdev:       DRBD device.
705  * @sock:       pointer to the pointer to the socket.
706  */
707 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708 {
709         int rr;
710         char tb[4];
711
712         if (!*sock)
713                 return FALSE;
714
715         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717         if (rr > 0 || rr == -EAGAIN) {
718                 return TRUE;
719         } else {
720                 sock_release(*sock);
721                 *sock = NULL;
722                 return FALSE;
723         }
724 }
725
726 /*
727  * return values:
728  *   1 yes, we have a valid connection
729  *   0 oops, did not work out, please try again
730  *  -1 peer talks different language,
731  *     no point in trying again, please go standalone.
732  *  -2 We do not have a network config...
733  */
734 static int drbd_connect(struct drbd_conf *mdev)
735 {
736         struct socket *s, *sock, *msock;
737         int try, h, ok;
738
739         D_ASSERT(!mdev->data.socket);
740
741         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745                 return -2;
746
747         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749         sock  = NULL;
750         msock = NULL;
751
752         do {
753                 for (try = 0;;) {
754                         /* 3 tries, this should take less than a second! */
755                         s = drbd_try_connect(mdev);
756                         if (s || ++try >= 3)
757                                 break;
758                         /* give the other side time to call bind() & listen() */
759                         __set_current_state(TASK_INTERRUPTIBLE);
760                         schedule_timeout(HZ / 10);
761                 }
762
763                 if (s) {
764                         if (!sock) {
765                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766                                 sock = s;
767                                 s = NULL;
768                         } else if (!msock) {
769                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770                                 msock = s;
771                                 s = NULL;
772                         } else {
773                                 dev_err(DEV, "Logic error in drbd_connect()\n");
774                                 goto out_release_sockets;
775                         }
776                 }
777
778                 if (sock && msock) {
779                         __set_current_state(TASK_INTERRUPTIBLE);
780                         schedule_timeout(HZ / 10);
781                         ok = drbd_socket_okay(mdev, &sock);
782                         ok = drbd_socket_okay(mdev, &msock) && ok;
783                         if (ok)
784                                 break;
785                 }
786
787 retry:
788                 s = drbd_wait_for_connect(mdev);
789                 if (s) {
790                         try = drbd_recv_fp(mdev, s);
791                         drbd_socket_okay(mdev, &sock);
792                         drbd_socket_okay(mdev, &msock);
793                         switch (try) {
794                         case P_HAND_SHAKE_S:
795                                 if (sock) {
796                                         dev_warn(DEV, "initial packet S crossed\n");
797                                         sock_release(sock);
798                                 }
799                                 sock = s;
800                                 break;
801                         case P_HAND_SHAKE_M:
802                                 if (msock) {
803                                         dev_warn(DEV, "initial packet M crossed\n");
804                                         sock_release(msock);
805                                 }
806                                 msock = s;
807                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808                                 break;
809                         default:
810                                 dev_warn(DEV, "Error receiving initial packet\n");
811                                 sock_release(s);
812                                 if (random32() & 1)
813                                         goto retry;
814                         }
815                 }
816
817                 if (mdev->state.conn <= C_DISCONNECTING)
818                         goto out_release_sockets;
819                 if (signal_pending(current)) {
820                         flush_signals(current);
821                         smp_rmb();
822                         if (get_t_state(&mdev->receiver) == Exiting)
823                                 goto out_release_sockets;
824                 }
825
826                 if (sock && msock) {
827                         ok = drbd_socket_okay(mdev, &sock);
828                         ok = drbd_socket_okay(mdev, &msock) && ok;
829                         if (ok)
830                                 break;
831                 }
832         } while (1);
833
834         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837         sock->sk->sk_allocation = GFP_NOIO;
838         msock->sk->sk_allocation = GFP_NOIO;
839
840         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843         if (mdev->net_conf->sndbuf_size) {
844                 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846         }
847
848         if (mdev->net_conf->rcvbuf_size) {
849                 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851         }
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where apropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 switch (drbd_do_auth(mdev)) {
882                 case -1:
883                         dev_err(DEV, "Authentication of peer failed\n");
884                         return -1;
885                 case 0:
886                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
887                         return 0;
888                 }
889         }
890
891         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892                 return 0;
893
894         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897         atomic_set(&mdev->packet_seq, 0);
898         mdev->peer_seq = 0;
899
900         drbd_thread_start(&mdev->asender);
901
902         if (!drbd_send_protocol(mdev))
903                 return -1;
904         drbd_send_sync_param(mdev, &mdev->sync_conf);
905         drbd_send_sizes(mdev, 0);
906         drbd_send_uuids(mdev);
907         drbd_send_state(mdev);
908         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909         clear_bit(RESIZE_PENDING, &mdev->flags);
910
911         return 1;
912
913 out_release_sockets:
914         if (sock)
915                 sock_release(sock);
916         if (msock)
917                 sock_release(msock);
918         return -1;
919 }
920
921 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
922 {
923         int r;
924
925         r = drbd_recv(mdev, h, sizeof(*h));
926
927         if (unlikely(r != sizeof(*h))) {
928                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
929                 return FALSE;
930         };
931         h->command = be16_to_cpu(h->command);
932         h->length  = be16_to_cpu(h->length);
933         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
934                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
935                     (long)be32_to_cpu(h->magic),
936                     h->command, h->length);
937                 return FALSE;
938         }
939         mdev->last_received = jiffies;
940
941         return TRUE;
942 }
943
944 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
945 {
946         int rv;
947
948         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
949                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
950                                         NULL, BLKDEV_IFL_WAIT);
951                 if (rv) {
952                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
953                         /* would rather check on EOPNOTSUPP, but that is not reliable.
954                          * don't try again for ANY return value != 0
955                          * if (rv == -EOPNOTSUPP) */
956                         drbd_bump_write_ordering(mdev, WO_drain_io);
957                 }
958                 put_ldev(mdev);
959         }
960
961         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
962 }
963
964 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965 {
966         struct flush_work *fw = (struct flush_work *)w;
967         struct drbd_epoch *epoch = fw->epoch;
968
969         kfree(w);
970
971         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
972                 drbd_flush_after_epoch(mdev, epoch);
973
974         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
975                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
976
977         return 1;
978 }
979
980 /**
981  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
982  * @mdev:       DRBD device.
983  * @epoch:      Epoch object.
984  * @ev:         Epoch event.
985  */
986 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
987                                                struct drbd_epoch *epoch,
988                                                enum epoch_event ev)
989 {
990         int finish, epoch_size;
991         struct drbd_epoch *next_epoch;
992         int schedule_flush = 0;
993         enum finish_epoch rv = FE_STILL_LIVE;
994
995         spin_lock(&mdev->epoch_lock);
996         do {
997                 next_epoch = NULL;
998                 finish = 0;
999
1000                 epoch_size = atomic_read(&epoch->epoch_size);
1001
1002                 switch (ev & ~EV_CLEANUP) {
1003                 case EV_PUT:
1004                         atomic_dec(&epoch->active);
1005                         break;
1006                 case EV_GOT_BARRIER_NR:
1007                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1008
1009                         /* Special case: If we just switched from WO_bio_barrier to
1010                            WO_bdev_flush we should not finish the current epoch */
1011                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1012                             mdev->write_ordering != WO_bio_barrier &&
1013                             epoch == mdev->current_epoch)
1014                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1015                         break;
1016                 case EV_BARRIER_DONE:
1017                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1018                         break;
1019                 case EV_BECAME_LAST:
1020                         /* nothing to do*/
1021                         break;
1022                 }
1023
1024                 if (epoch_size != 0 &&
1025                     atomic_read(&epoch->active) == 0 &&
1026                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1027                     epoch->list.prev == &mdev->current_epoch->list &&
1028                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1029                         /* Nearly all conditions are met to finish that epoch... */
1030                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1031                             mdev->write_ordering == WO_none ||
1032                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1033                             ev & EV_CLEANUP) {
1034                                 finish = 1;
1035                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1036                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1037                                  mdev->write_ordering == WO_bio_barrier) {
1038                                 atomic_inc(&epoch->active);
1039                                 schedule_flush = 1;
1040                         }
1041                 }
1042                 if (finish) {
1043                         if (!(ev & EV_CLEANUP)) {
1044                                 spin_unlock(&mdev->epoch_lock);
1045                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1046                                 spin_lock(&mdev->epoch_lock);
1047                         }
1048                         dec_unacked(mdev);
1049
1050                         if (mdev->current_epoch != epoch) {
1051                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1052                                 list_del(&epoch->list);
1053                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1054                                 mdev->epochs--;
1055                                 kfree(epoch);
1056
1057                                 if (rv == FE_STILL_LIVE)
1058                                         rv = FE_DESTROYED;
1059                         } else {
1060                                 epoch->flags = 0;
1061                                 atomic_set(&epoch->epoch_size, 0);
1062                                 /* atomic_set(&epoch->active, 0); is alrady zero */
1063                                 if (rv == FE_STILL_LIVE)
1064                                         rv = FE_RECYCLED;
1065                         }
1066                 }
1067
1068                 if (!next_epoch)
1069                         break;
1070
1071                 epoch = next_epoch;
1072         } while (1);
1073
1074         spin_unlock(&mdev->epoch_lock);
1075
1076         if (schedule_flush) {
1077                 struct flush_work *fw;
1078                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1079                 if (fw) {
1080                         fw->w.cb = w_flush;
1081                         fw->epoch = epoch;
1082                         drbd_queue_work(&mdev->data.work, &fw->w);
1083                 } else {
1084                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1085                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1086                         /* That is not a recursion, only one level */
1087                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1088                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1089                 }
1090         }
1091
1092         return rv;
1093 }
1094
1095 /**
1096  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1097  * @mdev:       DRBD device.
1098  * @wo:         Write ordering method to try.
1099  */
1100 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1101 {
1102         enum write_ordering_e pwo;
1103         static char *write_ordering_str[] = {
1104                 [WO_none] = "none",
1105                 [WO_drain_io] = "drain",
1106                 [WO_bdev_flush] = "flush",
1107                 [WO_bio_barrier] = "barrier",
1108         };
1109
1110         pwo = mdev->write_ordering;
1111         wo = min(pwo, wo);
1112         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1113                 wo = WO_bdev_flush;
1114         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1115                 wo = WO_drain_io;
1116         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1117                 wo = WO_none;
1118         mdev->write_ordering = wo;
1119         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1120                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1121 }
1122
1123 /**
1124  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125  * @mdev:       DRBD device.
1126  * @w:          work object.
1127  * @cancel:     The connection will be closed anyways (unused in this callback)
1128  */
1129 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130 {
1131         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132         struct bio *bio = e->private_bio;
1133
1134         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136            so that we can finish that epoch in drbd_may_finish_epoch().
1137            That is necessary if we already have a long chain of Epochs, before
1138            we realize that BIO_RW_BARRIER is actually not supported */
1139
1140         /* As long as the -ENOTSUPP on the barrier is reported immediately
1141            that will never trigger. If it is reported late, we will just
1142            print that warning and continue correctly for all future requests
1143            with WO_bdev_flush */
1144         if (previous_epoch(mdev, e->epoch))
1145                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146
1147         /* prepare bio for re-submit,
1148          * re-init volatile members */
1149         /* we still have a local reference,
1150          * get_ldev was done in receive_Data. */
1151         bio->bi_bdev = mdev->ldev->backing_bdev;
1152         bio->bi_sector = e->sector;
1153         bio->bi_size = e->size;
1154         bio->bi_idx = 0;
1155
1156         bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157         bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159         /* don't know whether this is necessary: */
1160         bio->bi_phys_segments = 0;
1161         bio->bi_next = NULL;
1162
1163         /* these should be unchanged: */
1164         /* bio->bi_end_io = drbd_endio_write_sec; */
1165         /* bio->bi_vcnt = whatever; */
1166
1167         e->w.cb = e_end_block;
1168
1169         /* This is no longer a barrier request. */
1170         bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1171
1172         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1173
1174         return 1;
1175 }
1176
1177 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1178 {
1179         int rv, issue_flush;
1180         struct p_barrier *p = (struct p_barrier *)h;
1181         struct drbd_epoch *epoch;
1182
1183         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1184
1185         rv = drbd_recv(mdev, h->payload, h->length);
1186         ERR_IF(rv != h->length) return FALSE;
1187
1188         inc_unacked(mdev);
1189
1190         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1191                 drbd_kick_lo(mdev);
1192
1193         mdev->current_epoch->barrier_nr = p->barrier;
1194         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1195
1196         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1197          * the activity log, which means it would not be resynced in case the
1198          * R_PRIMARY crashes now.
1199          * Therefore we must send the barrier_ack after the barrier request was
1200          * completed. */
1201         switch (mdev->write_ordering) {
1202         case WO_bio_barrier:
1203         case WO_none:
1204                 if (rv == FE_RECYCLED)
1205                         return TRUE;
1206                 break;
1207
1208         case WO_bdev_flush:
1209         case WO_drain_io:
1210                 if (rv == FE_STILL_LIVE) {
1211                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1212                         drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1213                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1214                 }
1215                 if (rv == FE_RECYCLED)
1216                         return TRUE;
1217
1218                 /* The asender will send all the ACKs and barrier ACKs out, since
1219                    all EEs moved from the active_ee to the done_ee. We need to
1220                    provide a new epoch object for the EEs that come in soon */
1221                 break;
1222         }
1223
1224         /* receiver context, in the writeout path of the other node.
1225          * avoid potential distributed deadlock */
1226         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1227         if (!epoch) {
1228                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1229                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1230                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1231                 if (issue_flush) {
1232                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1233                         if (rv == FE_RECYCLED)
1234                                 return TRUE;
1235                 }
1236
1237                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1238
1239                 return TRUE;
1240         }
1241
1242         epoch->flags = 0;
1243         atomic_set(&epoch->epoch_size, 0);
1244         atomic_set(&epoch->active, 0);
1245
1246         spin_lock(&mdev->epoch_lock);
1247         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1248                 list_add(&epoch->list, &mdev->current_epoch->list);
1249                 mdev->current_epoch = epoch;
1250                 mdev->epochs++;
1251         } else {
1252                 /* The current_epoch got recycled while we allocated this one... */
1253                 kfree(epoch);
1254         }
1255         spin_unlock(&mdev->epoch_lock);
1256
1257         return TRUE;
1258 }
1259
1260 /* used from receive_RSDataReply (recv_resync_read)
1261  * and from receive_Data */
1262 static struct drbd_epoch_entry *
1263 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1264 {
1265         struct drbd_epoch_entry *e;
1266         struct bio_vec *bvec;
1267         struct page *page;
1268         struct bio *bio;
1269         int dgs, ds, i, rr;
1270         void *dig_in = mdev->int_dig_in;
1271         void *dig_vv = mdev->int_dig_vv;
1272
1273         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1274                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1275
1276         if (dgs) {
1277                 rr = drbd_recv(mdev, dig_in, dgs);
1278                 if (rr != dgs) {
1279                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1280                              rr, dgs);
1281                         return NULL;
1282                 }
1283         }
1284
1285         data_size -= dgs;
1286
1287         ERR_IF(data_size &  0x1ff) return NULL;
1288         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1289
1290         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1291          * "criss-cross" setup, that might cause write-out on some other DRBD,
1292          * which in turn might block on the other node at this very place.  */
1293         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1294         if (!e)
1295                 return NULL;
1296         bio = e->private_bio;
1297         ds = data_size;
1298         bio_for_each_segment(bvec, bio, i) {
1299                 page = bvec->bv_page;
1300                 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1301                 kunmap(page);
1302                 if (rr != min_t(int, ds, PAGE_SIZE)) {
1303                         drbd_free_ee(mdev, e);
1304                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1305                              rr, min_t(int, ds, PAGE_SIZE));
1306                         return NULL;
1307                 }
1308                 ds -= rr;
1309         }
1310
1311         if (dgs) {
1312                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1313                 if (memcmp(dig_in, dig_vv, dgs)) {
1314                         dev_err(DEV, "Digest integrity check FAILED.\n");
1315                         drbd_bcast_ee(mdev, "digest failed",
1316                                         dgs, dig_in, dig_vv, e);
1317                         drbd_free_ee(mdev, e);
1318                         return NULL;
1319                 }
1320         }
1321         mdev->recv_cnt += data_size>>9;
1322         return e;
1323 }
1324
1325 /* drbd_drain_block() just takes a data block
1326  * out of the socket input buffer, and discards it.
1327  */
1328 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1329 {
1330         struct page *page;
1331         int rr, rv = 1;
1332         void *data;
1333
1334         page = drbd_pp_alloc(mdev, 1);
1335
1336         data = kmap(page);
1337         while (data_size) {
1338                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1339                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1340                         rv = 0;
1341                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1342                              rr, min_t(int, data_size, PAGE_SIZE));
1343                         break;
1344                 }
1345                 data_size -= rr;
1346         }
1347         kunmap(page);
1348         drbd_pp_free(mdev, page);
1349         return rv;
1350 }
1351
1352 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1353                            sector_t sector, int data_size)
1354 {
1355         struct bio_vec *bvec;
1356         struct bio *bio;
1357         int dgs, rr, i, expect;
1358         void *dig_in = mdev->int_dig_in;
1359         void *dig_vv = mdev->int_dig_vv;
1360
1361         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1362                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1363
1364         if (dgs) {
1365                 rr = drbd_recv(mdev, dig_in, dgs);
1366                 if (rr != dgs) {
1367                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1368                              rr, dgs);
1369                         return 0;
1370                 }
1371         }
1372
1373         data_size -= dgs;
1374
1375         /* optimistically update recv_cnt.  if receiving fails below,
1376          * we disconnect anyways, and counters will be reset. */
1377         mdev->recv_cnt += data_size>>9;
1378
1379         bio = req->master_bio;
1380         D_ASSERT(sector == bio->bi_sector);
1381
1382         bio_for_each_segment(bvec, bio, i) {
1383                 expect = min_t(int, data_size, bvec->bv_len);
1384                 rr = drbd_recv(mdev,
1385                              kmap(bvec->bv_page)+bvec->bv_offset,
1386                              expect);
1387                 kunmap(bvec->bv_page);
1388                 if (rr != expect) {
1389                         dev_warn(DEV, "short read receiving data reply: "
1390                              "read %d expected %d\n",
1391                              rr, expect);
1392                         return 0;
1393                 }
1394                 data_size -= rr;
1395         }
1396
1397         if (dgs) {
1398                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1399                 if (memcmp(dig_in, dig_vv, dgs)) {
1400                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1401                         return 0;
1402                 }
1403         }
1404
1405         D_ASSERT(data_size == 0);
1406         return 1;
1407 }
1408
1409 /* e_end_resync_block() is called via
1410  * drbd_process_done_ee() by asender only */
1411 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1412 {
1413         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1414         sector_t sector = e->sector;
1415         int ok;
1416
1417         D_ASSERT(hlist_unhashed(&e->colision));
1418
1419         if (likely(drbd_bio_uptodate(e->private_bio))) {
1420                 drbd_set_in_sync(mdev, sector, e->size);
1421                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1422         } else {
1423                 /* Record failure to sync */
1424                 drbd_rs_failed_io(mdev, sector, e->size);
1425
1426                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1427         }
1428         dec_unacked(mdev);
1429
1430         return ok;
1431 }
1432
1433 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1434 {
1435         struct drbd_epoch_entry *e;
1436
1437         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1438         if (!e) {
1439                 put_ldev(mdev);
1440                 return FALSE;
1441         }
1442
1443         dec_rs_pending(mdev);
1444
1445         e->private_bio->bi_end_io = drbd_endio_write_sec;
1446         e->private_bio->bi_rw = WRITE;
1447         e->w.cb = e_end_resync_block;
1448
1449         inc_unacked(mdev);
1450         /* corresponding dec_unacked() in e_end_resync_block()
1451          * respective _drbd_clear_done_ee */
1452
1453         spin_lock_irq(&mdev->req_lock);
1454         list_add(&e->w.list, &mdev->sync_ee);
1455         spin_unlock_irq(&mdev->req_lock);
1456
1457         drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1458         /* accounting done in endio */
1459
1460         maybe_kick_lo(mdev);
1461         return TRUE;
1462 }
1463
1464 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1465 {
1466         struct drbd_request *req;
1467         sector_t sector;
1468         unsigned int header_size, data_size;
1469         int ok;
1470         struct p_data *p = (struct p_data *)h;
1471
1472         header_size = sizeof(*p) - sizeof(*h);
1473         data_size   = h->length  - header_size;
1474
1475         ERR_IF(data_size == 0) return FALSE;
1476
1477         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1478                 return FALSE;
1479
1480         sector = be64_to_cpu(p->sector);
1481
1482         spin_lock_irq(&mdev->req_lock);
1483         req = _ar_id_to_req(mdev, p->block_id, sector);
1484         spin_unlock_irq(&mdev->req_lock);
1485         if (unlikely(!req)) {
1486                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1487                 return FALSE;
1488         }
1489
1490         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1491          * special casing it there for the various failure cases.
1492          * still no race with drbd_fail_pending_reads */
1493         ok = recv_dless_read(mdev, req, sector, data_size);
1494
1495         if (ok)
1496                 req_mod(req, data_received);
1497         /* else: nothing. handled from drbd_disconnect...
1498          * I don't think we may complete this just yet
1499          * in case we are "on-disconnect: freeze" */
1500
1501         return ok;
1502 }
1503
1504 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1505 {
1506         sector_t sector;
1507         unsigned int header_size, data_size;
1508         int ok;
1509         struct p_data *p = (struct p_data *)h;
1510
1511         header_size = sizeof(*p) - sizeof(*h);
1512         data_size   = h->length  - header_size;
1513
1514         ERR_IF(data_size == 0) return FALSE;
1515
1516         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1517                 return FALSE;
1518
1519         sector = be64_to_cpu(p->sector);
1520         D_ASSERT(p->block_id == ID_SYNCER);
1521
1522         if (get_ldev(mdev)) {
1523                 /* data is submitted to disk within recv_resync_read.
1524                  * corresponding put_ldev done below on error,
1525                  * or in drbd_endio_write_sec. */
1526                 ok = recv_resync_read(mdev, sector, data_size);
1527         } else {
1528                 if (__ratelimit(&drbd_ratelimit_state))
1529                         dev_err(DEV, "Can not write resync data to local disk.\n");
1530
1531                 ok = drbd_drain_block(mdev, data_size);
1532
1533                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1534         }
1535
1536         return ok;
1537 }
1538
1539 /* e_end_block() is called via drbd_process_done_ee().
1540  * this means this function only runs in the asender thread
1541  */
1542 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1543 {
1544         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1545         sector_t sector = e->sector;
1546         struct drbd_epoch *epoch;
1547         int ok = 1, pcmd;
1548
1549         if (e->flags & EE_IS_BARRIER) {
1550                 epoch = previous_epoch(mdev, e->epoch);
1551                 if (epoch)
1552                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1553         }
1554
1555         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1556                 if (likely(drbd_bio_uptodate(e->private_bio))) {
1557                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1558                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1559                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1560                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1561                         ok &= drbd_send_ack(mdev, pcmd, e);
1562                         if (pcmd == P_RS_WRITE_ACK)
1563                                 drbd_set_in_sync(mdev, sector, e->size);
1564                 } else {
1565                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1566                         /* we expect it to be marked out of sync anyways...
1567                          * maybe assert this?  */
1568                 }
1569                 dec_unacked(mdev);
1570         }
1571         /* we delete from the conflict detection hash _after_ we sent out the
1572          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1573         if (mdev->net_conf->two_primaries) {
1574                 spin_lock_irq(&mdev->req_lock);
1575                 D_ASSERT(!hlist_unhashed(&e->colision));
1576                 hlist_del_init(&e->colision);
1577                 spin_unlock_irq(&mdev->req_lock);
1578         } else {
1579                 D_ASSERT(hlist_unhashed(&e->colision));
1580         }
1581
1582         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1583
1584         return ok;
1585 }
1586
1587 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1588 {
1589         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1590         int ok = 1;
1591
1592         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1593         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1594
1595         spin_lock_irq(&mdev->req_lock);
1596         D_ASSERT(!hlist_unhashed(&e->colision));
1597         hlist_del_init(&e->colision);
1598         spin_unlock_irq(&mdev->req_lock);
1599
1600         dec_unacked(mdev);
1601
1602         return ok;
1603 }
1604
1605 /* Called from receive_Data.
1606  * Synchronize packets on sock with packets on msock.
1607  *
1608  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1609  * packet traveling on msock, they are still processed in the order they have
1610  * been sent.
1611  *
1612  * Note: we don't care for Ack packets overtaking P_DATA packets.
1613  *
1614  * In case packet_seq is larger than mdev->peer_seq number, there are
1615  * outstanding packets on the msock. We wait for them to arrive.
1616  * In case we are the logically next packet, we update mdev->peer_seq
1617  * ourselves. Correctly handles 32bit wrap around.
1618  *
1619  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1620  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1621  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1622  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1623  *
1624  * returns 0 if we may process the packet,
1625  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1626 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1627 {
1628         DEFINE_WAIT(wait);
1629         unsigned int p_seq;
1630         long timeout;
1631         int ret = 0;
1632         spin_lock(&mdev->peer_seq_lock);
1633         for (;;) {
1634                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1635                 if (seq_le(packet_seq, mdev->peer_seq+1))
1636                         break;
1637                 if (signal_pending(current)) {
1638                         ret = -ERESTARTSYS;
1639                         break;
1640                 }
1641                 p_seq = mdev->peer_seq;
1642                 spin_unlock(&mdev->peer_seq_lock);
1643                 timeout = schedule_timeout(30*HZ);
1644                 spin_lock(&mdev->peer_seq_lock);
1645                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1646                         ret = -ETIMEDOUT;
1647                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1648                         break;
1649                 }
1650         }
1651         finish_wait(&mdev->seq_wait, &wait);
1652         if (mdev->peer_seq+1 == packet_seq)
1653                 mdev->peer_seq++;
1654         spin_unlock(&mdev->peer_seq_lock);
1655         return ret;
1656 }
1657
1658 /* mirrored write */
1659 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1660 {
1661         sector_t sector;
1662         struct drbd_epoch_entry *e;
1663         struct p_data *p = (struct p_data *)h;
1664         int header_size, data_size;
1665         int rw = WRITE;
1666         u32 dp_flags;
1667
1668         header_size = sizeof(*p) - sizeof(*h);
1669         data_size   = h->length  - header_size;
1670
1671         ERR_IF(data_size == 0) return FALSE;
1672
1673         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1674                 return FALSE;
1675
1676         if (!get_ldev(mdev)) {
1677                 if (__ratelimit(&drbd_ratelimit_state))
1678                         dev_err(DEV, "Can not write mirrored data block "
1679                             "to local disk.\n");
1680                 spin_lock(&mdev->peer_seq_lock);
1681                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1682                         mdev->peer_seq++;
1683                 spin_unlock(&mdev->peer_seq_lock);
1684
1685                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1686                 atomic_inc(&mdev->current_epoch->epoch_size);
1687                 return drbd_drain_block(mdev, data_size);
1688         }
1689
1690         /* get_ldev(mdev) successful.
1691          * Corresponding put_ldev done either below (on various errors),
1692          * or in drbd_endio_write_sec, if we successfully submit the data at
1693          * the end of this function. */
1694
1695         sector = be64_to_cpu(p->sector);
1696         e = read_in_block(mdev, p->block_id, sector, data_size);
1697         if (!e) {
1698                 put_ldev(mdev);
1699                 return FALSE;
1700         }
1701
1702         e->private_bio->bi_end_io = drbd_endio_write_sec;
1703         e->w.cb = e_end_block;
1704
1705         spin_lock(&mdev->epoch_lock);
1706         e->epoch = mdev->current_epoch;
1707         atomic_inc(&e->epoch->epoch_size);
1708         atomic_inc(&e->epoch->active);
1709
1710         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1711                 struct drbd_epoch *epoch;
1712                 /* Issue a barrier if we start a new epoch, and the previous epoch
1713                    was not a epoch containing a single request which already was
1714                    a Barrier. */
1715                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1716                 if (epoch == e->epoch) {
1717                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1718                         rw |= (1<<BIO_RW_BARRIER);
1719                         e->flags |= EE_IS_BARRIER;
1720                 } else {
1721                         if (atomic_read(&epoch->epoch_size) > 1 ||
1722                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1723                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1724                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1725                                 rw |= (1<<BIO_RW_BARRIER);
1726                                 e->flags |= EE_IS_BARRIER;
1727                         }
1728                 }
1729         }
1730         spin_unlock(&mdev->epoch_lock);
1731
1732         dp_flags = be32_to_cpu(p->dp_flags);
1733         if (dp_flags & DP_HARDBARRIER) {
1734                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1735                 /* rw |= (1<<BIO_RW_BARRIER); */
1736         }
1737         if (dp_flags & DP_RW_SYNC)
1738                 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1739         if (dp_flags & DP_MAY_SET_IN_SYNC)
1740                 e->flags |= EE_MAY_SET_IN_SYNC;
1741
1742         /* I'm the receiver, I do hold a net_cnt reference. */
1743         if (!mdev->net_conf->two_primaries) {
1744                 spin_lock_irq(&mdev->req_lock);
1745         } else {
1746                 /* don't get the req_lock yet,
1747                  * we may sleep in drbd_wait_peer_seq */
1748                 const int size = e->size;
1749                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1750                 DEFINE_WAIT(wait);
1751                 struct drbd_request *i;
1752                 struct hlist_node *n;
1753                 struct hlist_head *slot;
1754                 int first;
1755
1756                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1757                 BUG_ON(mdev->ee_hash == NULL);
1758                 BUG_ON(mdev->tl_hash == NULL);
1759
1760                 /* conflict detection and handling:
1761                  * 1. wait on the sequence number,
1762                  *    in case this data packet overtook ACK packets.
1763                  * 2. check our hash tables for conflicting requests.
1764                  *    we only need to walk the tl_hash, since an ee can not
1765                  *    have a conflict with an other ee: on the submitting
1766                  *    node, the corresponding req had already been conflicting,
1767                  *    and a conflicting req is never sent.
1768                  *
1769                  * Note: for two_primaries, we are protocol C,
1770                  * so there cannot be any request that is DONE
1771                  * but still on the transfer log.
1772                  *
1773                  * unconditionally add to the ee_hash.
1774                  *
1775                  * if no conflicting request is found:
1776                  *    submit.
1777                  *
1778                  * if any conflicting request is found
1779                  * that has not yet been acked,
1780                  * AND I have the "discard concurrent writes" flag:
1781                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1782                  *
1783                  * if any conflicting request is found:
1784                  *       block the receiver, waiting on misc_wait
1785                  *       until no more conflicting requests are there,
1786                  *       or we get interrupted (disconnect).
1787                  *
1788                  *       we do not just write after local io completion of those
1789                  *       requests, but only after req is done completely, i.e.
1790                  *       we wait for the P_DISCARD_ACK to arrive!
1791                  *
1792                  *       then proceed normally, i.e. submit.
1793                  */
1794                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1795                         goto out_interrupted;
1796
1797                 spin_lock_irq(&mdev->req_lock);
1798
1799                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1800
1801 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1802                 slot = tl_hash_slot(mdev, sector);
1803                 first = 1;
1804                 for (;;) {
1805                         int have_unacked = 0;
1806                         int have_conflict = 0;
1807                         prepare_to_wait(&mdev->misc_wait, &wait,
1808                                 TASK_INTERRUPTIBLE);
1809                         hlist_for_each_entry(i, n, slot, colision) {
1810                                 if (OVERLAPS) {
1811                                         /* only ALERT on first iteration,
1812                                          * we may be woken up early... */
1813                                         if (first)
1814                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1815                                                       " new: %llus +%u; pending: %llus +%u\n",
1816                                                       current->comm, current->pid,
1817                                                       (unsigned long long)sector, size,
1818                                                       (unsigned long long)i->sector, i->size);
1819                                         if (i->rq_state & RQ_NET_PENDING)
1820                                                 ++have_unacked;
1821                                         ++have_conflict;
1822                                 }
1823                         }
1824 #undef OVERLAPS
1825                         if (!have_conflict)
1826                                 break;
1827
1828                         /* Discard Ack only for the _first_ iteration */
1829                         if (first && discard && have_unacked) {
1830                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1831                                      (unsigned long long)sector);
1832                                 inc_unacked(mdev);
1833                                 e->w.cb = e_send_discard_ack;
1834                                 list_add_tail(&e->w.list, &mdev->done_ee);
1835
1836                                 spin_unlock_irq(&mdev->req_lock);
1837
1838                                 /* we could probably send that P_DISCARD_ACK ourselves,
1839                                  * but I don't like the receiver using the msock */
1840
1841                                 put_ldev(mdev);
1842                                 wake_asender(mdev);
1843                                 finish_wait(&mdev->misc_wait, &wait);
1844                                 return TRUE;
1845                         }
1846
1847                         if (signal_pending(current)) {
1848                                 hlist_del_init(&e->colision);
1849
1850                                 spin_unlock_irq(&mdev->req_lock);
1851
1852                                 finish_wait(&mdev->misc_wait, &wait);
1853                                 goto out_interrupted;
1854                         }
1855
1856                         spin_unlock_irq(&mdev->req_lock);
1857                         if (first) {
1858                                 first = 0;
1859                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1860                                      "sec=%llus\n", (unsigned long long)sector);
1861                         } else if (discard) {
1862                                 /* we had none on the first iteration.
1863                                  * there must be none now. */
1864                                 D_ASSERT(have_unacked == 0);
1865                         }
1866                         schedule();
1867                         spin_lock_irq(&mdev->req_lock);
1868                 }
1869                 finish_wait(&mdev->misc_wait, &wait);
1870         }
1871
1872         list_add(&e->w.list, &mdev->active_ee);
1873         spin_unlock_irq(&mdev->req_lock);
1874
1875         switch (mdev->net_conf->wire_protocol) {
1876         case DRBD_PROT_C:
1877                 inc_unacked(mdev);
1878                 /* corresponding dec_unacked() in e_end_block()
1879                  * respective _drbd_clear_done_ee */
1880                 break;
1881         case DRBD_PROT_B:
1882                 /* I really don't like it that the receiver thread
1883                  * sends on the msock, but anyways */
1884                 drbd_send_ack(mdev, P_RECV_ACK, e);
1885                 break;
1886         case DRBD_PROT_A:
1887                 /* nothing to do */
1888                 break;
1889         }
1890
1891         if (mdev->state.pdsk == D_DISKLESS) {
1892                 /* In case we have the only disk of the cluster, */
1893                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1894                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1895                 drbd_al_begin_io(mdev, e->sector);
1896         }
1897
1898         e->private_bio->bi_rw = rw;
1899         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1900         /* accounting done in endio */
1901
1902         maybe_kick_lo(mdev);
1903         return TRUE;
1904
1905 out_interrupted:
1906         /* yes, the epoch_size now is imbalanced.
1907          * but we drop the connection anyways, so we don't have a chance to
1908          * receive a barrier... atomic_inc(&mdev->epoch_size); */
1909         put_ldev(mdev);
1910         drbd_free_ee(mdev, e);
1911         return FALSE;
1912 }
1913
1914 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1915 {
1916         sector_t sector;
1917         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1918         struct drbd_epoch_entry *e;
1919         struct digest_info *di = NULL;
1920         int size, digest_size;
1921         unsigned int fault_type;
1922         struct p_block_req *p =
1923                 (struct p_block_req *)h;
1924         const int brps = sizeof(*p)-sizeof(*h);
1925
1926         if (drbd_recv(mdev, h->payload, brps) != brps)
1927                 return FALSE;
1928
1929         sector = be64_to_cpu(p->sector);
1930         size   = be32_to_cpu(p->blksize);
1931
1932         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1933                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1934                                 (unsigned long long)sector, size);
1935                 return FALSE;
1936         }
1937         if (sector + (size>>9) > capacity) {
1938                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1939                                 (unsigned long long)sector, size);
1940                 return FALSE;
1941         }
1942
1943         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1944                 if (__ratelimit(&drbd_ratelimit_state))
1945                         dev_err(DEV, "Can not satisfy peer's read request, "
1946                             "no local data.\n");
1947                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1948                                  P_NEG_RS_DREPLY , p);
1949                 return TRUE;
1950         }
1951
1952         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1953          * "criss-cross" setup, that might cause write-out on some other DRBD,
1954          * which in turn might block on the other node at this very place.  */
1955         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1956         if (!e) {
1957                 put_ldev(mdev);
1958                 return FALSE;
1959         }
1960
1961         e->private_bio->bi_rw = READ;
1962         e->private_bio->bi_end_io = drbd_endio_read_sec;
1963
1964         switch (h->command) {
1965         case P_DATA_REQUEST:
1966                 e->w.cb = w_e_end_data_req;
1967                 fault_type = DRBD_FAULT_DT_RD;
1968                 break;
1969         case P_RS_DATA_REQUEST:
1970                 e->w.cb = w_e_end_rsdata_req;
1971                 fault_type = DRBD_FAULT_RS_RD;
1972                 /* Eventually this should become asynchronously. Currently it
1973                  * blocks the whole receiver just to delay the reading of a
1974                  * resync data block.
1975                  * the drbd_work_queue mechanism is made for this...
1976                  */
1977                 if (!drbd_rs_begin_io(mdev, sector)) {
1978                         /* we have been interrupted,
1979                          * probably connection lost! */
1980                         D_ASSERT(signal_pending(current));
1981                         goto out_free_e;
1982                 }
1983                 break;
1984
1985         case P_OV_REPLY:
1986         case P_CSUM_RS_REQUEST:
1987                 fault_type = DRBD_FAULT_RS_RD;
1988                 digest_size = h->length - brps ;
1989                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1990                 if (!di)
1991                         goto out_free_e;
1992
1993                 di->digest_size = digest_size;
1994                 di->digest = (((char *)di)+sizeof(struct digest_info));
1995
1996                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1997                         goto out_free_e;
1998
1999                 e->block_id = (u64)(unsigned long)di;
2000                 if (h->command == P_CSUM_RS_REQUEST) {
2001                         D_ASSERT(mdev->agreed_pro_version >= 89);
2002                         e->w.cb = w_e_end_csum_rs_req;
2003                 } else if (h->command == P_OV_REPLY) {
2004                         e->w.cb = w_e_end_ov_reply;
2005                         dec_rs_pending(mdev);
2006                         break;
2007                 }
2008
2009                 if (!drbd_rs_begin_io(mdev, sector)) {
2010                         /* we have been interrupted, probably connection lost! */
2011                         D_ASSERT(signal_pending(current));
2012                         goto out_free_e;
2013                 }
2014                 break;
2015
2016         case P_OV_REQUEST:
2017                 if (mdev->state.conn >= C_CONNECTED &&
2018                     mdev->state.conn != C_VERIFY_T)
2019                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2020                                 drbd_conn_str(mdev->state.conn));
2021                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2022                     mdev->agreed_pro_version >= 90) {
2023                         mdev->ov_start_sector = sector;
2024                         mdev->ov_position = sector;
2025                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2026                         dev_info(DEV, "Online Verify start sector: %llu\n",
2027                                         (unsigned long long)sector);
2028                 }
2029                 e->w.cb = w_e_end_ov_req;
2030                 fault_type = DRBD_FAULT_RS_RD;
2031                 /* Eventually this should become asynchronous. Currently it
2032                  * blocks the whole receiver just to delay the reading of a
2033                  * resync data block.
2034                  * the drbd_work_queue mechanism is made for this...
2035                  */
2036                 if (!drbd_rs_begin_io(mdev, sector)) {
2037                         /* we have been interrupted,
2038                          * probably connection lost! */
2039                         D_ASSERT(signal_pending(current));
2040                         goto out_free_e;
2041                 }
2042                 break;
2043
2044
2045         default:
2046                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2047                     cmdname(h->command));
2048                 fault_type = DRBD_FAULT_MAX;
2049         }
2050
2051         spin_lock_irq(&mdev->req_lock);
2052         list_add(&e->w.list, &mdev->read_ee);
2053         spin_unlock_irq(&mdev->req_lock);
2054
2055         inc_unacked(mdev);
2056
2057         drbd_generic_make_request(mdev, fault_type, e->private_bio);
2058         maybe_kick_lo(mdev);
2059
2060         return TRUE;
2061
2062 out_free_e:
2063         kfree(di);
2064         put_ldev(mdev);
2065         drbd_free_ee(mdev, e);
2066         return FALSE;
2067 }
2068
2069 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2070 {
2071         int self, peer, rv = -100;
2072         unsigned long ch_self, ch_peer;
2073
2074         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2075         peer = mdev->p_uuid[UI_BITMAP] & 1;
2076
2077         ch_peer = mdev->p_uuid[UI_SIZE];
2078         ch_self = mdev->comm_bm_set;
2079
2080         switch (mdev->net_conf->after_sb_0p) {
2081         case ASB_CONSENSUS:
2082         case ASB_DISCARD_SECONDARY:
2083         case ASB_CALL_HELPER:
2084                 dev_err(DEV, "Configuration error.\n");
2085                 break;
2086         case ASB_DISCONNECT:
2087                 break;
2088         case ASB_DISCARD_YOUNGER_PRI:
2089                 if (self == 0 && peer == 1) {
2090                         rv = -1;
2091                         break;
2092                 }
2093                 if (self == 1 && peer == 0) {
2094                         rv =  1;
2095                         break;
2096                 }
2097                 /* Else fall through to one of the other strategies... */
2098         case ASB_DISCARD_OLDER_PRI:
2099                 if (self == 0 && peer == 1) {
2100                         rv = 1;
2101                         break;
2102                 }
2103                 if (self == 1 && peer == 0) {
2104                         rv = -1;
2105                         break;
2106                 }
2107                 /* Else fall through to one of the other strategies... */
2108                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2109                      "Using discard-least-changes instead\n");
2110         case ASB_DISCARD_ZERO_CHG:
2111                 if (ch_peer == 0 && ch_self == 0) {
2112                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2113                                 ? -1 : 1;
2114                         break;
2115                 } else {
2116                         if (ch_peer == 0) { rv =  1; break; }
2117                         if (ch_self == 0) { rv = -1; break; }
2118                 }
2119                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2120                         break;
2121         case ASB_DISCARD_LEAST_CHG:
2122                 if      (ch_self < ch_peer)
2123                         rv = -1;
2124                 else if (ch_self > ch_peer)
2125                         rv =  1;
2126                 else /* ( ch_self == ch_peer ) */
2127                      /* Well, then use something else. */
2128                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2129                                 ? -1 : 1;
2130                 break;
2131         case ASB_DISCARD_LOCAL:
2132                 rv = -1;
2133                 break;
2134         case ASB_DISCARD_REMOTE:
2135                 rv =  1;
2136         }
2137
2138         return rv;
2139 }
2140
2141 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2142 {
2143         int self, peer, hg, rv = -100;
2144
2145         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2146         peer = mdev->p_uuid[UI_BITMAP] & 1;
2147
2148         switch (mdev->net_conf->after_sb_1p) {
2149         case ASB_DISCARD_YOUNGER_PRI:
2150         case ASB_DISCARD_OLDER_PRI:
2151         case ASB_DISCARD_LEAST_CHG:
2152         case ASB_DISCARD_LOCAL:
2153         case ASB_DISCARD_REMOTE:
2154                 dev_err(DEV, "Configuration error.\n");
2155                 break;
2156         case ASB_DISCONNECT:
2157                 break;
2158         case ASB_CONSENSUS:
2159                 hg = drbd_asb_recover_0p(mdev);
2160                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2161                         rv = hg;
2162                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2163                         rv = hg;
2164                 break;
2165         case ASB_VIOLENTLY:
2166                 rv = drbd_asb_recover_0p(mdev);
2167                 break;
2168         case ASB_DISCARD_SECONDARY:
2169                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2170         case ASB_CALL_HELPER:
2171                 hg = drbd_asb_recover_0p(mdev);
2172                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2173                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2174                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2175                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2176                           * we do not need to wait for the after state change work either. */
2177                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2178                         if (self != SS_SUCCESS) {
2179                                 drbd_khelper(mdev, "pri-lost-after-sb");
2180                         } else {
2181                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2182                                 rv = hg;
2183                         }
2184                 } else
2185                         rv = hg;
2186         }
2187
2188         return rv;
2189 }
2190
2191 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2192 {
2193         int self, peer, hg, rv = -100;
2194
2195         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2196         peer = mdev->p_uuid[UI_BITMAP] & 1;
2197
2198         switch (mdev->net_conf->after_sb_2p) {
2199         case ASB_DISCARD_YOUNGER_PRI:
2200         case ASB_DISCARD_OLDER_PRI:
2201         case ASB_DISCARD_LEAST_CHG:
2202         case ASB_DISCARD_LOCAL:
2203         case ASB_DISCARD_REMOTE:
2204         case ASB_CONSENSUS:
2205         case ASB_DISCARD_SECONDARY:
2206                 dev_err(DEV, "Configuration error.\n");
2207                 break;
2208         case ASB_VIOLENTLY:
2209                 rv = drbd_asb_recover_0p(mdev);
2210                 break;
2211         case ASB_DISCONNECT:
2212                 break;
2213         case ASB_CALL_HELPER:
2214                 hg = drbd_asb_recover_0p(mdev);
2215                 if (hg == -1) {
2216                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2217                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2218                           * we do not need to wait for the after state change work either. */
2219                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2220                         if (self != SS_SUCCESS) {
2221                                 drbd_khelper(mdev, "pri-lost-after-sb");
2222                         } else {
2223                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2224                                 rv = hg;
2225                         }
2226                 } else
2227                         rv = hg;
2228         }
2229
2230         return rv;
2231 }
2232
2233 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2234                            u64 bits, u64 flags)
2235 {
2236         if (!uuid) {
2237                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2238                 return;
2239         }
2240         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2241              text,
2242              (unsigned long long)uuid[UI_CURRENT],
2243              (unsigned long long)uuid[UI_BITMAP],
2244              (unsigned long long)uuid[UI_HISTORY_START],
2245              (unsigned long long)uuid[UI_HISTORY_END],
2246              (unsigned long long)bits,
2247              (unsigned long long)flags);
2248 }
2249
2250 /*
2251   100   after split brain try auto recover
2252     2   C_SYNC_SOURCE set BitMap
2253     1   C_SYNC_SOURCE use BitMap
2254     0   no Sync
2255    -1   C_SYNC_TARGET use BitMap
2256    -2   C_SYNC_TARGET set BitMap
2257  -100   after split brain, disconnect
2258 -1000   unrelated data
2259  */
2260 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2261 {
2262         u64 self, peer;
2263         int i, j;
2264
2265         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2266         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2267
2268         *rule_nr = 10;
2269         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2270                 return 0;
2271
2272         *rule_nr = 20;
2273         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2274              peer != UUID_JUST_CREATED)
2275                 return -2;
2276
2277         *rule_nr = 30;
2278         if (self != UUID_JUST_CREATED &&
2279             (peer == UUID_JUST_CREATED || peer == (u64)0))
2280                 return 2;
2281
2282         if (self == peer) {
2283                 int rct, dc; /* roles at crash time */
2284
2285                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2286
2287                         if (mdev->agreed_pro_version < 91)
2288                                 return -1001;
2289
2290                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2291                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2292                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2293                                 drbd_uuid_set_bm(mdev, 0UL);
2294
2295                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2296                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2297                                 *rule_nr = 34;
2298                         } else {
2299                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2300                                 *rule_nr = 36;
2301                         }
2302
2303                         return 1;
2304                 }
2305
2306                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2307
2308                         if (mdev->agreed_pro_version < 91)
2309                                 return -1001;
2310
2311                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2312                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2313                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2314
2315                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2316                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2317                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2318
2319                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2320                                 *rule_nr = 35;
2321                         } else {
2322                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2323                                 *rule_nr = 37;
2324                         }
2325
2326                         return -1;
2327                 }
2328
2329                 /* Common power [off|failure] */
2330                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2331                         (mdev->p_uuid[UI_FLAGS] & 2);
2332                 /* lowest bit is set when we were primary,
2333                  * next bit (weight 2) is set when peer was primary */
2334                 *rule_nr = 40;
2335
2336                 switch (rct) {
2337                 case 0: /* !self_pri && !peer_pri */ return 0;
2338                 case 1: /*  self_pri && !peer_pri */ return 1;
2339                 case 2: /* !self_pri &&  peer_pri */ return -1;
2340                 case 3: /*  self_pri &&  peer_pri */
2341                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2342                         return dc ? -1 : 1;
2343                 }
2344         }
2345
2346         *rule_nr = 50;
2347         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2348         if (self == peer)
2349                 return -1;
2350
2351         *rule_nr = 51;
2352         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2353         if (self == peer) {
2354                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2355                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2356                 if (self == peer) {
2357                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2358                            resync as sync source modifications of the peer's UUIDs. */
2359
2360                         if (mdev->agreed_pro_version < 91)
2361                                 return -1001;
2362
2363                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2364                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2365                         return -1;
2366                 }
2367         }
2368
2369         *rule_nr = 60;
2370         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2371         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2372                 peer = mdev->p_uuid[i] & ~((u64)1);
2373                 if (self == peer)
2374                         return -2;
2375         }
2376
2377         *rule_nr = 70;
2378         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2379         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2380         if (self == peer)
2381                 return 1;
2382
2383         *rule_nr = 71;
2384         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2385         if (self == peer) {
2386                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2387                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2388                 if (self == peer) {
2389                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2390                            resync as sync source modifications of our UUIDs. */
2391
2392                         if (mdev->agreed_pro_version < 91)
2393                                 return -1001;
2394
2395                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2396                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2397
2398                         dev_info(DEV, "Undid last start of resync:\n");
2399
2400                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2401                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2402
2403                         return 1;
2404                 }
2405         }
2406
2407
2408         *rule_nr = 80;
2409         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2410         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2411                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2412                 if (self == peer)
2413                         return 2;
2414         }
2415
2416         *rule_nr = 90;
2417         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2418         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2419         if (self == peer && self != ((u64)0))
2420                 return 100;
2421
2422         *rule_nr = 100;
2423         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2424                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2425                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2426                         peer = mdev->p_uuid[j] & ~((u64)1);
2427                         if (self == peer)
2428                                 return -100;
2429                 }
2430         }
2431
2432         return -1000;
2433 }
2434
2435 /* drbd_sync_handshake() returns the new conn state on success, or
2436    CONN_MASK (-1) on failure.
2437  */
2438 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2439                                            enum drbd_disk_state peer_disk) __must_hold(local)
2440 {
2441         int hg, rule_nr;
2442         enum drbd_conns rv = C_MASK;
2443         enum drbd_disk_state mydisk;
2444
2445         mydisk = mdev->state.disk;
2446         if (mydisk == D_NEGOTIATING)
2447                 mydisk = mdev->new_state_tmp.disk;
2448
2449         dev_info(DEV, "drbd_sync_handshake:\n");
2450         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2451         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2452                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2453
2454         hg = drbd_uuid_compare(mdev, &rule_nr);
2455
2456         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2457
2458         if (hg == -1000) {
2459                 dev_alert(DEV, "Unrelated data, aborting!\n");
2460                 return C_MASK;
2461         }
2462         if (hg == -1001) {
2463                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2464                 return C_MASK;
2465         }
2466
2467         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2468             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2469                 int f = (hg == -100) || abs(hg) == 2;
2470                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2471                 if (f)
2472                         hg = hg*2;
2473                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2474                      hg > 0 ? "source" : "target");
2475         }
2476
2477         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2478                 int pcount = (mdev->state.role == R_PRIMARY)
2479                            + (peer_role == R_PRIMARY);
2480                 int forced = (hg == -100);
2481
2482                 switch (pcount) {
2483                 case 0:
2484                         hg = drbd_asb_recover_0p(mdev);
2485                         break;
2486                 case 1:
2487                         hg = drbd_asb_recover_1p(mdev);
2488                         break;
2489                 case 2:
2490                         hg = drbd_asb_recover_2p(mdev);
2491                         break;
2492                 }
2493                 if (abs(hg) < 100) {
2494                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2495                              "automatically solved. Sync from %s node\n",
2496                              pcount, (hg < 0) ? "peer" : "this");
2497                         if (forced) {
2498                                 dev_warn(DEV, "Doing a full sync, since"
2499                                      " UUIDs where ambiguous.\n");
2500                                 hg = hg*2;
2501                         }
2502                 }
2503         }
2504
2505         if (hg == -100) {
2506                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2507                         hg = -1;
2508                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2509                         hg = 1;
2510
2511                 if (abs(hg) < 100)
2512                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2513                              "Sync from %s node\n",
2514                              (hg < 0) ? "peer" : "this");
2515         }
2516
2517         if (hg == -100) {
2518                 /* FIXME this log message is not correct if we end up here
2519                  * after an attempted attach on a diskless node.
2520                  * We just refuse to attach -- well, we drop the "connection"
2521                  * to that disk, in a way... */
2522                 dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2523                 drbd_khelper(mdev, "split-brain");
2524                 return C_MASK;
2525         }
2526
2527         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2528                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2529                 return C_MASK;
2530         }
2531
2532         if (hg < 0 && /* by intention we do not use mydisk here. */
2533             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2534                 switch (mdev->net_conf->rr_conflict) {
2535                 case ASB_CALL_HELPER:
2536                         drbd_khelper(mdev, "pri-lost");
2537                         /* fall through */
2538                 case ASB_DISCONNECT:
2539                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2540                         return C_MASK;
2541                 case ASB_VIOLENTLY:
2542                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2543                              "assumption\n");
2544                 }
2545         }
2546
2547         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2548                 if (hg == 0)
2549                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2550                 else
2551                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2552                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2553                                  abs(hg) >= 2 ? "full" : "bit-map based");
2554                 return C_MASK;
2555         }
2556
2557         if (abs(hg) >= 2) {
2558                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2559                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2560                         return C_MASK;
2561         }
2562
2563         if (hg > 0) { /* become sync source. */
2564                 rv = C_WF_BITMAP_S;
2565         } else if (hg < 0) { /* become sync target */
2566                 rv = C_WF_BITMAP_T;
2567         } else {
2568                 rv = C_CONNECTED;
2569                 if (drbd_bm_total_weight(mdev)) {
2570                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2571                              drbd_bm_total_weight(mdev));
2572                 }
2573         }
2574
2575         return rv;
2576 }
2577
2578 /* returns 1 if invalid */
2579 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2580 {
2581         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2582         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2583             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2584                 return 0;
2585
2586         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2587         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2588             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2589                 return 1;
2590
2591         /* everything else is valid if they are equal on both sides. */
2592         if (peer == self)
2593                 return 0;
2594
2595         /* everything es is invalid. */
2596         return 1;
2597 }
2598
2599 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2600 {
2601         struct p_protocol *p = (struct p_protocol *)h;
2602         int header_size, data_size;
2603         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2604         int p_want_lose, p_two_primaries, cf;
2605         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2606
2607         header_size = sizeof(*p) - sizeof(*h);
2608         data_size   = h->length  - header_size;
2609
2610         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2611                 return FALSE;
2612
2613         p_proto         = be32_to_cpu(p->protocol);
2614         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2615         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2616         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2617         p_two_primaries = be32_to_cpu(p->two_primaries);
2618         cf              = be32_to_cpu(p->conn_flags);
2619         p_want_lose = cf & CF_WANT_LOSE;
2620
2621         clear_bit(CONN_DRY_RUN, &mdev->flags);
2622
2623         if (cf & CF_DRY_RUN)
2624                 set_bit(CONN_DRY_RUN, &mdev->flags);
2625
2626         if (p_proto != mdev->net_conf->wire_protocol) {
2627                 dev_err(DEV, "incompatible communication protocols\n");
2628                 goto disconnect;
2629         }
2630
2631         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2632                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2633                 goto disconnect;
2634         }
2635
2636         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2637                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2638                 goto disconnect;
2639         }
2640
2641         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2642                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2643                 goto disconnect;
2644         }
2645
2646         if (p_want_lose && mdev->net_conf->want_lose) {
2647                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2648                 goto disconnect;
2649         }
2650
2651         if (p_two_primaries != mdev->net_conf->two_primaries) {
2652                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2653                 goto disconnect;
2654         }
2655
2656         if (mdev->agreed_pro_version >= 87) {
2657                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2658
2659                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2660                         return FALSE;
2661
2662                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2663                 if (strcmp(p_integrity_alg, my_alg)) {
2664                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2665                         goto disconnect;
2666                 }
2667                 dev_info(DEV, "data-integrity-alg: %s\n",
2668                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2669         }
2670
2671         return TRUE;
2672
2673 disconnect:
2674         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2675         return FALSE;
2676 }
2677
2678 /* helper function
2679  * input: alg name, feature name
2680  * return: NULL (alg name was "")
2681  *         ERR_PTR(error) if something goes wrong
2682  *         or the crypto hash ptr, if it worked out ok. */
2683 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2684                 const char *alg, const char *name)
2685 {
2686         struct crypto_hash *tfm;
2687
2688         if (!alg[0])
2689                 return NULL;
2690
2691         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2692         if (IS_ERR(tfm)) {
2693                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2694                         alg, name, PTR_ERR(tfm));
2695                 return tfm;
2696         }
2697         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2698                 crypto_free_hash(tfm);
2699                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2700                 return ERR_PTR(-EINVAL);
2701         }
2702         return tfm;
2703 }
2704
2705 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2706 {
2707         int ok = TRUE;
2708         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2709         unsigned int header_size, data_size, exp_max_sz;
2710         struct crypto_hash *verify_tfm = NULL;
2711         struct crypto_hash *csums_tfm = NULL;
2712         const int apv = mdev->agreed_pro_version;
2713
2714         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2715                     : apv == 88 ? sizeof(struct p_rs_param)
2716                                         + SHARED_SECRET_MAX
2717                     : /* 89 */    sizeof(struct p_rs_param_89);
2718
2719         if (h->length > exp_max_sz) {
2720                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2721                     h->length, exp_max_sz);
2722                 return FALSE;
2723         }
2724
2725         if (apv <= 88) {
2726                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2727                 data_size   = h->length  - header_size;
2728         } else /* apv >= 89 */ {
2729                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2730                 data_size   = h->length  - header_size;
2731                 D_ASSERT(data_size == 0);
2732         }
2733
2734         /* initialize verify_alg and csums_alg */
2735         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2736
2737         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2738                 return FALSE;
2739
2740         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2741
2742         if (apv >= 88) {
2743                 if (apv == 88) {
2744                         if (data_size > SHARED_SECRET_MAX) {
2745                                 dev_err(DEV, "verify-alg too long, "
2746                                     "peer wants %u, accepting only %u byte\n",
2747                                                 data_size, SHARED_SECRET_MAX);
2748                                 return FALSE;
2749                         }
2750
2751                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2752                                 return FALSE;
2753
2754                         /* we expect NUL terminated string */
2755                         /* but just in case someone tries to be evil */
2756                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2757                         p->verify_alg[data_size-1] = 0;
2758
2759                 } else /* apv >= 89 */ {
2760                         /* we still expect NUL terminated strings */
2761                         /* but just in case someone tries to be evil */
2762                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2763                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2764                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2765                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2766                 }
2767
2768                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2769                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2770                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2771                                     mdev->sync_conf.verify_alg, p->verify_alg);
2772                                 goto disconnect;
2773                         }
2774                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2775                                         p->verify_alg, "verify-alg");
2776                         if (IS_ERR(verify_tfm)) {
2777                                 verify_tfm = NULL;
2778                                 goto disconnect;
2779                         }
2780                 }
2781
2782                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2783                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2784                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2785                                     mdev->sync_conf.csums_alg, p->csums_alg);
2786                                 goto disconnect;
2787                         }
2788                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2789                                         p->csums_alg, "csums-alg");
2790                         if (IS_ERR(csums_tfm)) {
2791                                 csums_tfm = NULL;
2792                                 goto disconnect;
2793                         }
2794                 }
2795
2796
2797                 spin_lock(&mdev->peer_seq_lock);
2798                 /* lock against drbd_nl_syncer_conf() */
2799                 if (verify_tfm) {
2800                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2801                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2802                         crypto_free_hash(mdev->verify_tfm);
2803                         mdev->verify_tfm = verify_tfm;
2804                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2805                 }
2806                 if (csums_tfm) {
2807                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2808                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2809                         crypto_free_hash(mdev->csums_tfm);
2810                         mdev->csums_tfm = csums_tfm;
2811                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2812                 }
2813                 spin_unlock(&mdev->peer_seq_lock);
2814         }
2815
2816         return ok;
2817 disconnect:
2818         /* just for completeness: actually not needed,
2819          * as this is not reached if csums_tfm was ok. */
2820         crypto_free_hash(csums_tfm);
2821         /* but free the verify_tfm again, if csums_tfm did not work out */
2822         crypto_free_hash(verify_tfm);
2823         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2824         return FALSE;
2825 }
2826
2827 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2828 {
2829         /* sorry, we currently have no working implementation
2830          * of distributed TCQ */
2831 }
2832
2833 /* warn if the arguments differ by more than 12.5% */
2834 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2835         const char *s, sector_t a, sector_t b)
2836 {
2837         sector_t d;
2838         if (a == 0 || b == 0)
2839                 return;
2840         d = (a > b) ? (a - b) : (b - a);
2841         if (d > (a>>3) || d > (b>>3))
2842                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2843                      (unsigned long long)a, (unsigned long long)b);
2844 }
2845
2846 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2847 {
2848         struct p_sizes *p = (struct p_sizes *)h;
2849         enum determine_dev_size dd = unchanged;
2850         unsigned int max_seg_s;
2851         sector_t p_size, p_usize, my_usize;
2852         int ldsc = 0; /* local disk size changed */
2853         enum drbd_conns nconn;
2854
2855         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2856         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2857                 return FALSE;
2858
2859         p_size = be64_to_cpu(p->d_size);
2860         p_usize = be64_to_cpu(p->u_size);
2861
2862         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2863                 dev_err(DEV, "some backing storage is needed\n");
2864                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2865                 return FALSE;
2866         }
2867
2868         /* just store the peer's disk size for now.
2869          * we still need to figure out whether we accept that. */
2870         mdev->p_size = p_size;
2871
2872 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2873         if (get_ldev(mdev)) {
2874                 warn_if_differ_considerably(mdev, "lower level device sizes",
2875                            p_size, drbd_get_max_capacity(mdev->ldev));
2876                 warn_if_differ_considerably(mdev, "user requested size",
2877                                             p_usize, mdev->ldev->dc.disk_size);
2878
2879                 /* if this is the first connect, or an otherwise expected
2880                  * param exchange, choose the minimum */
2881                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2882                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2883                                              p_usize);
2884
2885                 my_usize = mdev->ldev->dc.disk_size;
2886
2887                 if (mdev->ldev->dc.disk_size != p_usize) {
2888                         mdev->ldev->dc.disk_size = p_usize;
2889                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2890                              (unsigned long)mdev->ldev->dc.disk_size);
2891                 }
2892
2893                 /* Never shrink a device with usable data during connect.
2894                    But allow online shrinking if we are connected. */
2895                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2896                    drbd_get_capacity(mdev->this_bdev) &&
2897                    mdev->state.disk >= D_OUTDATED &&
2898                    mdev->state.conn < C_CONNECTED) {
2899                         dev_err(DEV, "The peer's disk size is too small!\n");
2900                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2901                         mdev->ldev->dc.disk_size = my_usize;
2902                         put_ldev(mdev);
2903                         return FALSE;
2904                 }
2905                 put_ldev(mdev);
2906         }
2907 #undef min_not_zero
2908
2909         if (get_ldev(mdev)) {
2910           dd = drbd_determin_dev_size(mdev, 0);
2911                 put_ldev(mdev);
2912                 if (dd == dev_size_error)
2913                         return FALSE;
2914                 drbd_md_sync(mdev);
2915         } else {
2916                 /* I am diskless, need to accept the peer's size. */
2917                 drbd_set_my_capacity(mdev, p_size);
2918         }
2919
2920         if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2921                 nconn = drbd_sync_handshake(mdev,
2922                                 mdev->state.peer, mdev->state.pdsk);
2923                 put_ldev(mdev);
2924
2925                 if (nconn == C_MASK) {
2926                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2927                         return FALSE;
2928                 }
2929
2930                 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2931                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2932                         return FALSE;
2933                 }
2934         }
2935
2936         if (get_ldev(mdev)) {
2937                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2938                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2939                         ldsc = 1;
2940                 }
2941
2942                 max_seg_s = be32_to_cpu(p->max_segment_size);
2943                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2944                         drbd_setup_queue_param(mdev, max_seg_s);
2945
2946                 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2947                 put_ldev(mdev);
2948         }
2949
2950         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2951                 if (be64_to_cpu(p->c_size) !=
2952                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
2953                         /* we have different sizes, probably peer
2954                          * needs to know my new size... */
2955                         drbd_send_sizes(mdev, 0);
2956                 }
2957                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2958                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
2959                         if (mdev->state.pdsk >= D_INCONSISTENT &&
2960                             mdev->state.disk >= D_INCONSISTENT)
2961                                 resync_after_online_grow(mdev);
2962                         else
2963                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2964                 }
2965         }
2966
2967         return TRUE;
2968 }
2969
2970 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2971 {
2972         struct p_uuids *p = (struct p_uuids *)h;
2973         u64 *p_uuid;
2974         int i;
2975
2976         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2977         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2978                 return FALSE;
2979
2980         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2981
2982         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2983                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2984
2985         kfree(mdev->p_uuid);
2986         mdev->p_uuid = p_uuid;
2987
2988         if (mdev->state.conn < C_CONNECTED &&
2989             mdev->state.disk < D_INCONSISTENT &&
2990             mdev->state.role == R_PRIMARY &&
2991             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2992                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2993                     (unsigned long long)mdev->ed_uuid);
2994                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2995                 return FALSE;
2996         }
2997
2998         if (get_ldev(mdev)) {
2999                 int skip_initial_sync =
3000                         mdev->state.conn == C_CONNECTED &&
3001                         mdev->agreed_pro_version >= 90 &&
3002                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3003                         (p_uuid[UI_FLAGS] & 8);
3004                 if (skip_initial_sync) {
3005                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3006                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3007                                         "clear_n_write from receive_uuids");
3008                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3009                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3010                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3011                                         CS_VERBOSE, NULL);
3012                         drbd_md_sync(mdev);
3013                 }
3014                 put_ldev(mdev);
3015         }
3016
3017         /* Before we test for the disk state, we should wait until an eventually
3018            ongoing cluster wide state change is finished. That is important if
3019            we are primary and are detaching from our disk. We need to see the
3020            new disk state... */
3021         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3022         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3023                 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3024
3025         return TRUE;
3026 }
3027
3028 /**
3029  * convert_state() - Converts the peer's view of the cluster state to our point of view
3030  * @ps:         The state as seen by the peer.
3031  */
3032 static union drbd_state convert_state(union drbd_state ps)
3033 {
3034         union drbd_state ms;
3035
3036         static enum drbd_conns c_tab[] = {
3037                 [C_CONNECTED] = C_CONNECTED,
3038
3039                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3040                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3041                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3042                 [C_VERIFY_S]       = C_VERIFY_T,
3043                 [C_MASK]   = C_MASK,
3044         };
3045
3046         ms.i = ps.i;
3047
3048         ms.conn = c_tab[ps.conn];
3049         ms.peer = ps.role;
3050         ms.role = ps.peer;
3051         ms.pdsk = ps.disk;
3052         ms.disk = ps.pdsk;
3053         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3054
3055         return ms;
3056 }
3057
3058 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3059 {
3060         struct p_req_state *p = (struct p_req_state *)h;
3061         union drbd_state mask, val;
3062         int rv;
3063
3064         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3065         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3066                 return FALSE;
3067
3068         mask.i = be32_to_cpu(p->mask);
3069         val.i = be32_to_cpu(p->val);
3070
3071         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3072             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3073                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3074                 return TRUE;
3075         }
3076
3077         mask = convert_state(mask);
3078         val = convert_state(val);
3079
3080         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3081
3082         drbd_send_sr_reply(mdev, rv);
3083         drbd_md_sync(mdev);
3084
3085         return TRUE;
3086 }
3087
3088 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3089 {
3090         struct p_state *p = (struct p_state *)h;
3091         enum drbd_conns nconn, oconn;
3092         union drbd_state ns, peer_state;
3093         enum drbd_disk_state real_peer_disk;
3094         int rv;
3095
3096         ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3097                 return FALSE;
3098
3099         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3100                 return FALSE;
3101
3102         peer_state.i = be32_to_cpu(p->state);
3103
3104         real_peer_disk = peer_state.disk;
3105         if (peer_state.disk == D_NEGOTIATING) {
3106                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3107                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3108         }
3109
3110         spin_lock_irq(&mdev->req_lock);
3111  retry:
3112         oconn = nconn = mdev->state.conn;
3113         spin_unlock_irq(&mdev->req_lock);
3114
3115         if (nconn == C_WF_REPORT_PARAMS)
3116                 nconn = C_CONNECTED;
3117
3118         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3119             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3120                 int cr; /* consider resync */
3121
3122                 /* if we established a new connection */
3123                 cr  = (oconn < C_CONNECTED);
3124                 /* if we had an established connection
3125                  * and one of the nodes newly attaches a disk */
3126                 cr |= (oconn == C_CONNECTED &&
3127                        (peer_state.disk == D_NEGOTIATING ||
3128                         mdev->state.disk == D_NEGOTIATING));
3129                 /* if we have both been inconsistent, and the peer has been
3130                  * forced to be UpToDate with --overwrite-data */
3131                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3132                 /* if we had been plain connected, and the admin requested to
3133                  * start a sync by "invalidate" or "invalidate-remote" */
3134                 cr |= (oconn == C_CONNECTED &&
3135                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3136                                  peer_state.conn <= C_WF_BITMAP_T));
3137
3138                 if (cr)
3139                         nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3140
3141                 put_ldev(mdev);
3142                 if (nconn == C_MASK) {
3143                         nconn = C_CONNECTED;
3144                         if (mdev->state.disk == D_NEGOTIATING) {
3145                                 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3146                         } else if (peer_state.disk == D_NEGOTIATING) {
3147                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3148                                 peer_state.disk = D_DISKLESS;
3149                                 real_peer_disk = D_DISKLESS;
3150                         } else {
3151                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3152                                         return FALSE;
3153                                 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3154                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3155                                 return FALSE;
3156                         }
3157                 }
3158         }
3159
3160         spin_lock_irq(&mdev->req_lock);
3161         if (mdev->state.conn != oconn)
3162                 goto retry;
3163         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3164         ns.i = mdev->state.i;
3165         ns.conn = nconn;
3166         ns.peer = peer_state.role;
3167         ns.pdsk = real_peer_disk;
3168         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3169         if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3170                 ns.disk = mdev->new_state_tmp.disk;
3171
3172         rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3173         ns = mdev->state;
3174         spin_unlock_irq(&mdev->req_lock);
3175
3176         if (rv < SS_SUCCESS) {
3177                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3178                 return FALSE;
3179         }
3180
3181         if (oconn > C_WF_REPORT_PARAMS) {
3182                 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3183                     peer_state.disk != D_NEGOTIATING ) {
3184                         /* we want resync, peer has not yet decided to sync... */
3185                         /* Nowadays only used when forcing a node into primary role and
3186                            setting its disk to UpToDate with that */
3187                         drbd_send_uuids(mdev);
3188                         drbd_send_state(mdev);
3189                 }
3190         }
3191
3192         mdev->net_conf->want_lose = 0;
3193
3194         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3195
3196         return TRUE;
3197 }
3198
3199 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3200 {
3201         struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3202
3203         wait_event(mdev->misc_wait,
3204                    mdev->state.conn == C_WF_SYNC_UUID ||
3205                    mdev->state.conn < C_CONNECTED ||
3206                    mdev->state.disk < D_NEGOTIATING);
3207
3208         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3209
3210         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3211         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3212                 return FALSE;
3213
3214         /* Here the _drbd_uuid_ functions are right, current should
3215            _not_ be rotated into the history */
3216         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3217                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3218                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3219
3220                 drbd_start_resync(mdev, C_SYNC_TARGET);
3221
3222                 put_ldev(mdev);
3223         } else
3224                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3225
3226         return TRUE;
3227 }
3228
3229 enum receive_bitmap_ret { OK, DONE, FAILED };
3230
3231 static enum receive_bitmap_ret
3232 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3233         unsigned long *buffer, struct bm_xfer_ctx *c)
3234 {
3235         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3236         unsigned want = num_words * sizeof(long);
3237
3238         if (want != h->length) {
3239                 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3240                 return FAILED;
3241         }
3242         if (want == 0)
3243                 return DONE;
3244         if (drbd_recv(mdev, buffer, want) != want)
3245                 return FAILED;
3246
3247         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3248
3249         c->word_offset += num_words;
3250         c->bit_offset = c->word_offset * BITS_PER_LONG;
3251         if (c->bit_offset > c->bm_bits)
3252                 c->bit_offset = c->bm_bits;
3253
3254         return OK;
3255 }
3256
3257 static enum receive_bitmap_ret
3258 recv_bm_rle_bits(struct drbd_conf *mdev,
3259                 struct p_compressed_bm *p,
3260                 struct bm_xfer_ctx *c)
3261 {
3262         struct bitstream bs;
3263         u64 look_ahead;
3264         u64 rl;
3265         u64 tmp;
3266         unsigned long s = c->bit_offset;
3267         unsigned long e;
3268         int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3269         int toggle = DCBP_get_start(p);
3270         int have;
3271         int bits;
3272
3273         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3274
3275         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3276         if (bits < 0)
3277                 return FAILED;
3278
3279         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3280                 bits = vli_decode_bits(&rl, look_ahead);
3281                 if (bits <= 0)
3282                         return FAILED;
3283
3284                 if (toggle) {
3285                         e = s + rl -1;
3286                         if (e >= c->bm_bits) {
3287                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3288                                 return FAILED;
3289                         }
3290                         _drbd_bm_set_bits(mdev, s, e);
3291                 }
3292
3293                 if (have < bits) {
3294                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3295                                 have, bits, look_ahead,
3296                                 (unsigned int)(bs.cur.b - p->code),
3297                                 (unsigned int)bs.buf_len);
3298                         return FAILED;
3299                 }
3300                 look_ahead >>= bits;
3301                 have -= bits;
3302
3303                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3304                 if (bits < 0)
3305                         return FAILED;
3306                 look_ahead |= tmp << have;
3307                 have += bits;
3308         }
3309
3310         c->bit_offset = s;
3311         bm_xfer_ctx_bit_to_word_offset(c);
3312
3313         return (s == c->bm_bits) ? DONE : OK;
3314 }
3315
3316 static enum receive_bitmap_ret
3317 decode_bitmap_c(struct drbd_conf *mdev,
3318                 struct p_compressed_bm *p,
3319                 struct bm_xfer_ctx *c)
3320 {
3321         if (DCBP_get_code(p) == RLE_VLI_Bits)
3322                 return recv_bm_rle_bits(mdev, p, c);
3323
3324         /* other variants had been implemented for evaluation,
3325          * but have been dropped as this one turned out to be "best"
3326          * during all our tests. */
3327
3328         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3329         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3330         return FAILED;
3331 }
3332
3333 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3334                 const char *direction, struct bm_xfer_ctx *c)
3335 {
3336         /* what would it take to transfer it "plaintext" */
3337         unsigned plain = sizeof(struct p_header) *
3338                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3339                 + c->bm_words * sizeof(long);
3340         unsigned total = c->bytes[0] + c->bytes[1];
3341         unsigned r;
3342
3343         /* total can not be zero. but just in case: */
3344         if (total == 0)
3345                 return;
3346
3347         /* don't report if not compressed */
3348         if (total >= plain)
3349                 return;
3350
3351         /* total < plain. check for overflow, still */
3352         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3353                                     : (1000 * total / plain);
3354
3355         if (r > 1000)
3356                 r = 1000;
3357
3358         r = 1000 - r;
3359         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3360              "total %u; compression: %u.%u%%\n",
3361                         direction,
3362                         c->bytes[1], c->packets[1],
3363                         c->bytes[0], c->packets[0],
3364                         total, r/10, r % 10);
3365 }
3366
3367 /* Since we are processing the bitfield from lower addresses to higher,
3368    it does not matter if the process it in 32 bit chunks or 64 bit
3369    chunks as long as it is little endian. (Understand it as byte stream,
3370    beginning with the lowest byte...) If we would use big endian
3371    we would need to process it from the highest address to the lowest,
3372    in order to be agnostic to the 32 vs 64 bits issue.
3373
3374    returns 0 on failure, 1 if we successfully received it. */
3375 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3376 {
3377         struct bm_xfer_ctx c;
3378         void *buffer;
3379         enum receive_bitmap_ret ret;
3380         int ok = FALSE;
3381
3382         wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3383
3384         drbd_bm_lock(mdev, "receive bitmap");
3385
3386         /* maybe we should use some per thread scratch page,
3387          * and allocate that during initial device creation? */
3388         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3389         if (!buffer) {
3390                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3391                 goto out;
3392         }
3393
3394         c = (struct bm_xfer_ctx) {
3395                 .bm_bits = drbd_bm_bits(mdev),
3396                 .bm_words = drbd_bm_words(mdev),
3397         };
3398
3399         do {
3400                 if (h->command == P_BITMAP) {
3401                         ret = receive_bitmap_plain(mdev, h, buffer, &c);
3402                 } else if (h->command == P_COMPRESSED_BITMAP) {
3403                         /* MAYBE: sanity check that we speak proto >= 90,
3404                          * and the feature is enabled! */
3405                         struct p_compressed_bm *p;
3406
3407                         if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3408                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3409                                 goto out;
3410                         }
3411                         /* use the page buff */
3412                         p = buffer;
3413                         memcpy(p, h, sizeof(*h));
3414                         if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3415                                 goto out;
3416                         if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3417                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3418                                 return FAILED;
3419                         }
3420                         ret = decode_bitmap_c(mdev, p, &c);
3421                 } else {
3422                         dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3423                         goto out;
3424                 }
3425
3426                 c.packets[h->command == P_BITMAP]++;
3427                 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3428
3429                 if (ret != OK)
3430                         break;
3431
3432                 if (!drbd_recv_header(mdev, h))
3433                         goto out;
3434         } while (ret == OK);
3435         if (ret == FAILED)
3436                 goto out;
3437
3438         INFO_bm_xfer_stats(mdev, "receive", &c);
3439
3440         if (mdev->state.conn == C_WF_BITMAP_T) {
3441                 ok = !drbd_send_bitmap(mdev);
3442                 if (!ok)
3443                         goto out;
3444                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3445                 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3446                 D_ASSERT(ok == SS_SUCCESS);
3447         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3448                 /* admin may have requested C_DISCONNECTING,
3449                  * other threads may have noticed network errors */
3450                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3451                     drbd_conn_str(mdev->state.conn));
3452         }
3453
3454         ok = TRUE;
3455  out:
3456         drbd_bm_unlock(mdev);
3457         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3458                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3459         free_page((unsigned long) buffer);
3460         return ok;
3461 }
3462
3463 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3464 {
3465         /* TODO zero copy sink :) */
3466         static char sink[128];
3467         int size, want, r;
3468
3469         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3470              h->command, h->length);
3471
3472         size = h->length;
3473         while (size > 0) {
3474                 want = min_t(int, size, sizeof(sink));
3475                 r = drbd_recv(mdev, sink, want);
3476                 ERR_IF(r <= 0) break;
3477                 size -= r;
3478         }
3479         return size == 0;
3480 }
3481
3482 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3483 {
3484         if (mdev->state.disk >= D_INCONSISTENT)
3485                 drbd_kick_lo(mdev);
3486
3487         /* Make sure we've acked all the TCP data associated
3488          * with the data requests being unplugged */
3489         drbd_tcp_quickack(mdev->data.socket);
3490
3491         return TRUE;
3492 }
3493
3494 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3495
3496 static drbd_cmd_handler_f drbd_default_handler[] = {
3497         [P_DATA]            = receive_Data,
3498         [P_DATA_REPLY]      = receive_DataReply,
3499         [P_RS_DATA_REPLY]   = receive_RSDataReply,
3500         [P_BARRIER]         = receive_Barrier,
3501         [P_BITMAP]          = receive_bitmap,
3502         [P_COMPRESSED_BITMAP]    = receive_bitmap,
3503         [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3504         [P_DATA_REQUEST]    = receive_DataRequest,
3505         [P_RS_DATA_REQUEST] = receive_DataRequest,
3506         [P_SYNC_PARAM]      = receive_SyncParam,
3507         [P_SYNC_PARAM89]           = receive_SyncParam,
3508         [P_PROTOCOL]        = receive_protocol,
3509         [P_UUIDS]           = receive_uuids,
3510         [P_SIZES]           = receive_sizes,
3511         [P_STATE]           = receive_state,
3512         [P_STATE_CHG_REQ]   = receive_req_state,
3513         [P_SYNC_UUID]       = receive_sync_uuid,
3514         [P_OV_REQUEST]      = receive_DataRequest,
3515         [P_OV_REPLY]        = receive_DataRequest,
3516         [P_CSUM_RS_REQUEST]    = receive_DataRequest,
3517         /* anything missing from this table is in
3518          * the asender_tbl, see get_asender_cmd */
3519         [P_MAX_CMD]         = NULL,
3520 };
3521
3522 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3523 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3524
3525 static void drbdd(struct drbd_conf *mdev)
3526 {
3527         drbd_cmd_handler_f handler;
3528         struct p_header *header = &mdev->data.rbuf.header;
3529
3530         while (get_t_state(&mdev->receiver) == Running) {
3531                 drbd_thread_current_set_cpu(mdev);
3532                 if (!drbd_recv_header(mdev, header)) {
3533                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3534                         break;
3535                 }
3536
3537                 if (header->command < P_MAX_CMD)
3538                         handler = drbd_cmd_handler[header->command];
3539                 else if (P_MAY_IGNORE < header->command
3540                      && header->command < P_MAX_OPT_CMD)
3541                         handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3542                 else if (header->command > P_MAX_OPT_CMD)
3543                         handler = receive_skip;
3544                 else
3545                         handler = NULL;
3546
3547                 if (unlikely(!handler)) {
3548                         dev_err(DEV, "unknown packet type %d, l: %d!\n",
3549                             header->command, header->length);
3550                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3551                         break;
3552                 }
3553                 if (unlikely(!handler(mdev, header))) {
3554                         dev_err(DEV, "error receiving %s, l: %d!\n",
3555                             cmdname(header->command), header->length);
3556                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3557                         break;
3558                 }
3559         }
3560 }
3561
3562 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3563 {
3564         struct hlist_head *slot;
3565         struct hlist_node *pos;
3566         struct hlist_node *tmp;
3567         struct drbd_request *req;
3568         int i;
3569
3570         /*
3571          * Application READ requests
3572          */
3573         spin_lock_irq(&mdev->req_lock);
3574         for (i = 0; i < APP_R_HSIZE; i++) {
3575                 slot = mdev->app_reads_hash+i;
3576                 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3577                         /* it may (but should not any longer!)
3578                          * be on the work queue; if that assert triggers,
3579                          * we need to also grab the
3580                          * spin_lock_irq(&mdev->data.work.q_lock);
3581                          * and list_del_init here. */
3582                         D_ASSERT(list_empty(&req->w.list));
3583                         /* It would be nice to complete outside of spinlock.
3584                          * But this is easier for now. */
3585                         _req_mod(req, connection_lost_while_pending);
3586                 }
3587         }
3588         for (i = 0; i < APP_R_HSIZE; i++)
3589                 if (!hlist_empty(mdev->app_reads_hash+i))
3590                         dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3591                                 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3592
3593         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3594         spin_unlock_irq(&mdev->req_lock);
3595 }
3596
3597 void drbd_flush_workqueue(struct drbd_conf *mdev)
3598 {
3599         struct drbd_wq_barrier barr;
3600
3601         barr.w.cb = w_prev_work_done;
3602         init_completion(&barr.done);
3603         drbd_queue_work(&mdev->data.work, &barr.w);
3604         wait_for_completion(&barr.done);
3605 }
3606
3607 static void drbd_disconnect(struct drbd_conf *mdev)
3608 {
3609         enum drbd_fencing_p fp;
3610         union drbd_state os, ns;
3611         int rv = SS_UNKNOWN_ERROR;
3612         unsigned int i;
3613
3614         if (mdev->state.conn == C_STANDALONE)
3615                 return;
3616         if (mdev->state.conn >= C_WF_CONNECTION)
3617                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3618                                 drbd_conn_str(mdev->state.conn));
3619
3620         /* asender does not clean up anything. it must not interfere, either */
3621         drbd_thread_stop(&mdev->asender);
3622         drbd_free_sock(mdev);
3623
3624         spin_lock_irq(&mdev->req_lock);
3625         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3626         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3627         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3628         spin_unlock_irq(&mdev->req_lock);
3629
3630         /* We do not have data structures that would allow us to
3631          * get the rs_pending_cnt down to 0 again.
3632          *  * On C_SYNC_TARGET we do not have any data structures describing
3633          *    the pending RSDataRequest's we have sent.
3634          *  * On C_SYNC_SOURCE there is no data structure that tracks
3635          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3636          *  And no, it is not the sum of the reference counts in the
3637          *  resync_LRU. The resync_LRU tracks the whole operation including
3638          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3639          *  on the fly. */
3640         drbd_rs_cancel_all(mdev);
3641         mdev->rs_total = 0;
3642         mdev->rs_failed = 0;
3643         atomic_set(&mdev->rs_pending_cnt, 0);
3644         wake_up(&mdev->misc_wait);
3645
3646         /* make sure syncer is stopped and w_resume_next_sg queued */
3647         del_timer_sync(&mdev->resync_timer);
3648         set_bit(STOP_SYNC_TIMER, &mdev->flags);
3649         resync_timer_fn((unsigned long)mdev);
3650
3651         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3652          * w_make_resync_request etc. which may still be on the worker queue
3653          * to be "canceled" */
3654         drbd_flush_workqueue(mdev);
3655
3656         /* This also does reclaim_net_ee().  If we do this too early, we might
3657          * miss some resync ee and pages.*/
3658         drbd_process_done_ee(mdev);
3659
3660         kfree(mdev->p_uuid);
3661         mdev->p_uuid = NULL;
3662
3663         if (!mdev->state.susp)
3664                 tl_clear(mdev);
3665
3666         drbd_fail_pending_reads(mdev);
3667
3668         dev_info(DEV, "Connection closed\n");
3669
3670         drbd_md_sync(mdev);
3671
3672         fp = FP_DONT_CARE;
3673         if (get_ldev(mdev)) {
3674                 fp = mdev->ldev->dc.fencing;
3675                 put_ldev(mdev);
3676         }
3677
3678         if (mdev->state.role == R_PRIMARY) {
3679                 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3680                         enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3681                         drbd_request_state(mdev, NS(pdsk, nps));
3682                 }
3683         }
3684
3685         spin_lock_irq(&mdev->req_lock);
3686         os = mdev->state;
3687         if (os.conn >= C_UNCONNECTED) {
3688                 /* Do not restart in case we are C_DISCONNECTING */
3689                 ns = os;
3690                 ns.conn = C_UNCONNECTED;
3691                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3692         }
3693         spin_unlock_irq(&mdev->req_lock);
3694
3695         if (os.conn == C_DISCONNECTING) {
3696                 struct hlist_head *h;
3697                 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3698
3699                 /* we must not free the tl_hash
3700                  * while application io is still on the fly */
3701                 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3702
3703                 spin_lock_irq(&mdev->req_lock);
3704                 /* paranoia code */
3705                 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3706                         if (h->first)
3707                                 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3708                                                 (int)(h - mdev->ee_hash), h->first);
3709                 kfree(mdev->ee_hash);
3710                 mdev->ee_hash = NULL;
3711                 mdev->ee_hash_s = 0;
3712
3713                 /* paranoia code */
3714                 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3715                         if (h->first)
3716                                 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3717                                                 (int)(h - mdev->tl_hash), h->first);
3718                 kfree(mdev->tl_hash);
3719                 mdev->tl_hash = NULL;
3720                 mdev->tl_hash_s = 0;
3721                 spin_unlock_irq(&mdev->req_lock);
3722
3723                 crypto_free_hash(mdev->cram_hmac_tfm);
3724                 mdev->cram_hmac_tfm = NULL;
3725
3726                 kfree(mdev->net_conf);
3727                 mdev->net_conf = NULL;
3728                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3729         }
3730
3731         /* tcp_close and release of sendpage pages can be deferred.  I don't
3732          * want to use SO_LINGER, because apparently it can be deferred for
3733          * more than 20 seconds (longest time I checked).
3734          *
3735          * Actually we don't care for exactly when the network stack does its
3736          * put_page(), but release our reference on these pages right here.
3737          */
3738         i = drbd_release_ee(mdev, &mdev->net_ee);
3739         if (i)
3740                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3741         i = atomic_read(&mdev->pp_in_use);
3742         if (i)
3743                 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3744
3745         D_ASSERT(list_empty(&mdev->read_ee));
3746         D_ASSERT(list_empty(&mdev->active_ee));
3747         D_ASSERT(list_empty(&mdev->sync_ee));
3748         D_ASSERT(list_empty(&mdev->done_ee));
3749
3750         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3751         atomic_set(&mdev->current_epoch->epoch_size, 0);
3752         D_ASSERT(list_empty(&mdev->current_epoch->list));
3753 }
3754
3755 /*
3756  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3757  * we can agree on is stored in agreed_pro_version.
3758  *
3759  * feature flags and the reserved array should be enough room for future
3760  * enhancements of the handshake protocol, and possible plugins...
3761  *
3762  * for now, they are expected to be zero, but ignored.
3763  */
3764 static int drbd_send_handshake(struct drbd_conf *mdev)
3765 {
3766         /* ASSERT current == mdev->receiver ... */
3767         struct p_handshake *p = &mdev->data.sbuf.handshake;
3768         int ok;
3769
3770         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3771                 dev_err(DEV, "interrupted during initial handshake\n");
3772                 return 0; /* interrupted. not ok. */
3773         }
3774
3775         if (mdev->data.socket == NULL) {
3776                 mutex_unlock(&mdev->data.mutex);
3777                 return 0;
3778         }
3779
3780         memset(p, 0, sizeof(*p));
3781         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3782         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3783         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3784                              (struct p_header *)p, sizeof(*p), 0 );
3785         mutex_unlock(&mdev->data.mutex);
3786         return ok;
3787 }
3788
3789 /*
3790  * return values:
3791  *   1 yes, we have a valid connection
3792  *   0 oops, did not work out, please try again
3793  *  -1 peer talks different language,
3794  *     no point in trying again, please go standalone.
3795  */
3796 static int drbd_do_handshake(struct drbd_conf *mdev)
3797 {
3798         /* ASSERT current == mdev->receiver ... */
3799         struct p_handshake *p = &mdev->data.rbuf.handshake;
3800         const int expect = sizeof(struct p_handshake)
3801                           -sizeof(struct p_header);
3802         int rv;
3803
3804         rv = drbd_send_handshake(mdev);
3805         if (!rv)
3806                 return 0;
3807
3808         rv = drbd_recv_header(mdev, &p->head);
3809         if (!rv)
3810                 return 0;
3811
3812         if (p->head.command != P_HAND_SHAKE) {
3813                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3814                      cmdname(p->head.command), p->head.command);
3815                 return -1;
3816         }
3817
3818         if (p->head.length != expect) {
3819                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3820                      expect, p->head.length);
3821                 return -1;
3822         }
3823
3824         rv = drbd_recv(mdev, &p->head.payload, expect);
3825
3826         if (rv != expect) {
3827                 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3828                 return 0;
3829         }
3830
3831         p->protocol_min = be32_to_cpu(p->protocol_min);
3832         p->protocol_max = be32_to_cpu(p->protocol_max);
3833         if (p->protocol_max == 0)
3834                 p->protocol_max = p->protocol_min;
3835
3836         if (PRO_VERSION_MAX < p->protocol_min ||
3837             PRO_VERSION_MIN > p->protocol_max)
3838                 goto incompat;
3839
3840         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3841
3842         dev_info(DEV, "Handshake successful: "
3843              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3844
3845         return 1;
3846
3847  incompat:
3848         dev_err(DEV, "incompatible DRBD dialects: "
3849             "I support %d-%d, peer supports %d-%d\n",
3850             PRO_VERSION_MIN, PRO_VERSION_MAX,
3851             p->protocol_min, p->protocol_max);
3852         return -1;
3853 }
3854
3855 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3856 static int drbd_do_auth(struct drbd_conf *mdev)
3857 {
3858         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3859         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3860         return -1;
3861 }
3862 #else
3863 #define CHALLENGE_LEN 64
3864
3865 /* Return value:
3866         1 - auth succeeded,
3867         0 - failed, try again (network error),
3868         -1 - auth failed, don't try again.
3869 */
3870
3871 static int drbd_do_auth(struct drbd_conf *mdev)
3872 {
3873         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3874         struct scatterlist sg;
3875         char *response = NULL;
3876         char *right_response = NULL;
3877         char *peers_ch = NULL;
3878         struct p_header p;
3879         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3880         unsigned int resp_size;
3881         struct hash_desc desc;
3882         int rv;
3883
3884         desc.tfm = mdev->cram_hmac_tfm;
3885         desc.flags = 0;
3886
3887         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3888                                 (u8 *)mdev->net_conf->shared_secret, key_len);
3889         if (rv) {
3890                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3891                 rv = -1;
3892                 goto fail;
3893         }
3894
3895         get_random_bytes(my_challenge, CHALLENGE_LEN);
3896
3897         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3898         if (!rv)
3899                 goto fail;
3900
3901         rv = drbd_recv_header(mdev, &p);
3902         if (!rv)
3903                 goto fail;
3904
3905         if (p.command != P_AUTH_CHALLENGE) {
3906                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3907                     cmdname(p.command), p.command);
3908                 rv = 0;
3909                 goto fail;
3910         }
3911
3912         if (p.length > CHALLENGE_LEN*2) {
3913                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
3914                 rv = -1;
3915                 goto fail;
3916         }
3917
3918         peers_ch = kmalloc(p.length, GFP_NOIO);
3919         if (peers_ch == NULL) {
3920                 dev_err(DEV, "kmalloc of peers_ch failed\n");
3921                 rv = -1;
3922                 goto fail;
3923         }
3924
3925         rv = drbd_recv(mdev, peers_ch, p.length);
3926
3927         if (rv != p.length) {
3928                 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3929                 rv = 0;
3930                 goto fail;
3931         }
3932
3933         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3934         response = kmalloc(resp_size, GFP_NOIO);
3935         if (response == NULL) {
3936                 dev_err(DEV, "kmalloc of response failed\n");
3937                 rv = -1;
3938                 goto fail;
3939         }
3940
3941         sg_init_table(&sg, 1);
3942         sg_set_buf(&sg, peers_ch, p.length);
3943
3944         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3945         if (rv) {
3946                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3947                 rv = -1;
3948                 goto fail;
3949         }
3950
3951         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3952         if (!rv)
3953                 goto fail;
3954
3955         rv = drbd_recv_header(mdev, &p);
3956         if (!rv)
3957                 goto fail;
3958
3959         if (p.command != P_AUTH_RESPONSE) {
3960                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3961                     cmdname(p.command), p.command);
3962                 rv = 0;
3963                 goto fail;
3964         }
3965
3966         if (p.length != resp_size) {
3967                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3968                 rv = 0;
3969                 goto fail;
3970         }
3971
3972         rv = drbd_recv(mdev, response , resp_size);
3973
3974         if (rv != resp_size) {
3975                 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3976                 rv = 0;
3977                 goto fail;
3978         }
3979
3980         right_response = kmalloc(resp_size, GFP_NOIO);
3981         if (right_response == NULL) {
3982                 dev_err(DEV, "kmalloc of right_response failed\n");
3983                 rv = -1;
3984                 goto fail;
3985         }
3986
3987         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3988
3989         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3990         if (rv) {
3991                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3992                 rv = -1;
3993                 goto fail;
3994         }
3995
3996         rv = !memcmp(response, right_response, resp_size);
3997
3998         if (rv)
3999                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4000                      resp_size, mdev->net_conf->cram_hmac_alg);
4001         else
4002                 rv = -1;
4003
4004  fail:
4005         kfree(peers_ch);
4006         kfree(response);
4007         kfree(right_response);
4008
4009         return rv;
4010 }
4011 #endif
4012
4013 int drbdd_init(struct drbd_thread *thi)
4014 {
4015         struct drbd_conf *mdev = thi->mdev;
4016         unsigned int minor = mdev_to_minor(mdev);
4017         int h;
4018
4019         sprintf(current->comm, "drbd%d_receiver", minor);
4020
4021         dev_info(DEV, "receiver (re)started\n");
4022
4023         do {
4024                 h = drbd_connect(mdev);
4025                 if (h == 0) {
4026                         drbd_disconnect(mdev);
4027                         __set_current_state(TASK_INTERRUPTIBLE);
4028                         schedule_timeout(HZ);
4029                 }
4030                 if (h == -1) {
4031                         dev_warn(DEV, "Discarding network configuration.\n");
4032                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4033                 }
4034         } while (h == 0);
4035
4036         if (h > 0) {
4037                 if (get_net_conf(mdev)) {
4038                         drbdd(mdev);
4039                         put_net_conf(mdev);
4040                 }
4041         }
4042
4043         drbd_disconnect(mdev);
4044
4045         dev_info(DEV, "receiver terminated\n");
4046         return 0;
4047 }
4048
4049 /* ********* acknowledge sender ******** */
4050
4051 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4052 {
4053         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4054
4055         int retcode = be32_to_cpu(p->retcode);
4056
4057         if (retcode >= SS_SUCCESS) {
4058                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4059         } else {
4060                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4061                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4062                     drbd_set_st_err_str(retcode), retcode);
4063         }
4064         wake_up(&mdev->state_wait);
4065
4066         return TRUE;
4067 }
4068
4069 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4070 {
4071         return drbd_send_ping_ack(mdev);
4072
4073 }
4074
4075 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4076 {
4077         /* restore idle timeout */
4078         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4079         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4080                 wake_up(&mdev->misc_wait);
4081
4082         return TRUE;
4083 }
4084
4085 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4086 {
4087         struct p_block_ack *p = (struct p_block_ack *)h;
4088         sector_t sector = be64_to_cpu(p->sector);
4089         int blksize = be32_to_cpu(p->blksize);
4090
4091         D_ASSERT(mdev->agreed_pro_version >= 89);
4092
4093         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4094
4095         drbd_rs_complete_io(mdev, sector);
4096         drbd_set_in_sync(mdev, sector, blksize);
4097         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4098         mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4099         dec_rs_pending(mdev);
4100
4101         return TRUE;
4102 }
4103
4104 /* when we receive the ACK for a write request,
4105  * verify that we actually know about it */
4106 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4107         u64 id, sector_t sector)
4108 {
4109         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4110         struct hlist_node *n;
4111         struct drbd_request *req;
4112
4113         hlist_for_each_entry(req, n, slot, colision) {
4114                 if ((unsigned long)req == (unsigned long)id) {
4115                         if (req->sector != sector) {
4116                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4117                                     "wrong sector (%llus versus %llus)\n", req,
4118                                     (unsigned long long)req->sector,
4119                                     (unsigned long long)sector);
4120                                 break;
4121                         }
4122                         return req;
4123                 }
4124         }
4125         dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4126                 (void *)(unsigned long)id, (unsigned long long)sector);
4127         return NULL;
4128 }
4129
4130 typedef struct drbd_request *(req_validator_fn)
4131         (struct drbd_conf *mdev, u64 id, sector_t sector);
4132
4133 static int validate_req_change_req_state(struct drbd_conf *mdev,
4134         u64 id, sector_t sector, req_validator_fn validator,
4135         const char *func, enum drbd_req_event what)
4136 {
4137         struct drbd_request *req;
4138         struct bio_and_error m;
4139
4140         spin_lock_irq(&mdev->req_lock);
4141         req = validator(mdev, id, sector);
4142         if (unlikely(!req)) {
4143                 spin_unlock_irq(&mdev->req_lock);
4144                 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4145                 return FALSE;
4146         }
4147         __req_mod(req, what, &m);
4148         spin_unlock_irq(&mdev->req_lock);
4149
4150         if (m.bio)
4151                 complete_master_bio(mdev, &m);
4152         return TRUE;
4153 }
4154
4155 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4156 {
4157         struct p_block_ack *p = (struct p_block_ack *)h;
4158         sector_t sector = be64_to_cpu(p->sector);
4159         int blksize = be32_to_cpu(p->blksize);
4160         enum drbd_req_event what;
4161
4162         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4163
4164         if (is_syncer_block_id(p->block_id)) {
4165                 drbd_set_in_sync(mdev, sector, blksize);
4166                 dec_rs_pending(mdev);
4167                 return TRUE;
4168         }
4169         switch (be16_to_cpu(h->command)) {
4170         case P_RS_WRITE_ACK:
4171                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4172                 what = write_acked_by_peer_and_sis;
4173                 break;
4174         case P_WRITE_ACK:
4175                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4176                 what = write_acked_by_peer;
4177                 break;
4178         case P_RECV_ACK:
4179                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4180                 what = recv_acked_by_peer;
4181                 break;
4182         case P_DISCARD_ACK:
4183                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4184                 what = conflict_discarded_by_peer;
4185                 break;
4186         default:
4187                 D_ASSERT(0);
4188                 return FALSE;
4189         }
4190
4191         return validate_req_change_req_state(mdev, p->block_id, sector,
4192                 _ack_id_to_req, __func__ , what);
4193 }
4194
4195 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4196 {
4197         struct p_block_ack *p = (struct p_block_ack *)h;
4198         sector_t sector = be64_to_cpu(p->sector);
4199
4200         if (__ratelimit(&drbd_ratelimit_state))
4201                 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4202
4203         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4204
4205         if (is_syncer_block_id(p->block_id)) {
4206                 int size = be32_to_cpu(p->blksize);
4207                 dec_rs_pending(mdev);
4208                 drbd_rs_failed_io(mdev, sector, size);
4209                 return TRUE;
4210         }
4211         return validate_req_change_req_state(mdev, p->block_id, sector,
4212                 _ack_id_to_req, __func__ , neg_acked);
4213 }
4214
4215 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4216 {
4217         struct p_block_ack *p = (struct p_block_ack *)h;
4218         sector_t sector = be64_to_cpu(p->sector);
4219
4220         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4221         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4222             (unsigned long long)sector, be32_to_cpu(p->blksize));
4223
4224         return validate_req_change_req_state(mdev, p->block_id, sector,
4225                 _ar_id_to_req, __func__ , neg_acked);
4226 }
4227
4228 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4229 {
4230         sector_t sector;
4231         int size;
4232         struct p_block_ack *p = (struct p_block_ack *)h;
4233
4234         sector = be64_to_cpu(p->sector);
4235         size = be32_to_cpu(p->blksize);
4236         D_ASSERT(p->block_id == ID_SYNCER);
4237
4238         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4239
4240         dec_rs_pending(mdev);
4241
4242         if (get_ldev_if_state(mdev, D_FAILED)) {
4243                 drbd_rs_complete_io(mdev, sector);
4244                 drbd_rs_failed_io(mdev, sector, size);
4245                 put_ldev(mdev);
4246         }
4247
4248         return TRUE;
4249 }
4250
4251 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4252 {
4253         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4254
4255         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4256
4257         return TRUE;
4258 }
4259
4260 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4261 {
4262         struct p_block_ack *p = (struct p_block_ack *)h;
4263         struct drbd_work *w;
4264         sector_t sector;
4265         int size;
4266
4267         sector = be64_to_cpu(p->sector);
4268         size = be32_to_cpu(p->blksize);
4269
4270         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4271
4272         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4273                 drbd_ov_oos_found(mdev, sector, size);
4274         else
4275                 ov_oos_print(mdev);
4276
4277         drbd_rs_complete_io(mdev, sector);
4278         dec_rs_pending(mdev);
4279
4280         if (--mdev->ov_left == 0) {
4281                 w = kmalloc(sizeof(*w), GFP_NOIO);
4282                 if (w) {
4283                         w->cb = w_ov_finished;
4284                         drbd_queue_work_front(&mdev->data.work, w);
4285                 } else {
4286                         dev_err(DEV, "kmalloc(w) failed.");
4287                         ov_oos_print(mdev);
4288                         drbd_resync_finished(mdev);
4289                 }
4290         }
4291         return TRUE;
4292 }
4293
4294 struct asender_cmd {
4295         size_t pkt_size;
4296         int (*process)(struct drbd_conf *mdev, struct p_header *h);
4297 };
4298
4299 static struct asender_cmd *get_asender_cmd(int cmd)
4300 {
4301         static struct asender_cmd asender_tbl[] = {
4302                 /* anything missing from this table is in
4303                  * the drbd_cmd_handler (drbd_default_handler) table,
4304                  * see the beginning of drbdd() */
4305         [P_PING]            = { sizeof(struct p_header), got_Ping },
4306         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4307         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4308         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4309         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4310         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4311         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4312         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4313         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4314         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4315         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4316         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4317         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4318         [P_MAX_CMD]         = { 0, NULL },
4319         };
4320         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4321                 return NULL;
4322         return &asender_tbl[cmd];
4323 }
4324
4325 int drbd_asender(struct drbd_thread *thi)
4326 {
4327         struct drbd_conf *mdev = thi->mdev;
4328         struct p_header *h = &mdev->meta.rbuf.header;
4329         struct asender_cmd *cmd = NULL;
4330
4331         int rv, len;
4332         void *buf    = h;
4333         int received = 0;
4334         int expect   = sizeof(struct p_header);
4335         int empty;
4336
4337         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4338
4339         current->policy = SCHED_RR;  /* Make this a realtime task! */
4340         current->rt_priority = 2;    /* more important than all other tasks */
4341
4342         while (get_t_state(thi) == Running) {
4343                 drbd_thread_current_set_cpu(mdev);
4344                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4345                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4346                         mdev->meta.socket->sk->sk_rcvtimeo =
4347                                 mdev->net_conf->ping_timeo*HZ/10;
4348                 }
4349
4350                 /* conditionally cork;
4351                  * it may hurt latency if we cork without much to send */
4352                 if (!mdev->net_conf->no_cork &&
4353                         3 < atomic_read(&mdev->unacked_cnt))
4354                         drbd_tcp_cork(mdev->meta.socket);
4355                 while (1) {
4356                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4357                         flush_signals(current);
4358                         if (!drbd_process_done_ee(mdev)) {
4359                                 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4360                                 goto reconnect;
4361                         }
4362                         /* to avoid race with newly queued ACKs */
4363                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4364                         spin_lock_irq(&mdev->req_lock);
4365                         empty = list_empty(&mdev->done_ee);
4366                         spin_unlock_irq(&mdev->req_lock);
4367                         /* new ack may have been queued right here,
4368                          * but then there is also a signal pending,
4369                          * and we start over... */
4370                         if (empty)
4371                                 break;
4372                 }
4373                 /* but unconditionally uncork unless disabled */
4374                 if (!mdev->net_conf->no_cork)
4375                         drbd_tcp_uncork(mdev->meta.socket);
4376
4377                 /* short circuit, recv_msg would return EINTR anyways. */
4378                 if (signal_pending(current))
4379                         continue;
4380
4381                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4382                                      buf, expect-received, 0);
4383                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4384
4385                 flush_signals(current);
4386
4387                 /* Note:
4388                  * -EINTR        (on meta) we got a signal
4389                  * -EAGAIN       (on meta) rcvtimeo expired
4390                  * -ECONNRESET   other side closed the connection
4391                  * -ERESTARTSYS  (on data) we got a signal
4392                  * rv <  0       other than above: unexpected error!
4393                  * rv == expected: full header or command
4394                  * rv <  expected: "woken" by signal during receive
4395                  * rv == 0       : "connection shut down by peer"
4396                  */
4397                 if (likely(rv > 0)) {
4398                         received += rv;
4399                         buf      += rv;
4400                 } else if (rv == 0) {
4401                         dev_err(DEV, "meta connection shut down by peer.\n");
4402                         goto reconnect;
4403                 } else if (rv == -EAGAIN) {
4404                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4405                             mdev->net_conf->ping_timeo*HZ/10) {
4406                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4407                                 goto reconnect;
4408                         }
4409                         set_bit(SEND_PING, &mdev->flags);
4410                         continue;
4411                 } else if (rv == -EINTR) {
4412                         continue;
4413                 } else {
4414                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4415                         goto reconnect;
4416                 }
4417
4418                 if (received == expect && cmd == NULL) {
4419                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4420                                 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4421                                     (long)be32_to_cpu(h->magic),
4422                                     h->command, h->length);
4423                                 goto reconnect;
4424                         }
4425                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4426                         len = be16_to_cpu(h->length);
4427                         if (unlikely(cmd == NULL)) {
4428                                 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4429                                     (long)be32_to_cpu(h->magic),
4430                                     h->command, h->length);
4431                                 goto disconnect;
4432                         }
4433                         expect = cmd->pkt_size;
4434                         ERR_IF(len != expect-sizeof(struct p_header))
4435                                 goto reconnect;
4436                 }
4437                 if (received == expect) {
4438                         D_ASSERT(cmd != NULL);
4439                         if (!cmd->process(mdev, h))
4440                                 goto reconnect;
4441
4442                         buf      = h;
4443                         received = 0;
4444                         expect   = sizeof(struct p_header);
4445                         cmd      = NULL;
4446                 }
4447         }
4448
4449         if (0) {
4450 reconnect:
4451                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4452         }
4453         if (0) {
4454 disconnect:
4455                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4456         }
4457         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4458
4459         D_ASSERT(mdev->state.conn < C_CONNECTED);
4460         dev_info(DEV, "asender terminated\n");
4461
4462         return 0;
4463 }