~ne02ptzero/libfloat

ref: 0a170e94a6d19b9027a52113a2ef3324b45a4dae libfloat/snapshot.c -rw-r--r-- 7.5 KiB
0a170e94 — Michael Bonfils Add callback to abort snapshot synchronization 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#include "internal.h"

static void libfloat_internal_snapshot_apply_log(libfloat_ctx_t *ctx)
{
    libfloat_log_entry_t        *entry = NULL;
    libfloat_node_t             *node = NULL;
    khint_t                     iterator;
    int                         absent;

    if (ctx->is_snapshotting == false)
        return;

    for (libfloat_entry_id_t id = ctx->persistent.commit_index + 1; id <= ctx->snapshot_to_commit; id++)
    {
        iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, id);
        if (iterator == kh_end(ctx->persistent.log))
            continue;

        entry = kh_value(ctx->persistent.log, iterator);
        entry->id = ctx->persistent.commit_index + 1;

        /* Write the log in our database */
        if (!ctx->write_log(ctx, entry))
        {
            if (entry->commit != NULL)
                entry->commit(entry->udata, LIBFLOAT_ENTRY_REFUSED);

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            ctx->free(entry->data);
            ctx->free(entry);
            ERROR(ctx, "libfloat_internal_snapshot_apply_log: Refusing write, callback returned false");
            continue;
        }

        DEBUG(ctx, "libfloat_internal_snapshot_apply_log: Successfully written log %u (type=%d)", entry->id, entry->data->type);
        if (ctx->state == RAFT_STATE_LEADER)
            libfloat_log_add_node_ack(ctx, entry, ctx->me->id);

        ctx->timeout_elapsed = 0;
        ctx->last_log = entry;
        libfloat_set_current_commit_index(ctx, entry->id);

        if (entry->commit_type == LIBFLOAT_EVENTUALLY_CONSISTENT)
        {
            /* We have written the log, and are about to send RPCs to all nodes of the cluster */
            if (entry->commit != NULL)
                entry->commit(entry->udata, LIBFLOAT_ENTRY_COMMITTED);
            entry->commit = NULL;
        }

        if (ctx->n_nodes == 1)
        {
            /* We're alone in the cluster, let's consider this entry committed */
            if (entry->commit_type > LIBFLOAT_EVENTUALLY_CONSISTENT)
            {
                /* We've already call the callback earlier */
                if (entry->commit != NULL)
                    entry->commit(entry->udata, LIBFLOAT_ENTRY_COMMITTED);
                entry->commit = NULL;
            }

            ctx->free(entry->data->buf);
            ctx->free(entry->data);
            entry->data = NULL;
        }

        /* Track timeout for submitted logs, only with commit set */
        if (entry->commit != NULL)
            libfloat_list_add_tail(&entry->next, &ctx->logs);

        if (entry->id != id)
        {
            /**
             * Index of log has changed during the duration of the snapshot,
             * let's re-order the hashtable; During a snapshot, we accept log
             * without commiting them straight away, and assigning a temporary
             * id.  If all is good, those ids will be final, but if one of
             * those log is refused in the batch, we need to re-order the ids
             * after it in order to still have a logical suite.
             */
            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            iterator = kh_put(libfloat_entry_id_t, ctx->persistent.log, entry->id, &absent);
            kh_value(ctx->persistent.log, iterator) = entry;
        }
    }

    /* If we are the leader, let's sync our log with our peers */
    if (ctx->state == RAFT_STATE_LEADER)
    {
        /* Let's sync the log with other nodes in the cluster */
        for_every_node(ctx, node, {
            /* Skip myself */
            if (ctx->me == node)
                continue;

            if (ctx->conf.avoid_congestion)
            {
                /**
                * We've been asked to avoid congestion if possible, let's check if
                * that node is up to date with us, before sending the new entry
                */
                if (node->replicated_log < ctx->persistent.commit_index - 1)
                    continue;
            }

            libfloat_send_append_entries(ctx, node, false);
        });
    }
}

void libfloat_internal_snapshot_begin(libfloat_ctx_t *ctx, libfloat_entry_id_t id, libfloat_term_t term)
{
    libfloat_node_t             *node = NULL;

    if (ctx->snapshot == NULL)
    {
        /* Implementation is missing a snapshot logic, let's quit here */
        ERROR(ctx, "libfloat_internal_snapshot_begin: Wanted to snapshot but got nothing to call, aborting");
        ERROR(ctx, "If you do not want to have snapshots, please set conf.compact_every_n_log to 0, otherwise please fix your implementation.");
        return;
    }

    for_every_node(ctx, node, {
        if (ctx->me == node)
            continue;

        if (node->snapshot_in_progress)
        {
            ctx->abort_snapshot(ctx, node);
            node->snapshot_in_progress = false;
        }
    });

    if (id == ctx->persistent.snapshot.index + 1 && term == ctx->persistent.snapshot.term)
    {
        DEBUG(ctx, "libfloat_internal_snapshot_begin: A snapshot already exists with previous commit id, skip it");
        return;
    }

    /* First, let's block all writes and init variables */
    ctx->is_snapshotting = true;
    if (ctx->snapshot_to_commit < ctx->persistent.commit_index)
        ctx->snapshot_to_commit = ctx->persistent.commit_index;

    DEBUG(ctx, "Starting snapshot with index to commit after=%d", ctx->snapshot_to_commit);

    /* Start the snapshot */
    ctx->snapshot(ctx);
}

void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success)
{
    khint_t                             iterator;
    libfloat_log_entry_t                *log;

    if (!success)
    {
        /* Snapshot has failed, let's stop here and catch back on missed log if needed */
        libfloat_internal_snapshot_apply_log(ctx);
        ctx->is_snapshotting = false;
        return;
    }

    /* Let's cleanup our internal logs that we don't need anymore */
    for (libfloat_entry_id_t id = ctx->persistent.snapshot.index + 1; id <= ctx->persistent.commit_index; id++)
    {
        iterator = kh_get(libfloat_entry_id_t, ctx->persistent.log, id);
        if (iterator != kh_end(ctx->persistent.log))
        {
            log = kh_value(ctx->persistent.log, iterator);
            if (log->commit != NULL)
            {
                /* This entry was in the snapshot, let's consider this committed */
                log->commit(log->udata, LIBFLOAT_ENTRY_COMMITTED);
            }

            if (log->data != NULL)
            {
                ctx->free(log->data->buf);
                ctx->free(log->data);
            }

            kh_del(libfloat_entry_id_t, ctx->persistent.log, iterator);
            libfloat_list_del(&log->next);
            libfloat_log_free_acks(ctx, log);
            ctx->free(log);
        }
    }

    /* Write the current snapshot in persistent storage */
    ctx->write_current_snapshot(ctx, ctx->persistent.commit_index, ctx->persistent.term);
    ERROR(ctx, "Writing snapshot with index=%d and term=%d", ctx->persistent.commit_index, ctx->persistent.term);
    ctx->persistent.snapshot.index = ctx->persistent.commit_index;
    ctx->persistent.snapshot.term = ctx->persistent.term;

    /* All good */
    libfloat_internal_snapshot_apply_log(ctx);
    ctx->is_snapshotting = false;
}

void libfloat_snapshot_status_update(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool status)
{
    libfloat_node_t     *node = NULL;
    khint_t             iterator;

    iterator = kh_get(libfloat_node_id_t, ctx->nodes, id);
    if (iterator != kh_end(ctx->nodes))
    {
        node = kh_val(ctx->nodes, iterator);
        node->snapshot_in_progress = status;
    }
}