~ne02ptzero/libfloat

cf75af54e200fdd220d272e4150fc8a959a83c3a — Michael Bonfils 3 months ago 3468b1f
Few improvmements

- ACK manage logs
- Cleanup logs memory in reload_state
- Avoid to reload all logs on startup

Signed-off-by: Michael Bonfils <mbonfils@scaleway.com>
Ack-by       : Louis Solofrizzo <lsolofrizzo@scaleway.com>
8 files changed, 199 insertions(+), 43 deletions(-)

M election.c
M internal.h
M libfloat.c
M libfloat.h
M log.c
M log.h
M raft.c
M snapshot.c
M election.c => election.c +11 -5
@@ 21,6 21,8 @@ static int libfloat_get_number_of_votes_for_me(libfloat_ctx_t *ctx)
void libfloat_election_start(libfloat_ctx_t *ctx)
{
    libfloat_node_t     *node;
    libfloat_term_t     last_term = 0;
    libfloat_entry_id_t last_id = 0;

    DEBUG(ctx, "Election starting!");
    /* First, reset the vote of everyone */


@@ 43,6 45,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
    ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    ctx->timeout_elapsed = 0;

    libfloat_get_last_term(ctx, &last_id, &last_term);

    /* Send a vote request to each node of the cluster */
    for_every_node(ctx, node, {
        libfloat_rpc_request_vote_t     vote = { 0 };


@@ 56,8 60,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)
        /* Set informations and send the request to the node */
        vote.term = ctx->persistent.term;
        vote.candidate_id = ctx->me->id;
        vote.last_log_index = ctx->last_log->id;
        vote.last_log_term = ctx->last_log->term;
        vote.last_log_index = last_id;
        vote.last_log_term = last_term;

        ctx->request_vote(ctx, node, &vote);
    });


@@ 65,6 69,8 @@ void libfloat_election_start(libfloat_ctx_t *ctx)

static bool libfloat_can_i_grant_vote(libfloat_ctx_t *ctx, libfloat_rpc_request_vote_t *req)
{
    libfloat_term_t     last_term;

    /* Let's check if I have already voted for someone */
    if (ctx->persistent.voted_for != 0)
    {


@@ 73,14 79,14 @@ static bool libfloat_can_i_grant_vote(libfloat_ctx_t *ctx, libfloat_rpc_request_
        return false;
    }

    if (ctx->last_log == NULL)
    if (libfloat_get_last_term(ctx, NULL, &last_term) == false)
    {
        /* Our log is empty, so let's assume we are _not_ up to date */
        /* We have failed to retrieve last_ferm from log */
        return false;
    }

    /* Check term match */
    if (ctx->last_log->term > req->last_log_term && req->last_log_index <= ctx->persistent.commit_index)
    if (last_term > req->last_log_term && req->last_log_index <= ctx->persistent.commit_index)
    {
        /* We have a superior term and a superior log, we can't grant our vote */
        return false;

M internal.h => internal.h +29 -1
@@ 63,7 63,7 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
 * \brief Send AE requests to every node
 *
 * \param[in] ctx libfloat context
 */ 
 */
void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx);

void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx);


@@ 87,4 87,32 @@ void libfloat_error(libfloat_ctx_t *ctx, const char *fmt, ...) __attribute__((fo
 */
void libfloat_set_error_str(const char *str);

/*!
 * \brief Count a node towards an acknowledge-replication list
 *
 * \param[in] ctx libfloat context
 * \param[in,out] log Entry to update
 * \param[in] id ID of the node that acked this log
 *
 * \note This function is replay-safe, eg; If a node tells us twice that a log
 * X is replicated, we only count it once.
 */
void libfloat_log_add_node_ack(libfloat_ctx_t *ctx, libfloat_log_entry_t *log, libfloat_node_id_t id);

/*!
 * \brief Free the list of acknowledges of a log
 *
 * \param[in] ctx libfloat context
 * \param[in,out] log Log to free
 */
void libfloat_log_free_acks(libfloat_ctx_t *ctx, libfloat_log_entry_t *log);

/*!
 * \brief Free all log from memory
 *
 * \param[in] ctx libfloat context
 */
void libfloat_log_memory_cleanup(libfloat_ctx_t *ctx);


#endif /* LIBFLOAT_INTERNAL_H */

M libfloat.c => libfloat.c +1 -1
@@ 50,8 50,8 @@ void libfloat_ctx_del(libfloat_ctx_t *ctx)
            ctx->free(log->data);
        }

        libfloat_log_free_acks(ctx, log);
        ctx->free(log);

    });

    kh_foreach_value(ctx->nodes, node, {

M libfloat.h => libfloat.h +11 -0
@@ 485,4 485,15 @@ void libfloat_election_resign_receive(libfloat_ctx_t *ctx);
 */
const char *libfloat_get_error_str(void);

/*!
 * \brief Retrieve term of last log
 *
 * \param[in] ctx libfloat context
 * \param[out] id last commit from log
 * \param[out] term last term from log
 *
 * \note return false if log is missing
 */
bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libfloat_term_t *term);

#endif /* LIBFLOAT_H */

M log.c => log.c +122 -16
@@ 1,5 1,41 @@
#include "internal.h"

void libfloat_log_add_node_ack(libfloat_ctx_t *ctx, libfloat_log_entry_t *log, libfloat_node_id_t id)
{
    bool                found = false;
    libfloat_node_ack_t *ptr, *tmp;

    libfloat_list_for_each_entry_safe(ptr, tmp, &log->node_acks, next)
    {
        if (ptr->id == id)
        {
            found = true;
            break;
        }
    }

    if (found == false)
    {
        ptr = ctx->calloc(1, sizeof(*ptr));
        ptr->id = id;
        libfloat_list_add_tail(&ptr->next, &log->node_acks);
    }
}

void libfloat_log_free_acks(libfloat_ctx_t *ctx, libfloat_log_entry_t *log)
{
    libfloat_node_ack_t *ptr, *tmp;

    libfloat_list_for_each_entry_safe(ptr, tmp, &log->node_acks, next)
    {
        ctx->free(ptr);
    }

    /* Reset the list head */
    log->node_acks.prev = &log->node_acks;
    log->node_acks.next = &log->node_acks;
}

bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, libfloat_log_data_t *log,
    void (*commit)(void *, libfloat_commit_status_t), void *udata)
{


@@ 50,6 86,9 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l
    entry->commit_type = commit_type;
    entry->started = ctx->time(NULL);

    entry->node_acks.prev = &entry->node_acks;
    entry->node_acks.next = &entry->node_acks;

    iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent);
    kh_value(ctx->persistent.log, iterator) = entry;



@@ 83,7 122,7 @@ bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, l

    DEBUG(ctx, "libfloat_add_log: Successfully written log %d", entry->id);

    entry->node_count++;
    libfloat_log_add_node_ack(ctx, entry, ctx->me->id);
    ctx->timeout_elapsed = 0;
    ctx->last_log = entry;
    libfloat_set_current_commit_index(ctx, entry->id);


@@ 183,6 222,10 @@ static bool libfloat_get_log_from_db(libfloat_ctx_t *ctx, size_t id, libfloat_lo
        khint_t                     iterator;
        int                         absent;

        /* Reset the list head */
        entry->node_acks.prev = &entry->node_acks;
        entry->node_acks.next = &entry->node_acks;

        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent);
        kh_value(ctx->persistent.log, iterator) = entry;
        *out = entry;


@@ 224,11 267,14 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
        if (node->snapshot_count > 5)
        {
            /* Node has missed too many logs since the snapshot, let's send the snapshot */
            ERROR(ctx, "I AM SUPPOSED TO SEND A SNAPSHOT (next log node->next_log_to_send=%d, snapshot=%d)!",
            DEBUG(ctx, "Send snapshot (next_log_to_send=%u, snapshot=%u)!",
                node->next_log_to_send,
                ctx->persistent.snapshot.index
            );
            node->snapshot_count = 0;
            ctx->send_snapshot(ctx, node);
            /* check that there is not already a snapshot in progress */
            return;
        }
        else
            node->snapshot_count++;


@@ 404,15 450,6 @@ void libfloat_append_entries_receive(libfloat_ctx_t *ctx, libfloat_rpc_append_en
                    /* We don't have a log anymore apparently, let's inform the leader */
                    resp->current_index = 0;
                }
                else
                {
                    if (req->prev_log_index >= ctx->persistent.commit_index)
                    {
                        /* Decrease our log by one */
                        ctx->persistent.commit_index = ctx->persistent.commit_index - 1;
                        ctx->write_commit_index(ctx, ctx->persistent.commit_index);
                    }
                }
                goto end;
            }
        }


@@ 479,6 516,8 @@ response:
        log = ctx->calloc(1, sizeof(*log));
        log->id = req->entries[i]->id;
        log->term = req->entries[i]->term;
        log->node_acks.prev = &log->node_acks;
        log->node_acks.next = &log->node_acks;
        /* Don't allocate memory for data, we don't need to keep it in RAM for now */

        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);


@@ 503,6 542,7 @@ response:
            if (!ctx->write_log(ctx, req->entries[i]))
            {
                kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
                libfloat_log_free_acks(ctx, log);
                ctx->free(log);
                ERROR(ctx, "libfloat_append_entries_receive: Cannot write log id=%u", req->entries[i]->id);
                goto end;


@@ 535,7 575,7 @@ end:
            resp->current_index = ctx->persistent.commit_index;
        else
        {
            if (!ctx->is_snapshotting)
            if (!ctx->is_snapshotting && ctx->last_log)
                libfloat_set_current_commit_index(ctx, ctx->last_log->id);
        }
    }


@@ 654,10 694,10 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e

            log = kh_value(ctx->persistent.log, iterator);

            /* XXX: Can be replays, need to have a unique way to count acks */
            log->node_count++;

            if (log->node_count >= ctx->n_nodes)
            libfloat_log_add_node_ack(ctx, log, node->id);

            if (libfloat_list_count(&log->node_acks) >= ctx->n_nodes)
            {
                if (log->commit_type == LIBFLOAT_ABSOLUTELY_CONSISTENT)
                {


@@ 674,10 714,11 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e
                {
                    ctx->free(log->data->buf);
                    ctx->free(log->data);
                    libfloat_log_free_acks(ctx, log);
                    log->data = NULL;
                }
            }
            else if (log->node_count >= ctx->n_nodes / 2 + 1)
            else if (libfloat_list_count(&log->node_acks) >= ctx->n_nodes / 2 + 1)
            {
                if (log->commit_type <= LIBFLOAT_STRONGLY_CONSISTENT)
                {


@@ 705,3 746,68 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e
    else
        node->is_up_to_date = true;
}

bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libfloat_term_t *term)
{
    libfloat_entry_id_t                 i = ctx->persistent.commit_index;
    khint_t                             iterator;
    libfloat_log_entry_t                *log = NULL;

    iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, i);
    if (iterator == kh_end(ctx->persistent.log))
    {
        /* NOT FOUND, NEED TO LOAD LOG */
        if (!libfloat_get_log_from_db(ctx, i, &log))
        {
            ERROR(ctx, "Log %u not found while retrieving last term", i);
            return false;
        }
    }
    else
    {
        log = kh_value(ctx->persistent.log, iterator);
    }

    if (log->data == NULL)
    {
        /* Log is not in memory, let's get it from db */
        log->data = ctx->malloc(sizeof(*log->data));
        ctx->get_log(ctx, log->id, &log->term, log->data);
    }

    if (term != NULL)
        *term = log->data->term;
    if (id != NULL)
        *id = log->id;

    return true;
}

void libfloat_log_memory_cleanup(libfloat_ctx_t *ctx)
{
    khiter_t                            iterator;
    libfloat_log_entry_t                *log = NULL;

    for (iterator = kh_begin(ctx->persistent.log); iterator != kh_end(ctx->persistent.log); iterator++)
    {
        if (kh_exist(ctx->persistent.log, iterator))
        {
            log = kh_value(ctx->persistent.log, iterator);

            if (log->data != NULL)
            {
                ctx->free(log->data->buf);
                ctx->free(log->data);
            }

            if (log->commit != NULL)
            {
                log->commit(log->udata, LIBFLOAT_ENTRY_REFUSED);
                libfloat_list_del(&log->next);
            }

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            ctx->free(log);
        }
    }
}

M log.h => log.h +9 -1
@@ 21,9 21,17 @@ typedef struct {
    uint32_t            type;   /*!< Type of log. Implementation defined. 0 is reserved */
    size_t              len;    /*!< Length of the buffer */
    uint8_t             *buf;   /*!< Log data */

    /* */
    libfloat_term_t     term;   /*!< Log Term */
} libfloat_log_data_t;

typedef struct {
    libfloat_node_id_t  id;
    libfloat_list_t     next;
} libfloat_node_ack_t;

typedef struct {
    libfloat_entry_id_t         id;     /*!< Log ID */
    libfloat_term_t             term;   /*!< Log Term */
    libfloat_log_data_t         *data;  /*!< Log data (can be NULL) */


@@ 33,7 41,7 @@ typedef struct {
    libfloat_commit_type_t      commit_type;                                    /*!< Commit type */
    void                        *udata;                                         /*!< User data for callback */
    time_t                      started;                                        /*!< Age of the log */
    uint32_t                    node_count;                                     /*!< Number of nodes that have replicated this log */
    libfloat_list_t             node_acks;                                      /*!< List of nodes that have replicated this log */

    libfloat_list_t             next;
} libfloat_log_entry_t;

M raft.c => raft.c +11 -17
@@ 104,6 104,13 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)
        ctx->persistent.snapshot.term = 0;
    }

    /* when reloading a snapshot, ctx may have older logs and we want to drop them */
    if (kh_size(ctx->persistent.log) > 0)
    {
        libfloat_log_memory_cleanup(ctx);
    }

    /* XXX some part of code try to load missing log but other not */
    for (size_t i = ctx->persistent.snapshot.index + 1; i <= ctx->persistent.commit_index; i++)
    {
        libfloat_log_entry_t    *log;


@@ 112,25 119,12 @@ void libfloat_reload_state(libfloat_ctx_t *ctx)

        log = ctx->calloc(1, sizeof(*log));
        log->id = i;
        log->data = NULL;

        if (ctx->get_log(ctx, i, &log->term, NULL))
        {
            iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);
            kh_value(ctx->persistent.log, iterator) = log;

            ctx->last_log = log;
        }
        else
        {
            ERROR(ctx, "Consistent log is not the same as the expected commit index (%u vs %u)",
                ctx->last_log->id, ctx->persistent.commit_index);

            ctx->free(log);
        log->node_acks.prev = &log->node_acks;
        log->node_acks.next = &log->node_acks;

            ctx->persistent.commit_index = ctx->last_log->id;
            ctx->write_commit_index(ctx, ctx->persistent.commit_index);
        }
        iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, log->id, &absent);
        kh_value(ctx->persistent.log, iterator) = log;
    }
}


M snapshot.c => snapshot.c +5 -2
@@ 32,8 32,10 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)
            continue;
        }

        ERROR(ctx, "libfloat_internal_snapshot_apply_log: Successfully written log %d (type=%d)", entry->id, entry->data->type);
        entry->node_count++;
        DEBUG(ctx, "libfloat_internal_snapshot_apply_log: Successfully written log %u (type=%d)", entry->id, entry->data->type);
        if (ctx->state == RAFT_STATE_LEADER)
            libfloat_log_add_node_ack(ctx, entry, ctx->me->id);

        ctx->timeout_elapsed = 0;
        ctx->last_log = entry;
        libfloat_set_current_commit_index(ctx, entry->id);


@@ 161,6 163,7 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            libfloat_list_del(&log->next);
            libfloat_log_free_acks(ctx, log);
            ctx->free(log);
        }
    }