~aritra1911/mc_server

6daf59e514b7a8e8ad1d96b422f70da404f6c0f9 — Aritra Sarkar 1 year, 11 months ago 554b567
Implement reader writer lock
5 files changed, 225 insertions(+), 111 deletions(-)

M events.c
M events.h
M main.c
M session.c
M session.h
M events.c => events.c +204 -70
@@ 18,6 18,7 @@
 * https://www.gnu.org/licenses/gpl-3.0.txt
 */

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>


@@ 32,33 33,141 @@
#include "session.h"
#include "tpool.h"

typedef struct params {
    size_t fd;
typedef struct accept_params {
    int newfd;
    struct sockaddr_storage remoteaddr;
    session_t *sessionp;
    size_t *writersp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
} accept_params_t;

typedef struct disc_params {
    int fd;
    session_t *sessionp;
    size_t *writersp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
} disc_params_t;

typedef struct send_params {
    int fd;
    ssize_t num_bytes;
    char buf[BUFLEN];
    session_t *sessionp;
} params_t;
} send_params_t;

static inline void get_err_str(int errnum, char *err_str);
static inline void *get_in_addr(struct sockaddr *sa);
static void accept_client(int tid, /* session_t */ void *arg);
static void recv_and_send(int tid, /* params_t */ void *arg);
static void client_accept(int tid, void *arg);
static void client_disconnect(int tid, void *arg);
static void client_send(int tid, void *arg);

void
process_events(tpool_t *tpoolp, session_t *sessionp)
process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
{
    size_t i;
    params_t *paramsp;

    for (i = 0; sessionp->num_events && i < sessionp->fd_count; i++) {
        if ( !(sessionp->pfds[i].revents & POLLIN) ) continue;

        if ( i ) {
            paramsp = malloc(sizeof(params_t));
            paramsp->fd = sessionp->pfds[i].fd;
            paramsp->sessionp = sessionp;
            tpool_add_work(tpoolp, recv_and_send, paramsp);
        } else
            tpool_add_work(tpoolp, accept_client, sessionp);
    size_t i, writers;
    ssize_t num_bytes;
    int *fds, *fdp, listener, newfd;
    struct sockaddr_storage remoteaddr;
    char buf[BUFLEN] = "";
    socklen_t addrlen;
    pthread_mutex_t writers_mutex;
    pthread_cond_t writers_done;
    accept_params_t *accept_paramsp;
    disc_params_t *disc_paramsp;
    send_params_t *send_paramsp;

    fds = calloc(sessionp->fd_count + 1, sizeof(int));
    listener = sessionp->pfds[0].fd;
    addrlen = sizeof remoteaddr;
    writers = 0;
    pthread_mutex_init(&writers_mutex, NULL);
    pthread_cond_init(&writers_done, NULL);

    /* Collect the `fds` that we need to `recv()` or `accept()` on so
     * that we can give a window of time where we do not impose a read
     * lock and let the worker threads mutating `session` a chance to
     * complete. */

    for (i = 0, fdp = fds; num_events && i < sessionp->fd_count; i++) {
        if ( sessionp->pfds[i].revents & POLLIN ) {
            *fdp++ = sessionp->pfds[i].fd;
            num_events--;
        }
    }

    pthread_rwlock_unlock(&sessionp->lock);

    /* for each `fd` in `fds`, we need to do a `recv()` unless it's the
     * `listener` for which we `accept()` a new client. If the thread
     * pool's work queue is full, `tpool_add_work()` shall block while
     * adding new work. Since we do not impose a read lock on `session`
     * at this window of time, even if every queued work require a
     * write lock on `session` can proceed and drain the queue and
     * allow new work to be added. */

    for (fdp = fds; *fdp; fdp++) {
        if ( *fdp == listener ) {
            newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                                         &addrlen);
            accept_paramsp = malloc(sizeof(accept_params_t));
            accept_paramsp->newfd = newfd;
            accept_paramsp->remoteaddr = remoteaddr;
            accept_paramsp->sessionp = sessionp;
            accept_paramsp->writersp = &writers;
            accept_paramsp->writers_mutexp = &writers_mutex;
            accept_paramsp->writers_donep = &writers_done;

            tpool_add_work(tpoolp, client_accept, accept_paramsp);

            pthread_mutex_lock(&writers_mutex);
            writers++;
            pthread_mutex_unlock(&writers_mutex);
        } else {
            num_bytes = recv(*fdp, buf, BUFLEN, 0);
            if ( !num_bytes ) {
                disc_paramsp = malloc(sizeof(disc_params_t));
                disc_paramsp->fd = *fdp;
                disc_paramsp->sessionp = sessionp;
                disc_paramsp->writersp = &writers;
                disc_paramsp->writers_mutexp = &writers_mutex;
                disc_paramsp->writers_donep = &writers_done;

                tpool_add_work(tpoolp, client_disconnect, disc_paramsp);

                pthread_mutex_lock(&writers_mutex);
                writers++;
                pthread_mutex_unlock(&writers_mutex);
            } else if ( num_bytes > 0 ) {
                send_paramsp = malloc(sizeof(send_params_t));
                send_paramsp->fd = *fdp;
                send_paramsp->num_bytes = num_bytes;
                strncpy(send_paramsp->buf, buf, num_bytes);
                send_paramsp->sessionp = sessionp;
                /* TODO: `/quit` should kill the client as well */
                tpool_add_work(tpoolp, client_send, send_paramsp);
            } else {
                /* TODO: Handle error */
            }
        }
    }

    free(fds);

    /* This felt like a hack during implementation, but with my current
     * knowledge I couldn't find a better way to wait for all the work
     * in the queue which mutate `session` to finish before we went
     * back to polling again after attaining a read lock on `session`.
     * Honestly, in my minimal testing, this worked like a charm. */
    pthread_mutex_lock(&writers_mutex);
    while ( writers )
        pthread_cond_wait(&writers_done, &writers_mutex);
    pthread_mutex_unlock(&writers_mutex);
    /* TODO: We might wanna package these three things into a struct
     *       someday. */

    pthread_rwlock_rdlock(&sessionp->lock);
}

static inline void


@@ 90,52 199,50 @@ get_in_addr(struct sockaddr *sa)
}

static void
accept_client(int thread_id, void *arg)
client_accept(int thread_id, void *arg)
{
    /* TODO: We need the thread id and append that to the beginning of
     *       every message we print here. */

    int listener, newfd;
    size_t *writersp;
    int newfd;
    char err_str[STRERROR_BUFLEN];
    socklen_t addrlen;
    struct sockaddr_storage remoteaddr;
    session_t *sessionp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;

    /* INET6_ADDRSTRLEN > INET_ADDRSTRLEN and hence big enough to fit
     * an IPv4 address as well. */
    char remoteIP[INET6_ADDRSTRLEN] = "";

    sessionp = arg;
    addrlen = sizeof remoteaddr;

    pthread_mutex_lock(&sessionp->session_mutex);
    newfd = ((accept_params_t *) arg)->newfd;
    remoteaddr = ((accept_params_t *) arg)->remoteaddr;
    sessionp = ((accept_params_t *) arg)->sessionp;
    writersp = ((accept_params_t *) arg)->writersp;
    writers_mutexp = ((accept_params_t *) arg)->writers_mutexp;
    writers_donep = ((accept_params_t *) arg)->writers_donep;

    listener = sessionp->pfds[0].fd;
    free(arg);

    if ( (newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                                       &addrlen)) == -1 ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : Failed to `accept()` new connection : %s\n",
                        err_str);
    }
    pthread_rwlock_wrlock(&sessionp->lock);

    /* Add client to `pfds` */
    add_client(newfd, sessionp);

    if ( !(--sessionp->num_events) )
        pthread_cond_signal(&sessionp->resume_polling);
    pthread_mutex_lock(writers_mutexp);
    if ( !(--(*writersp)) )
        pthread_cond_signal(writers_donep);
    pthread_mutex_unlock(writers_mutexp);

    pthread_mutex_unlock(&sessionp->session_mutex);
    pthread_rwlock_unlock(&sessionp->lock);

    /* Convert client'd IP Address to human readable text form */
    if ( !inet_ntop(remoteaddr.ss_family,
                    get_in_addr((struct sockaddr *) &remoteaddr),
                    remoteIP, INET6_ADDRSTRLEN) ) {

        char err_str[STRERROR_BUFLEN];
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : Failed to convert client's IP Address to "
                        "text form : `inet_ntop()`: %s\n", err_str);
        fprintf(stderr, " ERR : [THREAD #%02i] Failed to convert client's IP "
                        "Address to text form : `inet_ntop()`: %s\n",
                thread_id, err_str);
    }

    printf("INFO : [THREAD #%02i] New connection from client %s on socket "


@@ 143,41 250,68 @@ accept_client(int thread_id, void *arg)
}

static void
recv_and_send(int thread_id, void *arg)
client_disconnect(int thread_id, void *arg)
{
    size_t i;
    int nbytes, fd;
    char buf[BUFLEN];
    size_t i, *writersp;
    int fd, flag;
    session_t *sessionp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;

    fd = ((params_t *) arg)->fd;
    sessionp = ((params_t *) arg)->sessionp;

    pthread_mutex_lock(&sessionp->session_mutex);

    nbytes = recv(fd, buf, BUFLEN, 0);

    if ( nbytes ) {
        printf("WARN : [THREAD #%02i] Messages are still unimplemented in this"
                                    " version.\n", thread_id);
        printf("INFO : [THREAD #%02i] Read something of %i bytes off of socket"
                                    " %i.\n"
               "     :              Going to ignore all of that...\n",
               thread_id, nbytes, fd);
    } else {
        for (i = 0; i < sessionp->fd_count; i++) {
            if ( sessionp->pfds[i].fd == fd ) {
                remove_client(i, sessionp);
                printf("INFO : [THREAD #%02i] Socket %i disconnected\n",
                       thread_id, fd);
            }
    fd = ((disc_params_t *) arg)->fd;
    sessionp = ((disc_params_t *) arg)->sessionp;
    writersp = ((disc_params_t *) arg)->writersp;
    writers_mutexp = ((disc_params_t *) arg)->writers_mutexp;
    writers_donep = ((disc_params_t *) arg)->writers_donep;

    free(arg);

    flag = 0;

    pthread_rwlock_wrlock(&sessionp->lock);
    for (i = 0; i < sessionp->fd_count; i++) {
        if ( sessionp->pfds[i].fd == fd ) {
            close(fd);
            remove_client(i, sessionp);
            flag = 1;
        }
    }

    if ( !(--sessionp->num_events) )
        pthread_cond_signal(&sessionp->resume_polling);
    pthread_mutex_lock(writers_mutexp);
    if ( !(--(*writersp)) )
        pthread_cond_signal(writers_donep);
    pthread_mutex_unlock(writers_mutexp);

    pthread_mutex_unlock(&sessionp->session_mutex);
    pthread_rwlock_unlock(&sessionp->lock);

    if ( flag )
        printf("INFO : [THREAD #%02i] Socket %i disconnected\n",
               thread_id, fd);
}

static void
client_send(int thread_id, void *arg)
{
    ssize_t num_bytes;
    int fd;
    char buf[BUFLEN] = "";
    session_t *sessionp;

    fd = ((send_params_t *) arg)->fd;
    num_bytes = ((send_params_t *) arg)->num_bytes;
    strncpy(buf, ((send_params_t *) arg)->buf, num_bytes);
    sessionp = ((send_params_t *) arg)->sessionp;

    free(arg);

    pthread_rwlock_rdlock(&sessionp->lock);
    /* TODO: We aren't reading shit, the read lock is useless. */

    printf("WARN : [THREAD #%02i] Messages are still unimplemented in this"
                                " version.\n", thread_id);
    printf("INFO : [THREAD #%02i] Read something of %lu bytes off of socket"
                                " %i.\n"
           "     :              Going to ignore all of that...\n",
           thread_id, num_bytes, fd);
    pthread_rwlock_unlock(&sessionp->lock);
}

M events.h => events.h +1 -1
@@ 7,6 7,6 @@
/* In glibc-2.7, the longest error message string is 50 characters */
# define STRERROR_BUFLEN 128

void process_events(tpool_t *, session_t *sessionp);
void process_events(int num_events, tpool_t *, session_t *sessionp);

#endif  /* __EVENTS_H_ */

M main.c => main.c +16 -31
@@ 37,8 37,8 @@
#include "tpool.h"
#include "events.h"

#define NUM_THREADS 16
#define MAX_QUEUE_SZ 4096
#define NUM_THREADS 2
#define MAX_QUEUE_SZ 4
#define MAX_ARGS 4
#define BACKLOG 10



@@ 236,7 236,7 @@ print_usage_ret_error:

int main(int argc, char **argv)
{
    int listener, newfd, args_parsed;
    int listener, newfd, args_parsed, num_events;
    char buf[BUFLEN], remoteIP[INET6_ADDRSTRLEN];
    struct sockaddr_storage remoteaddr;
    prefs_t prefs;


@@ 244,7 244,8 @@ int main(int argc, char **argv)
    session_t session;
    tpool_t tpool;

    /* Set up signal handler for ^C */
    /* Set up signal handler for ^C (SIGINT)
     * TODO: We should also handle ^\ (SIGQUIT) ? */
    struct sigaction sa = { 0 };
    sa.sa_handler = ctrl_c_handler;
    sigemptyset(&sa.sa_mask);  /* No need to block any other signals


@@ 283,20 284,13 @@ int main(int argc, char **argv)

    printf("INFO : Listening for connections on port %s\n", prefs.port);

    /* Now we gotta `poll()` and take care of everything once `poll()` returns
     * and then go back to polling again. */

    /* TODO: This should be a read lock actually */
    if ( pthread_mutex_lock(&session.session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_lock()` failed\n");
        goto cleanup;
    }
    /* Now we gotta keep `poll()`ing and take care of everything
     * everytime `poll()` returns until the server receives a SIGINT.
     */

    pthread_rwlock_rdlock(&session.lock);
    while ( 1 ) {
        if ( (session.num_events =
              poll(session.pfds, session.fd_count, -1)) == -1 ) {

        if ( (num_events = poll(session.pfds, session.fd_count, -1)) == -1 ) {
            if ( errno == EINTR ) {
                /* We got Ctrl-C'ed, we need to gracefully release
                 * any allocated memory in heap and then exit */


@@ 309,15 303,11 @@ int main(int argc, char **argv)
            return EXIT_FAILURE;
        }

        process_events(&tpool, &session);

        while ( session.num_events ) {
            if ( pthread_cond_wait(&session.resume_polling,
                                   &session.session_mutex) ) {
                fprintf(stderr, "`pthread_cond_wait()` failed!\n");
                goto cleanup;
            }
        }
        printf("INFO : Handling events...\n");
        /* `process_events` lets go of the read lock briefly in order
         * to add work to queue and then locks it back again. */
        process_events(num_events, &tpool, &session);
        printf("INFO : All events handled!\n");

        continue;   /* Everything below this are progressively being
                     * shifted to `events.c`. */


@@ 441,11 431,7 @@ int main(int argc, char **argv)
        }
    }

    if ( pthread_mutex_unlock(&session.session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_unlock()` failed\n");
        goto cleanup;
    }
    pthread_rwlock_unlock(&session.lock);

    /* ^C breaks `poll()` with EINTR which then we use to break the
     * outer infinite while(1) loop and hence we can finally reach this


@@ 453,7 439,6 @@ int main(int argc, char **argv)

    printf("INFO : Gracefully shutting down...\n");

cleanup:
    printf("INFO : Destroying thread pool...\n");
    tpool_destroy(&tpool, 1 /* wait until every worker thread joins */);


M session.c => session.c +2 -4
@@ 58,9 58,7 @@ session_init(session_t *sessionp, int listener)
    sessionp->pfds[0].events = POLLIN;  /* Detect incoming traffic */
    sessionp->fd_count++;

    sessionp->num_events = 0;
    pthread_mutex_init(&sessionp->session_mutex, NULL);
    pthread_cond_init(&sessionp->resume_polling, NULL);
    pthread_rwlock_init(&sessionp->lock, NULL);

    return 0;
}


@@ 92,7 90,6 @@ add_client(int cfd, session_t *sessionp)

realloc_failed:
    fprintf(stderr, " ERR : `realloc()` failed! Did we run out of memory?\n");
ret_err:
    return -1;
}



@@ 122,6 119,7 @@ remove_client(size_t index, session_t *sessionp)
void
session_destroy(session_t *sessionp)
{
    pthread_rwlock_destroy(&sessionp->lock);
    free(sessionp->pfds);
    free(sessionp->clients);
}

M session.h => session.h +2 -5
@@ 1,7 1,7 @@
#ifndef _SESSION_H
# define _SESSION_H

# include "pthread.h"
# include <pthread.h>

# include "common.h"



@@ 14,10 14,7 @@ typedef struct session {
    size_t fd_size, fd_count;
    struct pollfd *pfds;
    client_t *clients;
    pthread_mutex_t session_mutex;  /* TODO: Shouldn't this need
                                     *       to be an rw lock */
    pthread_cond_t resume_polling;
    int num_events;
    pthread_rwlock_t lock;
} session_t;

int session_init(session_t *, int listener);