M src/core/ev.c => src/core/ev.c +106 -15
@@ 48,6 48,7 @@
#include <netinet/tcp.h>
#include <netdb.h>
#include <sys/socket.h>
+#include <sys/wait.h>
#ifdef JANET_EV_EPOLL
#include <sys/epoll.h>
#include <sys/timerfd.h>
@@ 147,6 148,7 @@ JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng;
JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL;
JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0;
+JANET_THREAD_LOCAL size_t janet_vm_extra_listeners = 0;
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void);
@@ 491,6 493,14 @@ void janet_addtimeout(double sec) {
add_timeout(to);
}
+void janet_ev_inc_refcount(void) {
+ janet_vm_extra_listeners++;
+}
+
+void janet_ev_dec_refcount(void) {
+ janet_vm_extra_listeners--;
+}
+
/* Channels */
typedef struct {
@@ 774,14 784,16 @@ void janet_loop1(void) {
}
}
}
+
/* Run scheduled fibers */
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
run_one(task.fiber, task.value, task.sig);
}
+
/* Poll for events */
- if (janet_vm_listener_count || janet_vm_tq_count) {
+ if (janet_vm_listener_count || janet_vm_tq_count || janet_vm_extra_listeners) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
@@ 790,18 802,67 @@ void janet_loop1(void) {
pop_timeout(0);
}
/* Run polling implementation only if pending timeouts or pending events */
- if (janet_vm_tq_count || janet_vm_listener_count) {
+ if (janet_vm_tq_count || janet_vm_listener_count || janet_vm_extra_listeners) {
janet_loop1_impl(has_timeout, to.when);
}
}
}
void janet_loop(void) {
- while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
+ while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count || janet_vm_extra_listeners) {
janet_loop1();
}
}
+/*
+ * Signal handling code.
+ */
+
+#ifdef JANET_WINDOWS
+
+#else
+
+JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
+
+static void janet_ev_handle_signals(void) {
+ int sig = 0;
+ while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) {
+ switch (sig) {
+ default:
+ break;
+ case SIGCHLD:
+ {
+ int status = 0;
+ pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED);
+ /* invalid pid on failure will do no harm */
+ janet_schedule_pid(pid, status);
+ }
+ break;
+ }
+ }
+}
+
+static void janet_sig_handler(int sig) {
+ int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig));
+ if (result) {
+ /* Failed to handle signal. */
+ ;
+ }
+ signal(sig, janet_sig_handler);
+}
+
+static void janet_ev_setup_signals(void) {
+ if (-1 == pipe(janet_vm_selfpipe)) goto error;
+ if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error;
+ if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error;
+ signal(SIGCHLD, janet_sig_handler);
+ return;
+error:
+ JANET_EXIT("failed to initialize self pipe in event loop");
+}
+
+#endif
+
#ifdef JANET_WINDOWS
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
@@ 969,8 1030,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Step state machines */
for (int i = 0; i < ready; i++) {
- JanetStream *stream = events[i].data.ptr;
- if (NULL != stream) { /* If NULL, is a timeout */
+ void *p = events[i].data.ptr;
+ if (&janet_vm_timerfd == p) {
+ /* Timer expired, ignore */;
+ } else if (janet_vm_selfpipe == p) {
+ /* Signal */
+ janet_ev_handle_signals();
+ } else {
+ JanetStream *stream = p;
int mask = events[i].events;
JanetListenerState *state = stream->state;
state->event = events + i;
@@ 1001,14 1068,18 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
void janet_ev_init(void) {
janet_ev_init_common();
+ janet_ev_setup_signals();
janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC);
janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
janet_vm_timer_enabled = 0;
if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error;
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
- ev.data.ptr = NULL;
+ ev.data.ptr = &janet_vm_timerfd;
if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error;
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.ptr = janet_vm_selfpipe;
+ if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_selfpipe[0], &ev)) goto error;
return;
error:
JANET_EXIT("failed to initialize event loop");
@@ 1054,7 1125,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm_listener_cap;
if (newsize > oldsize) {
- janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd));
+ janet_vm_fds = realloc(janet_vm_fds, (newsize + 1) * sizeof(struct pollfd));
if (NULL == janet_vm_fds) {
JANET_OUT_OF_MEMORY;
}
@@ 1063,12 1134,12 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
ev.fd = stream->handle;
ev.events = make_poll_events(state->stream->_mask);
ev.revents = 0;
- janet_vm_fds[state->_index] = ev;
+ janet_vm_fds[state->_index + 1] = ev;
return state;
}
static void janet_unlisten(JanetListenerState *state) {
- janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1];
+ janet_vm_fds[state->_index + 1] = janet_vm_fds[janet_vm_listener_count];
janet_unlisten_impl(state);
}
@@ 1081,19 1152,25 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetTimestamp now = ts_now();
to = now > timeout ? 0 : (int)(timeout - now);
}
- ready = poll(janet_vm_fds, janet_vm_listener_count, to);
+ ready = poll(janet_vm_fds, janet_vm_listener_count + 1, to);
} while (ready == -1 && errno == EINTR);
if (ready == -1) {
JANET_EXIT("failed to poll events");
}
+ /* Check selfpipe */
+ if (janet_vm_fds[0].revents & POLLIN) {
+ janet_vm_fds[0].revents = 0;
+ janet_ev_handle_signals();
+ }
+
/* Step state machines */
for (size_t i = 0; i < janet_vm_listener_count; i++) {
- struct pollfd *pfd = janet_vm_fds + i;
+ struct pollfd *pfd = janet_vm_fds + i + 1;
/* Skip fds where nothing interesting happened */
JanetListenerState *state = janet_vm_listeners[i];
/* Normal event */
- int mask = janet_vm_fds[i].revents;
+ int mask = pfd->revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
@@ 1118,12 1195,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
void janet_ev_init(void) {
janet_ev_init_common();
janet_vm_fds = NULL;
+ janet_ev_setup_signals();
+ janet_vm_fds = malloc(sizeof(struct pollfd));
+ if (NULL == janet_vm_fds) {
+ JANET_OUT_OF_MEMORY;
+ }
+ janet_vm_fds[0].fd = janet_vm_selfpipe[0];
+ janet_vm_fds[0].events = POLLIN;
+ janet_vm_fds[0].revents = 0;
return;
}
void janet_ev_deinit(void) {
janet_ev_deinit_common();
free(janet_vm_fds);
+ close(janet_vm_selfpipe[0]);
+ close(janet_vm_selfpipe[1]);
janet_vm_fds = NULL;
}
@@ 1691,9 1778,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) {
return janet_wrap_fiber(fiber);
}
-static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
- janet_fixarity(argc, 1);
- double sec = janet_getnumber(argv, 0);
+JANET_NO_RETURN void janet_sleep_await(double sec) {
JanetTimeout to;
to.when = ts_delta(ts_now(), sec);
to.fiber = janet_vm_root_fiber;
@@ 1703,6 1788,12 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
janet_await();
}
+static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
+ janet_fixarity(argc, 1);
+ double sec = janet_getnumber(argv, 0);
+ janet_sleep_await(sec);
+}
+
static Janet cfun_ev_cancel(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetFiber *fiber = janet_getfiber(argv, 0);
M src/core/os.c => src/core/os.c +138 -31
@@ 185,6 185,7 @@ static Janet os_exit(int32_t argc, Janet *argv) {
#ifndef JANET_REDUCED_OS
#ifndef JANET_NO_PROCESSES
+
/* Get env for os_execute */
static char **os_execute_env(int32_t argc, const Janet *argv) {
char **envp = NULL;
@@ 319,6 320,8 @@ static JanetBuffer *os_exec_escape(JanetView args) {
static const JanetAbstractType ProcAT;
#define JANET_PROC_CLOSED 1
#define JANET_PROC_WAITED 2
+#define JANET_PROC_WAITING 4
+#define JANET_PROC_ERROR_NONZERO 8
typedef struct {
int flags;
#ifdef JANET_WINDOWS
@@ 332,6 335,7 @@ typedef struct {
JanetStream *in;
JanetStream *out;
JanetStream *err;
+ JanetFiber *fiber;
#else
JanetFile *in;
JanetFile *out;
@@ 339,9 343,82 @@ typedef struct {
#endif
} JanetProc;
+#ifdef JANET_EV
+
+JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL;
+JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0;
+JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0;
+
+/* Map pids to JanetProc to allow for lookup after a call to
+ * waitpid. */
+static void janet_add_waiting_proc(JanetProc *proc) {
+ if (janet_vm_proc_count == janet_vm_proc_cap) {
+ size_t newcap = (janet_vm_proc_count + 1) * 2;
+ if (newcap < 16) newcap = 16;
+ JanetProc **newprocs = realloc(janet_vm_waiting_procs, sizeof(JanetProc *) * newcap);
+ if (NULL == newprocs) {
+ JANET_OUT_OF_MEMORY;
+ }
+ janet_vm_waiting_procs = newprocs;
+ janet_vm_proc_cap = newcap;
+ }
+ janet_vm_waiting_procs[janet_vm_proc_count++] = proc;
+ janet_gcroot(janet_wrap_abstract(proc));
+ janet_ev_inc_refcount();
+}
+
+static void janet_remove_waiting_proc(JanetProc *proc) {
+ for (size_t i = 0; i < janet_vm_proc_count; i++) {
+ if (janet_vm_waiting_procs[i] == proc) {
+ janet_vm_waiting_procs[i] = janet_vm_waiting_procs[--janet_vm_proc_count];
+ janet_gcunroot(janet_wrap_abstract(proc));
+ janet_ev_dec_refcount();
+ return;
+ }
+ }
+}
+
+static JanetProc *janet_lookup_proc(pid_t pid) {
+ for (size_t i = 0; i < janet_vm_proc_count; i++) {
+ if (janet_vm_waiting_procs[i]->pid == pid) {
+ return janet_vm_waiting_procs[i];
+ }
+ }
+ return NULL;
+}
+
+void janet_schedule_pid(pid_t pid, int status) {
+ /* Use POSIX shell semantics for interpreting signals */
+ if (WIFEXITED(status)) {
+ status = WEXITSTATUS(status);
+ } else if (WIFSTOPPED(status)) {
+ status = WSTOPSIG(status) + 128;
+ } else {
+ status = WTERMSIG(status) + 128;
+ }
+ JanetProc *proc = janet_lookup_proc(pid);
+ if (NULL == proc) return;
+ proc->return_code = (int32_t) status;
+ proc->flags |= JANET_PROC_WAITED;
+ proc->flags &= ~JANET_PROC_WAITING;
+ if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
+ JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
+ janet_cancel(proc->fiber, janet_wrap_string(s));
+ } else {
+ janet_schedule(proc->fiber, janet_wrap_integer(status));
+ }
+ janet_remove_waiting_proc(proc);
+}
+#endif
+
static int janet_proc_gc(void *p, size_t s) {
(void) s;
JanetProc *proc = (JanetProc *) p;
+#ifdef JANET_EV
+ if (proc->flags & JANET_PROC_WAITING) {
+ janet_remove_waiting_proc(proc);
+ }
+#endif
#ifdef JANET_WINDOWS
if (!(proc->flags & JANET_PROC_CLOSED)) {
CloseHandle(proc->pHandle);
@@ 364,13 441,27 @@ static int janet_proc_mark(void *p, size_t s) {
if (NULL != proc->in) janet_mark(janet_wrap_abstract(proc->in));
if (NULL != proc->out) janet_mark(janet_wrap_abstract(proc->out));
if (NULL != proc->err) janet_mark(janet_wrap_abstract(proc->err));
+#ifdef JANET_EV
+ if (NULL != proc->fiber) janet_mark(janet_wrap_fiber(proc->fiber));
+#endif
return 0;
}
+#ifdef JANET_EV
+JANET_NO_RETURN
+#endif
static Janet os_proc_wait_impl(JanetProc *proc) {
- if (proc->flags & JANET_PROC_WAITED) {
- janet_panicf("cannot wait on process that has already finished");
+ if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) {
+ janet_panicf("cannot wait twice on a process");
}
+#ifdef JANET_EV
+ /* Event loop implementation */
+ proc->fiber = janet_root_fiber();
+ proc->flags |= JANET_PROC_WAITING;
+ janet_add_waiting_proc(proc);
+ janet_await();
+#else
+ /* Non evented implementation */
proc->flags |= JANET_PROC_WAITED;
int status = 0;
#ifdef JANET_WINDOWS
@@ 386,6 477,7 @@ static Janet os_proc_wait_impl(JanetProc *proc) {
#endif
proc->return_code = (int32_t) status;
return janet_wrap_integer(proc->return_code);
+#endif
}
static Janet os_proc_wait(int32_t argc, Janet *argv) {
@@ 575,7 667,7 @@ static JanetFile *get_stdio_for_handle(JanetHandle handle, void *orig, int iswri
}
#endif
-static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
+static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
janet_arity(argc, 1, 3);
/* Get flags */
@@ 713,7 805,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
tHandle = processInfo.hThread;
/* Wait and cleanup immedaitely */
- if (!is_async) {
+ if (!is_spawn) {
DWORD code;
WaitForSingleObject(pHandle, INFINITE);
GetExitCodeProcess(pHandle, &code);
@@ 781,45 873,42 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
if (status) {
os_execute_cleanup(envp, child_argv);
janet_panicf("%p: %s", argv[0], strerror(errno));
- } else if (is_async) {
+ } else if (is_spawn) {
/* Get process handle */
os_execute_cleanup(envp, child_argv);
} else {
/* Wait to complete */
- waitpid(pid, &status, 0);
os_execute_cleanup(envp, child_argv);
- /* Use POSIX shell semantics for interpreting signals */
- if (WIFEXITED(status)) {
- status = WEXITSTATUS(status);
- } else if (WIFSTOPPED(status)) {
- status = WSTOPSIG(status) + 128;
- } else {
- status = WTERMSIG(status) + 128;
- }
}
#endif
- if (is_async) {
- JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc));
- proc->return_code = -1;
+ JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc));
+ proc->return_code = -1;
#ifdef JANET_WINDOWS
- proc->pHandle = pHandle;
- proc->tHandle = tHandle;
+ proc->pHandle = pHandle;
+ proc->tHandle = tHandle;
#else
- proc->pid = pid;
-#endif
- proc->in = get_stdio_for_handle(new_in, orig_in, 0);
- proc->out = get_stdio_for_handle(new_out, orig_out, 1);
- proc->err = get_stdio_for_handle(new_err, orig_err, 1);
- proc->flags = 0;
- if (proc->in == NULL || proc->out == NULL || proc->err == NULL) {
- janet_panic("failed to construct proc");
- }
+ proc->pid = pid;
+#endif
+ proc->in = get_stdio_for_handle(new_in, orig_in, 0);
+ proc->out = get_stdio_for_handle(new_out, orig_out, 1);
+ proc->err = get_stdio_for_handle(new_err, orig_err, 1);
+ proc->flags = 0;
+ if (proc->in == NULL || proc->out == NULL || proc->err == NULL) {
+ janet_panic("failed to construct proc");
+ }
+ if (janet_flag_at(flags, 2)) {
+ proc->flags |= JANET_PROC_ERROR_NONZERO;
+ }
+
+ if (is_spawn) {
return janet_wrap_abstract(proc);
- } else if (janet_flag_at(flags, 2) && status) {
- janet_panicf("command failed with non-zero exit code %d", status);
} else {
- return janet_wrap_integer(status);
+#ifdef JANET_EV
+ os_proc_wait_impl(proc);
+#else
+ return os_proc_wait_impl(proc);
+#endif
}
}
@@ 2069,6 2158,17 @@ static const JanetReg os_cfuns[] = {
{NULL, NULL, NULL}
};
+void janet_os_deinit(void) {
+#ifndef JANET_NO_PROCESSES
+#ifdef JANET_EV
+ free(janet_vm_waiting_procs);
+ janet_vm_waiting_procs = NULL;
+ janet_vm_proc_count = 0;
+ janet_vm_proc_cap = 0;
+#endif
+#endif
+}
+
/* Module entry point */
void janet_lib_os(JanetTable *env) {
#if !defined(JANET_REDUCED_OS) && defined(JANET_WINDOWS) && defined(JANET_THREADS)
@@ 2079,5 2179,12 @@ void janet_lib_os(JanetTable *env) {
env_lock_initialized = 1;
}
#endif
+#ifndef JANET_NO_PROCESSES
+#ifdef JANET_EV
+ janet_vm_waiting_procs = NULL;
+ janet_vm_proc_count = 0;
+ janet_vm_proc_cap = 0;
+#endif
+#endif
janet_core_cfuns(env, NULL, os_cfuns);
}
M src/core/state.h => src/core/state.h +2 -0
@@ 111,4 111,6 @@ void janet_ev_init(void);
void janet_ev_deinit(void);
#endif
+void janet_os_deinit(void);
+
#endif /* JANET_STATE_H_defined */
M src/core/util.h => src/core/util.h +1 -0
@@ 146,6 146,7 @@ extern const JanetAbstractType janet_address_type;
void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2]);
+void janet_schedule_pid(pid_t pid, int status);
#endif
#endif
M src/core/vm.c => src/core/vm.c +1 -1
@@ 1478,7 1478,6 @@ int janet_init(void) {
janet_vm_fiber = NULL;
janet_vm_root_fiber = NULL;
janet_vm_stackn = 0;
- /* Threads */
#ifdef JANET_THREADS
janet_threads_init();
#endif
@@ 1506,6 1505,7 @@ void janet_deinit(void) {
free(janet_vm_traversal_base);
janet_vm_fiber = NULL;
janet_vm_root_fiber = NULL;
+ janet_os_deinit();
#ifdef JANET_THREADS
janet_threads_deinit();
#endif
M src/include/janet.h => src/include/janet.h +3 -0
@@ 1279,10 1279,13 @@ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener be
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);
+JANET_NO_RETURN JANET_API void janet_sleep_await(double sec);
/* For use inside listeners - adds a timeout to the current fiber, such that
* it will be resumed after sec seconds if no other event schedules the current fiber. */
JANET_API void janet_addtimeout(double sec);
+JANET_API void janet_ev_inc_refcount(void);
+JANET_API void janet_ev_dec_refcount(void);
/* Get last error from a an IO operation */
JANET_API Janet janet_ev_lasterr(void);
M test/suite0009.janet => test/suite0009.janet +2 -1
@@ 40,7 40,7 @@
# or else the first read can fail. Might be a strange windows
# "bug", but needs further investigating. Otherwise, `build_win test`
# can sometimes fail on windows, leading to flaky testing.
-(ev/sleep 0.2)
+(ev/sleep 0.3)
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
@@ 59,6 59,7 @@
(var pipe-counter 0)
(def chan (ev/chan 10))
(let [[reader writer] (os/pipe)]
+ (ev/sleep 0.3)
(ev/spawn
(while (ev/read reader 3)
(++ pipe-counter))