g41797/nats
Zig client for NATS Core and JetStream
afa8c15099937aa559cd8639f7ab85bf3cbc1fc8
d9142c73aedc5698beba58b3fbf2bcfe69864778
Let's try to program the prototype of the system shown in the picture above.
Key points:
Flow:
Acknowledgements:
You can find more details on NATS Example page
const DefaultConnectOpts: protocol.ConnectOpts = .{};
const STREAM: []const u8 = "ORDERS";
var js: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
defer js.DISCONNECT();
var CONF: protocol.StreamConfig = .{ .name = STREAM, .subjects = &.{ "ORDERS.*" } };
try js.CREATE(&CONF);
var submitter: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
defer submitter.DISCONNECT();
try submitter.PUBLISH("ORDERS.received", null, "order-1 ...");
const DeleteConsumer: bool = true;
const conf: ConsumerConfig = .{
.durable_name = "NEW",
.filter_subject = "ORDERS.received",
};
var NEW: Consumer = try Consumer.START(std.testing.allocator, DefaultConnectOpts, STREAM, &conf);
defer NEW.STOP(DeleteConsumer);
var order = try NEW.CONSUME(protocol.SECNS * 2);
.... process order ....
try NEW.ACK(order.?, true);
try NEW.PUBLISH("ORDERS.processed", null, "...");
const DeleteConsumer: bool = true;
const conf = .{
.durable_name = "DISPATCH",
.filter_subject = "ORDERS.processed",
};
var DISPATCH: Consumer = try Consumer.START(std.testing.allocator, DefaultConnectOpts, STREAM, &conf);
defer DISPATCH.STOP(DeleteConsumer);
order = try DISPATCH.CONSUME(protocol.SECNS * 2);
.... process order ....
try DISPATCH.ACK(order.?, false);
try DISPATCH.PUBLISH("ORDERS.completed", null, "...");
var subscriber: Subscriber = try Subscriber.SUBSCRIBE(std.testing.allocator, DefaultConnectOpts, STREAM, "ORDERS.*");
defer subscriber.UNSUBSCRIBE();
while (true) {
const pmsg = try subscriber.NEXT(protocol.SECNS * 5);
... pushes info to monitor ...
subscriber.REUSE(pmsg);
}
Add nats to build.zig.zon:
zig fetch --save=nats git+https://github.com/g41797/nats
Add nats to build.zig:
const nats = b.dependency("nats", .{
.target = target,
.optimize = optimize,
});
const lib = b.addStaticLibrary(..);
lib.root_module.addImport("nats", nats.module("nats"));
const lib_unit_tests = b.addTest(...);
lib_unit_tests.root_module.addImport("nats", nats.module("nats"));
Import nats:
const nats = @import("nats");
The project is largely inspired by the very existence of Nats client for php.