~aritra1911/mc_server

554b567039efe8798f14c488fa43443213a9bbca — Aritra Sarkar 1 year, 11 months ago 1f3a13b
Get rid of `poll_mutex`
4 files changed, 42 insertions(+), 74 deletions(-)

M events.c
M main.c
M session.c
M session.h
M events.c => events.c +34 -34
@@ 33,14 33,33 @@
#include "tpool.h"

typedef struct params {
    size_t i;
    size_t fd;
    session_t *sessionp;
} params_t;

static inline void get_err_str(int errnum, char *err_str);
static inline void *get_in_addr(struct sockaddr *sa);
static void accept_client(int tid, void *arg);
static void recv_and_send(int tid, void *arg);
static void accept_client(int tid, /* session_t */ void *arg);
static void recv_and_send(int tid, /* params_t */ void *arg);

void
process_events(tpool_t *tpoolp, session_t *sessionp)
{
    size_t i;
    params_t *paramsp;

    for (i = 0; sessionp->num_events && i < sessionp->fd_count; i++) {
        if ( !(sessionp->pfds[i].revents & POLLIN) ) continue;

        if ( i ) {
            paramsp = malloc(sizeof(params_t));
            paramsp->fd = sessionp->pfds[i].fd;
            paramsp->sessionp = sessionp;
            tpool_add_work(tpoolp, recv_and_send, paramsp);
        } else
            tpool_add_work(tpoolp, accept_client, sessionp);
    }
}

static inline void
get_err_str(int errnum, char *err_str)


@@ 58,24 77,6 @@ get_err_str(int errnum, char *err_str)
        sprintf(err_str, "Error number %i", errnum);
}

void
process_events(tpool_t *tpoolp, session_t *sessionp)
{
    size_t i = 0;

    for ( ; sessionp->num_events && i < sessionp->fd_count; i++ ) {
        if ( !(sessionp->pfds[i].revents & POLLIN) ) continue;

        if ( i ) {
            params_t *paramsp = malloc(sizeof(params_t));
            paramsp->i = i;
            paramsp->sessionp = sessionp;
            tpool_add_work(tpoolp, recv_and_send, paramsp);
        } else
            tpool_add_work(tpoolp, accept_client, sessionp);
    }
}

static inline void *
get_in_addr(struct sockaddr *sa)
{


@@ 107,7 108,7 @@ accept_client(int thread_id, void *arg)
    sessionp = arg;
    addrlen = sizeof remoteaddr;

    pthread_mutex_lock(&sessionp->poll_mutex);
    pthread_mutex_lock(&sessionp->session_mutex);

    listener = sessionp->pfds[0].fd;



@@ 124,7 125,7 @@ accept_client(int thread_id, void *arg)
    if ( !(--sessionp->num_events) )
        pthread_cond_signal(&sessionp->resume_polling);

    pthread_mutex_unlock(&sessionp->poll_mutex);
    pthread_mutex_unlock(&sessionp->session_mutex);

    /* Convert client'd IP Address to human readable text form */
    if ( !inet_ntop(remoteaddr.ss_family,


@@ 135,10 136,6 @@ accept_client(int thread_id, void *arg)
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : Failed to convert client's IP Address to "
                        "text form : `inet_ntop()`: %s\n", err_str);

        /* Okay we might not need to abort if we fail to convert
         * `remoteaddr` to text, but we're being strict here. */
        return;
    }

    printf("INFO : [THREAD #%02i] New connection from client %s on socket "


@@ 153,12 150,11 @@ recv_and_send(int thread_id, void *arg)
    char buf[BUFLEN];
    session_t *sessionp;

    i = ((params_t *) arg)->i;
    fd = ((params_t *) arg)->fd;
    sessionp = ((params_t *) arg)->sessionp;

    pthread_mutex_lock(&sessionp->poll_mutex);
    pthread_mutex_lock(&sessionp->session_mutex);

    fd = sessionp->pfds[i].fd;
    nbytes = recv(fd, buf, BUFLEN, 0);

    if ( nbytes ) {


@@ 169,15 165,19 @@ recv_and_send(int thread_id, void *arg)
               "     :              Going to ignore all of that...\n",
               thread_id, nbytes, fd);
    } else {
        remove_client(i, sessionp);
        printf("INFO : [THREAD #%02i] Socket %i disconnected\n",
               thread_id, fd);
        for (i = 0; i < sessionp->fd_count; i++) {
            if ( sessionp->pfds[i].fd == fd ) {
                remove_client(i, sessionp);
                printf("INFO : [THREAD #%02i] Socket %i disconnected\n",
                       thread_id, fd);
            }
        }
    }

    if ( !(--sessionp->num_events) )
        pthread_cond_signal(&sessionp->resume_polling);

    pthread_mutex_unlock(&sessionp->poll_mutex);
    pthread_mutex_unlock(&sessionp->session_mutex);

    free(arg);
}

M main.c => main.c +6 -20
@@ 38,7 38,7 @@
#include "events.h"

#define NUM_THREADS 16
#define MAX_QUEUE_SZ 32
#define MAX_QUEUE_SZ 4096
#define MAX_ARGS 4
#define BACKLOG 10



@@ 284,24 284,16 @@ int main(int argc, char **argv)
    printf("INFO : Listening for connections on port %s\n", prefs.port);

    /* Now we gotta `poll()` and take care of everything once `poll()` returns
     * and then go back to polling again.
     *
     * TODO: Threadify the `taking care of everything' part.
     */
     * and then go back to polling again. */

    if ( pthread_mutex_lock(&session.poll_mutex) ) {
    /* TODO: This should be a read lock actually */
    if ( pthread_mutex_lock(&session.session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_lock()` failed\n");
        goto cleanup;
    }

    while ( 1 ) {
        if ( pthread_mutex_lock(&session.session_mutex) ) {
            /* TODO: `strerror_r(errno)` */
            fprintf(stderr, "`pthread_mutex_lock()` failed\n");
            goto cleanup;
        }

        if ( (session.num_events =
              poll(session.pfds, session.fd_count, -1)) == -1 ) {



@@ 317,17 309,11 @@ int main(int argc, char **argv)
            return EXIT_FAILURE;
        }

        if ( pthread_mutex_unlock(&session.session_mutex) ) {
            /* TODO: `strerror_r(errno)` */
            fprintf(stderr, "`pthread_mutex_unlock()` failed\n");
            goto cleanup;
        }

        process_events(&tpool, &session);

        while ( session.num_events ) {
            if ( pthread_cond_wait(&session.resume_polling,
                                   &session.poll_mutex) ) {
                                   &session.session_mutex) ) {
                fprintf(stderr, "`pthread_cond_wait()` failed!\n");
                goto cleanup;
            }


@@ 455,7 441,7 @@ int main(int argc, char **argv)
        }
    }

    if ( pthread_mutex_unlock(&session.poll_mutex) ) {
    if ( pthread_mutex_unlock(&session.session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_unlock()` failed\n");
        goto cleanup;

M session.c => session.c +1 -18
@@ 60,7 60,6 @@ session_init(session_t *sessionp, int listener)

    sessionp->num_events = 0;
    pthread_mutex_init(&sessionp->session_mutex, NULL);
    pthread_mutex_init(&sessionp->poll_mutex, NULL);
    pthread_cond_init(&sessionp->resume_polling, NULL);

    return 0;


@@ 69,12 68,6 @@ session_init(session_t *sessionp, int listener)
int
add_client(int cfd, session_t *sessionp)
{
    if ( pthread_mutex_lock(&sessionp->session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_lock()` failed\n");
        goto ret_err;
    }

    if ( sessionp->fd_count == sessionp->fd_size ) {
        /* We've run out of room, double `fd_size` and `realloc()` */
        sessionp->fd_size *= 2;


@@ 95,12 88,6 @@ add_client(int cfd, session_t *sessionp)
    sessionp->clients[sessionp->fd_count].nick[0] = '\0';
    sessionp->fd_count++;

    if ( pthread_mutex_unlock(&sessionp->session_mutex) ) {
        /* TODO: `strerror_r(errno)` */
        fprintf(stderr, "`pthread_mutex_unlock()` failed!\n");
        goto ret_err;
    }

    return 0;

realloc_failed:


@@ 112,8 99,6 @@ ret_err:
int
remove_client(size_t index, session_t *sessionp)
{
    pthread_mutex_lock(&sessionp->session_mutex);

    /* We remove a client by copying over details of the last client
     * in the queue over the client we want to remove, thus
     * overwriting the details of the client we want to remove.


@@ 123,7 108,7 @@ remove_client(size_t index, session_t *sessionp)
     * by one and it will be overwritten by the next connecting
     * client. */

    if ( --sessionp->fd_count == index ) {
    if ( --sessionp->fd_count != index ) {
        sessionp->pfds[index] = sessionp->pfds[sessionp->fd_count];

        /* Do the same for nick */


@@ 131,8 116,6 @@ remove_client(size_t index, session_t *sessionp)
               sessionp->clients[sessionp->fd_count].nick);
    }

    pthread_mutex_unlock(&sessionp->session_mutex);

    return 0;
}


M session.h => session.h +1 -2
@@ 14,9 14,8 @@ typedef struct session {
    size_t fd_size, fd_count;
    struct pollfd *pfds;
    client_t *clients;
    pthread_mutex_t session_mutex,  /* TODO: Shouldn't this need
    pthread_mutex_t session_mutex;  /* TODO: Shouldn't this need
                                     *       to be an rw lock */
                    poll_mutex;
    pthread_cond_t resume_polling;
    int num_events;
} session_t;