cocky-punch/raft
Raft Consensus Algorithm implementation in Zig
refs
An implementation of Raft Consensus Algorithm in Zig
Leader tracking
In-memory transport
Cluster-wide coordination
Basic TCP transport
gRPC transport
Client redirection
Election timeout logic
Client command submission
Log replication
Apply committed entries
Heartbeats / ticks
State machine plumbing
Client acknowledgment / client request retries - confirmations
Persistent log
Rotation of the log files
Persistent snapshots
Proper handler of TimeoutNow
- Leadership Transfer Mechanism - message (Raft extensions)
Read-only GET message (Raft extensions)
Fetch the master or a release
zig fetch --save https://github.com/cocky-punch/raft/archive/refs/heads/master.tar.gz
# or by a release version
# zig fetch --save https://github.com/cocky-punch/raft/archive/refs/tags/[RELEASE_VERSION].tar.gz
which will save a reference to the library into build.zig.zon
Then add this into build.zig
// after "b.installArtifact(exe)" line
const raft = b.dependency("raft", .{
.target = target,
.optimize = optimize,
});
exe.root_module.addImport("raft", raft.module("raft"));
MyStateMachine#apply(...)
const std = @import("std");
const raft = @import("raft");
const Allocator = std.mem.Allocator;
// Your own state machine implementation.
// This will apply committed commands (Set/Delete) to a file
const MyStateMachine = struct {
db_file: std.fs.File,
// Interface function
// Custom implementation, for a custom storage
pub fn apply(self: *MyStateMachine, entry: raft.LogEntry) void {
switch (entry.command) {
.Set => |cmd| {
// Simulate persistent insert by appending to file
_ = self.db_file.writer().print("SET {s} {any}\n", .{cmd.key, cmd.value}) catch {};
},
.Delete => |cmd| {
_ = self.db_file.writer().print("DELETE {s}\n", .{cmd.key}) catch {};
},
//TODO
.Update => |u| {
// Optional: check if key exists first
if (!self.db.contains(u.key)) return error.KeyNotFound;
try self.db_file_update_key(u.key, u.value); // Overwrite
},
//TODO
.Get => |cmd| {
// find a value in the file
},
}
}
pub fn init() !MyStateMachine {
const f = try std.fs.cwd().createFile("raft_db.txt", .{ .read = true, .truncate = false });
return MyStateMachine{ .db_file = f };
}
pub fn deinit(self: *MyStateMachine) void {
self.db_file.close();
}
//mock method
inline fn db_file_contain_key(k: string) bool {
return true;
}
//mock method
inline fn db_file_update_key(k: string, v: string) void {
}
};
pub fn main() !void {
const allocator = std.heap.page_allocator;
// Load config
const config = try raft.loadConfig(allocator, "node1_raft.yaml");
// Wrap the state machine logic
var sm_impl = try MyStateMachine.init();
const sm = raft.StateMachine(MyStateMachine){
.ctx = &sm_impl,
};
const Node = raft.RaftNode(MyStateMachine);
const ClusterT = raft.Cluster(MyStateMachine);
var cluster = ClusterT.init(allocator);
var node = try Node.init(allocator, config.self_id, sm);
for (config.nodes) |peer| {
try cluster.addNodeAddress(peer.id, peer.address);
}
// Start TCP server in background
var server = raft.RaftTcpServer(MyStateMachine).init(
allocator,
&node,
&cluster,
50, // max clients
);
const self_port = blk: {
for (config.nodes) |p| {
if (p.id == config.self_id) break :blk p.address.port;
}
return error.SelfNotFound;
};
const t = try std.Thread.spawn(.{}, raft.RaftTcpServer(MyStateMachine).start, .{ &server, self_port });
t.detach();
// Main loop: Raft ticking (election, heartbeat, etc.)
while (true) {
try cluster.tick();
try server.checkCommittedAcks(); // sends acks to clients if commit_index advanced
std.time.sleep(50 * std.time.ns_per_ms);
}
}
in the corresponding directory