ceph: use ceph. prefix for virtual xattrs
[safe/jmp/linux-2.6] / fs / ceph / messenger.c
index 395ce32..fe75ec7 100644 (file)
@@ -340,6 +340,7 @@ static void reset_connection(struct ceph_connection *con)
                ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
        }
+       con->out_keepalive_pending = false;
        con->in_seq = 0;
        con->in_seq_acked = 0;
 }
@@ -357,6 +358,7 @@ void ceph_con_close(struct ceph_connection *con)
        clear_bit(WRITE_PENDING, &con->state);
        mutex_lock(&con->mutex);
        reset_connection(con);
+       con->peer_global_seq = 0;
        cancel_delayed_work(&con->work);
        mutex_unlock(&con->mutex);
        queue_con(con);
@@ -661,7 +663,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = CEPH_FEATURE_SUPPORTED;
+       con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1124,8 +1126,8 @@ static void fail_protocol(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = CEPH_FEATURE_SUPPORTED;
-       u64 req_feat = CEPH_FEATURE_REQUIRED;
+       u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
+       u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1512,14 +1514,14 @@ static void process_message(struct ceph_connection *con)
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
-               con->peer_name = msg->hdr.src.name;
+               con->peer_name = msg->hdr.src;
 
        con->in_seq++;
        mutex_unlock(&con->mutex);
 
        dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
             msg, le64_to_cpu(msg->hdr.seq),
-            ENTITY_NAME(msg->hdr.src.name),
+            ENTITY_NAME(msg->hdr.src),
             le16_to_cpu(msg->hdr.type),
             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
             le32_to_cpu(msg->hdr.front_len),
@@ -1544,7 +1546,6 @@ static int try_write(struct ceph_connection *con)
        dout("try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
 
-       mutex_lock(&con->mutex);
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -1637,7 +1638,6 @@ do_next:
 done:
        ret = 0;
 out:
-       mutex_unlock(&con->mutex);
        dout("try_write done on %p\n", con);
        return ret;
 }
@@ -1649,7 +1649,6 @@ out:
  */
 static int try_read(struct ceph_connection *con)
 {
-       struct ceph_messenger *msgr;
        int ret = -1;
 
        if (!con->sock)
@@ -1659,9 +1658,6 @@ static int try_read(struct ceph_connection *con)
                return 0;
 
        dout("try_read start on %p\n", con);
-       msgr = con->msgr;
-
-       mutex_lock(&con->mutex);
 
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
@@ -1756,7 +1752,6 @@ more:
 done:
        ret = 0;
 out:
-       mutex_unlock(&con->mutex);
        dout("try_read done on %p\n", con);
        return ret;
 
@@ -1828,6 +1823,8 @@ more:
        dout("con_work %p start, clearing QUEUED\n", con);
        clear_bit(QUEUED, &con->state);
 
+       mutex_lock(&con->mutex);
+
        if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
                dout("con_work CLOSED\n");
                con_close_socket(con);
@@ -1842,11 +1839,16 @@ more:
        if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
            try_read(con) < 0 ||
            try_write(con) < 0) {
+               mutex_unlock(&con->mutex);
                backoff = 1;
                ceph_fault(con);     /* error/fault path */
+               goto done_unlocked;
        }
 
 done:
+       mutex_unlock(&con->mutex);
+
+done_unlocked:
        clear_bit(BUSY, &con->state);
        dout("con->state=%lu\n", con->state);
        if (test_bit(QUEUED, &con->state)) {
@@ -1985,9 +1987,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        }
 
        /* set src+dst */
-       msg->hdr.src.name = con->msgr->inst.name;
-       msg->hdr.src.addr = con->msgr->my_enc_addr;
-       msg->hdr.orig_src = msg->hdr.src;
+       msg->hdr.src = con->msgr->inst.name;
 
        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));