~ne02ptzero/libfloat

ref: c93ce23cec30a46ea6bd54d341f22cb6d582bbd9 libfloat/libfloat.h -rw-r--r-- 16.9 KiB
c93ce23c — Louis Solofrizzo log: Add helpers for human readable consistency types 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
#ifndef LIBFLOAT_H
#define LIBFLOAT_H

#include "externals/khash.h"
#include "externals/list.h"

#include <stdbool.h>
#include <stdint.h>
#include <stdarg.h>
#include <time.h>

typedef uint32_t libfloat_term_t;
typedef uint32_t libfloat_entry_id_t;
typedef uint32_t libfloat_node_id_t;
typedef struct libfloat_ctx_s libfloat_ctx_t;

#include "log.h"
#include "rpc.h"
#include "node.h"

KHASH_MAP_INIT_INT(libfloat_entry_id_t, libfloat_log_entry_t *);
KHASH_MAP_INIT_INT(libfloat_node_id_t, libfloat_node_t *);

typedef enum {
    RAFT_STATE_NONE = 0,
    RAFT_STATE_FOLLOWER,
    RAFT_STATE_CANDIDATE,
    RAFT_STATE_LEADER
} libfloat_raft_state_t;

#define LIBFLOAT_SNAPSHOT_LOG 0

struct libfloat_ctx_s {
    struct {
        libfloat_term_t                 term;           /*!< Current term */
        libfloat_node_id_t              voted_for;      /*!< Last Node ID we voted for (0 if no current vote) */
        libfloat_entry_id_t             commit_index;   /*!< Last log we applied */
        khash_t(libfloat_entry_id_t)    *log;           /*!< Hashtable of current log */

        struct {
            libfloat_entry_id_t         index;          /*!< Last index of the snapshot */
            libfloat_term_t             term;           /*!< Last term of the snapshot */
        } snapshot;
    } persistent;

    struct {
        uint64_t        leader_election;                /*!< Count of leader elections for this cluster */
    } stat;

    struct {
        uint32_t                election_timeout;       /*!< Timeout for RAFT election (ms) */
        uint32_t                log_commit_timeout;     /*!< Timeout for log application (s) */
        uint32_t                compact_every_n_log;    /*!< Snapshot every N logs */
        bool                    avoid_congestion;       /*!< Avoid spamming AE to nodes already behind */
        uint32_t                sanity_timeout;         /*!< Timeout before a leader steps-down if it can't reach any other nodes (sec) */
        bool                    optimistic_replication; /*!< Do we assume that nodes are gonna replicate the log or not */
        uint32_t                max_logs_per_ae;        /*!< Max number of logs to be sent per AE request */
        bool                    do_revert;              /*!< Whether or not to revert logs */
    } conf;

    libfloat_raft_state_t       state;                  /*!< Current state of the node */
    size_t                      n_nodes;                /*!< Number of nodes in the cluster */
    khash_t(libfloat_node_id_t) *nodes;                 /*!< Hashtable of nodes */
    libfloat_node_t             *leader;                /*!< Current Leader (can be NULL) */
    libfloat_node_t             *me;                    /*!< My node in the cluster */
    libfloat_log_entry_t        *last_log;              /*!< Last log committed */

    uint32_t                    timeout_elapsed;        /*!< Current time elasped between two heartbeats */
    uint32_t                    election_timeout_rand;  /*!< Randomized election time */
    uint32_t                    request_timeout;        /*!< Timeout for AE */
    uint32_t                    logs_check;             /*!< Last check counter of log stuck */
    bool                        stepping_down;          /*!< Is the node stepping down from leadership */

    bool                        is_snapshotting;        /*!< Is the cluster currently snapshotting */
    libfloat_entry_id_t         snapshot_to_commit;     /*!< Last log ID that is to be written after the snapshot is done */

    libfloat_list_t             logs;                   /*<! Logs with commit still not acked */

    void                        *udata;                 /*!< User data */

    /*!
     * \brief Send a Vote Request to a node
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the request to
     * \param[in] libfloat_rpc_request_vote_t * - Vote Request
     */
    bool (*request_vote)(struct libfloat_ctx_s *, libfloat_node_t *, libfloat_rpc_request_vote_t *);

    /*!
     * \brief Callback when becoming leader of a cluster
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     */
    void (*become_leader_cb)(struct libfloat_ctx_s *);

    /*!
     * \brief Send an AppendEntries request to a node
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the request to
     * \param[in] libfloat_rpc_append_entries_t * - AppendEntries Request
     */
    bool (*append_entries)(struct libfloat_ctx_s *, libfloat_node_t *, libfloat_rpc_append_entries_t *);

    /*!
     * \brief Send a Snapshot to a specific node
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the snapshot to
     */
    bool (*send_snapshot)(struct libfloat_ctx_s *, libfloat_node_t *);

    /*!
     * \brief Write (and apply) a log to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_log_entry_t * - Log to write
     */
    bool (*write_log)(struct libfloat_ctx_s *, libfloat_log_entry_t *);

    /*!
     * \brief Delete (and revert) a log from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_log_entry_t * - Log to delete
     */
    bool (*delete_log)(struct libfloat_ctx_s *, libfloat_log_entry_t *);

    /*!
     * \brief Get a log from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_entry_id_t - ID of the log to get
     * \param[out] term - Term of the log
     * \param[out] data - Data of the log
     *
     * \note The data argument can be NULL. In this case, the intended behavior is to only get the log term
     * \note The data->buf must be allocated by the caller, the library will take ownership of it
     */
    bool (*get_log)(struct libfloat_ctx_s *, libfloat_entry_id_t, libfloat_term_t *, libfloat_log_data_t *data);

    /*!
     * \brief Snapshot a cluster to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     *
     * \note Snapshot can happen asynchronously, caller must call libfloat_snapshot_done when it's over
     * \sa libfloat_snapshot_done
     */
    void (*snapshot)(struct libfloat_ctx_s *);

    /*!
     * \brief Write the current term to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_term_t - Term to write
     */
    void (*write_current_term)(struct libfloat_ctx_s *, libfloat_term_t);

    /*!
     * \brief Write the current vote to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_id_t - Node ID to write
     */
    void (*write_vote)(struct libfloat_ctx_s *, libfloat_node_id_t);

    /*!
     * \brief Write the current commit index to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_entry_id_t - Log ID to write
     */
    void (*write_commit_index)(struct libfloat_ctx_s *, libfloat_entry_id_t);

    /*!
     * \brief Write the current snapshot informations to persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_entry_id_t - Last log ID of the snapshot
     * \param[in] libfloat_term_t - Last log term of the snapshot
     */
    void (*write_current_snapshot)(struct libfloat_ctx_s *, libfloat_entry_id_t, libfloat_term_t);

    /*!
     * \brief Append a snapshot log to the log, thus beginning a snapshot
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_entry_id_t - Current commit index
     * \param[in] libfloat_term_t - Current term
     */
    void (*append_snapshot_log)(struct libfloat_ctx_s *, libfloat_entry_id_t, libfloat_term_t);

    /*!
     * \brief Get the current term from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     */
    libfloat_term_t (*get_current_term)(struct libfloat_ctx_s *);

    /*!
     * \brief Get the current vote from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     */
    libfloat_node_id_t (*get_vote)(struct libfloat_ctx_s *);

    /*!
     * \brief Get the current commit index from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     */
    libfloat_entry_id_t (*get_commit_index)(struct libfloat_ctx_s *);

    /*!
     * \brief Get the current snapshot information from persistent storage
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[out] libfloat_entry_id_t * - Last log ID of the snapshot
     * \param[out] libfloat_term_t * - Last term of the snapshot
     */
    bool (*get_last_snapshot)(struct libfloat_ctx_s *, libfloat_entry_id_t *, libfloat_term_t *);

    /*!
     * \brief Send a leader resign request to a node. Leader will be stopped shortly after
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] libfloat_node_t * - Node to send the request to
     */
    void (*resign)(struct libfloat_ctx_s *, libfloat_node_t *node);

    /*!
     * \brief Log an error
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] const char * - String to log
     */
    void (*error)(struct libfloat_ctx_s *, const char *);

    /*!
     * \brief Log a debug information
     *
     * \param[in] struct libfloat_ctx_s * - Cluster context
     * \param[in] const char * - String to log
     *
     * \note It is up to the caller to log (or not) on this callback.
     */
    void (*debug)(struct libfloat_ctx_s *, const char *);

    /**
     * The following functions are expected to behave like their counterparts in man(3)
     */
    void        *(*malloc)(size_t);
    void        *(*calloc)(size_t, size_t);
    void        (*free)(void *);
    int         (*rand)(void);
    long int    (*time)(long int *);
    int         (*vsnprintf)(char *, size_t, const char *, va_list);
};

/*!
 * \brief Create a new libfloat context
 *
 * \param[in] malloc Allocation function
 * \param[in] calloc Allocation and bzero function
 * \param[in] free Free function
 *
 * \return A freshly allocated context on success, NULL on failure
 */
libfloat_ctx_t *libfloat_ctx_new(void *(*malloc)(size_t), void *(*calloc)(size_t, size_t), void (*free)(void *));

/*!
 * \brief Free and delete a libfloat context
 *
 * \param[in] ctx Context to free
 */
void libfloat_ctx_del(libfloat_ctx_t *ctx);

/*!
 * \brief Add a node to a cluster
 *
 * \param[in] ctx Libfloat context
 * \param[in] id ID of this new node
 * \param[in] is_me Is this node myself?
 * \param[in] udata User Data
 *
 * \note The caller can safely call this function with the same node id over and over
 */
void libfloat_add_node(libfloat_ctx_t *ctx, libfloat_node_id_t id, bool is_me, void *udata);

/*!
 * \brief Delete a node from the cluster
 *
 * \param[in] ctx Libfloat context
 * \param[in] id ID of the node to delete
 *
 * \note NOP if the ID is unknown
 */
void libfloat_del_node(libfloat_ctx_t *ctx, libfloat_node_id_t id);

/*!
 * \brief Periodic libfloat callback
 *
 * \param[in] ctx Libfloat context
 * \param[in] time Time elapsed between this call and the previous, in milliseconds
 */
void libfloat_periodic(libfloat_ctx_t *ctx, uint32_t time);

/*!
 * \brief Append a log to the cluster
 *
 * \param[in] ctx Libfloat context
 * \param[in] commit_type Wanted consistency for this log
 * \param[in] log Log to append
 * \param[in] commit Callback when consistency is achieved, or timeout is reached
 * \param[in] udata User data for callback
 *
 * \return true on success, false on failure
 *
 * \sa libfloat_commit_type_t
 * \note The caller must allocate the log, but the library will take ownership of it.
 */
bool libfloat_add_log(libfloat_ctx_t *ctx, libfloat_commit_type_t commit_type, libfloat_log_data_t *log, void (*commit)(void *, libfloat_commit_status_t), void *udata);

/*!
 * \brief Reload the cluster state from persistent storage
 *
 * \param[in] ctx Libfloat context
 */
void libfloat_reload_state(libfloat_ctx_t *ctx);

/*!
 * \brief Function to call when an AppendEntries request is received
 *
 * \param[in] ctx Libfloat context
 * \param[in] req Append Entries Request
 * \param[out] resp Append Entries response
 */
void libfloat_append_entries_receive(libfloat_ctx_t *ctx, libfloat_rpc_append_entries_t *req, libfloat_rpc_append_entries_response_t *resp);

/*!
 * \brief Function to call when an AppendEntries response is received
 *
 * \param[in] ctx Libfloat context
 * \param[in] resp Append Entries response
 */
void libfloat_append_entries_response(libfloat_ctx_t *ctx, libfloat_rpc_append_entries_response_t *resp);

/*!
 * \brief Function to call when a RequestVote request is received
 *
 * \param[in] ctx Libfloat context
 * \param[in] req RequestVote request
 * \param[out] resp RequestVote response
 */
void libfloat_request_vote_receive(libfloat_ctx_t *ctx, libfloat_rpc_request_vote_t *req, libfloat_rpc_response_vote_t *resp);

/*!
 * \brief Function to call when a RequestVote response is received
 *
 * \param[in] ctx libfloat context
 * \param[in] resp RequestVote response
 */
void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_vote_t *resp);

/*!
 * \brief Callback to call when a snapshot is done
 *
 * \param[in] ctx libfloat context
 * \param[in] success Whether or not the snapshot succeed
 */
void libfloat_snapshot_done(libfloat_ctx_t *ctx, bool success);

/*!
 * \brief Set user data for a node
 *
 * \param[in] ctx libfloat context
 * \param[in] id Node ID
 * \param[in] udata Pointer to set
 */
void libfloat_set_node_udata(libfloat_ctx_t *ctx, libfloat_node_id_t id, void *udata);

/*!
 * \brief Get a node object by its ID
 *
 * \param[in] ctx libfloat context
 * \param[in] id Node ID
 *
 * \return The node object on success, NULL on not found
 */
libfloat_node_t *libfloat_get_node(libfloat_ctx_t *ctx, libfloat_node_id_t id);

/*!
 * \brief Get the count of nodes in the cluster
 *
 * \param[in] ctx libfloat context
 *
 * \return The count of nodes in the cluster
 */
uint32_t libfloat_get_num_nodes(libfloat_ctx_t *ctx);

/*!
 * \brief Get a cluster node from a defined index (0-N)
 *
 * \param[in] ctx libfloat context
 * \param[in] i Defined index
 *
 * \return The node object on success, NULL on failure / not found
 */
libfloat_node_t *libfloat_get_node_from_index(libfloat_ctx_t *ctx, size_t i);

/*!
 * \brief Get the user data for a node
 *
 * \param[in] ctx libfloat context
 * \param[in] id Node ID
 *
 * \return The user data of this node. NULL if it was never set
 */
void *libfloat_get_node_udata(libfloat_ctx_t *ctx, libfloat_node_id_t id);

/*!
 * \brief Become the leader of a cluster
 *
 * \param[in] ctx libfloat context
 *
 * \note This function is *dangerous*. Do not call it if you don't know what you're doing
 */
void libfloat_become_leader(libfloat_ctx_t *ctx);

/*!
 * \brief Become follower in a cluster
 *
 * \param[in] ctx libfloat context
 */
void libfloat_become_follower(libfloat_ctx_t *ctx);

/*!
 * \brief Leave cluster
 *
 * \param[in] ctx libfloat context
 */
void libfloat_step_down(libfloat_ctx_t *ctx);

/*!
 * \brief Returns whether or not I am the current leader of a cluster
 *
 * \param[in] ctx libfloat context
 */
bool libfloat_am_i_leader(libfloat_ctx_t *ctx);

/*!
 * \brief Return the cluster node state as an human readable string
 *
 * \param[in] ctx libfloat context
 */
const char *libfloat_cluster_state_to_str(libfloat_ctx_t *ctx);

/*!
 * \brief Returns whether or not we are up to date with the leader (that we know of)
 *
 * \param[in] ctx libfloat context
 */
bool libfloat_am_i_up_to_date(libfloat_ctx_t *ctx);

/*!
 * \brief Resign as leader and send a Resign request to restart an election for others followers
 *
 * \param[in] ctx libfloat context
 */
void libfloat_election_resign(libfloat_ctx_t *ctx);

/*!
 * \brief Reveice a Resign request from leader
 *
 * \param[in] ctx libfloat context
 */
void libfloat_election_resign_receive(libfloat_ctx_t *ctx);

/*!
 * \brief Get a string explaining an error that has happened
 *
 * \note This is only useful for libfloat_add_log for now
 */
const char *libfloat_get_error_str(void);

/*!
 * \brief Retrieve term of last log
 *
 * \param[in] ctx libfloat context
 * \param[out] id last commit from log
 * \param[out] term last term from log
 *
 * \note return false if log is missing
 */
bool libfloat_get_last_term(libfloat_ctx_t *ctx, libfloat_entry_id_t *id, libfloat_term_t *term);

/*!
 * \brief Transform a commit-type to a human readable string
 *
 * \param[in] t Commit type
 * \return A string constant of the type
 */
const char *libfloat_commit_type_to_str(libfloat_commit_type_t t);

/*!
 * \brief Transform a string into a libfloat commit type
 *
 * \param[in] t String to read
 * \param[out] out Output type
 *
 * \return true on success, false on failure
 */
bool libfloat_str_to_commit_type(const char *t, libfloat_commit_type_t *out);

#endif /* LIBFLOAT_H */