diff options
| author | Kristian Høgsberg <krh@bitplanet.net> | 2012-10-04 16:54:22 -0400 |
|---|---|---|
| committer | Kristian Høgsberg <krh@bitplanet.net> | 2012-10-10 20:59:00 -0400 |
| commit | 53d24713a31d59d9534c1c1a84a7ad46f44ee95f (patch) | |
| tree | 0e550fa29c2e5b9ccc757b307dfed026143c3a24 /src/connection.c | |
| parent | Ensure cursor_data.c is included in distribution tarballs (diff) | |
| download | wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar.gz wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar.bz2 wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar.lz wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar.xz wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.tar.zst wayland-53d24713a31d59d9534c1c1a84a7ad46f44ee95f.zip | |
Change filedescriptor API to be thread safe
The update callback for the file descriptors was always a bit awkward and
un-intuitive. The idea was that whenever the protocol code needed to
write data to the fd it would call the 'update' function. This function
would adjust the mainloop so that it polls for POLLOUT on the fd so we
can eventually flush the data to the socket.
The problem is that in multi-threaded applications, any thread can issue
a request, which writes data to the output buffer and thus triggers the
update callback. Thus, we'll be calling out with the display mutex
held and may call from any thread.
The solution is to eliminate the udpate callback and just require that
the application or server flushes all connection buffers before blocking.
This turns out to be a simpler API, although we now require clients to
deal with EAGAIN and non-blocking writes. It also saves a few syscalls,
since the socket will be writable most of the time and most writes will
complete, so we avoid changing epoll to poll for POLLOUT, then write and
then change it back for each write.
Diffstat (limited to 'src/connection.c')
| -rw-r--r-- | src/connection.c | 121 |
1 files changed, 53 insertions, 68 deletions
diff --git a/src/connection.c b/src/connection.c index 8f4b44c..ce6fac6 100644 --- a/src/connection.c +++ b/src/connection.c @@ -56,9 +56,7 @@ struct wl_connection { struct wl_buffer in, out; struct wl_buffer fds_in, fds_out; int fd; - void *data; - wl_connection_update_func_t update; - int write_signalled; + int want_flush; }; union wl_value { @@ -156,9 +154,7 @@ wl_buffer_size(struct wl_buffer *b) } struct wl_connection * -wl_connection_create(int fd, - wl_connection_update_func_t update, - void *data) +wl_connection_create(int fd) { struct wl_connection *connection; @@ -167,12 +163,6 @@ wl_connection_create(int fd, return NULL; memset(connection, 0, sizeof *connection); connection->fd = fd; - connection->update = update; - connection->data = data; - - connection->update(connection, - WL_CONNECTION_READABLE, - connection->data); return connection; } @@ -249,14 +239,19 @@ decode_cmsg(struct wl_buffer *buffer, struct msghdr *msg) } int -wl_connection_data(struct wl_connection *connection, uint32_t mask) +wl_connection_flush(struct wl_connection *connection) { struct iovec iov[2]; struct msghdr msg; char cmsg[CLEN]; - int len, count, clen; + int len = 0, count, clen; + uint32_t tail; + + if (!connection->want_flush) + return 0; - if (mask & WL_CONNECTION_WRITABLE) { + tail = connection->out.tail; + while (connection->out.head - connection->out.tail > 0) { wl_buffer_get_iov(&connection->out, iov, &count); build_cmsg(&connection->fds_out, cmsg, &clen); @@ -272,58 +267,49 @@ wl_connection_data(struct wl_connection *connection, uint32_t mask) do { len = sendmsg(connection->fd, &msg, MSG_NOSIGNAL | MSG_DONTWAIT); - } while (len < 0 && errno == EINTR); + } while (len == -1 && errno == EINTR); - if (len == -1 && errno == EPIPE) { + if (len == -1) return -1; - } else if (len < 0) { - fprintf(stderr, - "write error for connection %p, fd %d: %m\n", - connection, connection->fd); - return -1; - } close_fds(&connection->fds_out); connection->out.tail += len; - if (connection->out.tail == connection->out.head && - connection->write_signalled) { - connection->update(connection, - WL_CONNECTION_READABLE, - connection->data); - connection->write_signalled = 0; - } } - if (mask & WL_CONNECTION_READABLE) { - wl_buffer_put_iov(&connection->in, iov, &count); + connection->want_flush = 0; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = count; - msg.msg_control = cmsg; - msg.msg_controllen = sizeof cmsg; - msg.msg_flags = 0; + return connection->out.head - tail; +} - do { - len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0); - } while (len < 0 && errno == EINTR); +int +wl_connection_read(struct wl_connection *connection) +{ + struct iovec iov[2]; + struct msghdr msg; + char cmsg[CLEN]; + int len, count; - if (len < 0) { - fprintf(stderr, - "read error from connection %p: %m (%d)\n", - connection, errno); - return -1; - } else if (len == 0) { - /* FIXME: Handle this better? */ - return -1; - } + wl_buffer_put_iov(&connection->in, iov, &count); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = count; + msg.msg_control = cmsg; + msg.msg_controllen = sizeof cmsg; + msg.msg_flags = 0; - decode_cmsg(&connection->fds_in, &msg); + do { + len = wl_os_recvmsg_cloexec(connection->fd, &msg, 0); + } while (len < 0 && errno == EINTR); - connection->in.head += len; - } + if (len <= 0) + return len; + + decode_cmsg(&connection->fds_in, &msg); + + connection->in.head += len; return connection->in.head - connection->in.tail; } @@ -333,19 +319,14 @@ wl_connection_write(struct wl_connection *connection, const void *data, size_t count) { if (connection->out.head - connection->out.tail + - count > ARRAY_LENGTH(connection->out.data)) - if (wl_connection_data(connection, WL_CONNECTION_WRITABLE)) + count > ARRAY_LENGTH(connection->out.data)) { + connection->want_flush = 1; + if (wl_connection_flush(connection) < 0) return -1; + } wl_buffer_put(&connection->out, data, count); - - if (!connection->write_signalled) { - connection->update(connection, - WL_CONNECTION_READABLE | - WL_CONNECTION_WRITABLE, - connection->data); - connection->write_signalled = 1; - } + connection->want_flush = 1; return 0; } @@ -355,9 +336,11 @@ wl_connection_queue(struct wl_connection *connection, const void *data, size_t count) { if (connection->out.head - connection->out.tail + - count > ARRAY_LENGTH(connection->out.data)) - if (wl_connection_data(connection, WL_CONNECTION_WRITABLE)) + count > ARRAY_LENGTH(connection->out.data)) { + connection->want_flush = 1; + if (wl_connection_flush(connection) < 0) return -1; + } wl_buffer_put(&connection->out, data, count); @@ -394,9 +377,11 @@ wl_message_size_extra(const struct wl_message *message) static int wl_connection_put_fd(struct wl_connection *connection, int32_t fd) { - if (wl_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd) - if (wl_connection_data(connection, WL_CONNECTION_WRITABLE)) + if (wl_buffer_size(&connection->fds_out) == MAX_FDS_OUT * sizeof fd) { + connection->want_flush = 1; + if (wl_connection_flush(connection) < 0) return -1; + } wl_buffer_put(&connection->fds_out, &fd, sizeof fd); |
