~ne02ptzero/libfloat

ref: ec48c42c981c18cb67e4c2722dfd870d4424ff84 libfloat/election.c -rw-r--r-- 5.9 KiB
ec48c42c — Michael Bonfils Don't try to retrieve term if log is 0 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#include "internal.h"

void libfloat_vote_for(libfloat_ctx_t *ctx, libfloat_node_id_t id)
{
    ctx->persistent.voted_for = id;
    ctx->write_vote(ctx, id);
}

static int libfloat_get_number_of_votes_for_me(libfloat_ctx_t *ctx)
{
    int                 ret = 0;
    libfloat_node_t     *node;

    for_every_node(ctx, node, {
        if (node->has_voted_for_me)
            ret++;
    });
    return ret;
}

void libfloat_election_start(libfloat_ctx_t *ctx)
{
    libfloat_node_t     *node;
    libfloat_term_t     last_term = 0;
    libfloat_entry_id_t last_id = 0;

    DEBUG(ctx, "Election starting!");
    /* First, reset the vote of everyone */
    for_every_node(ctx, node, {
        node->has_voted_for_me = 0;
    });

    /* Increment term */
    libfloat_set_current_term(ctx, ctx->persistent.term  + 1);

    /* Then, vote for myself (Democracy!) */
    libfloat_vote_for(ctx, ctx->me->id);
    ctx->me->has_voted_for_me = 1;

    /* Discard the current leader and become candidate */
    ctx->leader = NULL;
    libfloat_become_candidate(ctx);

    /* Randomize election timeout, and reset counter */
    ctx->election_timeout_rand = ctx->conf.election_timeout + ctx->rand() % ctx->conf.election_timeout;
    ctx->timeout_elapsed = 0;

    libfloat_get_last_term(ctx, &last_id, &last_term);

    /* Send a vote request to each node of the cluster */
    for_every_node(ctx, node, {
        libfloat_rpc_request_vote_t     vote = { 0 };

        if (node == ctx->me)
        {
            /* Not talking to myself */
            continue;
        }

        /* Set informations and send the request to the node */
        vote.term = ctx->persistent.term;
        vote.candidate_id = ctx->me->id;
        vote.last_log_index = last_id;
        vote.last_log_term = last_term;

        ctx->request_vote(ctx, node, &vote);
    });
}

static bool libfloat_can_i_grant_vote(libfloat_ctx_t *ctx, libfloat_rpc_request_vote_t *req)
{
    libfloat_term_t     last_term;

    /* Let's check if I have already voted for someone */
    if (ctx->persistent.voted_for != 0)
    {
        if (ctx->persistent.voted_for == req->candidate_id)
            return true;
        return false;
    }

    if (ctx->persistent.commit_index > 0 && libfloat_get_last_term(ctx, NULL, &last_term) == false)
    {
        /* We have failed to retrieve last_ferm from log */
        return false;
    }

    /* Check term match */
    if (last_term > req->last_log_term && req->last_log_index <= ctx->persistent.commit_index)
    {
        /* We have a superior term and a superior log, we can't grant our vote */
        return false;
    }

    /* Check commit index match */
    if (ctx->persistent.commit_index > req->last_log_index)
    {
        /* We have a superior log, we can't grant our vote */

        /* There's an election in progress, and we have a superior log, let's try to become leader */
        libfloat_election_start(ctx);
        return false;
    }

    /* All good! */
    return true;
}

void libfloat_request_vote_receive(libfloat_ctx_t *ctx, libfloat_rpc_request_vote_t *req, libfloat_rpc_response_vote_t *resp)
{
    libfloat_node_t                     *node = libfloat_get_node(ctx, req->candidate_id);

    resp->node = ctx->me->id;
    if (node == NULL)
    {
        /* I don't know you, go away */
        return;
    }

    if (ctx->leader != NULL && ctx->leader != node && (ctx->timeout_elapsed < ctx->election_timeout_rand) && ctx->persistent.term >= req->term)
    {
        /* We already have a leader, and it's not the node that we're currently talking to */
        resp->vote_granted = false;
        goto end;
    }

    if (ctx->persistent.term < req->term)
    {
        libfloat_set_current_term(ctx, req->term);

        libfloat_become_follower(ctx);
        ctx->timeout_elapsed = 0;
    }

    if (libfloat_can_i_grant_vote(ctx, req))
    {
        DEBUG(ctx, "Election: Granted vote to node %d", node->id);
        libfloat_vote_for(ctx, node->id);
        resp->vote_granted = true;

        ctx->leader = NULL;
        ctx->timeout_elapsed = 0;
    }
    else
    {
        resp->vote_granted = false;
    }

end:
    resp->term = ctx->persistent.term;
}

void libfloat_request_vote_response(libfloat_ctx_t *ctx, libfloat_rpc_response_vote_t *resp)
{
    libfloat_node_t     *node = libfloat_get_node(ctx, resp->node);
    int                 votes;

    if (node == NULL)
    {
        /* I don't know you, go away */
        return;
    }

    if (ctx->state != RAFT_STATE_CANDIDATE)
    {
        /* I am not a candidate, I don't care about this */
        return;
    }

    if (ctx->persistent.term < resp->term)
    {
        libfloat_set_current_term(ctx, resp->term);

        libfloat_become_follower(ctx);
        ctx->leader = NULL;
        return;
    }

    if (ctx->persistent.term != resp->term)
    {
        /* The node who voted for us would have obtained our term */
        return;
    }

    if (resp->vote_granted == false)
        return;

    node->has_voted_for_me = 1;
    votes = libfloat_get_number_of_votes_for_me(ctx);

    if (votes >= ctx->n_nodes / 2 + 1)
    {
        /* We have a majority! */
        if (ctx->leader == NULL)
        {
            DEBUG(ctx, "I have a majority of votes! Assuming leadership");
            libfloat_become_leader(ctx);
            ctx->stat.leader_election++;
        }
    }
}

void libfloat_election_resign(libfloat_ctx_t *ctx)
{
    libfloat_node_t             *node;

    if (ctx->state != RAFT_STATE_LEADER)
    {
        ERROR(ctx ,"libfloat_election_resign: not a leader, I can't resign");
        return;
    }

    libfloat_step_down(ctx);

    if (ctx->resign == NULL)
        return;

    for_every_node(ctx, node, {
        /* Skip myself */
        if (ctx->me == node)
            continue;

        ctx->resign(ctx, node);
    });
}

void libfloat_election_resign_receive(libfloat_ctx_t *ctx)
{
    ERROR(ctx, "Restart election");
    libfloat_election_start(ctx);
}