#include "internal.h" static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx) { libfloat_log_entry_t *entry = NULL; libfloat_node_t *node = NULL; khint_t iterator; int absent; if (ctx->is_snapshotting == false) return; for (libfloat_entry_id_t id = ctx->persistent.commit_index + 1; id <= ctx->snapshot_to_commit; id++) { iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, id); if (iterator == kh_end(ctx->persistent.log)) continue; entry = kh_value(ctx->persistent.log, iterator); entry->id = ctx->persistent.commit_index + 1; /* Write the log in our database */ if (!ctx->write_log(ctx, entry)) { if (entry->commit != NULL) entry->commit(entry->udata, LIBFLOAT_ENTRY_REFUSED); kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator); ctx->free(entry->data); ctx->free(entry); ERROR(ctx, "libfloat_internal_snapshot_apply_log: Refusing write, callback returned false"); continue; } DEBUG(ctx, "libfloat_internal_snapshot_apply_log: Successfully written log %u (type=%d)", entry->id, entry->data->type); if (ctx->state == RAFT_STATE_LEADER) libfloat_log_add_node_ack(ctx, entry, ctx->me->id); ctx->timeout_elapsed = 0; ctx->last_log = entry; libfloat_set_current_commit_index(ctx, entry->id); if (entry->commit_type == LIBFLOAT_EVENTUALLY_CONSISTENT) { /* We have written the log, and are about to send RPCs to all nodes of the cluster */ if (entry->commit != NULL) entry->commit(entry->udata, LIBFLOAT_ENTRY_COMMITTED); entry->commit = NULL; } if (ctx->n_nodes == 1) { /* We're alone in the cluster, let's consider this entry committed */ if (entry->commit_type > LIBFLOAT_EVENTUALLY_CONSISTENT) { /* We've already call the callback earlier */ if (entry->commit != NULL) entry->commit(entry->udata, LIBFLOAT_ENTRY_COMMITTED); entry->commit = NULL; } ctx->free(entry->data->buf); ctx->free(entry->data); entry->data = NULL; } /* Track timeout for submitted logs, only with commit set */ if (entry->commit != NULL) libfloat_list_add_tail(&entry->next, &ctx->logs); if (entry->id != id) { /** * Index of log has changed during the duration of the snapshot, * let's re-order the hashtable; During a snapshot, we accept log * without commiting them straight away, and assigning a temporary * id. If all is good, those ids will be final, but if one of * those log is refused in the batch, we need to re-order the ids * after it in order to still have a logical suite. */ kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator); iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent); kh_value(ctx->persistent.log, iterator) = entry; } } /* If we are the leader, let's sync our log with our peers */ if (ctx->state == RAFT_STATE_LEADER) { /* Let's sync the log with other nodes in the cluster */ for_every_node(ctx, node, { /* Skip myself */ if (ctx->me == node) continue; if (ctx->conf.avoid_congestion) { /** * We've been asked to avoid congestion if possible, let's check if * that node is up to date with us, before sending the new entry */ if (node->replicated_log < ctx->persistent.commit_index - 1) continue; } libfloat_send_append_entries(ctx, node, false); }); } } void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term) { libfloat_node_t *node = NULL; if (ctx->snapshot == NULL) { /* Implementation is missing a snapshot logic, let's quit here */ ERROR(ctx, "libfloat_internal_snapshot_begin: Wanted to snapshot but got nothing to call, aborting"); ERROR(ctx, "If you do not want to have snapshots, please set conf.compact_every_n_log to 0, otherwise please fix your implementation."); return; } for_every_node(ctx, node, { if (ctx->me == node) continue; if (node->snapshot_in_progress) { ctx->abort_snapshot(ctx, node); node->snapshot_in_progress = false; } }); if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term) { DEBUG(ctx, "libfloat_internal_snapshot_begin: A snapshot already exists with previous commit id, skip it"); return; } /* First, let's block all writes and init variables */ ctx->is_snapshotting = true; if (ctx->snapshot_to_commit < ctx->persistent.commit_index) ctx->snapshot_to_commit = ctx->persistent.commit_index; DEBUG(ctx, "Starting snapshot with index to commit after=%d", ctx->snapshot_to_commit); /* Start the snapshot */ ctx->snapshot(ctx); } void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success) { khint_t iterator; libfloat_log_entry_t *log; if (!success) { /* Snapshot has failed, let's stop here and catch back on missed log if needed */ libfloat_internal_snapshot_apply_log(ctx); ctx->is_snapshotting = false; return; } /* Let's cleanup our internal logs that we don't need anymore */ for (libfloat_entry_id_t id = ctx->persistent.snapshot.index + 1; id <= ctx->persistent.commit_index; id++) { iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, id); if (iterator != kh_end(ctx->persistent.log)) { log = kh_value(ctx->persistent.log, iterator); if (log->commit != NULL) { /* This entry was in the snapshot, let's consider this committed */ log->commit(log->udata, LIBFLOAT_ENTRY_COMMITTED); } if (log->data != NULL) { ctx->free(log->data->buf); ctx->free(log->data); } kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator); libfloat_list_del(&log->next); libfloat_log_free_acks(ctx, log); ctx->free(log); } } /* Write the current snapshot in persistent storage */ ctx->write_current_snapshot(ctx, ctx->persistent.commit_index, ctx->persistent.term); DEBUG(ctx, "Writing snapshot with index=%d and term=%d", ctx->persistent.commit_index, ctx->persistent.term); ctx->persistent.snapshot.index = ctx->persistent.commit_index; ctx->persistent.snapshot.term = ctx->persistent.term; /* All good */ libfloat_internal_snapshot_apply_log(ctx); ctx->is_snapshotting = false; } void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status) { libfloat_node_t *node = NULL; khint_t iterator; iterator = kh_get(libfloat_node_id_t, ctx->nodes, id); if (iterator != kh_end(ctx->nodes)) { node = kh_val(ctx->nodes, iterator); node->snapshot_in_progress = status; } }