Skip to content

Commit

Permalink
fix(prometheus): remove httpz again and fix prometheus metrics (#555)
Browse files Browse the repository at this point in the history
* Revert "fix(prometheus): prometheus cannot connect to sig's metrics (#531)"

This reverts commit 3e0043e.

* prometheus: use HTTP 1.1 instead of 1.0
  • Loading branch information
Rexicon226 authored Feb 11, 2025
1 parent a98e4e0 commit 3502333
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 43 deletions.
8 changes: 0 additions & 8 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ pub fn build(b: *Build) void {
const zig_cli_dep = b.dependency("zig-cli", dep_opts);
const zig_cli_mod = zig_cli_dep.module("zig-cli");

const httpz_dep = b.dependency("httpz", dep_opts);
const httpz_mod = httpz_dep.module("httpz");

const zstd_dep = b.dependency("zstd", dep_opts);
const zstd_mod = zstd_dep.module("zstd");

Expand Down Expand Up @@ -80,7 +77,6 @@ pub fn build(b: *Build) void {
sig_mod.addImport("zig-network", zig_network_mod);
sig_mod.addImport("base58", base58_mod);
sig_mod.addImport("zig-cli", zig_cli_mod);
sig_mod.addImport("httpz", httpz_mod);
sig_mod.addImport("zstd", zstd_mod);
switch (blockstore_db) {
.rocksdb => sig_mod.addImport("rocksdb", rocksdb_mod),
Expand Down Expand Up @@ -108,7 +104,6 @@ pub fn build(b: *Build) void {

sig_exe.root_module.addImport("xev", xev_mod);
sig_exe.root_module.addImport("base58", base58_mod);
sig_exe.root_module.addImport("httpz", httpz_mod);
sig_exe.root_module.addImport("zig-cli", zig_cli_mod);
sig_exe.root_module.addImport("zig-network", zig_network_mod);
sig_exe.root_module.addImport("zstd", zstd_mod);
Expand Down Expand Up @@ -148,7 +143,6 @@ pub fn build(b: *Build) void {

unit_tests_exe.root_module.addImport("xev", xev_mod);
unit_tests_exe.root_module.addImport("base58", base58_mod);
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_mod);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);
switch (blockstore_db) {
Expand Down Expand Up @@ -184,7 +178,6 @@ pub fn build(b: *Build) void {
fuzz_exe.root_module.addImport("xev", xev_mod);
fuzz_exe.root_module.addImport("base58", base58_mod);
fuzz_exe.root_module.addImport("zig-network", zig_network_mod);
fuzz_exe.root_module.addImport("httpz", httpz_mod);
fuzz_exe.root_module.addImport("zstd", zstd_mod);
switch (blockstore_db) {
.rocksdb => fuzz_exe.root_module.addImport("rocksdb", rocksdb_mod),
Expand Down Expand Up @@ -220,7 +213,6 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("xev", xev_mod);
benchmark_exe.root_module.addImport("base58", base58_mod);
benchmark_exe.root_module.addImport("zig-network", zig_network_mod);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("prettytable", pretty_table_mod);
switch (blockstore_db) {
Expand Down
4 changes: 0 additions & 4 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
.url = "https://github.com/sam701/zig-cli/archive/8c7a798c0f7fa0358d7ab41106fc872fca4cd995.tar.gz",
.hash = "1220c008492d9460c3be2b209600a948181e6efb3bf0d79a1633def499632e708f4b",
},
.httpz = .{
.url = "https://github.com/karlseguin/http.zig/archive/79dad0f0cc652830cd8e49bf3e73aa77155ad4b2.tar.gz",
.hash = "1220b8a918dfcee4fc8326ec337776e2ffd3029511c35f6b96d10aa7be98ca2faf99",
},
.zstd = .{
.url = "git+https://github.com/Syndica/zstd.zig#5095f011c1183aa67d696172795440d6a33732c9",
.hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88",
Expand Down
91 changes: 60 additions & 31 deletions src/prometheus/http.zig
Original file line number Diff line number Diff line change
@@ -1,48 +1,77 @@
const builtin = @import("builtin");
const std = @import("std");
const httpz = @import("httpz");

const Registry = @import("registry.zig").Registry;
const globalRegistry = @import("registry.zig").globalRegistry;
const DEFAULT_BUCKETS = @import("histogram.zig").DEFAULT_BUCKETS;

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the given port.
pub fn spawnMetrics(
gpa_allocator: std.mem.Allocator,
port: u16,
) !std.Thread {
const registry = globalRegistry();
return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port });
}

pub fn servePrometheus(
allocator: std.mem.Allocator,
registry: *Registry(.{}),
port: u16,
) !void {
const endpoint = MetricsEndpoint{
.allocator = allocator,
.registry = registry,
};
var server = try httpz.ServerCtx(*const MetricsEndpoint, *const MetricsEndpoint).init(
allocator,
.{ .port = port, .address = "0.0.0.0" },
&endpoint,
);
var router = server.router();
router.get("/metrics", getMetrics);
return server.listen();
}
const our_ip = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, port);
var tcp = try our_ip.listen(.{
.force_nonblocking = true,
.reuse_address = true,
});
defer tcp.deinit();

const MetricsEndpoint = struct {
allocator: std.mem.Allocator,
registry: *Registry(.{}),
};
while (true) {
const conn = tcp.accept() catch |err| switch (err) {
error.WouldBlock => continue,
else => |e| return e,
};

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the given port.
pub fn spawnMetrics(gpa_allocator: std.mem.Allocator, port: u16) !std.Thread {
const registry = globalRegistry();
return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port });
}
// TODO: unify this with the code for the RPC server
if (comptime builtin.target.isDarwin()) set_flags: {
const FlagsInt = @typeInfo(std.posix.O).Struct.backing_integer.?;
var flags_int: FlagsInt =
@intCast(try std.posix.fcntl(conn.stream.handle, std.posix.F.GETFL, 0));
const flags: *std.posix.O =
std.mem.bytesAsValue(std.posix.O, std.mem.asBytes(&flags_int));
if (flags.NONBLOCK == false and flags.CLOEXEC == true) break :set_flags;
flags.NONBLOCK = false;
flags.CLOEXEC = true;
_ = try std.posix.fcntl(conn.stream.handle, std.posix.F.SETFL, flags_int);
}

pub fn getMetrics(
self: *const MetricsEndpoint,
_: *httpz.Request,
response: *httpz.Response,
) !void {
response.content_type = .TEXT; // expected by prometheus
try self.registry.write(self.allocator, response.writer());
var read_buffer: [4096]u8 = undefined;
var http_server = std.http.Server.init(conn, &read_buffer);
var request = http_server.receiveHead() catch continue;

if (request.head.method != .GET or
!std.mem.eql(u8, request.head.target, "/metrics") //
) {
try request.respond("", .{
.status = .not_found,
.keep_alive = false,
});
continue;
}

var send_buffer: [4096]u8 = undefined;
var response = request.respondStreaming(.{
.send_buffer = &send_buffer,
.respond_options = .{
.status = .ok,
.keep_alive = true,
.extra_headers = &.{.{ .name = "Content-Type", .value = "text/plain" }},
},
});
try registry.write(allocator, response.writer());
try response.end();
}
}

/// Runs a test prometheus endpoint with dummy data.
Expand Down

0 comments on commit 3502333

Please sign in to comment.