~emersion/basu

2b7565c667f5d9ce8e582abf3f951b49c1d62f03 — Kenny Levinsen 3 years ago a4ea535
sd-bus: Remove BUS_WATCH_BIND
5 files changed, 35 insertions(+), 588 deletions(-)

M src/libsystemd/sd-bus/bus-internal.h
M src/libsystemd/sd-bus/bus-socket.c
M src/libsystemd/sd-bus/bus-socket.h
M src/libsystemd/sd-bus/sd-bus.c
D src/libsystemd/sd-bus/test-bus-watch-bind.c
M src/libsystemd/sd-bus/bus-internal.h => src/libsystemd/sd-bus/bus-internal.h +0 -8
@@ 150,7 150,6 @@ struct sd_bus_slot {

enum bus_state {
        BUS_UNSET,
        BUS_WATCH_BIND,      /* waiting for the socket to appear via inotify */
        BUS_OPENING,         /* the kernel's connect() is still not ready */
        BUS_AUTHENTICATING,  /* we are currently in the "SASL" authorization phase of dbus */
        BUS_HELLO,           /* we are waiting for the Hello() response */


@@ 183,7 182,6 @@ struct sd_bus {

        enum bus_state state;
        int input_fd, output_fd;
        int inotify_fd;
        int message_version;
        int message_endian;



@@ 293,7 291,6 @@ struct sd_bus {
        sd_event_source *output_io_event_source;
        sd_event_source *time_event_source;
        sd_event_source *quit_event_source;
        sd_event_source *inotify_event_source;
        sd_event *event;
        int event_priority;



@@ 315,9 312,6 @@ struct sd_bus {
        LIST_HEAD(sd_bus_slot, slots);
        LIST_HEAD(sd_bus_track, tracks);

        int *inotify_watches;
        size_t n_inotify_watches;

        /* zero means use value specified by $SYSTEMD_BUS_TIMEOUT= environment variable or built-in default */
        usec_t method_call_timeout;
};


@@ 377,9 371,7 @@ bool bus_pid_changed(sd_bus *bus);
char *bus_address_escape(const char *v);

int bus_attach_io_events(sd_bus *b);
int bus_attach_inotify_event(sd_bus *b);

void bus_close_inotify_fd(sd_bus *b);
void bus_close_io_fds(sd_bus *b);

#define OBJECT_PATH_FOREACH_PREFIX(prefix, path)                        \

M src/libsystemd/sd-bus/bus-socket.c => src/libsystemd/sd-bus/bus-socket.c +4 -238
@@ 664,187 664,7 @@ int bus_socket_start_auth(sd_bus *b) {
                return bus_socket_start_auth_client(b);
}

static int bus_socket_inotify_setup(sd_bus *b) {
        _cleanup_free_ int *new_watches = NULL;
        _cleanup_free_ char *absolute = NULL;
        size_t n_allocated = 0, n = 0, done = 0, i;
        unsigned max_follow = 32;
        const char *p;
        int wd, r;

        assert(b);
        assert(b->watch_bind);
        assert(b->sockaddr.sa.sa_family == AF_UNIX);
        assert(b->sockaddr.un.sun_path[0] != 0);

        /* Sets up an inotify fd in case watch_bind is enabled: wait until the configured AF_UNIX file system socket
         * appears before connecting to it. The implemented is pretty simplistic: we just subscribe to relevant changes
         * to all prefix components of the path, and every time we get an event for that we try to reconnect again,
         * without actually caring what precisely the event we got told us. If we still can't connect we re-subscribe
         * to all relevant changes of anything in the path, so that our watches include any possibly newly created path
         * components. */

        if (b->inotify_fd < 0) {
                b->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);
                if (b->inotify_fd < 0)
                        return -errno;

                b->inotify_fd = fd_move_above_stdio(b->inotify_fd);
        }

        /* Make sure the path is NUL terminated */
        p = strndupa(b->sockaddr.un.sun_path, sizeof(b->sockaddr.un.sun_path));

        /* Make sure the path is absolute */
        r = path_make_absolute_cwd(p, &absolute);
        if (r < 0)
                goto fail;

        /* Watch all parent directories, and don't mind any prefix that doesn't exist yet. For the innermost directory
         * that exists we want to know when files are created or moved into it. For all parents of it we just care if
         * they are removed or renamed. */

        if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
                r = -ENOMEM;
                goto fail;
        }

        /* Start with the top-level directory, which is a bit simpler than the rest, since it can't be a symlink, and
         * always exists */
        wd = inotify_add_watch(b->inotify_fd, "/", IN_CREATE|IN_MOVED_TO);
        if (wd < 0) {
                r = log_debug_errno(errno, "Failed to add inotify watch on /: %m");
                goto fail;
        } else
                new_watches[n++] = wd;

        for (;;) {
                _cleanup_free_ char *component = NULL, *prefix = NULL, *destination = NULL;
                size_t n_slashes, n_component;
                char *c = NULL;

                n_slashes = strspn(absolute + done, "/");
                n_component = n_slashes + strcspn(absolute + done + n_slashes, "/");

                if (n_component == 0) /* The end */
                        break;

                component = strndup(absolute + done, n_component);
                if (!component) {
                        r = -ENOMEM;
                        goto fail;
                }

                /* A trailing slash? That's a directory, and not a socket then */
                if (path_equal(component, "/")) {
                        r = -EISDIR;
                        goto fail;
                }

                /* A single dot? Let's eat this up */
                if (path_equal(component, "/.")) {
                        done += n_component;
                        continue;
                }

                prefix = strndup(absolute, done + n_component);
                if (!prefix) {
                        r = -ENOMEM;
                        goto fail;
                }

                if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
                        r = -ENOMEM;
                        goto fail;
                }

                wd = inotify_add_watch(b->inotify_fd, prefix, IN_DELETE_SELF|IN_MOVE_SELF|IN_ATTRIB|IN_CREATE|IN_MOVED_TO|IN_DONT_FOLLOW);
                log_debug("Added inotify watch for %s on bus %s: %i", prefix, strna(b->description), wd);

                if (wd < 0) {
                        if (IN_SET(errno, ENOENT, ELOOP))
                                break; /* This component doesn't exist yet, or the path contains a cyclic symlink right now */

                        r = log_debug_errno(errno, "Failed to add inotify watch on %s: %m", empty_to_root(prefix));
                        goto fail;
                } else
                        new_watches[n++] = wd;

                /* Check if this is possibly a symlink. If so, let's follow it and watch it too. */
                r = readlink_malloc(prefix, &destination);
                if (r == -EINVAL) { /* not a symlink */
                        done += n_component;
                        continue;
                }
                if (r < 0)
                        goto fail;

                if (isempty(destination)) { /* Empty symlink target? Yuck! */
                        r = -EINVAL;
                        goto fail;
                }

                if (max_follow <= 0) { /* Let's make sure we don't follow symlinks forever */
                        r = -ELOOP;
                        goto fail;
                }

                if (path_is_absolute(destination)) {
                        /* For absolute symlinks we build the new path and start anew */
                        c = strjoin(destination, absolute + done + n_component);
                        done = 0;
                } else {
                        _cleanup_free_ char *t = NULL;

                        /* For relative symlinks we replace the last component, and try again */
                        t = strndup(absolute, done);
                        if (!t)
                                return -ENOMEM;

                        c = strjoin(t, "/", destination, absolute + done + n_component);
                }
                if (!c) {
                        r = -ENOMEM;
                        goto fail;
                }

                free(absolute);
                absolute = c;

                max_follow--;
        }

        /* And now, let's remove all watches from the previous iteration we don't need anymore */
        for (i = 0; i < b->n_inotify_watches; i++) {
                bool found = false;
                size_t j;

                for (j = 0; j < n; j++)
                        if (new_watches[j] == b->inotify_watches[i]) {
                                found = true;
                                break;
                        }

                if (found)
                        continue;

                (void) inotify_rm_watch(b->inotify_fd, b->inotify_watches[i]);
        }

        free_and_replace(b->inotify_watches, new_watches);
        b->n_inotify_watches = n;

        return 0;

fail:
        bus_close_inotify_fd(b);
        return r;
}

int bus_socket_connect(sd_bus *b) {
        bool inotify_done = false;
        int r;

        assert(b);

        for (;;) {


@@ 864,13 684,10 @@ int bus_socket_connect(sd_bus *b) {
                if (connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size) < 0) {
                        if (errno == EINPROGRESS) {

                                /* If we have any inotify watches open, close them now, we don't need them anymore, as
                                 * we have successfully initiated a connection */
                                bus_close_inotify_fd(b);

                                /* Note that very likely we are already in BUS_OPENING state here, as we enter it when
                                 * we start parsing the address string. The only reason we set the state explicitly
                                 * here, is to undo BUS_WATCH_BIND, in case we did the inotify magic. */
                                 * here, is to undo BUS_WATCH_BIND, in case we did the inotify magic.
                                 * basu note: We no longer have BUS_WATCH_BIND */
                                bus_set_state(b, BUS_OPENING);
                                return 1;
                        }


@@ 884,33 701,12 @@ int bus_socket_connect(sd_bus *b) {
                                 * fresh one when reconnecting. */
                                bus_close_io_fds(b);

                                if (inotify_done) {
                                        /* inotify set up already, don't do it again, just return now, and remember
                                         * that we are waiting for inotify events now. */
                                        bus_set_state(b, BUS_WATCH_BIND);
                                        return 1;
                                }

                                /* This is a file system socket, and the inotify logic is enabled. Let's create the necessary inotify fd. */
                                r = bus_socket_inotify_setup(b);
                                if (r < 0)
                                        return r;

                                /* Let's now try to connect a second time, because in theory there's otherwise a race
                                 * here: the socket might have been created in the time between our first connect() and
                                 * the time we set up the inotify logic. But let's remember that we set up inotify now,
                                 * so that we don't do the connect() more than twice. */
                                inotify_done = true;

                        } else
                                return -errno;
                        }
                        return -errno;
                } else
                        break;
        }

        /* Yay, established, we don't need no inotify anymore! */
        bus_close_inotify_fd(b);

        return bus_socket_start_auth(b);
}



@@ 1261,33 1057,3 @@ int bus_socket_process_authenticating(sd_bus *b) {
        return bus_socket_read_auth(b);
}

int bus_socket_process_watch_bind(sd_bus *b) {
        int r, q;

        assert(b);
        assert(b->state == BUS_WATCH_BIND);
        assert(b->inotify_fd >= 0);

        r = flush_fd(b->inotify_fd);
        if (r <= 0)
                return r;

        log_debug("Got inotify event on bus %s.", strna(b->description));

        /* We flushed events out of the inotify fd. In that case, maybe the socket is valid now? Let's try to connect
         * to it again */

        r = bus_socket_connect(b);
        if (r < 0)
                return r;

        q = bus_attach_io_events(b);
        if (q < 0)
                return q;

        q = bus_attach_inotify_event(b);
        if (q < 0)
                return q;

        return r;
}

M src/libsystemd/sd-bus/bus-socket.h => src/libsystemd/sd-bus/bus-socket.h +0 -1
@@ 15,6 15,5 @@ int bus_socket_read_message(sd_bus *bus);

int bus_socket_process_opening(sd_bus *b);
int bus_socket_process_authenticating(sd_bus *b);
int bus_socket_process_watch_bind(sd_bus *b);

bool bus_socket_auth_needs_write(sd_bus *b);

M src/libsystemd/sd-bus/sd-bus.c => src/libsystemd/sd-bus/sd-bus.c +31 -120
@@ 57,7 57,6 @@

static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
static void bus_detach_io_events(sd_bus *b);
static void bus_detach_inotify_event(sd_bus *b);

static thread_local sd_bus *default_system_bus = NULL;
static thread_local sd_bus *default_user_bus = NULL;


@@ 124,16 123,6 @@ void bus_close_io_fds(sd_bus *b) {
        b->output_fd = b->input_fd = safe_close(b->input_fd);
}

void bus_close_inotify_fd(sd_bus *b) {
        assert(b);

        bus_detach_inotify_event(b);

        b->inotify_fd = safe_close(b->inotify_fd);
        b->inotify_watches = mfree(b->inotify_watches);
        b->n_inotify_watches = 0;
}

static void bus_reset_queues(sd_bus *b) {
        assert(b);



@@ 177,7 166,6 @@ static sd_bus* bus_free(sd_bus *b) {
                *b->default_bus_ptr = NULL;

        bus_close_io_fds(b);
        bus_close_inotify_fd(b);

        free(b->label);
        free(b->groups);


@@ 232,7 220,6 @@ _public_ int sd_bus_new(sd_bus **ret) {
                .n_ref = REFCNT_INIT,
                .input_fd = -1,
                .output_fd = -1,
                .inotify_fd = -1,
                .message_version = 1,
                .creds_mask = SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME,
                .accept_fd = true,


@@ 496,7 483,6 @@ void bus_set_state(sd_bus *bus, enum bus_state state) {

        static const char * const table[_BUS_STATE_MAX] = {
                [BUS_UNSET] = "UNSET",
                [BUS_WATCH_BIND] = "WATCH_BIND",
                [BUS_OPENING] = "OPENING",
                [BUS_AUTHENTICATING] = "AUTHENTICATING",
                [BUS_HELLO] = "HELLO",


@@ 1079,7 1065,6 @@ static int bus_start_address(sd_bus *b) {

        for (;;) {
                bus_close_io_fds(b);
                bus_close_inotify_fd(b);

                bus_kill_exec(b);



@@ 1103,10 1088,6 @@ static int bus_start_address(sd_bus *b) {
                        if (q < 0)
                                return q;

                        q = bus_attach_inotify_event(b);
                        if (q < 0)
                                return q;

                        return r;
                }



@@ 1543,7 1524,6 @@ _public_ void sd_bus_close(sd_bus *bus) {
        bus_reset_queues(bus);

        bus_close_io_fds(bus);
        bus_close_inotify_fd(bus);
}

_public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {


@@ 1562,7 1542,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
void bus_enter_closing(sd_bus *bus) {
        assert(bus);

        if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
        if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
                return;

        bus_set_state(bus, BUS_CLOSING);


@@ 1931,7 1911,7 @@ static usec_t calc_elapse(sd_bus *bus, uint64_t usec) {
         * with any connection setup states. Hence, if a method callback is started earlier than that we just store the
         * relative timestamp, and afterwards the absolute one. */

        if (IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING))
        if (IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING))
                return usec;
        else
                return now(CLOCK_MONOTONIC) + usec;


@@ 2220,9 2200,6 @@ _public_ int sd_bus_get_fd(sd_bus *bus) {
        if (bus->state == BUS_CLOSED)
                return -ENOTCONN;

        if (bus->inotify_fd >= 0)
                return bus->inotify_fd;

        if (bus->input_fd >= 0)
                return bus->input_fd;



@@ 2242,10 2219,6 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
        case BUS_CLOSED:
                return -ENOTCONN;

        case BUS_WATCH_BIND:
                flags |= POLLIN;
                break;

        case BUS_OPENING:
                flags |= POLLOUT;
                break;


@@ 2322,7 2295,6 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
                *timeout_usec = 0;
                return 1;

        case BUS_WATCH_BIND:
        case BUS_OPENING:
                *timeout_usec = (uint64_t) -1;
                return 0;


@@ 2929,10 2901,6 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit
        case BUS_CLOSED:
                return -ECONNRESET;

        case BUS_WATCH_BIND:
                r = bus_socket_process_watch_bind(bus);
                break;

        case BUS_OPENING:
                r = bus_socket_process_opening(bus);
                break;


@@ 2979,7 2947,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa

static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
        struct pollfd p[2] = {};
        int r, n;
        int r, n, e;
        struct timespec ts;
        usec_t m = USEC_INFINITY;



@@ 2991,46 2959,36 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
        if (!BUS_IS_OPEN(bus->state))
                return -ENOTCONN;

        if (bus->state == BUS_WATCH_BIND) {
                assert(bus->inotify_fd >= 0);
        e = sd_bus_get_events(bus);
        if (e < 0)
                return e;

        if (need_more)
                /* The caller really needs some more data, he doesn't
                 * care about what's already read, or any timeouts
                 * except its own. */
                e |= POLLIN;
        else {
                usec_t until;
                /* The caller wants to process if there's something to
                 * process, but doesn't care otherwise */

                r = sd_bus_get_timeout(bus, &until);
                if (r < 0)
                        return r;
                if (r > 0)
                        m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC));
        }

                p[0].events = POLLIN;
                p[0].fd = bus->inotify_fd;
        p[0].fd = bus->input_fd;
        if (bus->output_fd == bus->input_fd) {
                p[0].events = e;
                n = 1;
        } else {
                int e;

                e = sd_bus_get_events(bus);
                if (e < 0)
                        return e;

                if (need_more)
                        /* The caller really needs some more data, he doesn't
                         * care about what's already read, or any timeouts
                         * except its own. */
                        e |= POLLIN;
                else {
                        usec_t until;
                        /* The caller wants to process if there's something to
                         * process, but doesn't care otherwise */

                        r = sd_bus_get_timeout(bus, &until);
                        if (r < 0)
                                return r;
                        if (r > 0)
                                m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC));
                }

                p[0].fd = bus->input_fd;
                if (bus->output_fd == bus->input_fd) {
                        p[0].events = e;
                        n = 1;
                } else {
                        p[0].events = e & POLLIN;
                        p[1].fd = bus->output_fd;
                        p[1].events = e & POLLOUT;
                        n = 2;
                }
                p[0].events = e & POLLIN;
                p[1].fd = bus->output_fd;
                p[1].events = e & POLLOUT;
                n = 2;
        }

        if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m))


@@ 3074,10 3032,6 @@ _public_ int sd_bus_flush(sd_bus *bus) {
        if (!BUS_IS_OPEN(bus->state))
                return -ENOTCONN;

        /* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */
        if (bus->state == BUS_WATCH_BIND)
                return -EUNATCH;

        r = bus_ensure_running(bus);
        if (r < 0)
                return r;


@@ 3319,7 3273,7 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd

        assert(bus);

        /* Note that this is called both on input_fd, output_fd as well as inotify_fd events */
        /* Note that this is called both on input_fd and output_fd events */

        r = sd_bus_process(bus, NULL);
        if (r < 0) {


@@ 3478,44 3432,6 @@ static void bus_detach_io_events(sd_bus *bus) {
        }
}

int bus_attach_inotify_event(sd_bus *bus) {
        int r;

        assert(bus);

        if (bus->inotify_fd < 0)
                return 0;

        if (!bus->event)
                return 0;

        if (!bus->inotify_event_source) {
                r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus);
                if (r < 0)
                        return r;

                r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority);
                if (r < 0)
                        return r;

                r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify");
        } else
                r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd);
        if (r < 0)
                return r;

        return 0;
}

static void bus_detach_inotify_event(sd_bus *bus) {
        assert(bus);

        if (bus->inotify_event_source) {
                sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF);
                bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source);
        }
}

_public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
        int r;



@@ 3561,10 3477,6 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
        if (r < 0)
                goto fail;

        r = bus_attach_inotify_event(bus);
        if (r < 0)
                goto fail;

        return 0;

fail:


@@ 3580,7 3492,6 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
                return 0;

        bus_detach_io_events(bus);
        bus_detach_inotify_event(bus);

        if (bus->time_event_source) {
                sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);

D src/libsystemd/sd-bus/test-bus-watch-bind.c => src/libsystemd/sd-bus/test-bus-watch-bind.c +0 -221
@@ 1,221 0,0 @@
/* SPDX-License-Identifier: LGPL-2.1+ */

#include <pthread.h>

#include "sd-bus.h"
#include "sd-event.h"
#include "sd-id128.h"

#include "alloc-util.h"
#include "fd-util.h"
#include "fileio.h"
#include "fs-util.h"
#include "mkdir.h"
#include "path-util.h"
#include "random-util.h"
#include "rm-rf.h"
#include "socket-util.h"
#include "string-util.h"

static int method_foobar(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
        log_info("Got Foobar() call.");

        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
        return sd_bus_reply_method_return(m, NULL);
}

static int method_exit(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
        log_info("Got Exit() call");
        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 1) >= 0);
        return sd_bus_reply_method_return(m, NULL);
}

static const sd_bus_vtable vtable[] = {
        SD_BUS_VTABLE_START(0),
        SD_BUS_METHOD("Foobar", NULL, NULL, method_foobar, SD_BUS_VTABLE_UNPRIVILEGED),
        SD_BUS_METHOD("Exit", NULL, NULL, method_exit, SD_BUS_VTABLE_UNPRIVILEGED),
        SD_BUS_VTABLE_END,
};

static void* thread_server(void *p) {
        _cleanup_free_ char *suffixed = NULL, *suffixed2 = NULL, *d = NULL;
        _cleanup_close_ int fd = -1;
        union sockaddr_union u = {};
        const char *path = p;
        int salen;

        log_debug("Initializing server");

        /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
        (void) usleep(100 * USEC_PER_MSEC);

        assert_se(mkdir_parents(path, 0755) >= 0);
        (void) usleep(100 * USEC_PER_MSEC);

        d = dirname_malloc(path);
        assert_se(d);
        assert_se(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()) >= 0);
        assert_se(rename(d, suffixed) >= 0);
        (void) usleep(100 * USEC_PER_MSEC);

        assert_se(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()) >= 0);
        assert_se(symlink(suffixed2, d) >= 0);
        (void) usleep(100 * USEC_PER_MSEC);

        assert_se(symlink(basename(suffixed), suffixed2) >= 0);
        (void) usleep(100 * USEC_PER_MSEC);

        salen = sockaddr_un_set_path(&u.un, path);
        assert_se(salen >= 0);

        fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
        assert_se(fd >= 0);

        assert_se(bind(fd, &u.sa, salen) >= 0);
        usleep(100 * USEC_PER_MSEC);

        assert_se(listen(fd, SOMAXCONN) >= 0);
        usleep(100 * USEC_PER_MSEC);

        assert_se(touch(path) >= 0);
        usleep(100 * USEC_PER_MSEC);

        log_debug("Initialized server");

        for (;;) {
                _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
                _cleanup_(sd_event_unrefp) sd_event *event = NULL;
                sd_id128_t id;
                int bus_fd, code;

                assert_se(sd_id128_randomize(&id) >= 0);

                assert_se(sd_event_new(&event) >= 0);

                bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
                assert_se(bus_fd >= 0);

                log_debug("Accepted server connection");

                assert_se(sd_bus_new(&bus) >= 0);
                assert_se(sd_bus_set_description(bus, "server") >= 0);
                assert_se(sd_bus_set_fd(bus, bus_fd, bus_fd) >= 0);
                assert_se(sd_bus_set_server(bus, true, id) >= 0);
                /* assert_se(sd_bus_set_anonymous(bus, true) >= 0); */

                assert_se(sd_bus_attach_event(bus, event, 0) >= 0);

                assert_se(sd_bus_add_object_vtable(bus, NULL, "/foo", "foo.TestInterface", vtable, NULL) >= 0);

                assert_se(sd_bus_start(bus) >= 0);

                assert_se(sd_event_loop(event) >= 0);

                assert_se(sd_event_get_exit_code(event, &code) >= 0);

                if (code > 0)
                        break;
        }

        log_debug("Server done");

        return NULL;
}

static void* thread_client1(void *p) {
        _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
        const char *path = p, *t;
        int r;

        log_debug("Initializing client1");

        assert_se(sd_bus_new(&bus) >= 0);
        assert_se(sd_bus_set_description(bus, "client1") >= 0);

        t = strjoina("unix:path=", path);
        assert_se(sd_bus_set_address(bus, t) >= 0);
        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
        assert_se(sd_bus_start(bus) >= 0);

        r = sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", &error, NULL, NULL);
        assert_se(r >= 0);

        log_debug("Client1 done");

        return NULL;
}

static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
        assert_se(sd_bus_message_is_method_error(m, NULL) == 0);
        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
        return 0;
}

static void* thread_client2(void *p) {
        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
        const char *path = p, *t;

        log_debug("Initializing client2");

        assert_se(sd_event_new(&event) >= 0);
        assert_se(sd_bus_new(&bus) >= 0);
        assert_se(sd_bus_set_description(bus, "client2") >= 0);

        t = strjoina("unix:path=", path);
        assert_se(sd_bus_set_address(bus, t) >= 0);
        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
        assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
        assert_se(sd_bus_start(bus) >= 0);

        assert_se(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL) >= 0);

        assert_se(sd_event_loop(event) >= 0);

        log_debug("Client2 done");

        return NULL;
}

static void request_exit(const char *path) {
        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
        const char *t;

        assert_se(sd_bus_new(&bus) >= 0);

        t = strjoina("unix:path=", path);
        assert_se(sd_bus_set_address(bus, t) >= 0);
        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
        assert_se(sd_bus_set_description(bus, "request-exit") >= 0);
        assert_se(sd_bus_start(bus) >= 0);

        assert_se(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL) >= 0);
}

int main(int argc, char *argv[]) {
        _cleanup_(rm_rf_physical_and_freep) char *d = NULL;
        pthread_t server, client1, client2;
        char *path;

        log_set_max_level(LOG_DEBUG);

        /* We use /dev/shm here rather than /tmp, since some weird distros might set up /tmp as some weird fs that
         * doesn't support inotify properly. */
        assert_se(mkdtemp_malloc("/dev/shm/systemd-watch-bind-XXXXXX", &d) >= 0);

        path = strjoina(d, "/this/is/a/socket");

        assert_se(pthread_create(&server, NULL, thread_server, path) == 0);
        assert_se(pthread_create(&client1, NULL, thread_client1, path) == 0);
        assert_se(pthread_create(&client2, NULL, thread_client2, path) == 0);

        assert_se(pthread_join(client1, NULL) == 0);
        assert_se(pthread_join(client2, NULL) == 0);

        request_exit(path);

        assert_se(pthread_join(server, NULL) == 0);

        return 0;
}