deckarep/zigrdkafka
This is librdkafka, hijacked and under the command and control of Zig, an easy Kafka client to tame your consumer and producer cluster.
All your codebase are belong to us.
This is librdkafka
, hijacked and under the command and control of Zig.
This project requires Zig 0.13
and is developed currently on macos-aarch64
.
I am building this Zig-flavored wrapper to librdkafka organically and as a discovery project. Therefore, I'm skipping proper unit-testing for awhile as the API is likely to change drastically. Additionally, I havn't yet put a lot of thought into how I want to handle errors. Obviously, I will use Zig error-sets but I need to figure out how to best map the librdkafka C error codes to the error-sets.
const std = @import("std");
const zrdk = @import("zigrdkafka");
pub fn main() !void {
// Create a fresh consumer configuration.
const conf = try zrdk.Conf.init();
try conf.set("bootstrap.servers", "localhost:9092");
try conf.set("group.id", "zig-cli-consumer");
// Create a new consumer.
const consumer = try zrdk.Consumer.init(conf);
defer consumer.deinit();
defer consumer.close();
// Define topics of interest to consume.
const topics = [_][]const u8{"topic.foo", "topic.bar"};
try consumer.subscribe(&topics);
while (true) {
const msg = consumer.poll(100);
defer msg.deinit();
if (!msg.isOK()) {
std.log.warn("either message was empty or it had an error...", .{});
// Deal with it here.
continue;
}
// Log the message.
std.log.info("message => {s}\ntopic => {s}\npartition => {d}\noffset => {d}\n", .{
msg.payloadAsString(),
msg.topic().name(),
msg.partition(),
msg.offset(),
});
count += 1;
}
std.log.info("Consumer loop ended!", .{});
std.log.info("Yep, it's really that easy!", .{});
}
Proper error-handling (not started) ⚠️
Conf + TopicConf ✅
Uuid ✅
Topic ✅
TopicPartition ✅
TopicPartitionList (in-progress)
Headers collection (in-progress)
Message (in-progress)
Raw Consumer (in-progress, but works!)
Raw Producer (in-progress, but works!)
Full support for librdkafka callbacks (in-progress)
logging callback
delivery report messages callback
rebalance callback
offset commits callback
stats callback
consume callback
background event callback
throttle callback
set socket callback
set connect callback
close socket callback
open callback
various others
Admin client (not-started)
Variations on Consumers/Producers (such as high-level consumer) (not-started)
etc. as there's a lot more to librdkafka than meets the eye just like Transformers!
# 1. Clone this repo and pull down any submodules.
git clone <this repo>
git submodule update --init --recursive
# 2. Install deps for your operating system.
# For MacOS
brew install openssl curl zlib zstd cyrus-sasl
# 3. Run cmake configuration for librdkafka
# This generates a config.h header file.
cd lib/librdkafka
./configure
cd ../../
# 4. Build all examples projects.
zig build
# 5. Each of the binaries are now located in: zig-out/bin
# Example:
# a. Prestep: Ensure your Kafka cluster is running.
# b. Navigate to the freshly built binaries.
cd zig-out/bin
# c. Run the producer in one window.
./producer
# d. Run the consumer in another window.
./consumer
Provide a 1st class Zig-flavored wrapper around librdkafka
Consider auto-generation down the road, at least for some things.
Design choice: As it stands, all Zig structs are value-type and immutable, this may change for all or some structs.
All C ugliness should be hidden away
c.rd_kafka_conf_set
=> z.Conf.set
(something like this)[]const u8
or null-terminated when required: [:0]const u8
[*c]
tags anywhere (in-progress), internal is ok.new
or create
=> init
for Zig._destroy()
=> .deinit()
for Zig.callconv(.C)
.