-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathping_pong.zig
290 lines (248 loc) · 11 KB
/
ping_pong.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
const std = @import("std");
const sig = @import("../sig.zig");
const testing = std.testing;
const DefaultPrng = std.rand.DefaultPrng;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const LruCache = sig.utils.lru.LruCache;
const Pubkey = sig.core.Pubkey;
const Hash = sig.core.Hash;
const Signature = sig.core.Signature;
const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo;
const SocketAddr = sig.net.SocketAddr;
const Duration = sig.time.Duration;
const PING_TOKEN_SIZE: usize = 32;
const PING_PONG_HASH_PREFIX: [16]u8 = .{
'S', 'O', 'L', 'A', 'N', 'A', '_', 'P', 'I', 'N', 'G', '_', 'P', 'O', 'N', 'G',
};
const U256 = struct { inner: struct { u128, u128 } };
pub const Ping = struct {
from: Pubkey,
token: [PING_TOKEN_SIZE]u8,
signature: Signature,
pub fn init(token: [PING_TOKEN_SIZE]u8, keypair: *const KeyPair) !Ping {
const signature = try keypair.sign(&token, null);
const self = .{
.from = Pubkey.fromPublicKey(&keypair.public_key),
.token = token,
.signature = .{ .data = signature.toBytes() },
};
return self;
}
pub fn initRandom(random: std.rand.Random, keypair: *const KeyPair) !Ping {
var token: [PING_TOKEN_SIZE]u8 = undefined;
random.bytes(&token);
const signature = keypair.sign(&token, null) catch unreachable; // TODO: do we need noise?
return Ping{
.from = Pubkey.fromPublicKey(&keypair.public_key),
.token = token,
.signature = .{ .data = signature.toBytes() },
};
}
pub fn verify(self: *const Ping) !void {
if (!try self.signature.verify(self.from, &self.token)) {
return error.InvalidSignature;
}
}
};
pub const Pong = struct {
from: Pubkey,
hash: Hash, // Hash of received ping token.
signature: Signature,
pub fn init(ping: *const Ping, keypair: *const KeyPair) !Pong {
var token_with_prefix = PING_PONG_HASH_PREFIX ++ ping.token;
var hash = Hash.generateSha256Hash(token_with_prefix[0..]);
const signature = keypair.sign(&hash.data, null) catch return error.SignatureError;
return .{
.from = Pubkey.fromPublicKey(&keypair.public_key),
.hash = hash,
.signature = .{ .data = signature.toBytes() },
};
}
pub fn verify(self: *const Pong) !void {
if (!try self.signature.verify(self.from, &self.hash.data)) {
return error.InvalidSignature;
}
}
pub fn initRandom(random: std.rand.Random, keypair: *const KeyPair) !Pong {
const ping = try Ping.initRandom(random, keypair);
return try Pong.init(&ping, keypair);
}
pub fn eql(self: *const Pong, other: *const @This()) bool {
return std.mem.eql(u8, &self.from.data, &other.from.data) and
std.mem.eql(u8, &self.hash.data, &other.hash.data) and
std.mem.eql(u8, &self.signature.data, &other.signature.data);
}
};
/// `PubkeyAndSocketAddr` is a 2 element tuple: `.{ Pubkey, SocketAddr }`
pub const PubkeyAndSocketAddr = struct {
pubkey: Pubkey,
socket_addr: SocketAddr,
};
pub const PingAndSocketAddr = struct { ping: Ping, socket: SocketAddr };
/// Maintains records of remote nodes which have returned a valid response to a
/// ping message, and on-the-fly ping messages pending a pong response from the
/// remote node.
pub const PingCache = struct {
// Time-to-live of received pong messages.
ttl: sig.time.Duration,
// Rate limit delay to generate pings for a given address
rate_limit_delay: sig.time.Duration,
// Timestamp of last ping message sent to a remote node.
// Used to rate limit pings to remote nodes.
pings: LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant),
// Verified pong responses from remote nodes.
pongs: LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant),
// Hash of ping tokens sent out to remote nodes,
// pending a pong response back.
pending_cache: LruCache(.non_locking, Hash, PubkeyAndSocketAddr),
// allocator
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(
allocator: std.mem.Allocator,
ttl: sig.time.Duration,
rate_limit_delay: sig.time.Duration,
cache_capacity: usize,
) error{OutOfMemory}!Self {
std.debug.assert(rate_limit_delay.asNanos() <= ttl.asNanos() / 2);
return Self{
.ttl = ttl,
.rate_limit_delay = rate_limit_delay,
.pings = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity),
.pongs = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity),
.pending_cache = try LruCache(.non_locking, Hash, PubkeyAndSocketAddr).init(allocator, cache_capacity),
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
self.pending_cache.deinit();
self.pings.deinit();
self.pongs.deinit();
}
/// Records a `Pong` if corresponding `Ping` exists in `pending_cache`
pub fn receviedPong(self: *Self, pong: *const Pong, socket: SocketAddr, now: std.time.Instant) bool {
const peer_and_addr = PubkeyAndSocketAddr{ .pubkey = pong.from, .socket_addr = socket };
if (self.pending_cache.peek(pong.hash)) |*pubkey_and_addr| {
if (pubkey_and_addr.pubkey.equals(&pong.from) and pubkey_and_addr.socket_addr.eql(&socket)) {
_ = self.pings.pop(peer_and_addr);
_ = self.pongs.put(peer_and_addr, now);
_ = self.pending_cache.pop(pong.hash);
return true;
}
}
return false;
}
pub fn maybePing(
self: *Self,
now: std.time.Instant,
peer_and_addr: PubkeyAndSocketAddr,
keypair: *const KeyPair,
) ?Ping {
if (self.pings.peek(peer_and_addr)) |earlier| {
// to prevent integer overflow
std.debug.assert(now.order(earlier) != .lt);
const elapsed: u64 = now.since(earlier);
if (elapsed < self.rate_limit_delay.asNanos()) {
return null;
}
}
var prng = DefaultPrng.init(0);
const ping = Ping.initRandom(prng.random(), keypair) catch return null;
var token_with_prefix = PING_PONG_HASH_PREFIX ++ ping.token;
const hash = Hash.generateSha256Hash(token_with_prefix[0..]);
_ = self.pending_cache.put(hash, peer_and_addr);
_ = self.pings.put(peer_and_addr, now);
return ping;
}
pub fn check(
self: *Self,
now: std.time.Instant,
peer_and_addr: PubkeyAndSocketAddr,
keypair: *const KeyPair,
) struct { passes_ping_check: bool, maybe_ping: ?Ping } {
if (self.pongs.get(peer_and_addr)) |last_pong_time| {
// to prevent integer overflow
std.debug.assert(now.order(last_pong_time) != .lt);
const age = now.since(last_pong_time);
// if age is greater than time-to-live, remove pong
if (age > self.ttl.asNanos()) {
_ = self.pongs.pop(peer_and_addr);
}
// if age is greater than time-to-live divided by 8, we maybe ping again
return .{ .passes_ping_check = true, .maybe_ping = if (age > self.ttl.asNanos() / 8) self.maybePing(now, peer_and_addr, keypair) else null };
}
return .{ .passes_ping_check = false, .maybe_ping = self.maybePing(now, peer_and_addr, keypair) };
}
/// Filters valid peers according to `PingCache` state and returns them along with any possible pings that need to be sent out.
///
/// *Note*: caller is responsible for deinit `ArrayList`(s) returned!
pub fn filterValidPeers(
self: *Self,
allocator: std.mem.Allocator,
our_keypair: KeyPair,
peers: []ThreadSafeContactInfo,
) error{OutOfMemory}!struct { valid_peers: std.ArrayList(usize), pings: std.ArrayList(PingAndSocketAddr) } {
const now = std.time.Instant.now() catch @panic("time not supported by OS!");
var valid_peers = std.ArrayList(usize).init(allocator);
var pings = std.ArrayList(PingAndSocketAddr).init(allocator);
for (peers, 0..) |*peer, i| {
if (peer.gossip_addr) |gossip_addr| {
const result = self.check(now, PubkeyAndSocketAddr{ .pubkey = peer.pubkey, .socket_addr = gossip_addr }, &our_keypair);
if (result.passes_ping_check) {
try valid_peers.append(i);
}
if (result.maybe_ping) |ping| {
try pings.append(.{ .ping = ping, .socket = gossip_addr });
}
}
}
return .{ .valid_peers = valid_peers, .pings = pings };
}
// only used in tests/benchmarks
pub fn _setPong(self: *Self, peer: Pubkey, socket_addr: SocketAddr) void {
_ = self.pongs.put(PubkeyAndSocketAddr{
.pubkey = peer,
.socket_addr = socket_addr,
}, std.time.Instant.now() catch unreachable);
}
};
test "PingCache works" {
var ping_cache = try PingCache.init(
testing.allocator,
Duration.fromNanos(10_000),
Duration.fromNanos(1_000),
1_024,
);
defer ping_cache.deinit();
var prng = std.rand.DefaultPrng.init(0);
const random = prng.random();
const the_node = PubkeyAndSocketAddr{ .pubkey = Pubkey.initRandom(random), .socket_addr = SocketAddr.UNSPECIFIED };
const now1 = try std.time.Instant.now();
var our_kp = try KeyPair.create(null);
const ping = ping_cache.maybePing(
now1,
the_node,
&our_kp,
);
const now2 = try std.time.Instant.now();
const resp = ping_cache.check(now2, the_node, &our_kp);
try testing.expect(!resp.passes_ping_check);
try testing.expect(resp.maybe_ping != null);
var result = try ping_cache.filterValidPeers(testing.allocator, our_kp, &[_]ThreadSafeContactInfo{});
defer result.valid_peers.deinit();
defer result.pings.deinit();
try testing.expect(ping != null);
}
test "ping signatures match rust" {
var keypair = try KeyPair.fromSecretKey(try std.crypto.sign.Ed25519.SecretKey.fromBytes([_]u8{
125, 52, 162, 97, 231, 139, 58, 13, 185, 212, 57, 142, 136, 12, 21, 127, 228, 71,
115, 126, 138, 52, 102, 69, 103, 185, 45, 255, 132, 222, 243, 138, 25, 117, 21, 11,
61, 170, 38, 18, 67, 196, 242, 219, 50, 154, 4, 254, 79, 227, 253, 229, 188, 230,
121, 12, 227, 248, 199, 156, 253, 144, 175, 67,
}));
var ping = Ping.init([_]u8{0} ** PING_TOKEN_SIZE, &keypair) catch unreachable;
const signature = ping.signature.data;
const rust_sig = [_]u8{ 52, 171, 91, 205, 183, 211, 38, 219, 53, 155, 163, 118, 202, 169, 15, 237, 147, 87, 209, 20, 6, 115, 24, 114, 196, 41, 217, 55, 123, 245, 35, 138, 126, 47, 233, 182, 90, 206, 13, 173, 212, 107, 94, 120, 167, 254, 14, 11, 253, 199, 158, 4, 203, 42, 173, 143, 214, 209, 132, 158, 223, 62, 214, 11 };
try testing.expect(std.mem.eql(u8, &signature, &rust_sig));
try ping.verify();
}