~aritra1911/mc_server

479f44349505d30f0a0b7e4ce681c1c99f4a5ce7 — Aritra Sarkar 1 year, 11 months ago 6daf59e
Somewhat broken implementation of client_send

We need to redesign everything again it seems
4 files changed, 107 insertions(+), 40 deletions(-)

M events.c
M main.c
M response.c
M response.h
M events.c => events.c +100 -34
@@ 32,6 32,7 @@
#include "events.h"
#include "session.h"
#include "tpool.h"
#include "response.h"

typedef struct accept_params {
    int newfd;


@@ 53,8 54,12 @@ typedef struct disc_params {
typedef struct send_params {
    int fd;
    ssize_t num_bytes;
    size_t *writersp;
    char buf[BUFLEN];
    session_t *sessionp;
    tpool_t *tpoolp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
} send_params_t;

static inline void get_err_str(int errnum, char *err_str);


@@ 75,7 80,6 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
    pthread_mutex_t writers_mutex;
    pthread_cond_t writers_done;
    accept_params_t *accept_paramsp;
    disc_params_t *disc_paramsp;
    send_params_t *send_paramsp;

    fds = calloc(sessionp->fd_count + 1, sizeof(int));


@@ 87,9 91,8 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)

    /* Collect the `fds` that we need to `recv()` or `accept()` on so
     * that we can give a window of time where we do not impose a read
     * lock and let the worker threads mutating `session` a chance to
     * complete. */

     * lock on `session` and let the worker threads mutating `session`
     * a chance to finish. */
    for (i = 0, fdp = fds; num_events && i < sessionp->fd_count; i++) {
        if ( sessionp->pfds[i].revents & POLLIN ) {
            *fdp++ = sessionp->pfds[i].fd;


@@ 109,6 112,8 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)

    for (fdp = fds; *fdp; fdp++) {
        if ( *fdp == listener ) {
            /* TODO: `accept()` can be called inside of
             *       `client_accept()`. */
            newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                                         &addrlen);
            accept_paramsp = malloc(sizeof(accept_params_t));


@@ 119,33 124,23 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
            accept_paramsp->writers_mutexp = &writers_mutex;
            accept_paramsp->writers_donep = &writers_done;

            tpool_add_work(tpoolp, client_accept, accept_paramsp);

            pthread_mutex_lock(&writers_mutex);
            writers++;
            pthread_mutex_unlock(&writers_mutex);

            tpool_add_work(tpoolp, client_accept, accept_paramsp);
        } else {
            num_bytes = recv(*fdp, buf, BUFLEN, 0);
            if ( !num_bytes ) {
                disc_paramsp = malloc(sizeof(disc_params_t));
                disc_paramsp->fd = *fdp;
                disc_paramsp->sessionp = sessionp;
                disc_paramsp->writersp = &writers;
                disc_paramsp->writers_mutexp = &writers_mutex;
                disc_paramsp->writers_donep = &writers_done;

                tpool_add_work(tpoolp, client_disconnect, disc_paramsp);

                pthread_mutex_lock(&writers_mutex);
                writers++;
                pthread_mutex_unlock(&writers_mutex);
            } else if ( num_bytes > 0 ) {
            if ( num_bytes >= 0 ) {
                send_paramsp = malloc(sizeof(send_params_t));
                send_paramsp->fd = *fdp;
                send_paramsp->num_bytes = num_bytes;
                strncpy(send_paramsp->buf, buf, num_bytes);
                memcpy(send_paramsp->buf, buf, num_bytes);
                send_paramsp->sessionp = sessionp;
                /* TODO: `/quit` should kill the client as well */
                send_paramsp->tpoolp = tpoolp;
                send_paramsp->writersp = &writers;
                send_paramsp->writers_mutexp = &writers_mutex;
                send_paramsp->writers_donep = &writers_done;
                tpool_add_work(tpoolp, client_send, send_paramsp);
            } else {
                /* TODO: Handle error */


@@ 160,10 155,12 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
     * in the queue which mutate `session` to finish before we went
     * back to polling again after attaining a read lock on `session`.
     * Honestly, in my minimal testing, this worked like a charm. */

    pthread_mutex_lock(&writers_mutex);
    while ( writers )
        pthread_cond_wait(&writers_done, &writers_mutex);
    pthread_mutex_unlock(&writers_mutex);

    /* TODO: We might wanna package these three things into a struct
     *       someday. */



@@ 201,8 198,8 @@ get_in_addr(struct sockaddr *sa)
static void
client_accept(int thread_id, void *arg)
{
    size_t *writersp;
    int newfd;
    size_t *writersp;
    char err_str[STRERROR_BUFLEN];
    struct sockaddr_storage remoteaddr;
    session_t *sessionp;


@@ 292,26 289,95 @@ client_disconnect(int thread_id, void *arg)
static void
client_send(int thread_id, void *arg)
{
    (void) thread_id;

    int fd, *fds, *fdp;
    ssize_t num_bytes;
    int fd;
    char buf[BUFLEN] = "";
    size_t idx, i, *writersp;
    char buf[BUFLEN] = "", nick[NICKLEN] = "";
    session_t *sessionp;
    tpool_t *tpoolp;
    pthread_mutex_t *writers_mutexp;
    pthread_cond_t *writers_donep;
    disc_params_t *disc_paramsp;
    ResponseError err_code;

    fd = ((send_params_t *) arg)->fd;
    num_bytes = ((send_params_t *) arg)->num_bytes;
    strncpy(buf, ((send_params_t *) arg)->buf, num_bytes);
    memcpy(buf, ((send_params_t *) arg)->buf, num_bytes);
    sessionp = ((send_params_t *) arg)->sessionp;
    tpoolp = ((send_params_t *) arg)->tpoolp;
    writersp = ((send_params_t *) arg)->writersp;
    writers_mutexp = ((send_params_t *) arg)->writers_mutexp;
    writers_donep = ((send_params_t *) arg)->writers_donep;

    free(arg);

    fds = calloc(sessionp->fd_count + 1, sizeof(int));
    for (i = 1 /* avoid listener */, fdp = fds; i < sessionp->fd_count; i++) {
        *fdp++ = sessionp->pfds[i].fd;
        if ( sessionp->pfds[i].fd == fd ) {
            strcpy(nick, sessionp->clients[i].nick);
            idx = i;
        }
    }

    /* TODO: Fix the write lock, it shouldn't alway be getting a write lock, but
     *       only in case of a disconnect, /quit or /setnick. Also we should be
     *       able to handle this decision in process_events() */

    //pthread_rwlock_wrlock(&sessionp->lock);
    pthread_rwlock_rdlock(&sessionp->lock);
    /* TODO: We aren't reading shit, the read lock is useless. */

    printf("WARN : [THREAD #%02i] Messages are still unimplemented in this"
                                " version.\n", thread_id);
    printf("INFO : [THREAD #%02i] Read something of %lu bytes off of socket"
                                " %i.\n"
           "     :              Going to ignore all of that...\n",
           thread_id, num_bytes, fd);
    /* TODO: `respond()` musn't mutate client nick */
    if ( num_bytes ) {
        if ( respond(&sessionp->clients[idx], sessionp->clients,
                     sessionp->fd_count, buf, &num_bytes,
                     &err_code) == -1 ) {
            buf[0] = RES_ERROR;
            buf[1] = err_code;
            num_bytes = 2;
        }
    } else {
        if ( *nick ) {
            sprintf(buf + 1, "%s connection closed", nick);
            buf[0] = RES_QUIT;
            num_bytes = 1 + strlen(buf + 1);
        }
    }
    pthread_rwlock_unlock(&sessionp->lock);

    /* send confirmation earlier since it's going to be killed soon */
    if ( *nick && buf[0] == RES_QUIT )
        send(fd, buf, num_bytes, 0);

    if ( !num_bytes || buf[0] == RES_QUIT ) {
        disc_paramsp = malloc(sizeof(disc_params_t));

        /* Send off a worker to actually call `remove_client()` while
         * we release the read lock and announce the disconnection to
         * others. */

        disc_paramsp->fd = fd;
        disc_paramsp->sessionp = sessionp;
        disc_paramsp->writersp = writersp;
        disc_paramsp->writers_mutexp = writers_mutexp;
        disc_paramsp->writers_donep = writers_donep;

        pthread_mutex_lock(writers_mutexp);
        (*writersp)++;
        pthread_mutex_unlock(writers_mutexp);

        tpool_add_work(tpoolp, client_disconnect, disc_paramsp);
    }

    if ( buf[0] == RES_ERROR )
        send(fd, buf, num_bytes, 0);
    else {
        for (fdp = fds; *fdp; fdp++) {
            if ( !(buf[0] == RES_QUIT && *fdp == fd) )
                send(*fdp, buf, num_bytes, 0);
        }
    }

    free(fds);
}

M main.c => main.c +2 -2
@@ 37,8 37,8 @@
#include "tpool.h"
#include "events.h"

#define NUM_THREADS 2
#define MAX_QUEUE_SZ 4
#define NUM_THREADS 16
#define MAX_QUEUE_SZ 32
#define MAX_ARGS 4
#define BACKLOG 10


M response.c => response.c +3 -3
@@ 27,7 27,7 @@
#include "session.h"

static inline int append_nick(ResponseCode res_code, const char *nick,
                              char *buf, int *len, ResponseError *err_code)
                              char *buf, ssize_t *len, ResponseError *err_code)
{
    if ( !*nick ) {
        if ( err_code )


@@ 65,7 65,7 @@ static inline int nick_exists(const char *nick, const client_t *client,

static inline int set_nick(SendCode snd_code, client_t *client,
                           const client_t *clients_list, size_t list_size,
                           char *buf, int *len, ResponseError *err_code)
                           char *buf, ssize_t *len, ResponseError *err_code)
{

    char nick[NICKLEN] = "";


@@ 99,7 99,7 @@ static inline int set_nick(SendCode snd_code, client_t *client,
}

int respond(client_t *client, const client_t *clients_list, size_t list_size,
            char *buf, int *len, ResponseError *err_code)
            char *buf, ssize_t *len, ResponseError *err_code)
{
    /* Extract the first byte as the send code */
    SendCode snd_code = buf[0];

M response.h => response.h +2 -1
@@ 4,6 4,7 @@
# include "common.h"
# include "session.h"

int respond(client_t *, const client_t *, size_t, char *, int *, ResponseError *);
int respond(client_t *client, const client_t *clients_list,
            size_t list_size, char *buf, ssize_t *len, ResponseError *);

#endif  /* _RESPONSE_H */