~ne02ptzero/libfloat

6cfcf0e6635218123454cf617dcb9506f3a42ac0 — Louis Solofrizzo a month ago 3a7f658
log: Add real optimisic replication on log burst

This patch adds some code to try not to duplicate log sending when we
can avoid it. On log-write burst, the last known sent log is never
overrided by the node AE response, in order to avoid replay. When
heartbeating, the value _is_ overidded to allow replication on a
out-of-date node.

The main trade-off is that an out-of-date not will take longer to be
up-to-date again (hearbeat time, best case), but less data will be sent
on the wire.

Patch: https://lists.sr.ht/~ne02ptzero/libfloat/patches/27200

Signed-off-by : Louis Solofrizzo <lsolofrizzo@scaleway.com>
Acked-by      : Patrik Cyvoct <pcyvoct@scaleway.com>
Acked-by      : Michael Bonfils <mbonfils@scaleway.com>

 _______________________________________
/ The trouble is, there is an endless   \
| supply of White Men, but there has    |
| always been a limited number of Human |
\ Beings. -- Little Big Man             /
 ---------------------------------------
        \   ^__^
         \  (oo)\_______
            (__)\       )\/\
                ||----w |
                ||     ||
5 files changed, 22 insertions(+), 5 deletions(-)

M internal.h
M log.c
M node.h
M periodic.c
M raft.c
M internal.h => internal.h +2 -1
@@ 63,8 63,9 @@ void libfloat_send_append_entries(libfloat_ctx_t *ctx, libfloat_node_t *node, bo
 * \brief Send AE requests to every node
 *
 * \param[in] ctx libfloat context
 * \param[in] hearbeating Whether or not this is a hearbeat
 */
void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx);
void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx, bool hearbeating);

/*!
 * \brief Start a snapshot

M log.c => log.c +12 -2
@@ 624,12 624,22 @@ void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_e

    /* Set the next index to sent to the node */
    node->last_update = ctx->time(NULL);
    if (node->next_log_to_send <= resp->current_index)

    if (node->replicated_log < resp->current_index)
    {
        node->next_log_to_send = resp->current_index + 1;
        node->replicated_log = resp->current_index;
    }

    if (node->next_log_to_send <= resp->current_index && node->hearbeating)
    {
        node->next_log_to_send = resp->current_index + 1;
    }

    if (node->hearbeating)
    {
        node->hearbeating = false;
    }

    if (ctx->persistent.term < resp->term)
    {
        libfloat_set_current_term(ctx, resp->term);

M node.h => node.h +1 -0
@@ 13,6 13,7 @@ typedef struct {
    void                *udata;                 /*!< User data */
    time_t              last_update;            /*!< Time of the last AE response (If I am the leader) */
    int                 snapshot_count;         /*!< Count of the times we are supposed to send a snapshot */
    bool                hearbeating;
} libfloat_node_t;

#endif /* LIBFLOAT_NODE_H */

M periodic.c => periodic.c +1 -1
@@ 34,7 34,7 @@ void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time)
            }

            /* Timeout has expired! Time to send some heartbeats or some entries */
            libfloat_send_append_entries_to_all(ctx);
            libfloat_send_append_entries_to_all(ctx, true);
            ctx->timeout_elapsed = 0;
        }


M raft.c => raft.c +6 -1
@@ 51,13 51,18 @@ void libfloat_step_down(libfloat_ctx_t *ctx)
    ctx->state = RAFT_STATE_NONE;
}

void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx)
void libfloat_send_append_entries_to_all(libfloat_ctx_t *ctx, bool heartbeat)
{
    libfloat_node_t     *node;

    for_every_node(ctx, node, {
        if (node != ctx->me)
        {
            if (heartbeat)
                node->hearbeating = true;

            libfloat_send_append_entries(ctx, node, false);
        }
    });
}