~aritra1911/mc_server

f781c00e95a4cf91608d0fd5e3c74363817193ad — Aritra Sarkar 1 year, 11 months ago 479f443
Another day, another design which might actually work
3 files changed, 156 insertions(+), 205 deletions(-)

M events.c
M main.c
M response.c
M events.c => events.c +153 -201
@@ 34,60 34,92 @@
#include "tpool.h"
#include "response.h"

typedef struct writers {
    size_t count;
    pthread_mutex_t mutex;
    pthread_cond_t done;
} writers_t;

typedef struct accept_params {
    int newfd;
    struct sockaddr_storage remoteaddr;
    session_t *sessionp;
    size_t *writersp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
    writers_t *writersp;
} accept_params_t;

typedef struct disc_params {
/* Receive And Send (RAS), not to be confused by RAS Syndrome */
typedef struct ras_params {
    int fd;
    session_t *sessionp;
    size_t *writersp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
} disc_params_t;
    writers_t *writersp;
} ras_params_t;

static void get_err_str(int errnum, char *err_str);
static void *get_in_addr(struct sockaddr *sa);
static int writers_init(writers_t *);
static int writers_increment(writers_t *);
static int writers_decrement(writers_t *);
static int writers_destroy(writers_t *);
static void accept_client(int tid, void *arg);
static void recv_and_send(int tid, void *arg);

static int
writers_init(writers_t *writersp)
{
    writersp->count = 0;
    pthread_mutex_init(&writersp->mutex, NULL);
    pthread_cond_init(&writersp->done, NULL);
    return 0;
}

typedef struct send_params {
    int fd;
    ssize_t num_bytes;
    size_t *writersp;
    char buf[BUFLEN];
    session_t *sessionp;
    tpool_t *tpoolp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
} send_params_t;
static int
writers_increment(writers_t *writersp)
{
    pthread_mutex_lock(&writersp->mutex);
    writersp->count++;
    pthread_mutex_unlock(&writersp->mutex);
    return 0;
}

static inline void get_err_str(int errnum, char *err_str);
static inline void *get_in_addr(struct sockaddr *sa);
static void client_accept(int tid, void *arg);
static void client_disconnect(int tid, void *arg);
static void client_send(int tid, void *arg);
static int
writers_decrement(writers_t *writersp)
{
    pthread_mutex_lock(&writersp->mutex);
    if ( !(--writersp->count) )
        pthread_cond_signal(&writersp->done);
    pthread_mutex_unlock(&writersp->mutex);
    return 0;
}

static int
writers_wait(writers_t *writersp)
{
    pthread_mutex_lock(&writersp->mutex);
    while ( writersp->count )
        pthread_cond_wait(&writersp->done, &writersp->mutex);
    pthread_mutex_unlock(&writersp->mutex);
    return 0;
}

static int
writers_destroy(writers_t *writersp)
{
    writersp->count = 0xDEAD4BAD;
    pthread_mutex_destroy(&writersp->mutex);
    pthread_cond_destroy(&writersp->done);
    return 0;
}

void
process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
{
    size_t i, writers;
    ssize_t num_bytes;
    int *fds, *fdp, listener, newfd;
    struct sockaddr_storage remoteaddr;
    char buf[BUFLEN] = "";
    socklen_t addrlen;
    pthread_mutex_t writers_mutex;
    pthread_cond_t writers_done;
    size_t i;
    int *fds, *fdp, listener;
    accept_params_t *accept_paramsp;
    send_params_t *send_paramsp;
    ras_params_t *ras_paramsp;
    writers_t writers;

    fds = calloc(sessionp->fd_count + 1, sizeof(int));
    listener = sessionp->pfds[0].fd;
    addrlen = sizeof remoteaddr;
    writers = 0;
    pthread_mutex_init(&writers_mutex, NULL);
    pthread_cond_init(&writers_done, NULL);
    writers_init(&writers);

    /* Collect the `fds` that we need to `recv()` or `accept()` on so
     * that we can give a window of time where we do not impose a read


@@ 114,60 146,42 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
        if ( *fdp == listener ) {
            /* TODO: `accept()` can be called inside of
             *       `client_accept()`. */
            newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                                         &addrlen);
            accept_paramsp = malloc(sizeof(accept_params_t));
            accept_paramsp->newfd = newfd;
            accept_paramsp->remoteaddr = remoteaddr;
            accept_paramsp->sessionp = sessionp;
            accept_paramsp->writersp = &writers;
            accept_paramsp->writers_mutexp = &writers_mutex;
            accept_paramsp->writers_donep = &writers_done;

            pthread_mutex_lock(&writers_mutex);
            writers++;
            pthread_mutex_unlock(&writers_mutex);

            tpool_add_work(tpoolp, client_accept, accept_paramsp);
            tpool_add_work(tpoolp, accept_client, accept_paramsp);
            writers_increment(&writers);
        } else {
            num_bytes = recv(*fdp, buf, BUFLEN, 0);
            if ( num_bytes >= 0 ) {
                send_paramsp = malloc(sizeof(send_params_t));
                send_paramsp->fd = *fdp;
                send_paramsp->num_bytes = num_bytes;
                memcpy(send_paramsp->buf, buf, num_bytes);
                send_paramsp->sessionp = sessionp;
                send_paramsp->tpoolp = tpoolp;
                send_paramsp->writersp = &writers;
                send_paramsp->writers_mutexp = &writers_mutex;
                send_paramsp->writers_donep = &writers_done;
                tpool_add_work(tpoolp, client_send, send_paramsp);
            } else {
                /* TODO: Handle error */
            }
            ras_paramsp = malloc(sizeof(ras_params_t));
            ras_paramsp->fd = *fdp;
            ras_paramsp->sessionp = sessionp;
            ras_paramsp->writersp = &writers;

            /* This potentially holds a write lock on `session` in
             * case session data needs to be modified like for eg. a
             * nick change or client disconnect. Once it figures out
             * that `session` doesn't need any modification or such a
             * modification has already been performed, it releases
             * the write lock and switches to a read lock for
             * distribution or announcement to all the other clients.
             */
            tpool_add_work(tpoolp, recv_and_send, ras_paramsp);
            writers_increment(&writers);
        }
    }

    free(fds);

    /* This felt like a hack during implementation, but with my current
     * knowledge I couldn't find a better way to wait for all the work
     * in the queue which mutate `session` to finish before we went
     * back to polling again after attaining a read lock on `session`.
     * Honestly, in my minimal testing, this worked like a charm. */

    pthread_mutex_lock(&writers_mutex);
    while ( writers )
        pthread_cond_wait(&writers_done, &writers_mutex);
    pthread_mutex_unlock(&writers_mutex);
    /* Wait for all writer threads to release the lock before
     * read-locking `session` for `poll()`ing again. */
    writers_wait(&writers);

    /* TODO: We might wanna package these three things into a struct
     *       someday. */
    writers_destroy(&writers);

    pthread_rwlock_rdlock(&sessionp->lock);
}

static inline void
static void
get_err_str(int errnum, char *err_str)
{
    /* Use `strerror_r()` as a(n) MT-Safe way of retrieving error


@@ 183,7 197,7 @@ get_err_str(int errnum, char *err_str)
        sprintf(err_str, "Error number %i", errnum);
}

static inline void *
static void *
get_in_addr(struct sockaddr *sa)
{
    if ( sa->sa_family == AF_INET ) {


@@ 196,38 210,36 @@ get_in_addr(struct sockaddr *sa)
}

static void
client_accept(int thread_id, void *arg)
accept_client(int thread_id, void *arg)
{
    int newfd;
    size_t *writersp;
    int listener, newfd;
    char err_str[STRERROR_BUFLEN];
    struct sockaddr_storage remoteaddr;
    socklen_t addrlen;
    session_t *sessionp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
    writers_t *writersp;

    /* INET6_ADDRSTRLEN > INET_ADDRSTRLEN and hence big enough to fit
     * an IPv4 address as well. */
    char remoteIP[INET6_ADDRSTRLEN] = "";

    newfd = ((accept_params_t *) arg)->newfd;
    remoteaddr = ((accept_params_t *) arg)->remoteaddr;
    sessionp = ((accept_params_t *) arg)->sessionp;
    writersp = ((accept_params_t *) arg)->writersp;
    writers_mutexp = ((accept_params_t *) arg)->writers_mutexp;
    writers_donep = ((accept_params_t *) arg)->writers_donep;

    free(arg);

    listener = sessionp->pfds[0].fd;
    addrlen = sizeof remoteaddr;

    newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                                 &addrlen);

    pthread_rwlock_wrlock(&sessionp->lock);

    /* Add client to `pfds` */
    add_client(newfd, sessionp);

    pthread_mutex_lock(writers_mutexp);
    if ( !(--(*writersp)) )
        pthread_cond_signal(writers_donep);
    pthread_mutex_unlock(writers_mutexp);
    writers_decrement(writersp);

    pthread_rwlock_unlock(&sessionp->lock);



@@ 247,137 259,77 @@ client_accept(int thread_id, void *arg)
}

static void
client_disconnect(int thread_id, void *arg)
recv_and_send(int thread_id, void *arg)
{
    size_t i, *writersp;
    int fd, flag;
    int fd, announce;
    ssize_t num_bytes;
    size_t i;
    char buf[BUFLEN] = "";
    session_t *sessionp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
    writers_t *writersp;
    ResponseError err_code;

    fd = ((disc_params_t *) arg)->fd;
    sessionp = ((disc_params_t *) arg)->sessionp;
    writersp = ((disc_params_t *) arg)->writersp;
    writers_mutexp = ((disc_params_t *) arg)->writers_mutexp;
    writers_donep = ((disc_params_t *) arg)->writers_donep;
    fd       = ((ras_params_t *) arg)->fd;
    sessionp = ((ras_params_t *) arg)->sessionp;
    writersp = ((ras_params_t *) arg)->writersp;

    free(arg);

    flag = 0;
    announce = 0;

    pthread_rwlock_wrlock(&sessionp->lock);
    for (i = 0; i < sessionp->fd_count; i++) {
        if ( sessionp->pfds[i].fd == fd ) {
            close(fd);
            remove_client(i, sessionp);
            flag = 1;
        }
    if ( (num_bytes = recv(fd, buf, BUFLEN, 0)) == -1 ) {
        /* TODO: Handle Error */
    }

    pthread_mutex_lock(writers_mutexp);
    if ( !(--(*writersp)) )
        pthread_cond_signal(writers_donep);
    pthread_mutex_unlock(writers_mutexp);

    pthread_rwlock_unlock(&sessionp->lock);

    if ( flag )
        printf("INFO : [THREAD #%02i] Socket %i disconnected\n",
               thread_id, fd);
}

static void
client_send(int thread_id, void *arg)
{
    (void) thread_id;

    int fd, *fds, *fdp;
    ssize_t num_bytes;
    size_t idx, i, *writersp;
    char buf[BUFLEN] = "", nick[NICKLEN] = "";
    session_t *sessionp;
    tpool_t *tpoolp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
    disc_params_t *disc_paramsp;
    ResponseError err_code;

    fd = ((send_params_t *) arg)->fd;
    num_bytes = ((send_params_t *) arg)->num_bytes;
    memcpy(buf, ((send_params_t *) arg)->buf, num_bytes);
    sessionp = ((send_params_t *) arg)->sessionp;
    tpoolp = ((send_params_t *) arg)->tpoolp;
    writersp = ((send_params_t *) arg)->writersp;
    writers_mutexp = ((send_params_t *) arg)->writers_mutexp;
    writers_donep = ((send_params_t *) arg)->writers_donep;
    pthread_rwlock_wrlock(&sessionp->lock);

    free(arg);
    /* Determine index from fd */
    for (i = 0; i < sessionp->fd_count; i++)
        if ( sessionp->pfds[i].fd == fd )
            break;

    fds = calloc(sessionp->fd_count + 1, sizeof(int));
    for (i = 1 /* avoid listener */, fdp = fds; i < sessionp->fd_count; i++) {
        *fdp++ = sessionp->pfds[i].fd;
        if ( sessionp->pfds[i].fd == fd ) {
            strcpy(nick, sessionp->clients[i].nick);
            idx = i;
    if ( !num_bytes ) {
        if ( *sessionp->clients[i].nick ) {
            sprintf(buf + 1, "%s connection closed",
                    sessionp->clients[i].nick);
            buf[0] = RES_QUIT;
            num_bytes = 1 + strlen(buf + 1);
            announce = 1;
        }
    }

    /* TODO: Fix the write lock, it shouldn't alway be getting a write lock, but
     *       only in case of a disconnect, /quit or /setnick. Also we should be
     *       able to handle this decision in process_events() */
        goto disconnect;
    } else {
        announce = 1;
        if ( respond(&sessionp->clients[i],
                     sessionp->clients, (size_t) sessionp->fd_count,
                     buf, &num_bytes, &err_code) == -1 ) {

    //pthread_rwlock_wrlock(&sessionp->lock);
    pthread_rwlock_rdlock(&sessionp->lock);
    /* TODO: `respond()` musn't mutate client nick */
    if ( num_bytes ) {
        if ( respond(&sessionp->clients[idx], sessionp->clients,
                     sessionp->fd_count, buf, &num_bytes,
                     &err_code) == -1 ) {
            buf[0] = RES_ERROR;
            buf[1] = err_code;
            num_bytes = 2;
        }
    } else {
        if ( *nick ) {
            sprintf(buf + 1, "%s connection closed", nick);
            buf[0] = RES_QUIT;
            num_bytes = 1 + strlen(buf + 1);

        if ( buf[0] == RES_QUIT ) {
disconnect:
            remove_client(i, sessionp);
            close(fd);
            printf("DBUG : [THREAD #%02i] Socket %i disconnected\n", thread_id, fd);
        }
    }
    pthread_rwlock_unlock(&sessionp->lock);

    /* send confirmation earlier since it's going to be killed soon */
    if ( *nick && buf[0] == RES_QUIT )
        send(fd, buf, num_bytes, 0);

    if ( !num_bytes || buf[0] == RES_QUIT ) {
        disc_paramsp = malloc(sizeof(disc_params_t));

        /* Send off a worker to actually call `remove_client()` while
         * we release the read lock and announce the disconnection to
         * others. */

        disc_paramsp->fd = fd;
        disc_paramsp->sessionp = sessionp;
        disc_paramsp->writersp = writersp;
        disc_paramsp->writers_mutexp = writers_mutexp;
        disc_paramsp->writers_donep = writers_donep;

        pthread_mutex_lock(writers_mutexp);
        (*writersp)++;
        pthread_mutex_unlock(writers_mutexp);

        tpool_add_work(tpoolp, client_disconnect, disc_paramsp);
    }
    writers_decrement(writersp);
    pthread_rwlock_unlock(&sessionp->lock);

    if ( buf[0] == RES_ERROR )
        send(fd, buf, num_bytes, 0);
    else {
        for (fdp = fds; *fdp; fdp++) {
            if ( !(buf[0] == RES_QUIT && *fdp == fd) )
                send(*fdp, buf, num_bytes, 0);
    if ( announce ) {
        printf("DBUG : [THREAD #%02i] Announcing...\n", thread_id);
        if ( buf[0] == RES_ERROR )
            send(fd, buf, num_bytes, 0);
        else {
            pthread_rwlock_rdlock(&sessionp->lock);
            for (i = 1 /* avoid listener */; i < sessionp->fd_count; i++) {
                send(sessionp->pfds[i].fd, buf, num_bytes, 0);
            }
            pthread_rwlock_unlock(&sessionp->lock);
        }
    }

    free(fds);
}

M main.c => main.c +2 -2
@@ 303,11 303,11 @@ int main(int argc, char **argv)
            return EXIT_FAILURE;
        }

        printf("INFO : Handling events...\n");
        printf("DBUG : Handling events...\n");
        /* `process_events` lets go of the read lock briefly in order
         * to add work to queue and then locks it back again. */
        process_events(num_events, &tpool, &session);
        printf("INFO : All events handled!\n");
        printf("DBUG : All events handled!\n");

        continue;   /* Everything below this are progressively being
                     * shifted to `events.c`. */

M response.c => response.c +1 -2
@@ 67,7 67,6 @@ static inline int set_nick(SendCode snd_code, client_t *client,
                           const client_t *clients_list, size_t list_size,
                           char *buf, ssize_t *len, ResponseError *err_code)
{

    char nick[NICKLEN] = "";
    strncpy(nick, buf + 1, (NICKLEN < *len ? NICKLEN : *len) - 1);



@@ 85,7 84,7 @@ static inline int set_nick(SendCode snd_code, client_t *client,
        return -1;
    }

    buf[0] = snd_code == SND_JOIN ? (char) RES_JOIN : (char) RES_SETNICK;
    buf[0] = snd_code == SND_JOIN ? RES_JOIN : RES_SETNICK;

    if ( snd_code == SND_SETNICK ) {
        sprintf(buf + 1, "%s %s", client->nick, nick);