~bakpakin/janet

c9f33bbde03b804e8a62d0d90e4f56307347124f — Calvin Rose 2 years ago 9c9f9d4 sync-primitives
Add rwlocks.
5 files changed, 193 insertions(+), 5 deletions(-)

A examples/evlocks.janet
M src/core/abstract.c
M src/core/ev.c
M src/core/features.h
M src/include/janet.h
A examples/evlocks.janet => examples/evlocks.janet +45 -0
@@ 0,0 1,45 @@
(defn sleep
  "Sleep the entire thread, not just a single fiber."
  [n]
  (os/sleep (* 0.1 n)))

(defn work [lock n]
  (ev/acquire-lock lock)
  (print "working " n "...")
  (sleep n)
  (print "done working...")
  (ev/release-lock lock))

(defn reader
  [rwlock n]
  (ev/acquire-rlock rwlock)
  (print "reading " n "...")
  (sleep n)
  (print "done reading " n "...")
  (ev/release-rlock rwlock))

(defn writer
  [rwlock n]
  (ev/acquire-wlock rwlock)
  (print "writing " n "...")
  (sleep n)
  (print "done writing...")
  (ev/release-wlock rwlock))

(defn test-lock
  []
  (def lock (ev/lock))
  (for i 3 7
    (ev/spawn-thread
      (work lock i))))

(defn test-rwlock
  []
  (def rwlock (ev/rwlock))
  (for i 0 20
    (if (> 0.1 (math/random))
      (ev/spawn-thread (writer rwlock i))
      (ev/spawn-thread (reader rwlock i)))))

(test-rwlock)
(test-lock)

M src/core/abstract.c => src/core/abstract.c +49 -0
@@ 110,6 110,31 @@ void janet_os_mutex_unlock(JanetOSMutex *mutex) {
    LeaveCriticalSection((CRITICAL_SECTION *) mutex);
}

void janet_os_rwlock_init(JanetOSRWLock *rwlock) {
    InitializeSRWLock((PSRWLOCK) rwlock);
}

void janet_os_rwlock_deinit(JanetOSRWLock *rwlock) {
    /* no op? */
    (void) rwlock;
}

void janet_os_rwlock_rlock(JanetOSRWLock *rwlock) {
    AcquireSRWLockShared((PSRWLOCK) rwlock);
}

void janet_os_rwlock_wlock(JanetOSRWLock *rwlock) {
    AcquireSRWLockExclusive((PSRWLOCK) rwlock);
}

void janet_os_rwlock_runlock(JanetOSRWLock *rwlock) {
    ReleaseSRWLockShared((PSRWLOCK) rwlock);
}

void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock) {
    ReleaseSRWLockExclusive((PSRWLOCK) rwlock);
}

#else

static int32_t janet_incref(JanetAbstractHead *ab) {


@@ 140,6 165,30 @@ void janet_os_mutex_unlock(JanetOSMutex *mutex) {
    if (ret) janet_panic("cannot release lock");
}

void janet_os_rwlock_init(JanetOSRWLock *rwlock) {
    pthread_rwlock_init(rwlock, NULL);
}

void janet_os_rwlock_deinit(JanetOSRWLock *rwlock) {
    pthread_rwlock_destroy(rwlock);
}

void janet_os_rwlock_rlock(JanetOSRWLock *rwlock) {
    pthread_rwlock_rdlock(rwlock);
}

void janet_os_rwlock_wlock(JanetOSRWLock *rwlock) {
    pthread_rwlock_wrlock(rwlock);
}

void janet_os_rwlock_runlock(JanetOSRWLock *rwlock) {
    pthread_rwlock_unlock(rwlock);
}

void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock) {
    pthread_rwlock_unlock(rwlock);
}

#endif

int32_t janet_abstract_incref(void *abst) {

M src/core/ev.c => src/core/ev.c +69 -1
@@ 3015,7 3015,6 @@ JANET_CORE_FN(janet_cfun_stream_write,

typedef struct {
    JanetOSMutex mutex;
    int destroyed;
} JanetAbstractMutex;

static int mutexgc(void *p, size_t size) {


@@ 3061,6 3060,69 @@ JANET_CORE_FN(janet_cfun_mutex_release,
    return argv[0];
}

typedef struct {
    JanetOSRWLock rwlock;
} JanetAbstractRWLock;

static int rwlockgc(void *p, size_t size) {
    JanetAbstractRWLock *rwlock = (JanetAbstractRWLock *) p;
    (void) size;
    janet_os_rwlock_deinit(&rwlock->rwlock);
    return 0;
}

const JanetAbstractType janet_rwlock_type = {
    "core/rwlock",
    rwlockgc,
    JANET_ATEND_GC
};

JANET_CORE_FN(janet_cfun_rwlock,
        "(ev/rwlock)",
        "Create a new read-write lock to coordinate threads.") {
    janet_fixarity(argc, 0);
    (void) argv;
    JanetAbstractRWLock *rwlock = janet_abstract_threaded(&janet_rwlock_type, sizeof(JanetAbstractRWLock));
    janet_os_rwlock_init(&rwlock->rwlock);
    return janet_wrap_abstract(rwlock);
}

JANET_CORE_FN(janet_cfun_rwlock_read_lock,
        "(ev/acquire-rlock rwlock)",
        "Acquire a read lock an a read-write lock.") {
    janet_fixarity(argc, 1);
    JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type);
    janet_os_rwlock_rlock(&rwlock->rwlock);
    return argv[0];
}

JANET_CORE_FN(janet_cfun_rwlock_write_lock,
        "(ev/acquire-wlock rwlock)",
        "Acquire a write lock on a read-write lock.") {
    janet_fixarity(argc, 1);
    JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type);
    janet_os_rwlock_wlock(&rwlock->rwlock);
    return argv[0];
}

JANET_CORE_FN(janet_cfun_rwlock_read_release,
        "(ev/release-rlock rwlock)",
        "Release a read lock on a read-write lock") {
    janet_fixarity(argc, 1);
    JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type);
    janet_os_rwlock_runlock(&rwlock->rwlock);
    return argv[0];
}

JANET_CORE_FN(janet_cfun_rwlock_write_release,
        "(ev/release-wlock rwlock)",
        "Release a write lock on a read-write lock") {
    janet_fixarity(argc, 1);
    JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type);
    janet_os_rwlock_wunlock(&rwlock->rwlock);
    return argv[0];
}

void janet_lib_ev(JanetTable *env) {
    JanetRegExt ev_cfuns_ext[] = {
        JANET_CORE_REG("ev/give", cfun_channel_push),


@@ 3086,6 3148,11 @@ void janet_lib_ev(JanetTable *env) {
        JANET_CORE_REG("ev/lock", janet_cfun_mutex),
        JANET_CORE_REG("ev/acquire-lock", janet_cfun_mutex_acquire),
        JANET_CORE_REG("ev/release-lock", janet_cfun_mutex_release),
        JANET_CORE_REG("ev/rwlock", janet_cfun_rwlock),
        JANET_CORE_REG("ev/acquire-rlock", janet_cfun_rwlock_read_lock),
        JANET_CORE_REG("ev/acquire-wlock", janet_cfun_rwlock_write_lock),
        JANET_CORE_REG("ev/release-rlock", janet_cfun_rwlock_read_release),
        JANET_CORE_REG("ev/release-wlock", janet_cfun_rwlock_write_release),
        JANET_REG_END
    };



@@ 3093,6 3160,7 @@ void janet_lib_ev(JanetTable *env) {
    janet_register_abstract_type(&janet_stream_type);
    janet_register_abstract_type(&janet_channel_type);
    janet_register_abstract_type(&janet_mutex_type);
    janet_register_abstract_type(&janet_rwlock_type);
}

#endif

M src/core/features.h => src/core/features.h +7 -3
@@ 45,9 45,13 @@
#define WIN32_LEAN_AND_MEAN
#endif

/* Needed for realpath on linux */
#if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__))
#define _XOPEN_SOURCE 500
/* Needed for realpath on linux, as well as pthread rwlocks. */
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#if _XOPEN_SOURCE < 600
#undef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif

/* Needed for timegm and other extensions when building with -std=c99.

M src/include/janet.h => src/include/janet.h +23 -1
@@ 299,6 299,18 @@ typedef struct {
    JANET_CURRENT_CONFIG_BITS })
#endif

/* Feature include for pthreads. Most feature detection code should go in
 * features.h instead. */
#ifndef JANET_WINDOWS
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#if _XOPEN_SOURCE < 600
#undef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#endif

/* What to do when out of memory */
#ifndef JANET_OUT_OF_MEMORY
#include <stdio.h>


@@ 335,9 347,13 @@ typedef struct JanetDudCriticalSection {
    void *lock_semaphore;
    unsigned long spin_count;
} JanetOSMutex;
typedef struct JanetDudRWLock {
    void *ptr;
} JanetOSRWLock;
#else
#include <pthread.h>
typedef pthread_mutex_t JanetOSMutex;
typedef pthread_rwlock_t JanetOSRWLock;
#endif
#endif



@@ 1368,11 1384,17 @@ JANET_API void *janet_abstract_threaded(const JanetAbstractType *atype, size_t s
JANET_API int32_t janet_abstract_incref(void *abst);
JANET_API int32_t janet_abstract_decref(void *abst);

/* Expose some OS sync primitives to make portable abstract types easier to implement */
/* Expose some OS sync primitives */
JANET_API void janet_os_mutex_init(JanetOSMutex *mutex);
JANET_API void janet_os_mutex_deinit(JanetOSMutex *mutex);
JANET_API void janet_os_mutex_lock(JanetOSMutex *mutex);
JANET_API void janet_os_mutex_unlock(JanetOSMutex *mutex);
JANET_API void janet_os_rwlock_init(JanetOSRWLock *rwlock);
JANET_API void janet_os_rwlock_deinit(JanetOSRWLock *rwlock);
JANET_API void janet_os_rwlock_rlock(JanetOSRWLock *rwlock);
JANET_API void janet_os_rwlock_wlock(JanetOSRWLock *rwlock);
JANET_API void janet_os_rwlock_runlock(JanetOSRWLock *rwlock);
JANET_API void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock);

/* Get last error from an IO operation */
JANET_API Janet janet_ev_lasterr(void);