return ret;
}
-struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool)
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
{
wait_queue_t wait;
struct ceph_msg *msg;
+ if (front_len && front_len > pool->front_len) {
+ pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
+ pool, front_len, pool->front_len);
+ WARN_ON(1);
+
+ /* try to alloc a fresh message */
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
+ if (!IS_ERR(msg))
+ return msg;
+ }
+
+ if (!front_len)
+ front_len = pool->front_len;
+
if (pool->blocking) {
/* mempool_t behavior; first try to alloc */
- msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
if (!IS_ERR(msg))
return msg;
}
return msg;
}
pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
- pool->min, pool->blocking ? "waiting" : "failing");
+ pool->min, pool->blocking ? "waiting" : "may fail");
spin_unlock(&pool->lock);
if (!pool->blocking) {
WARN_ON(1);
/* maybe we can allocate it now? */
- msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
if (!IS_ERR(msg))
return msg;
+ pr_err("msgpool_get %p empty + alloc failed\n", pool);
return ERR_PTR(-ENOMEM);
}
{
spin_lock(&pool->lock);
if (pool->num < pool->min) {
- ceph_msg_get(msg); /* retake a single ref */
+ /* reset msg front_len; user may have changed it */
+ msg->front.iov_len = pool->front_len;
+ msg->hdr.front_len = cpu_to_le32(pool->front_len);
+
+ kref_set(&msg->kref, 1); /* retake a single ref */
list_add(&msg->list_head, &pool->msgs);
pool->num++;
dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,