aboutsummaryrefslogtreecommitdiffstats
path: root/src/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/connection.c')
-rw-r--r--src/connection.c341
1 files changed, 270 insertions, 71 deletions
diff --git a/src/connection.c b/src/connection.c
index a58b7a7..8870fd2 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -26,6 +26,7 @@
#define _GNU_SOURCE
+#include <assert.h>
#include <math.h>
#include <stdlib.h>
#include <stdint.h>
@@ -55,12 +56,12 @@ div_roundup(uint32_t n, size_t a)
}
struct wl_ring_buffer {
- char data[4096];
- uint32_t head, tail;
+ char *data;
+ size_t head, tail;
+ uint32_t size_bits;
+ uint32_t max_size_bits; /* 0 for unlimited */
};
-#define MASK(i) ((i) & 4095)
-
#define MAX_FDS_OUT 28
#define CLEN (CMSG_LEN(MAX_FDS_OUT * sizeof(int32_t)))
@@ -71,26 +72,38 @@ struct wl_connection {
int want_flush;
};
+static inline size_t
+size_pot(uint32_t size_bits)
+{
+ assert(size_bits < 8 * sizeof(size_t));
+
+ return ((size_t)1) << size_bits;
+}
+
+static size_t
+ring_buffer_capacity(const struct wl_ring_buffer *b) {
+ return size_pot(b->size_bits);
+}
+
+static size_t
+ring_buffer_mask(const struct wl_ring_buffer *b, size_t i) {
+ size_t m = ring_buffer_capacity(b) - 1;
+ return i & m;
+}
+
static int
ring_buffer_put(struct wl_ring_buffer *b, const void *data, size_t count)
{
- uint32_t head, size;
-
- if (count > sizeof(b->data)) {
- wl_log("Data too big for buffer (%d > %d).\n",
- count, sizeof(b->data));
- errno = E2BIG;
- return -1;
- }
+ size_t head, size;
if (count == 0)
return 0;
- head = MASK(b->head);
- if (head + count <= sizeof b->data) {
+ head = ring_buffer_mask(b, b->head);
+ if (head + count <= ring_buffer_capacity(b)) {
memcpy(b->data + head, data, count);
} else {
- size = sizeof b->data - head;
+ size = ring_buffer_capacity(b) - head;
memcpy(b->data + head, data, size);
memcpy(b->data, (const char *) data + size, count - size);
}
@@ -103,21 +116,21 @@ ring_buffer_put(struct wl_ring_buffer *b, const void *data, size_t count)
static void
ring_buffer_put_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
{
- uint32_t head, tail;
+ size_t head, tail;
- head = MASK(b->head);
- tail = MASK(b->tail);
+ head = ring_buffer_mask(b, b->head);
+ tail = ring_buffer_mask(b, b->tail);
if (head < tail) {
iov[0].iov_base = b->data + head;
iov[0].iov_len = tail - head;
*count = 1;
} else if (tail == 0) {
iov[0].iov_base = b->data + head;
- iov[0].iov_len = sizeof b->data - head;
+ iov[0].iov_len = ring_buffer_capacity(b) - head;
*count = 1;
} else {
iov[0].iov_base = b->data + head;
- iov[0].iov_len = sizeof b->data - head;
+ iov[0].iov_len = ring_buffer_capacity(b) - head;
iov[1].iov_base = b->data;
iov[1].iov_len = tail;
*count = 2;
@@ -127,21 +140,21 @@ ring_buffer_put_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
static void
ring_buffer_get_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
{
- uint32_t head, tail;
+ size_t head, tail;
- head = MASK(b->head);
- tail = MASK(b->tail);
+ head = ring_buffer_mask(b, b->head);
+ tail = ring_buffer_mask(b, b->tail);
if (tail < head) {
iov[0].iov_base = b->data + tail;
iov[0].iov_len = head - tail;
*count = 1;
} else if (head == 0) {
iov[0].iov_base = b->data + tail;
- iov[0].iov_len = sizeof b->data - tail;
+ iov[0].iov_len = ring_buffer_capacity(b) - tail;
*count = 1;
} else {
iov[0].iov_base = b->data + tail;
- iov[0].iov_len = sizeof b->data - tail;
+ iov[0].iov_len = ring_buffer_capacity(b) - tail;
iov[1].iov_base = b->data;
iov[1].iov_len = head;
*count = 2;
@@ -151,29 +164,158 @@ ring_buffer_get_iov(struct wl_ring_buffer *b, struct iovec *iov, int *count)
static void
ring_buffer_copy(struct wl_ring_buffer *b, void *data, size_t count)
{
- uint32_t tail, size;
+ size_t tail, size;
if (count == 0)
return;
- tail = MASK(b->tail);
- if (tail + count <= sizeof b->data) {
+ tail = ring_buffer_mask(b, b->tail);
+ if (tail + count <= ring_buffer_capacity(b)) {
memcpy(data, b->data + tail, count);
} else {
- size = sizeof b->data - tail;
+ size = ring_buffer_capacity(b) - tail;
memcpy(data, b->data + tail, size);
memcpy((char *) data + size, b->data, count - size);
}
}
-static uint32_t
+static size_t
ring_buffer_size(struct wl_ring_buffer *b)
{
return b->head - b->tail;
}
+static char *
+ring_buffer_tail(const struct wl_ring_buffer *b)
+{
+ return b->data + ring_buffer_mask(b, b->tail);
+}
+
+static uint32_t
+get_max_size_bits_for_size(size_t buffer_size)
+{
+ uint32_t max_size_bits = WL_BUFFER_DEFAULT_SIZE_POT;
+
+ /* buffer_size == 0 means unbound buffer size */
+ if (buffer_size == 0)
+ return 0;
+
+ while (max_size_bits < 8 * sizeof(size_t) && size_pot(max_size_bits) < buffer_size)
+ max_size_bits++;
+
+ return max_size_bits;
+}
+
+static int
+ring_buffer_allocate(struct wl_ring_buffer *b, size_t size_bits)
+{
+ char *new_data;
+
+ new_data = calloc(size_pot(size_bits), 1);
+ if (!new_data)
+ return -1;
+
+ ring_buffer_copy(b, new_data, ring_buffer_size(b));
+ free(b->data);
+ b->data = new_data;
+ b->size_bits = size_bits;
+ b->head = ring_buffer_size(b);
+ b->tail = 0;
+
+ return 0;
+}
+
+static size_t
+ring_buffer_get_bits_for_size(struct wl_ring_buffer *b, size_t net_size)
+{
+ size_t max_size_bits = get_max_size_bits_for_size(net_size);
+
+ if (max_size_bits < WL_BUFFER_DEFAULT_SIZE_POT)
+ max_size_bits = WL_BUFFER_DEFAULT_SIZE_POT;
+
+ if (b->max_size_bits > 0 && max_size_bits > b->max_size_bits)
+ max_size_bits = b->max_size_bits;
+
+ return max_size_bits;
+}
+
+static bool
+ring_buffer_is_max_size_reached(struct wl_ring_buffer *b)
+{
+ size_t net_size = ring_buffer_size(b) + 1;
+ size_t size_bits = ring_buffer_get_bits_for_size(b, net_size);
+
+ return net_size >= size_pot(size_bits);
+}
+
+static int
+ring_buffer_ensure_space(struct wl_ring_buffer *b, size_t count)
+{
+ size_t net_size = ring_buffer_size(b) + count;
+ size_t size_bits = ring_buffer_get_bits_for_size(b, net_size);
+
+ /* The 'size_bits' value represents the required size (in POT) to store
+ * 'net_size', which depending whether the buffers are bounded or not
+ * might not be sufficient (i.e. we might have reached the maximum size
+ * allowed).
+ */
+ if (net_size > size_pot(size_bits)) {
+ wl_log("Data too big for buffer (%d + %zd > %zd).\n",
+ ring_buffer_size(b), count, size_pot(size_bits));
+ errno = E2BIG;
+ return -1;
+ }
+
+ /* The following test here is a short-cut to avoid reallocating a buffer
+ * of the same size.
+ */
+ if (size_bits == b->size_bits)
+ return 0;
+
+ /* Otherwise, we (re)allocate the buffer to match the required size */
+ return ring_buffer_allocate(b, size_bits);
+}
+
+static void
+ring_buffer_close_fds(struct wl_ring_buffer *buffer, int32_t count)
+{
+ int32_t i, *p;
+ size_t size, tail;
+
+ size = ring_buffer_capacity(buffer);
+ tail = ring_buffer_mask(buffer, buffer->tail);
+ p = (int32_t *) (buffer->data + tail);
+
+ for (i = 0; i < count; i++) {
+ if (p >= (int32_t *) (buffer->data + size))
+ p = (int32_t *) buffer->data;
+ close(*p++);
+ }
+}
+
+void
+wl_connection_set_max_buffer_size(struct wl_connection *connection,
+ size_t max_buffer_size)
+{
+ uint32_t max_size_bits;
+
+ max_size_bits = get_max_size_bits_for_size(max_buffer_size);
+
+ connection->fds_in.max_size_bits = max_size_bits;
+ ring_buffer_ensure_space(&connection->fds_in, 0);
+
+ connection->fds_out.max_size_bits = max_size_bits;
+ ring_buffer_ensure_space(&connection->fds_out, 0);
+
+ connection->in.max_size_bits = max_size_bits;
+ ring_buffer_ensure_space(&connection->in, 0);
+
+ connection->out.max_size_bits = max_size_bits;
+ ring_buffer_ensure_space(&connection->out, 0);
+}
+
struct wl_connection *
-wl_connection_create(int fd)
+wl_connection_create(int fd, size_t max_buffer_size)
{
struct wl_connection *connection;
@@ -181,6 +323,8 @@ wl_connection_create(int fd)
if (connection == NULL)
return NULL;
+ wl_connection_set_max_buffer_size(connection, max_buffer_size);
+
connection->fd = fd;
return connection;
@@ -189,20 +333,20 @@ wl_connection_create(int fd)
static void
close_fds(struct wl_ring_buffer *buffer, int max)
{
- int32_t fds[sizeof(buffer->data) / sizeof(int32_t)], i, count;
size_t size;
+ int32_t count;
size = ring_buffer_size(buffer);
if (size == 0)
return;
- ring_buffer_copy(buffer, fds, size);
- count = size / sizeof fds[0];
+ count = size / sizeof(int32_t);
if (max > 0 && max < count)
count = max;
- size = count * sizeof fds[0];
- for (i = 0; i < count; i++)
- close(fds[i]);
+
+ ring_buffer_close_fds(buffer, count);
+
+ size = count * sizeof(int32_t);
buffer->tail += size;
}
@@ -218,7 +362,13 @@ wl_connection_destroy(struct wl_connection *connection)
int fd = connection->fd;
close_fds(&connection->fds_out, -1);
+ free(connection->fds_out.data);
+ free(connection->out.data);
+
close_fds(&connection->fds_in, -1);
+ free(connection->fds_in.data);
+ free(connection->in.data);
+
free(connection);
return fd;
@@ -262,7 +412,7 @@ static int
decode_cmsg(struct wl_ring_buffer *buffer, struct msghdr *msg)
{
struct cmsghdr *cmsg;
- size_t size, max, i;
+ size_t size, i;
int overflow = 0;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL;
@@ -272,8 +422,8 @@ decode_cmsg(struct wl_ring_buffer *buffer, struct msghdr *msg)
continue;
size = cmsg->cmsg_len - CMSG_LEN(0);
- max = sizeof(buffer->data) - ring_buffer_size(buffer);
- if (size > max || overflow) {
+
+ if (ring_buffer_ensure_space(buffer, size) < 0 || overflow) {
overflow = 1;
size /= sizeof(int32_t);
for (i = 0; i < size; i++)
@@ -299,17 +449,40 @@ wl_connection_flush(struct wl_connection *connection)
char cmsg[CLEN];
int len = 0, count;
size_t clen;
- uint32_t tail;
+ size_t tail;
if (!connection->want_flush)
return 0;
tail = connection->out.tail;
- while (connection->out.head - connection->out.tail > 0) {
- ring_buffer_get_iov(&connection->out, iov, &count);
-
+ while (ring_buffer_size(&connection->out) > 0) {
build_cmsg(&connection->fds_out, cmsg, &clen);
+ if (clen >= CLEN) {
+ /* UNIX domain sockets allows to send file descriptors
+ * using ancillary data.
+ *
+ * As per the UNIX domain sockets man page (man 7 unix),
+ * "at least one byte of real data should be sent when
+ * sending ancillary data".
+ *
+ * This is why we send only a single byte here, to ensure
+ * all file descriptors are sent before the bytes are
+ * cleared out.
+ *
+ * Otherwise This can fail to clear the file descriptors
+ * first if individual messages are allowed to have 224
+ * (8 bytes * MAX_FDS_OUT = 224) file descriptors .
+ */
+ iov[0].iov_base = ring_buffer_tail(&connection->out);
+ iov[0].iov_len = 1;
+ count = 1;
+ } else {
+ ring_buffer_get_iov(&connection->out, iov, &count);
+ }
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = count;
msg.msg_control = (clen > 0) ? cmsg : NULL;
@@ -347,35 +520,48 @@ wl_connection_read(struct wl_connection *connection)
char cmsg[CLEN];
int len, count, ret;
- if (ring_buffer_size(&connection->in) >= sizeof(connection->in.data)) {
- errno = EOVERFLOW;
- return -1;
- }
+ while (1) {
+ int data_size = ring_buffer_size(&connection->in);
- ring_buffer_put_iov(&connection->in, iov, &count);
+ /* Stop once we've read the max buffer size. */
+ if (ring_buffer_is_max_size_reached(&connection->in))
+ return data_size;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = count;
- msg.msg_control = cmsg;
- msg.msg_controllen = sizeof cmsg;
- msg.msg_flags = 0;
+ if (ring_buffer_ensure_space(&connection->in, 1) < 0)
+ return -1;
- do {
- len = wl_os_recvmsg_cloexec(connection->fd, &msg, MSG_DONTWAIT);
- } while (len < 0 && errno == EINTR);
+ ring_buffer_put_iov(&connection->in, iov, &count);
- if (len <= 0)
- return len;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = count;
+ msg.msg_control = cmsg;
+ msg.msg_controllen = sizeof cmsg;
+ msg.msg_flags = 0;
- ret = decode_cmsg(&connection->fds_in, &msg);
- if (ret)
- return -1;
+ do {
+ len = wl_os_recvmsg_cloexec(connection->fd, &msg, MSG_DONTWAIT);
+ } while (len < 0 && errno == EINTR);
+
+ if (len == 0) {
+ /* EOF, return previously read data first */
+ return data_size;
+ }
+ if (len < 0) {
+ if (errno == EAGAIN && data_size > 0) {
+ /* nothing new read, return previously read data */
+ return data_size;
+ }
+ return len;
+ }
- connection->in.head += len;
+ ret = decode_cmsg(&connection->fds_in, &msg);
+ if (ret)
+ return -1;
- return wl_connection_pending_input(connection);
+ connection->in.head += len;
+ }
}
int
@@ -394,13 +580,23 @@ int
wl_connection_queue(struct wl_connection *connection,
const void *data, size_t count)
{
- if (connection->out.head - connection->out.tail +
- count > ARRAY_LENGTH(connection->out.data)) {
+ /* We want to try to flush when the buffer reaches the default maximum
+ * size even if the buffer has been previously expanded.
+ *
+ * Otherwise the larger buffer will cause us to flush less frequently,
+ * which could increase lag.
+ *
+ * We'd like to flush often and get the buffer size back down if possible.
+ */
+ if (ring_buffer_size(&connection->out) + count > WL_BUFFER_DEFAULT_MAX_SIZE) {
connection->want_flush = 1;
- if (wl_connection_flush(connection) < 0)
+ if (wl_connection_flush(connection) < 0 && errno != EAGAIN)
return -1;
}
+ if (ring_buffer_ensure_space(&connection->out, count) < 0)
+ return -1;
+
return ring_buffer_put(&connection->out, data, count);
}
@@ -426,12 +622,15 @@ wl_connection_get_fd(struct wl_connection *connection)
static int
wl_connection_put_fd(struct wl_connection *connection, int32_t fd)
{
- if (ring_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd) {
+ if (ring_buffer_size(&connection->fds_out) >= MAX_FDS_OUT * sizeof fd) {
connection->want_flush = 1;
- if (wl_connection_flush(connection) < 0)
+ if (wl_connection_flush(connection) < 0 && errno != EAGAIN)
return -1;
}
+ if (ring_buffer_ensure_space(&connection->fds_out, sizeof fd) < 0)
+ return -1;
+
return ring_buffer_put(&connection->fds_out, &fd, sizeof fd);
}