~ne02ptzero/libfloat

0a170e94a6d19b9027a52113a2ef3324b45a4dae — Michael Bonfils 25 days ago c93ce23 work/michael/error_on_snapshot
Add callback to abort snapshot synchronization

It will be done when a leader become a follower or when
a new snapshot should be done.

Signed-off-by: Michael Bonfils <mbonfils@scaleway.com>
5 files changed, 60 insertions(+), 0 deletions(-)

M libfloat.h
M log.c
M node.h
M raft.c
M snapshot.c
M libfloat.h => libfloat.h +17 -0
@@ 112,6 112,14 @@ struct libfloat_ctx_s {
    bool (*send_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);

    /*!
     * \brief Abort a Snapshot currently send to a specific node
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the abort snapshot request to
     */
    bool (*abort_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);

    /*!
     * \brief Write (and apply) a log to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context


@@ 373,6 381,15 @@ void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_v
void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success);

/*!
 * \brief Callback to update sending snapshot state
 *
 * \param[in] ctx libfloat context
 * \param[in] node node ID
 * \param[in] status true or false
 */
void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status);

/*!
 * \brief Set user data for a node
 *
 * \param[in] ctx libfloat context

M log.c => log.c +4 -0
@@ 272,6 272,7 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
                ctx->persistent.snapshot.index
            );
            node->snapshot_count = 0;
            node->snapshot_in_progress = true;
            ctx->send_snapshot(ctx, node);
            /* check that there is not already a snapshot in progress */
            return;


@@ 282,6 283,9 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
        goto end;
    }

    /* reset snapshot flag */
    node->snapshot_in_progress = false;

    if (node->next_log_to_send <= ctx->persistent.commit_index)
    {


M node.h => node.h +1 -0
@@ 9,6 9,7 @@ typedef struct {

    uint8_t             has_voted_for_me        : 1;
    uint8_t             is_up_to_date           : 1;
    uint8_t             snapshot_in_progress    : 1;

    void                *udata;                 /*!< User data */
    time_t              last_update;            /*!< Time of the last AE response (If I am the leader) */

M raft.c => raft.c +12 -0
@@ 34,7 34,19 @@ void libfloat_become_candidate(libfloat_ctx_t *ctx)

void libfloat_become_follower(libfloat_ctx_t *ctx)
{
    libfloat_node_t             *node = NULL;

    DEBUG(ctx, "Becoming follower");
    for_every_node(ctx, node, {
        if (ctx->me == node)
            continue;

        if (node->snapshot_in_progress)
        {
            ctx->abort_snapshot(ctx, node);
            node->snapshot_in_progress = false;
        }
    });
    ctx->state = RAFT_STATE_FOLLOWER;
    ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    libfloat_vote_for(ctx, 0);

M snapshot.c => snapshot.c +26 -0
@@ 110,6 110,8 @@ static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)

void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term)
{
    libfloat_node_t             *node = NULL;

    if (ctx->snapshot == NULL)
    {
        /* Implementation is missing a snapshot logic, let's quit here */


@@ 118,6 120,17 @@ void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t i
        return;
    }

    for_every_node(ctx, node, {
        if (ctx->me == node)
            continue;

        if (node->snapshot_in_progress)
        {
            ctx->abort_snapshot(ctx, node);
            node->snapshot_in_progress = false;
        }
    });

    if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term)
    {
        DEBUG(ctx, "libfloat_internal_snapshot_begin: A snapshot already exists with previous commit id, skip it");


@@ 184,3 197,16 @@ void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)
    libfloat_internal_snapshot_apply_log(ctx);
    ctx->is_snapshotting = false;
}

void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status)
{
    libfloat_node_t     *node = NULL;
    khint_t             iterator;

    iterator = kh_get(libfloat_node_id_t, ctx->nodes, id);
    if (iterator != kh_end(ctx->nodes))
    {
        node = kh_val(ctx->nodes, iterator);
        node->snapshot_in_progress = status;
    }
}