~aritra1911/mc_server

1b9ea615fcc257d84bf851c197ef1aa0b6a7d678 — Aritra Sarkar 2 years ago f781c00
Move helpers functions into their own file and beautify some error reporting and some clean ups
9 files changed, 245 insertions(+), 277 deletions(-)

M Makefile
M events.c
M events.h
A helpers.c
A helpers.h
M main.c
M session.c
M session.h
M tpool.c
M Makefile => Makefile +1 -1
@@ 5,7 5,7 @@ CFLAGS=	-Wall -Wextra -pedantic -Werror-implicit-function-declaration \
CPPFLAGS = -D_POSIX_C_SOURCE=200809L
LDFLAGS = $(LIBS)

SRCS = main.c session.c response.c tpool.c events.c
SRCS = main.c session.c response.c tpool.c events.c helpers.c
OBJS = $(SRCS:.c=.o)

.PHONY: all clean

M events.c => events.c +2 -31
@@ 29,6 29,7 @@
#include <netinet/in.h>

#include "common.h"
#include "helpers.h"
#include "events.h"
#include "session.h"
#include "tpool.h"


@@ 52,8 53,6 @@ typedef struct ras_params {
    writers_t *writersp;
} ras_params_t;

static void get_err_str(int errnum, char *err_str);
static void *get_in_addr(struct sockaddr *sa);
static int writers_init(writers_t *);
static int writers_increment(writers_t *);
static int writers_decrement(writers_t *);


@@ 102,7 101,7 @@ writers_wait(writers_t *writersp)
static int
writers_destroy(writers_t *writersp)
{
    writersp->count = 0xDEAD4BAD;
    writersp->count = 0xDEAD4BAD;   /* prime */
    pthread_mutex_destroy(&writersp->mutex);
    pthread_cond_destroy(&writersp->done);
    return 0;


@@ 182,34 181,6 @@ process_events(int num_events, tpool_t *tpoolp, session_t *sessionp)
}

static void
get_err_str(int errnum, char *err_str)
{
    /* Use `strerror_r()` as a(n) MT-Safe way of retrieving error
     * description corresponding to an `errno` */

    char buf[STRERROR_BUFLEN];
    if ( !strerror_r(errnum, buf, STRERROR_BUFLEN) )
        strcpy(err_str, buf);
    else
        /* In such rare unfortunate cases when the error reporting
         * tool itself causes an error, dump the error number and let
         * the users figure it out. */
        sprintf(err_str, "Error number %i", errnum);
}

static void *
get_in_addr(struct sockaddr *sa)
{
    if ( sa->sa_family == AF_INET ) {
        /* IPv4 */
        return &(((struct sockaddr_in *) sa)->sin_addr);
    }

    /* IPv6 */
    return &(((struct sockaddr_in6 *) sa)->sin6_addr);
}

static void
accept_client(int thread_id, void *arg)
{
    int listener, newfd;

M events.h => events.h +0 -3
@@ 4,9 4,6 @@
# include "session.h"
# include "tpool.h"

/* In glibc-2.7, the longest error message string is 50 characters */
# define STRERROR_BUFLEN 128

void process_events(int num_events, tpool_t *, session_t *sessionp);

#endif  /* __EVENTS_H_ */

A helpers.c => helpers.c +33 -0
@@ 0,0 1,33 @@
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>

#include "helpers.h"

void *
get_in_addr(struct sockaddr *sa)
{
    if ( sa->sa_family == AF_INET ) {
        /* IPv4 */
        return &(((struct sockaddr_in *) sa)->sin_addr);
    }

    /* IPv6 */
    return &(((struct sockaddr_in6 *) sa)->sin6_addr);
}

void
get_err_str(int errnum, char *err_str)
{
    /* Use `strerror_r()` as a(n) MT-Safe way of retrieving error
     * description corresponding to an `errno` */

    char buf[STRERROR_BUFLEN];
    if ( !strerror_r(errnum, buf, STRERROR_BUFLEN) )
        strcpy(err_str, buf);
    else
        /* In such rare unfortunate cases when the error reporting
         * tool itself causes an error, dump the error number and let
         * the users figure it out. */
        sprintf(err_str, "Error number %i", errnum);
}

A helpers.h => helpers.h +12 -0
@@ 0,0 1,12 @@
#ifndef __MC_HELPERS_H_
# define __MC_HELPERS_H_

# include <netinet/in.h>

/* In glibc-2.7, the longest error message string is 50 characters */
# define STRERROR_BUFLEN 128

void *get_in_addr(struct sockaddr *);
void get_err_str(int errnum, char *err_str);

#endif  /* __MC_HELPERS_H_ */

M main.c => main.c +54 -161
@@ 32,6 32,7 @@
#include <signal.h>

#include "common.h"
#include "helpers.h"
#include "response.h"
#include "session.h"
#include "tpool.h"


@@ 47,26 48,14 @@ typedef struct {
    char port[8];
} prefs_t;

static void ctrl_c_handler(int);
static inline void *get_in_addr(struct sockaddr *sa);
static void signal_handler(int);
static int get_listener(prefs_t *);
static inline void set_defaults(prefs_t *);
static int parse_prefs(int, char **, prefs_t *);
static void set_defaults(prefs_t *);
static void print_usage(const char *program);
static int parse_prefs(int argc, char **argv, prefs_t *);

static void ctrl_c_handler(int signum) {
    (void) signum;
    printf("\nINFO : Received SIGINT\n");
}

static inline void *get_in_addr(struct sockaddr *sa)
{
    if ( sa->sa_family == AF_INET ) {
        /* IPv4 */
        return &(((struct sockaddr_in *) sa)->sin_addr);
    }

    /* IPv6 */
    return &(((struct sockaddr_in6 *) sa)->sin6_addr);
static void signal_handler(int signum) {
    printf("\nINFO : Received signal %i.\n", signum);
}

static int get_listener(prefs_t *prefs)


@@ 80,6 69,7 @@ static int get_listener(prefs_t *prefs)

    struct addrinfo hints, *res, *p;
    int status, listener, yes;
    char err_str[STRERROR_BUFLEN];

    /* `memset()` `hints` to zeroes, and set a few necessary fields. This way
       `getaddrinfo()` will be able to set the other necessary fields for us */


@@ 92,13 82,15 @@ static int get_listener(prefs_t *prefs)
    yes = 1;

    if ( (status = getaddrinfo(NULL, prefs->port, &hints, &res)) == -1 ) {
        fprintf(stderr, "`getaddrinfo()` failed : %s\n", strerror(errno));
        fprintf(stderr, " ERR : %s:%i => `getaddrinfo()`: %s\n",
                __FILE__, __LINE__ - 2, gai_strerror(errno));
        return -1;  /* We'd like to exit */
    }

    /* res should now point to a linked list of addresses. In case it's not: */
    if ( !res ) {
        fprintf(stderr, "`getaddrinfo()` returned a NULL list\n");
        fprintf(stderr, " ERR : %s:%i => `getaddrinfo()` returned an empty "
                                        "list\n", __FILE__, __LINE__ - 10);
        return -1;
    }



@@ 119,7 111,10 @@ static int get_listener(prefs_t *prefs)

        /* Reuse the `listener` sockid on restart. Without this the OS might
         * complain about "address already in use" upon restarting the server */
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
        if ( setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
                        &yes, sizeof yes) == -1 ) {
            continue;
        }

        /* We now have a listening socket id. Let try `bind()`ing to it */
        if ( bind(listener, p->ai_addr, p->ai_addrlen) == -1 ) {


@@ 128,7 123,7 @@ static int get_listener(prefs_t *prefs)
        }

        /* We've got bound. No need to look at rest of the addresses in the
           list */
         * list */
        break;
    }



@@ 137,27 132,31 @@ static int get_listener(prefs_t *prefs)

    /* Maybe `p` couldn't bind to any address and ended up walking upto NULL */
    if ( !p ) {
        fprintf(stderr, "Unable to `bind()` to any address : %s\n",
                        strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => Couldn't `bind()` to anything: %s\n",
                __FILE__, __LINE__ - 17, err_str);
        return -1;
    }

    /* Now we'll start the listener and return the socket id */
    if ( listen(listener, BACKLOG) == -1 ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `listen()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        return -1;
    }

    return listener;
}

static inline void set_defaults(prefs_t *prefs)
static void set_defaults(prefs_t *prefs)
{
    prefs->addr_family = AF_UNSPEC; /* Choose either IPv4 address or
                                     * IPv6 addres, we don't care! */
    strcpy(prefs->port, PORT);
}

static inline void print_usage(const char *program)
static void print_usage(const char *program)
{
    printf("Usage:  %s [options] [port]\n"
           "Options:\n"


@@ 236,22 235,21 @@ print_usage_ret_error:

int main(int argc, char **argv)
{
    int listener, newfd, args_parsed, num_events;
    char buf[BUFLEN], remoteIP[INET6_ADDRSTRLEN];
    struct sockaddr_storage remoteaddr;
    int listener, args_parsed, num_events, exit_code;
    char err_str[STRERROR_BUFLEN];
    prefs_t prefs;
    socklen_t addrlen;
    session_t session;
    tpool_t tpool;

    /* Set up signal handler for ^C (SIGINT)
     * TODO: We should also handle ^\ (SIGQUIT) ? */
    /* Set up signal handler for ^C (SIGINT) TODO: and ^\ (SIGQUIT) */
    struct sigaction sa = { 0 };
    sa.sa_handler = ctrl_c_handler;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);  /* No need to block any other signals
                                * while the signal handler runs */
    if ( sigaction(SIGINT, &sa, NULL) == -1 ) {
        fprintf(stderr, " ERR : `sigaction()` failed : %s\n", strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `sigaction()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        return EXIT_FAILURE;
    }



@@ 276,7 274,7 @@ int main(int argc, char **argv)
    if ( !tpool_init(&tpool, NUM_THREADS  /* TODO: Accept as an arg */,
                             MAX_QUEUE_SZ, 0) ) {
        fprintf(stderr, " ERR : Failed to initialize thread pool\n");
        return EXIT_FAILURE;
        goto destroy_session_and_exit;
    }

    printf("INFO : Initialized thread pool with %lu worker threads.\n",


@@ 284,11 282,14 @@ int main(int argc, char **argv)

    printf("INFO : Listening for connections on port %s\n", prefs.port);

    exit_code = EXIT_SUCCESS;   /* Let's hope for the best */

    /* Now we gotta keep `poll()`ing and take care of everything
     * everytime `poll()` returns until the server receives a SIGINT.
     */

    pthread_rwlock_rdlock(&session.lock);

    while ( 1 ) {
        if ( (num_events = poll(session.pfds, session.fd_count, -1)) == -1 ) {
            if ( errno == EINTR ) {


@@ 299,8 300,11 @@ int main(int argc, char **argv)
                break;  /* Break the outer infinite while(1) loop */
            }

            fprintf(stderr, "`poll()` failed : %s\n", strerror(errno));
            return EXIT_FAILURE;
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `poll()`: %s\n",
                    __FILE__, __LINE__ - 11, err_str);
            exit_code = EXIT_FAILURE;
            goto destroy_tpool_and_exit;
        }

        printf("DBUG : Handling events...\n");


@@ 308,127 312,6 @@ int main(int argc, char **argv)
         * to add work to queue and then locks it back again. */
        process_events(num_events, &tpool, &session);
        printf("DBUG : All events handled!\n");

        continue;   /* Everything below this are progressively being
                     * shifted to `events.c`. */

        /* We'll first check if the listener is ready to `accept()` a new
         * connection */
        if ( session.pfds[0].revents & POLLIN ) {
            /* Obtain client's socket id */
            addrlen = sizeof remoteaddr;
            if ( (newfd = accept(listener, (struct sockaddr *) &remoteaddr,
                                           &addrlen)) == -1 ) {
                fprintf(stderr, "Failed to `accept()` new connection : %s\n",
                                strerror(errno));

                /* Go back to `poll()`ing */
                continue;
            }

            /* Add client to `pfds` */
            add_client(newfd, &session);
            printf("New connection from client %s on socket %i\n",
                   inet_ntop(remoteaddr.ss_family,
                             get_in_addr((struct sockaddr *) &remoteaddr),
                             remoteIP, INET6_ADDRSTRLEN),
                   newfd);
            continue;
        }

        /*
         * Walk through existing connections looking for the one client with
         * unread data. Of course we start from 1, i.e. skipping the listener
         */
        for ( size_t i = 1; i < session.fd_count; i++ ) {
            /* Check if pfds[i] is ready to read */
            if ( !(session.pfds[i].revents & POLLIN) ) {
                /* Client not ready to read, keep moving */
                continue;
            }

            int nbytes = recv(session.pfds[i].fd, buf, BUFLEN, 0);

            if ( nbytes <= 0 ) {
                if ( nbytes == -1 ) {
                    fprintf(stderr, "`recv()` failed : %s\n",
                                    strerror(errno));

                } else {  /* Client closed the connection */
                    /*
                     * Announce disconnection to others
                     * iff their nick is known
                     */
                    if ( session.clients[i].nick[0] ) {
                        sprintf(buf + 1, "%s connection closed",
                                         session.clients[i].nick);
                        buf[0] = RES_QUIT;  /* the response code */
                        nbytes = 1 + strlen(buf + 1);

                        for ( size_t j = 1; j < session.fd_count; j++ ) {
                            if ( j == i ) {
                                /* skip sending to disconnected client */
                                continue;
                            }

                            if ( send(session.pfds[j].fd, buf, nbytes, 0) == -1 ) {
                                fprintf(stderr, "`send()`ing to socket %i "
                                                "failed : %s\n",
                                        session.pfds[j].fd, strerror(errno));
                            }
                        }
                    }

                    printf("socket %i hung up!\n", session.pfds[i].fd);
                }

                /* Close client's socket and get rid of client */
                close(session.pfds[i].fd);
                remove_client(i, &session);

            } else {
                /*
                 * We've got data from client, send it to everyone
                 * (including the sender), except the listener
                 */
                ResponseError err_code = 0;

                /* Form a proper response to send along with a response code */
                if ( respond(&session.clients[i],
                            session.clients, (size_t) session.fd_count,
                            buf, &nbytes, &err_code) == -1 ) {

                    buf[0] = RES_ERROR;  /* Response code */
                    buf[1] = err_code;
                    nbytes = 2;

                    /* Send back these 2 bytes to sender, the client is
                     * responsible for proper error reporting to the
                     * user. */
                    if ( send(session.pfds[i].fd, buf, nbytes, 0) == -1 ) {
                        fprintf(stderr, "`send()`ing to socket %i failed : "
                                "%s\n", session.pfds[i].fd, strerror(errno));
                    }

                    continue;
                }

                for (size_t j = 1; j < session.fd_count; j++) {
                    if ( send(session.pfds[j].fd, buf, nbytes, 0) == -1 ) {
                        fprintf(stderr, "`send()`ing to socket %i failed : "
                                "%s\n", session.pfds[j].fd, strerror(errno));
                    }
                }

                if ( (ResponseCode) buf[0] == RES_QUIT ) {
                    /* Client issued '/quit' command, hence disconnect client */
                    int fd = session.pfds[i].fd;
                    close(fd);
                    remove_client(i, &session);
                    printf("killed socket %i!\n", fd);
                }
            }
        }
    }

    pthread_rwlock_unlock(&session.lock);


@@ 439,11 322,21 @@ int main(int argc, char **argv)

    printf("INFO : Gracefully shutting down...\n");

destroy_tpool_and_exit:
    printf("INFO : Destroying thread pool...\n");
    tpool_destroy(&tpool, 1 /* wait until every worker thread joins */);
    if ( tpool_destroy(&tpool, 1) == -1 ) {
        fprintf(stderr, " ERR : Failed to destroy thread pool!\n"
                        "     : Watch out for those memory leaks!\n");
        exit_code = EXIT_FAILURE;
    }

destroy_session_and_exit:
    printf("INFO : Destroying session...\n");
    session_destroy(&session);
    if ( session_destroy(&session) == -1 ) {
        fprintf(stderr, " ERR : Failed to destroy session!\n"
                        "     : Watch out for those memory leaks!\n");
        exit_code = EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
    return exit_code;
}

M session.c => session.c +2 -1
@@ 116,10 116,11 @@ remove_client(size_t index, session_t *sessionp)
    return 0;
}

void
int
session_destroy(session_t *sessionp)
{
    pthread_rwlock_destroy(&sessionp->lock);
    free(sessionp->pfds);
    free(sessionp->clients);
    return 0;
}

M session.h => session.h +1 -1
@@ 20,6 20,6 @@ typedef struct session {
int session_init(session_t *, int listener);
int add_client(int fd, session_t *);
int remove_client(size_t index, session_t *);
void session_destroy(session_t *);
int session_destroy(session_t *);

#endif

M tpool.c => tpool.c +140 -79
@@ 30,6 30,7 @@
#include <pthread.h>

#include "tpool.h"
#include "helpers.h"

static void *tpool_thread(void *tpoolp_vp);



@@ 40,6 41,7 @@ tpool_init(tpool_t  *tpoolp,
           int      do_not_block_when_full)
{
    size_t  i = 0;
    char    err_str[STRERROR_BUFLEN];

    /* initiate the fields */
    tpoolp->num_threads = num_worker_threads;


@@ 50,35 52,40 @@ tpool_init(tpool_t  *tpoolp,
    /* allocate memory for worker threads */
    if ( !(tpoolp->threads =
                malloc(num_worker_threads * sizeof(pthread_t))) ) {
        fprintf(stderr, "%s:%i => malloc: %s\n",
                __FILE__, __LINE__, strerror(errno));
        return NULL;
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `malloc()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto ret_err;
    }

    tpoolp->cur_queue_size = 0;
    tpoolp->queue_head = tpoolp->queue_tail = NULL;

    if ( pthread_mutex_init(&tpoolp->queue_lock, NULL) ) {
        fprintf(stderr, "%s:%i => pthread_mutex_init: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_init()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto cleanup;
    }

    if ( pthread_cond_init(&tpoolp->queue_not_empty, NULL) ) {
        fprintf(stderr, "%s:%i => pthread_cond_init: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_init()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto destroy_queue_lock_mutex;
    }

    if ( pthread_cond_init(&tpoolp->queue_not_full, NULL) ) {
        fprintf(stderr, "%s:%i => pthread_cond_init: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_init()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto destroy_queue_not_empty_cond;
    }

    if ( pthread_cond_init(&tpoolp->queue_empty, NULL) ) {
        fprintf(stderr, "%s:%i => pthread_cond_init: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_init()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto destroy_queue_not_full_cond;
    }



@@ 88,8 95,9 @@ tpool_init(tpool_t  *tpoolp,
                            NULL,                   /* thread attrs */
                            tpool_thread,           /* routine to call */
                            (void *) tpoolp) ) {    /* args to routine */
            fprintf(stderr, "%s:%i => pthread_create: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `pthread_create()`: %s\n",
                    __FILE__, __LINE__ - 6, err_str);
            goto destroy_queue_empty_cond;
        }
    }


@@ 97,34 105,42 @@ tpool_init(tpool_t  *tpoolp,
    return tpoolp;

destroy_queue_empty_cond:
    if ( pthread_cond_destroy(&tpoolp->queue_empty) )
        fprintf(stderr, "%s:%i => pthread_cond_destroy: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_cond_destroy(&tpoolp->queue_empty) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_destroy()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
destroy_queue_not_full_cond:
    if ( pthread_cond_destroy(&tpoolp->queue_not_full) )
        fprintf(stderr, "%s:%i => pthread_cond_destroy: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_cond_destroy(&tpoolp->queue_not_full) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_destroy()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
destroy_queue_not_empty_cond:
    if ( pthread_cond_destroy(&tpoolp->queue_not_empty) )
        fprintf(stderr, "%s:%i => pthread_cond_destroy: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_cond_destroy(&tpoolp->queue_not_empty) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_destroy()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
destroy_queue_lock_mutex:
    if ( pthread_mutex_destroy(&tpoolp->queue_lock) )
        fprintf(stderr, "%s:%i => pthread_mutex_destroy: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_mutex_destroy(&tpoolp->queue_lock) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_destroy()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
cleanup:
    free(tpoolp->threads);
ret_err:
    return NULL;
}

/* TODO: `strerror()` is not thread-safe. Use `strerror_r()` */

static void *
tpool_thread(void *tpoolp_vp)
{
    tpool_t         *tpoolp = tpoolp_vp;
    tpool_work_t    *workp;
    int             thread_id, i;
    char            err_str[STRERROR_BUFLEN];

    thread_id = -1; /* Just to silence `-Wsometimes-uninitialized`.
                     * The for loop below will always succeed for one


@@ 132,23 148,34 @@ tpool_thread(void *tpoolp_vp)

    /* thread id is just the index of this thread in tpoolp->threads */
    for (i = 0; (size_t) i < tpoolp->num_threads; i++) {
        /* `pthread_equal()` and `pthread_self()` never fail. */
        if ( pthread_equal(pthread_self(), tpoolp->threads[i]) ) {
            thread_id = i;
            break;
        }
    }

    if ( thread_id == -1 ) {
        /* if this function was executed by a thread not in
         * `tpoolp->threads`, then it was called externally
         * and hence we have no reason to continue living. */
        printf(" ERR : `tpool_thread()` defined in %s:%i was called"
                     " external to a thread pool.\n"
               "     : Thread exiting...\n", __FILE__, __LINE__ - 26);
        return NULL;
    }

    printf("DBUG : [THREAD #%02i] Dispatched!\n", thread_id);

    while (1) {
    while ( 1 ) {

        /* fetch work from queue */

        if ( pthread_mutex_lock(&tpoolp->queue_lock) ) {
            /* TODO: Maybe append this thread's id to the error
             *       messages? */
            fprintf(stderr, "%s:%i => pthread_mutex_lock: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : [THREAD #%02i] "
                            "%s:%i => `pthread_mutex_lock()`: %s\n",
                    thread_id, __FILE__, __LINE__ - 4, err_str);
            goto ret;
        }



@@ 162,8 189,10 @@ tpool_thread(void *tpoolp_vp)

            if ( pthread_cond_wait(&tpoolp->queue_not_empty,
                                   &tpoolp->queue_lock) ) {
                fprintf(stderr, "%s:%i => pthread_cond_wait: %s\n",
                        __FILE__, __LINE__, strerror(errno));
                get_err_str(errno, err_str);
                fprintf(stderr, " ERR : [THREAD #%02i] "
                                "%s:%i => `pthread_cond_wait()`: %s\n",
                        thread_id, __FILE__, __LINE__ - 5, err_str);
                goto unlock_queue_and_ret;
            }
        }


@@ 192,8 221,10 @@ tpool_thread(void *tpoolp_vp)
        if ( !tpoolp->do_not_block_when_full &&
              tpoolp->cur_queue_size == tpoolp->max_queue_size - 1 ) {
            if ( pthread_cond_broadcast(&tpoolp->queue_not_full) ) {
                fprintf(stderr, "%s:%i => pthread_cond_broadcast: %s\n",
                        __FILE__, __LINE__, strerror(errno));
                get_err_str(errno, err_str);
                fprintf(stderr, " ERR : [THREAD #%02i] "
                                "%s:%i => `pthread_cond_broadcast()`: %s\n",
                        thread_id, __FILE__, __LINE__ - 4, err_str);
                goto unlock_queue_and_ret;
            }
        }


@@ 205,15 236,19 @@ tpool_thread(void *tpoolp_vp)
         * process. */
        if ( !tpoolp->cur_queue_size ) {
            if ( pthread_cond_signal(&tpoolp->queue_empty) ) {
                fprintf(stderr, "%s:%i => pthread_cond_signal: %s\n",
                        __FILE__, __LINE__, strerror(errno));
                get_err_str(errno, err_str);
                fprintf(stderr, " ERR : [THREAD #%02i] "
                                "%s:%i => `pthread_cond_signal()`: %s\n",
                        thread_id, __FILE__, __LINE__ - 4, err_str);
                goto unlock_queue_and_ret;
            }
        }

        if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
            fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : [THREAD #%02i] "
                            "%s:%i => `pthread_mutex_unlock()`: %s\n",
                    thread_id, __FILE__, __LINE__ - 4, err_str);
            goto ret;
        }



@@ 223,9 258,12 @@ tpool_thread(void *tpoolp_vp)
    }

unlock_queue_and_ret:
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) )
        fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : [THREAD #%02i] "
                        "%s:%i => `pthread_mutex_unlock()`: %s\n",
                thread_id, __FILE__, __LINE__ - 4, err_str);
    }
ret:
    printf("DBUG : [THREAD #%02i] Exiting...\n", thread_id);
    return NULL;


@@ 235,20 273,24 @@ int
tpool_add_work(tpool_t *tpoolp, void (*routine)(int, void *), void *arg)
{
    tpool_work_t *workp = NULL;
    char err_str[STRERROR_BUFLEN];

    /* allocate memory for new work */
    if ( !(workp = malloc(sizeof(tpool_work_t))) ) {
        fprintf(stderr, "%s:%i => malloc: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `malloc()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto ret_err;
    }

    workp->routine = routine;
    workp->arg = arg;
    workp->next = NULL;

    if ( pthread_mutex_lock(&tpoolp->queue_lock) ) {
        fprintf(stderr, "%s:%i => pthread_mutex_lock: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_lock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto free_and_ret_err;
    }



@@ 256,7 298,8 @@ tpool_add_work(tpool_t *tpoolp, void (*routine)(int, void *), void *arg)
     * additions, we'll simply return with an error.
     *
     * TODO: We can have a different return code for this should we
     *       wish to distinguish this from regular errors. */
     *       wish to distinguish this from regular errors.
     */
    if ( tpoolp->cur_queue_size == tpoolp->max_queue_size &&
            tpoolp->do_not_block_when_full )
        goto unlock_queue_and_ret_err;


@@ 266,8 309,9 @@ tpool_add_work(tpool_t *tpoolp, void (*routine)(int, void *), void *arg)

        if ( pthread_cond_wait(&tpoolp->queue_not_full,
                               &tpoolp->queue_lock) ) {
            fprintf(stderr, "%s:%i => pthread_cond_wait: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `pthread_cond_wait()`: %s\n",
                    __FILE__, __LINE__ - 4, err_str);
            goto unlock_queue_and_ret_err;
        }
    }


@@ 278,8 322,9 @@ tpool_add_work(tpool_t *tpoolp, void (*routine)(int, void *), void *arg)
    if ( !tpoolp->cur_queue_size ) {
        tpoolp->queue_tail = tpoolp->queue_head = workp;
        if ( pthread_cond_broadcast(&tpoolp->queue_not_empty) ) {
            fprintf(stderr, "%s:%i => pthread_cond_broadcast: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `pthread_cond_broadcast()`: %s\n",
                    __FILE__, __LINE__ - 3, err_str);
            goto unlock_queue_and_ret_err;
        }
    } else {


@@ 288,16 333,20 @@ tpool_add_work(tpool_t *tpoolp, void (*routine)(int, void *), void *arg)
    }
    tpoolp->cur_queue_size++;

    if ( pthread_mutex_unlock(&tpoolp->queue_lock) )
        fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_unlock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }

    return 0;

unlock_queue_and_ret_err:
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) )
        fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_unlock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
free_and_ret_err:
    free(workp);
ret_err:


@@ 307,18 356,23 @@ ret_err:
int
tpool_destroy(tpool_t *tpoolp, int finish)
{
    size_t i;
    char err_str[STRERROR_BUFLEN];

    if ( pthread_mutex_lock(&tpoolp->queue_lock) ) {
        fprintf(stderr, "%s:%i => pthread_mutex_lock: %s\n",
                __FILE__, __LINE__, strerror(errno));
        return -1;
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_lock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        goto ret_err;
    }

    /* is a shutdown already in progress? */
    if ( tpoolp->queue_closed || tpoolp->shutdown ) {
        if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
            fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            return -1;
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `pthread_mutex_unlock()`: %s\n",
                    __FILE__, __LINE__ - 3, err_str);
            goto ret_err;
        }
        return 0;
    }


@@ 333,8 387,9 @@ tpool_destroy(tpool_t *tpoolp, int finish)
        while ( tpoolp->cur_queue_size ) {
            if ( pthread_cond_wait(&tpoolp->queue_empty,
                                   &tpoolp->queue_lock) ) {
                fprintf(stderr, "%s:%i => pthread_cond_wait: %s\n",
                        __FILE__, __LINE__, strerror(errno));
                get_err_str(errno, err_str);
                fprintf(stderr, " ERR : %s:%i => `pthread_cond_wait()`: %s\n",
                        __FILE__, __LINE__ - 4, err_str);
                goto unlock_queue_and_ret_err;
            }
        }


@@ 343,30 398,33 @@ tpool_destroy(tpool_t *tpoolp, int finish)
    tpoolp->shutdown = 1;

    if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
        fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_unlock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        return -1;
    }

    /* wake up any workers so they recheck shutdown flag */
    if ( pthread_cond_broadcast(&tpoolp->queue_not_empty) ) {
        fprintf(stderr, "%s:%i => pthread_cond_broadcast: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_broadcast()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        return -1;
    }

    if ( pthread_cond_broadcast(&tpoolp->queue_not_full) ) {
        fprintf(stderr, "%s:%i => pthread_cond_broadcast: %s\n",
                __FILE__, __LINE__, strerror(errno));
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_cond_broadcast()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
        return -1;
    }

    /* wait for workers to join */
    size_t i = 0;
    for ( ; i < tpoolp->num_threads; i++) {
    for (i = 0; i < tpoolp->num_threads; i++) {
        if ( pthread_join(tpoolp->threads[i], NULL) ) {
            fprintf(stderr, "%s:%i => pthread_cond_broadcast: %s\n",
                    __FILE__, __LINE__, strerror(errno));
            get_err_str(errno, err_str);
            fprintf(stderr, " ERR : %s:%i => `pthread_join()`: %s\n",
                    __FILE__, __LINE__ - 3, err_str);
            return -1;
        }
    }


@@ 382,8 440,11 @@ tpool_destroy(tpool_t *tpoolp, int finish)
    return 0;

unlock_queue_and_ret_err:
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) )
        fprintf(stderr, "%s:%i => pthread_mutex_unlock: %s\n",
                __FILE__, __LINE__, strerror(errno));
    if ( pthread_mutex_unlock(&tpoolp->queue_lock) ) {
        get_err_str(errno, err_str);
        fprintf(stderr, " ERR : %s:%i => `pthread_mutex_unlock()`: %s\n",
                __FILE__, __LINE__ - 3, err_str);
    }
ret_err:
    return -1;
}