#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/net.h>
+#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
#include <net/tcp.h>
#include "super.h"
#include "messenger.h"
#include "decode.h"
+#include "pagelist.h"
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+#ifdef CONFIG_LOCKDEP
+static struct lock_class_key socket_class;
+#endif
+
static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
con->sock = sock;
sock->sk->sk_allocation = GFP_NOFS;
+#ifdef CONFIG_LOCKDEP
+ lockdep_set_class(&sock->sk->sk_lock, &socket_class);
+#endif
+
set_sock_callbacks(sock, con);
dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
{
/* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */
- mutex_lock(&con->out_mutex);
ceph_msg_remove_list(&con->out_queue);
ceph_msg_remove_list(&con->out_sent);
+ if (con->in_msg) {
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ }
+
con->connect_seq = 0;
con->out_seq = 0;
- con->out_msg = NULL;
+ if (con->out_msg) {
+ ceph_msg_put(con->out_msg);
+ con->out_msg = NULL;
+ }
con->in_seq = 0;
- mutex_unlock(&con->out_mutex);
+ con->in_seq_acked = 0;
}
/*
dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
set_bit(CLOSED, &con->state); /* in case there's queued work */
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
+ clear_bit(LOSSYTX, &con->state); /* so we retry next connect */
+ clear_bit(KEEPALIVE_PENDING, &con->state);
+ clear_bit(WRITE_PENDING, &con->state);
+ mutex_lock(&con->mutex);
reset_connection(con);
+ cancel_delayed_work(&con->work);
+ mutex_unlock(&con->mutex);
queue_con(con);
}
}
/*
+ * return true if this connection ever successfully opened
+ */
+bool ceph_con_opened(struct ceph_connection *con)
+{
+ return con->connect_seq > 0;
+}
+
+/*
* generic get/put
*/
struct ceph_connection *ceph_con_get(struct ceph_connection *con)
memset(con, 0, sizeof(*con));
atomic_set(&con->nref, 1);
con->msgr = msgr;
- mutex_init(&con->out_mutex);
+ mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
con->out_kvec_bytes += sizeof(m->footer);
con->out_kvec_left++;
con->out_more = m->more_to_follow;
- con->out_msg = NULL; /* we're done with this one */
+ con->out_msg_done = true;
}
/*
con->out_kvec_bytes = 0;
con->out_kvec_is_msg = true;
+ con->out_msg_done = false;
/* Sneak an ack in there first? If we can get it into the same
* TCP packet that's a good thing. */
con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
}
- /* move message to sending/sent list */
m = list_first_entry(&con->out_queue,
struct ceph_msg, list_head);
- list_move_tail(&m->list_head, &con->out_sent);
- con->out_msg = m; /* we don't bother taking a reference here. */
+ con->out_msg = m;
+ if (test_bit(LOSSYTX, &con->state)) {
+ list_del_init(&m->list_head);
+ } else {
+ /* put message on sent list */
+ ceph_msg_get(m);
+ list_move_tail(&m->list_head, &con->out_sent);
+ }
- m->hdr.seq = cpu_to_le64(++con->out_seq);
+ /*
+ * only assign outgoing seq # if we haven't sent this message
+ * yet. if it is requeued, resend with it's original seq.
+ */
+ if (m->needs_out_seq) {
+ m->hdr.seq = cpu_to_le64(++con->out_seq);
+ m->needs_out_seq = false;
+ }
dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
int auth_len = 0;
int auth_protocol = 0;
+ mutex_unlock(&con->mutex);
if (con->ops->get_authorizer)
con->ops->get_authorizer(con, &auth_buf, &auth_len,
&auth_protocol, &con->auth_reply_buf,
&con->auth_reply_buf_len,
con->auth_retry);
+ mutex_lock(&con->mutex);
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
+ con->out_connect.features = CEPH_FEATURE_SUPPORTED;
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
con->out_connect.protocol_version = cpu_to_le32(proto);
con->out_connect.flags = 0;
- if (test_bit(LOSSYTX, &con->state))
- con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
if (!after_banner) {
con->out_kvec_left = 0;
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
+ } else if (msg->pagelist) {
+ page = list_first_entry(&msg->pagelist->head,
+ struct page, lru);
+ if (crc)
+ kaddr = kmap(page);
} else {
page = con->msgr->zero_page;
if (crc)
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && msg->pages)
+ if (crc && (msg->pages || msg->pagelist))
kunmap(page);
if (ret <= 0)
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
+ if (msg->pagelist)
+ list_move_tail(&page->lru,
+ &msg->pagelist->head);
}
}
con->in_base_pos = 0;
}
-static void prepare_read_connect_retry(struct ceph_connection *con)
-{
- dout("prepare_read_connect_retry %p\n", con);
- con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->actual_peer_addr)
- + sizeof(con->peer_addr_for_me);
-}
-
static void prepare_read_ack(struct ceph_connection *con)
{
dout("prepare_read_ack %p\n", con);
* end may not yet know their ip address, so if it's 0.0.0.0, give
* them the benefit of the doubt.
*/
- if (!ceph_entity_addr_is_local(&con->peer_addr,
- &con->actual_peer_addr) &&
+ if (memcmp(&con->peer_addr, &con->actual_peer_addr,
+ sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr.in_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_err("wrong peer, want %s/%d, "
- "got %s/%d, wtf\n",
- pr_addr(&con->peer_addr.in_addr),
- con->peer_addr.nonce,
- pr_addr(&con->actual_peer_addr.in_addr),
- con->actual_peer_addr.nonce);
- con->error_msg = "protocol error, wrong peer";
+ pr_warning("wrong peer, want %s/%lld, got %s/%lld\n",
+ pr_addr(&con->peer_addr.in_addr),
+ le64_to_cpu(con->peer_addr.nonce),
+ pr_addr(&con->actual_peer_addr.in_addr),
+ le64_to_cpu(con->actual_peer_addr.nonce));
+ con->error_msg = "wrong peer at address";
return -1;
}
return 0;
}
+static void fail_protocol(struct ceph_connection *con)
+{
+ reset_connection(con);
+ set_bit(CLOSED, &con->state); /* in case there's queued work */
+
+ mutex_unlock(&con->mutex);
+ if (con->ops->bad_proto)
+ con->ops->bad_proto(con);
+ mutex_lock(&con->mutex);
+}
+
static int process_connect(struct ceph_connection *con)
{
+ u64 sup_feat = CEPH_FEATURE_SUPPORTED;
+ u64 req_feat = CEPH_FEATURE_REQUIRED;
+ u64 server_feat = le64_to_cpu(con->in_reply.features);
+
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
switch (con->in_reply.tag) {
+ case CEPH_MSGR_TAG_FEATURES:
+ pr_err("%s%lld %s feature set mismatch,"
+ " my %llx < server's %llx, missing %llx\n",
+ ENTITY_NAME(con->peer_name),
+ pr_addr(&con->peer_addr.in_addr),
+ sup_feat, server_feat, server_feat & ~sup_feat);
+ con->error_msg = "missing required protocol features";
+ fail_protocol(con);
+ return -1;
+
case CEPH_MSGR_TAG_BADPROTOVER:
- dout("process_connect got BADPROTOVER my %d != their %d\n",
- le32_to_cpu(con->out_connect.protocol_version),
- le32_to_cpu(con->in_reply.protocol_version));
pr_err("%s%lld %s protocol version mismatch,"
" my %d != server's %d\n",
ENTITY_NAME(con->peer_name),
le32_to_cpu(con->out_connect.protocol_version),
le32_to_cpu(con->in_reply.protocol_version));
con->error_msg = "protocol version mismatch";
- if (con->ops->bad_proto)
- con->ops->bad_proto(con);
- reset_connection(con);
- set_bit(CLOSED, &con->state); /* in case there's queued work */
+ fail_protocol(con);
return -1;
case CEPH_MSGR_TAG_BADAUTHORIZER:
}
con->auth_retry = 1;
prepare_write_connect(con->msgr, con, 0);
- prepare_read_connect_retry(con);
+ prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_RESETSESSION:
prepare_read_connect(con);
/* Tell ceph about it. */
+ mutex_unlock(&con->mutex);
pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
if (con->ops->peer_reset)
con->ops->peer_reset(con);
+ mutex_lock(&con->mutex);
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
break;
case CEPH_MSGR_TAG_READY:
+ if (req_feat & ~server_feat) {
+ pr_err("%s%lld %s protocol feature mismatch,"
+ " my required %llx > server's %llx, need %llx\n",
+ ENTITY_NAME(con->peer_name),
+ pr_addr(&con->peer_addr.in_addr),
+ req_feat, server_feat, req_feat & ~server_feat);
+ con->error_msg = "missing required protocol features";
+ fail_protocol(con);
+ return -1;
+ }
clear_bit(CONNECTING, &con->state);
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->connect_seq);
WARN_ON(con->connect_seq !=
le32_to_cpu(con->in_reply.connect_seq));
+
+ if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
+ set_bit(LOSSYTX, &con->state);
+
prepare_read_tag(con);
break;
u64 ack = le64_to_cpu(con->in_temp_ack);
u64 seq;
- mutex_lock(&con->out_mutex);
while (!list_empty(&con->out_sent)) {
m = list_first_entry(&con->out_sent, struct ceph_msg,
list_head);
le16_to_cpu(m->hdr.type), m);
ceph_msg_remove(m);
}
- mutex_unlock(&con->out_mutex);
prepare_read_tag(con);
}
+static int read_partial_message_section(struct ceph_connection *con,
+ struct kvec *section, unsigned int sec_len,
+ u32 *crc)
+{
+ int left;
+ int ret;
+
+ BUG_ON(!section);
+
+ while (section->iov_len < sec_len) {
+ BUG_ON(section->iov_base == NULL);
+ left = sec_len - section->iov_len;
+ ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
+ section->iov_len, left);
+ if (ret <= 0)
+ return ret;
+ section->iov_len += ret;
+ if (section->iov_len == sec_len)
+ *crc = crc32c(0, section->iov_base,
+ section->iov_len);
+ }
+ return 1;
+}
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip);
/*
* read (part of) a message.
*/
struct ceph_msg *m = con->in_msg;
void *p;
int ret;
- int to, want, left;
+ int to, left;
unsigned front_len, middle_len, data_len, data_off;
int datacrc = con->msgr->nocrc;
+ int skip;
+ u64 seq;
dout("read_partial_message con %p msg %p\n", con, m);
}
}
}
-
front_len = le32_to_cpu(con->in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
return -EIO;
+ data_off = le16_to_cpu(con->in_hdr.data_off);
+
+ /* verify seq# */
+ seq = le64_to_cpu(con->in_hdr.seq);
+ if ((s64)seq - (s64)con->in_seq < 1) {
+ pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
+ ENTITY_NAME(con->peer_name),
+ pr_addr(&con->peer_addr.in_addr),
+ seq, con->in_seq + 1);
+ con->in_base_pos = -front_len - middle_len - data_len -
+ sizeof(m->footer);
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
+ return 0;
+ } else if ((s64)seq - (s64)con->in_seq > 1) {
+ pr_err("read_partial_message bad seq %lld expected %lld\n",
+ seq, con->in_seq + 1);
+ con->error_msg = "bad message sequence # for incoming message";
+ return -EBADMSG;
+ }
/* allocate message? */
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len);
- con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
- if (!con->in_msg) {
+ con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
+ if (skip) {
/* skip this message */
dout("alloc_msg returned NULL, skipping message\n");
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
return 0;
}
if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg);
con->in_msg = NULL;
- con->error_msg = "out of memory for incoming message";
+ con->error_msg =
+ "error allocating memory for incoming message";
return ret;
}
m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */
- memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
+ if (m->middle)
+ m->middle->vec.iov_len = 0;
+
+ con->in_msg_pos.page = 0;
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ con->in_msg_pos.data_pos = 0;
}
/* front */
- while (m->front.iov_len < front_len) {
- BUG_ON(m->front.iov_base == NULL);
- left = front_len - m->front.iov_len;
- ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
- m->front.iov_len, left);
- if (ret <= 0)
- return ret;
- m->front.iov_len += ret;
- if (m->front.iov_len == front_len)
- con->in_front_crc = crc32c(0, m->front.iov_base,
- m->front.iov_len);
- }
+ ret = read_partial_message_section(con, &m->front, front_len,
+ &con->in_front_crc);
+ if (ret <= 0)
+ return ret;
/* middle */
- while (middle_len > 0 && (!m->middle ||
- m->middle->vec.iov_len < middle_len)) {
- if (m->middle == NULL) {
- ret = -EOPNOTSUPP;
- if (con->ops->alloc_middle)
- ret = con->ops->alloc_middle(con, m);
- if (ret < 0) {
- dout("alloc_middle failed, skipping payload\n");
- con->in_base_pos = -middle_len - data_len
- - sizeof(m->footer);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
- }
- m->middle->vec.iov_len = 0;
- }
- left = middle_len - m->middle->vec.iov_len;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)m->middle->vec.iov_base +
- m->middle->vec.iov_len, left);
+ if (m->middle) {
+ ret = read_partial_message_section(con, &m->middle->vec, middle_len,
+ &con->in_middle_crc);
if (ret <= 0)
return ret;
- m->middle->vec.iov_len += ret;
- if (m->middle->vec.iov_len == middle_len)
- con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
- m->middle->vec.iov_len);
}
/* (page) data */
- data_off = le16_to_cpu(m->hdr.data_off);
- if (data_len == 0)
- goto no_data;
-
- if (m->nr_pages == 0) {
- con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
- con->in_msg_pos.data_pos = 0;
- /* find pages for data payload */
- want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
- ret = -1;
- if (con->ops->prepare_pages)
- ret = con->ops->prepare_pages(con, m, want);
- if (ret < 0) {
- dout("%p prepare_pages failed, skipping payload\n", m);
- con->in_base_pos = -data_len - sizeof(m->footer);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
- }
- BUG_ON(m->nr_pages < want);
- }
while (con->in_msg_pos.data_pos < data_len) {
left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
}
}
-no_data:
/* footer */
to = sizeof(m->hdr) + sizeof(m->footer);
while (con->in_base_pos < to) {
*/
static void process_message(struct ceph_connection *con)
{
- struct ceph_msg *msg = con->in_msg;
+ struct ceph_msg *msg;
+ msg = con->in_msg;
con->in_msg = NULL;
/* if first message, set peer_name */
if (con->peer_name.type == 0)
con->peer_name = msg->hdr.src.name;
- mutex_lock(&con->out_mutex);
con->in_seq++;
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
msg, le64_to_cpu(msg->hdr.seq),
le32_to_cpu(msg->hdr.data_len),
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
con->ops->dispatch(con, msg);
+
+ mutex_lock(&con->mutex);
prepare_read_tag(con);
}
dout("try_write start %p state %lu nref %d\n", con, con->state,
atomic_read(&con->nref));
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
set_bit(CONNECTING, &con->state);
clear_bit(NEGOTIATING, &con->state);
+ BUG_ON(con->in_msg);
con->in_tag = CEPH_MSGR_TAG_READY;
dout("try_write initiating connect on %p new state %lu\n",
con, con->state);
ret = write_partial_kvec(con);
if (ret <= 0)
goto done;
- if (ret < 0) {
- dout("try_write write_partial_kvec err %d\n", ret);
- goto done;
- }
}
/* msg pages? */
if (con->out_msg) {
+ if (con->out_msg_done) {
+ ceph_msg_put(con->out_msg);
+ con->out_msg = NULL; /* we're done with this one */
+ goto do_next;
+ }
+
ret = write_partial_msg_pages(con);
if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */
}
}
+do_next:
if (!test_bit(CONNECTING, &con->state)) {
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
done:
ret = 0;
out:
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
dout("try_write done on %p\n", con);
return ret;
}
dout("try_read start on %p\n", con);
msgr = con->msgr;
+ mutex_lock(&con->mutex);
+
more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
done:
ret = 0;
out:
+ mutex_unlock(&con->mutex);
dout("try_read done on %p\n", con);
return ret;
clear_bit(BUSY, &con->state);
dout("con->state=%lu\n", con->state);
if (test_bit(QUEUED, &con->state)) {
- if (!backoff) {
+ if (!backoff || test_bit(OPENING, &con->state)) {
dout("con_work %p QUEUED reset, looping\n", con);
goto more;
}
goto out;
}
- clear_bit(BUSY, &con->state); /* to avoid an improbable race */
+ mutex_lock(&con->mutex);
+ if (test_bit(CLOSED, &con->state))
+ goto out_unlock;
con_close_socket(con);
- con->in_msg = NULL;
+
+ if (con->in_msg) {
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ }
+
+ /* Requeue anything that hasn't been acked */
+ list_splice_init(&con->out_sent, &con->out_queue);
/* If there are no messages in the queue, place the connection
* in a STANDBY state (i.e., don't try to reconnect just yet). */
- mutex_lock(&con->out_mutex);
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
dout("fault setting STANDBY\n");
set_bit(STANDBY, &con->state);
- mutex_unlock(&con->out_mutex);
- goto out;
+ } else {
+ /* retry after a delay. */
+ if (con->delay == 0)
+ con->delay = BASE_DELAY_INTERVAL;
+ else if (con->delay < MAX_DELAY_INTERVAL)
+ con->delay *= 2;
+ dout("fault queueing %p delay %lu\n", con, con->delay);
+ con->ops->get(con);
+ if (queue_delayed_work(ceph_msgr_wq, &con->work,
+ round_jiffies_relative(con->delay)) == 0)
+ con->ops->put(con);
+ }
+
+out_unlock:
+ mutex_unlock(&con->mutex);
+out:
+ /*
+ * in case we faulted due to authentication, invalidate our
+ * current tickets so that we can get new ones.
+ */
+ if (con->auth_retry && con->ops->invalidate_authorizer) {
+ dout("calling invalidate_authorizer()\n");
+ con->ops->invalidate_authorizer(con);
}
- /* Requeue anything that hasn't been acked, and retry after a
- * delay. */
- list_splice_init(&con->out_sent, &con->out_queue);
- mutex_unlock(&con->out_mutex);
-
- if (con->delay == 0)
- con->delay = BASE_DELAY_INTERVAL;
- else if (con->delay < MAX_DELAY_INTERVAL)
- con->delay *= 2;
-
- /* explicitly schedule work to try to reconnect again later. */
- dout("fault queueing %p delay %lu\n", con, con->delay);
- con->ops->get(con);
- if (queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay)) == 0)
- con->ops->put(con);
-
-out:
if (con->ops->fault)
con->ops->fault(con);
}
msgr->inst.addr = *myaddr;
/* select a random nonce */
- get_random_bytes(&msgr->inst.addr.nonce,
- sizeof(msgr->inst.addr.nonce));
+ msgr->inst.addr.type = 0;
+ get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
encode_my_addr(msgr);
dout("messenger_create %p\n", msgr);
msg->hdr.src.name = con->msgr->inst.name;
msg->hdr.src.addr = con->msgr->my_enc_addr;
msg->hdr.orig_src = msg->hdr.src;
- msg->hdr.dst_erank = con->peer_addr.erank;
+
+ BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
+
+ msg->needs_out_seq = true;
/* queue */
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
*/
void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
{
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("con_revoke %p msg %p\n", con, msg);
list_del_init(&msg->list_head);
ceph_msg_put(msg);
msg->hdr.seq = 0;
- if (con->out_msg == msg)
+ if (con->out_msg == msg) {
+ ceph_msg_put(con->out_msg);
con->out_msg = NULL;
+ }
if (con->out_kvec_is_msg) {
con->out_skip = con->out_kvec_bytes;
con->out_kvec_is_msg = false;
} else {
dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
}
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
+}
+
+/*
+ * Revoke a message that we may be reading data into
+ */
+void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ mutex_lock(&con->mutex);
+ if (con->in_msg && con->in_msg == msg) {
+ unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
+ unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
+ unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+
+ /* skip rest of message */
+ dout("con_revoke_pages %p msg %p revoked\n", con, msg);
+ con->in_base_pos = con->in_base_pos -
+ sizeof(struct ceph_msg_header) -
+ front_len -
+ middle_len -
+ data_len -
+ sizeof(struct ceph_msg_footer);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
+ } else {
+ dout("con_revoke_pages %p msg %p pages %p no-op\n",
+ con, con->in_msg, msg);
+ }
+ mutex_unlock(&con->mutex);
}
/*
m = kmalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
goto out;
- atomic_set(&m->nref, 1);
+ kref_init(&m->kref);
INIT_LIST_HEAD(&m->list_head);
+ m->hdr.tid = 0;
m->hdr.type = cpu_to_le16(type);
+ m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+ m->hdr.version = 0;
m->hdr.front_len = cpu_to_le32(front_len);
m->hdr.middle_len = 0;
m->hdr.data_len = cpu_to_le32(page_len);
m->hdr.data_off = cpu_to_le16(page_off);
- m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+ m->hdr.reserved = 0;
m->footer.front_crc = 0;
m->footer.middle_crc = 0;
m->footer.data_crc = 0;
+ m->footer.flags = 0;
m->front_max = front_len;
m->front_is_vmalloc = false;
m->more_to_follow = false;
/* data */
m->nr_pages = calc_pages_for(page_off, page_len);
m->pages = pages;
+ m->pagelist = NULL;
dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
m->nr_pages);
}
/*
- * Generic message allocator, for incoming messages.
- */
-struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr)
-{
- int type = le16_to_cpu(hdr->type);
- int front_len = le32_to_cpu(hdr->front_len);
- struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
-
- if (!msg) {
- pr_err("unable to allocate msg type %d len %d\n",
- type, front_len);
- return ERR_PTR(-ENOMEM);
- }
- return msg;
-}
-
-/*
* Allocate "middle" portion of a message, if it is needed and wasn't
* allocated by alloc_msg. This allows us to read a small fixed-size
* per-type header in the front and then gracefully fail (i.e.,
* propagate the error to the caller based on info in the front) when
* the middle is too large.
*/
-int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
+static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
{
int type = le16_to_cpu(msg->hdr.type);
int middle_len = le32_to_cpu(msg->hdr.middle_len);
return 0;
}
+/*
+ * Generic message allocator, for incoming messages.
+ */
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip)
+{
+ int type = le16_to_cpu(hdr->type);
+ int front_len = le32_to_cpu(hdr->front_len);
+ int middle_len = le32_to_cpu(hdr->middle_len);
+ struct ceph_msg *msg = NULL;
+ int ret;
+
+ if (con->ops->alloc_msg) {
+ mutex_unlock(&con->mutex);
+ msg = con->ops->alloc_msg(con, hdr, skip);
+ mutex_lock(&con->mutex);
+ if (IS_ERR(msg))
+ return msg;
+
+ if (*skip)
+ return NULL;
+ }
+ if (!msg) {
+ *skip = 0;
+ msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+ if (!msg) {
+ pr_err("unable to allocate msg type %d len %d\n",
+ type, front_len);
+ return ERR_PTR(-ENOMEM);
+ }
+ }
+ memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
+
+ if (middle_len) {
+ ret = ceph_alloc_middle(con, msg);
+
+ if (ret < 0) {
+ ceph_msg_put(msg);
+ return msg;
+ }
+ }
+
+ return msg;
+}
+
/*
* Free a generically kmalloc'd message.
/*
* Drop a msg ref. Destroy as needed.
*/
-void ceph_msg_put(struct ceph_msg *m)
-{
- dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
- atomic_read(&m->nref)-1);
- if (atomic_read(&m->nref) <= 0) {
- pr_err("bad ceph_msg_put on %p %llu %d=%s %d+%d\n",
- m, le64_to_cpu(m->hdr.seq),
- le16_to_cpu(m->hdr.type),
- ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
- le32_to_cpu(m->hdr.front_len),
- le32_to_cpu(m->hdr.data_len));
- WARN_ON(1);
- }
- if (atomic_dec_and_test(&m->nref)) {
- dout("ceph_msg_put last one on %p\n", m);
- WARN_ON(!list_empty(&m->list_head));
-
- /* drop middle, data, if any */
- if (m->middle) {
- ceph_buffer_put(m->middle);
- m->middle = NULL;
- }
- m->nr_pages = 0;
- m->pages = NULL;
+void ceph_msg_last_put(struct kref *kref)
+{
+ struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+
+ dout("ceph_msg_put last one on %p\n", m);
+ WARN_ON(!list_empty(&m->list_head));
- if (m->pool)
- ceph_msgpool_put(m->pool, m);
- else
- ceph_msg_kfree(m);
+ /* drop middle, data, if any */
+ if (m->middle) {
+ ceph_buffer_put(m->middle);
+ m->middle = NULL;
}
+ m->nr_pages = 0;
+ m->pages = NULL;
+
+ if (m->pagelist) {
+ ceph_pagelist_release(m->pagelist);
+ kfree(m->pagelist);
+ m->pagelist = NULL;
+ }
+
+ if (m->pool)
+ ceph_msgpool_put(m->pool, m);
+ else
+ ceph_msg_kfree(m);
+}
+
+void ceph_msg_dump(struct ceph_msg *msg)
+{
+ pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
+ msg->front_max, msg->nr_pages);
+ print_hex_dump(KERN_DEBUG, "header: ",
+ DUMP_PREFIX_OFFSET, 16, 1,
+ &msg->hdr, sizeof(msg->hdr), true);
+ print_hex_dump(KERN_DEBUG, " front: ",
+ DUMP_PREFIX_OFFSET, 16, 1,
+ msg->front.iov_base, msg->front.iov_len, true);
+ if (msg->middle)
+ print_hex_dump(KERN_DEBUG, "middle: ",
+ DUMP_PREFIX_OFFSET, 16, 1,
+ msg->middle->vec.iov_base,
+ msg->middle->vec.iov_len, true);
+ print_hex_dump(KERN_DEBUG, "footer: ",
+ DUMP_PREFIX_OFFSET, 16, 1,
+ &msg->footer, sizeof(msg->footer), true);
}