diff --git a/Cargo.lock b/Cargo.lock index e12551cd..e34fc68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,9 +62,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" dependencies = [ "memchr", ] @@ -80,9 +80,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anyhow" @@ -121,7 +121,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -132,7 +132,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -216,7 +216,7 @@ checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#983596a457e7b7d8a1850c5bf70312b2537c57c8" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "base64", "byteorder", @@ -226,7 +226,7 @@ dependencies = [ "rand_chacha", "rust_decimal", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", ] @@ -249,7 +249,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.33", + "syn 2.0.38", "which", ] @@ -352,7 +352,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5353f36341f7451062466f0b755b96ac3a9547e4d7f6b70d603fc721a7d7896" dependencies = [ - "sha2 0.10.7", + "sha2 0.10.8", "tinyvec", ] @@ -380,9 +380,9 @@ dependencies = [ [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -416,9 +416,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "num-traits", ] @@ -436,9 +436,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.3" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -446,9 +446,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstyle", "clap_lex", @@ -463,7 +463,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -617,7 +617,7 @@ dependencies = [ "serde", "serde_derive", "serialport", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", ] @@ -685,9 +685,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -814,7 +814,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -872,7 +872,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sha2 0.9.9", + "sha2 0.10.8", "signature", "thiserror", "time", @@ -968,9 +968,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "heck" @@ -998,7 +998,7 @@ dependencies = [ "rand_core", "rsa", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "signature", "thiserror", "tss2", @@ -1007,7 +1007,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#983596a457e7b7d8a1850c5bf70312b2537c57c8" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "bytes", "prost", @@ -1020,9 +1020,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hmac" @@ -1148,12 +1148,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -1221,9 +1221,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libloading" @@ -1237,9 +1237,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "linux-raw-sys" @@ -1288,9 +1288,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "md5" @@ -1300,9 +1300,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "mime" @@ -1414,9 +1414,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", "libm", @@ -1440,7 +1440,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1496,9 +1496,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" +checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" dependencies = [ "memchr", "thiserror", @@ -1512,7 +1512,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.0", + "indexmap 2.0.2", ] [[package]] @@ -1532,7 +1532,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1560,7 +1560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1584,18 +1584,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8473a65b88506c106c28ae905ca4a2b83a2993640467a41bb3080627ddfd2c" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -1603,9 +1603,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d3e647e9eb04ddfef78dfee2d5b3fefdf94821c84b710a3d8ebc89ede8b164" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck", @@ -1618,29 +1618,29 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.33", + "syn 2.0.38", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56075c27b20ae524d00f247b8a4dc333e5784f889fe63099f8e626bc8d73486c" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "prost-types" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cebe0a918c97f86c217b0f76fd754e966f8b9f41595095cf7d74cb4e59d730f6" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ "prost", ] @@ -1721,9 +1721,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", @@ -1733,9 +1733,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", @@ -1750,9 +1750,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "rend" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab" +checksum = "a2571463863a6bd50c32f94402933f03457a3fbaf697a707c5be741e459f08fd" dependencies = [ "bytecheck", ] @@ -1854,9 +1854,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.37.23" +version = "0.37.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +checksum = "4279d76516df406a8bd37e7dff53fd37d1a093f997a3c34a5c21658c126db06d" dependencies = [ "bitflags 1.3.2", "errno", @@ -1956,14 +1956,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -1978,7 +1978,7 @@ checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2025,9 +2025,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -2036,9 +2036,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -2097,9 +2097,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -2146,9 +2146,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.33" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -2183,22 +2183,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2213,9 +2213,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "itoa", @@ -2226,15 +2226,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -2289,7 +2289,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2305,9 +2305,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -2338,16 +2338,16 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "toml_datetime", "winnow", ] [[package]] name = "tonic" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", @@ -2372,15 +2372,15 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b477abbe1d18c0b08f56cd01d1bc288668c5b5cfd19b2ae1886bbf599c546f1" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2446,7 +2446,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2493,9 +2493,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ucd-trie" @@ -2643,9 +2643,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" dependencies = [ "memchr", ] @@ -2676,5 +2676,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] diff --git a/src/api/mod.rs b/src/api/mod.rs index 60f5e7fe..4509fb13 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,7 +11,7 @@ pub use helium_proto::{ }; pub use server::LocalServer; -use crate::{Error, Result}; +use crate::{Error, PublicKey, Result}; impl TryFrom for crate::packet_router::RouterStatus { type Error = Error; @@ -20,7 +20,7 @@ impl TryFrom for crate::packet_router::RouterStatus { Ok(Self { uri: http::Uri::from_str(&value.uri)?, connected: value.connected, - session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(), + session_key: PublicKey::try_from(value.session_key).ok(), }) } } diff --git a/src/beaconer.rs b/src/beaconer.rs index 50d30242..3e3a77c1 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -1,16 +1,14 @@ //! This module provides proof-of-coverage (PoC) beaconing support. - use crate::{ - error::DecodeError, gateway::{self, BeaconResp}, message_cache::MessageCache, region_watcher, - service::{entropy::EntropyService, poc::PocIotService}, + service::{entropy::EntropyService, poc::PocIotService, Reconnect}, settings::Settings, - sign, sync, Base64, Keypair, PacketUp, PublicKey, RegionParams, Result, + sync, Base64, DecodeError, PacketUp, PublicKey, RegionParams, Result, }; use futures::TryFutureExt; -use helium_proto::{services::poc_lora, Message as ProtoMessage}; +use helium_proto::services::poc_lora::{self, lora_stream_response_v1}; use http::Uri; use std::sync::Arc; use time::{Duration, Instant}; @@ -38,23 +36,24 @@ impl MessageSender { pub struct Beaconer { /// Beacon/Witness handling disabled disabled: bool, - /// keypair to sign reports with - keypair: Arc, /// gateway packet transmit message queue transmit: gateway::MessageSender, /// Our receive queue. messages: MessageReceiver, + /// Service to deliver PoC reports to + service: PocIotService, + /// Service reconnect trigger + reconnect: Reconnect, /// Region change queue region_watch: region_watcher::MessageReceiver, /// Beacon interval interval: Duration, - // Time next beacon attempt is o be made + // Time next beacon attempt is to be made next_beacon_time: Instant, /// Last seen beacons last_seen: MessageCache>, /// Use for channel plan and FR parameters - region_params: RegionParams, - poc_ingest_uri: Uri, + region_params: Arc, entropy_uri: Uri, } @@ -66,14 +65,13 @@ impl Beaconer { transmit: gateway::MessageSender, ) -> Self { let interval = Duration::seconds(settings.poc.interval as i64); - let poc_ingest_uri = settings.poc.ingest_uri.clone(); let entropy_uri = settings.poc.entropy_uri.clone(); - let keypair = settings.keypair.clone(); - let region_params = region_watcher::current_value(®ion_watch); + let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone()); + let reconnect = Reconnect::default(); + let region_params = Arc::new(region_watcher::current_value(®ion_watch)); let disabled = settings.poc.disable; Self { - keypair, transmit, messages, region_watch, @@ -84,9 +82,10 @@ impl Beaconer { // cause the beacon to not occur next_beacon_time: Instant::now() + interval, region_params, - poc_ingest_uri, + service, entropy_uri, disabled, + reconnect, } } @@ -94,6 +93,7 @@ impl Beaconer { info!( beacon_interval = self.interval.whole_seconds(), disabled = self.disabled, + uri = %self.service.uri, "starting" ); @@ -128,7 +128,7 @@ impl Beaconer { if self.region_params.params.is_empty() { // Calculate a random but deterministic time offset // for this hotspot's beacons - let offset = mk_beacon_offset(self.keypair.public_key(), self.interval); + let offset = mk_beacon_offset(self.service.gateway_key(), self.interval); // Get a delay for the first beacon based on the // deterministic offset and the timestamp in the // first region params. If there's an error @@ -138,32 +138,42 @@ impl Beaconer { info!(delay = delay.whole_seconds(), "first beacon"); self.next_beacon_time = Instant::now() + delay; } - self.region_params = region_watcher::current_value(&self.region_watch); + self.region_params = Arc::new(region_watcher::current_value(&self.region_watch)); info!(region = RegionParams::to_string(&self.region_params), "region updated"); }, Err(_) => warn!("region watch disconnected"), - } - + }, + service_message = self.service.recv() => match service_message { + Ok(lora_stream_response_v1::Response::Offer(message)) => { + let session_result = self.handle_session_offer(message).await; + if session_result.is_ok() { + // (Re)set retry count to max to maximize time to + // next disconnect from service + self.reconnect.retry_count = self.reconnect.max_retries; + } else { + // Failed to handle session offer, disconnect + self.service.disconnect(); + } + self.reconnect.update_next_time(session_result.is_err()); + }, + Err(err) => { + warn!(?err, "ingest error"); + self.reconnect.update_next_time(true); + }, + }, + _ = self.reconnect.wait() => { + let reconnect_result = self.handle_reconnect().await; + self.reconnect.update_next_time(reconnect_result.is_err()); + }, } } } - pub async fn mk_beacon(&self) -> Result { - self.region_params.check_valid()?; - - let mut entropy_service = EntropyService::new(self.entropy_uri.clone()); - let remote_entropy = entropy_service.get_entropy().await?; - let local_entropy = beacon::Entropy::local()?; - - let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &self.region_params)?; - Ok(beacon) - } - /// Sends a gateway-to-gateway packet. /// /// See [`gateway::MessageSender::transmit_beacon`] - pub async fn send_beacon(&self, beacon: beacon::Beacon) -> Result { + pub async fn send_beacon(&mut self, beacon: beacon::Beacon) -> Result { let beacon_id = beacon .beacon_data() .map(|data| data.to_b64()) @@ -178,59 +188,43 @@ impl Beaconer { .map_ok(|BeaconResp { powe, tmst }| (powe, tmst)) .await?; - // Construct concurrent futures for connecting to the poc ingester and - // signing the report - let report_fut = self.mk_beacon_report(beacon.clone(), powe, tmst); - let service_fut = PocIotService::connect(self.poc_ingest_uri.clone()); - - match tokio::try_join!(report_fut, service_fut) { - Ok((report, mut poc_service)) => { - poc_service - .submit_beacon(report) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) - .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) - .await? - } - Err(err) => { - warn!(beacon_id, %err, "poc beacon report"); - } - } + Self::mk_beacon_report( + beacon.clone(), + powe, + tmst, + self.service.gateway_key().clone(), + ) + .and_then(|report| self.service.submit_beacon(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) + .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) + .await?; Ok(beacon) } - async fn mk_beacon_report( - &self, - beacon: beacon::Beacon, - conducted_power: i32, - tmst: u32, - ) -> Result { - let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; - report.tx_power = conducted_power; - report.tmst = tmst; - report.pub_key = self.keypair.public_key().to_vec(); - report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?; - Ok(report) + async fn handle_session_offer( + &mut self, + message: poc_lora::LoraStreamSessionOfferV1, + ) -> Result { + self.service.session_init(&message.nonce).await } - async fn mk_witness_report( - &self, - packet: PacketUp, - payload: Vec, - ) -> Result { - let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; - report.data = payload; - report.pub_key = self.keypair.public_key().to_vec(); - report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?; - Ok(report) + async fn handle_reconnect(&mut self) -> Result { + // Do not send waiting reports on ok here since we wait for a session + // offer. Also do not reset the reconnect retry counter since only a + // session key indicates a good connection + self.service + .reconnect() + .inspect_err(|err| warn!(%err, "failed to reconnect")) + .await } async fn handle_beacon_tick(&mut self) { if self.disabled { return; } - let last_beacon = self - .mk_beacon() + + let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone()) .inspect_err(|err| warn!(%err, "construct beacon")) .and_then(|beacon| self.send_beacon(beacon)) .map_ok_or_else(|_| None, Some) @@ -261,23 +255,49 @@ impl Beaconer { return; } - // Construct concurrent futures for connecting to the poc ingester and - // signing the report - let report_fut = self.mk_witness_report(packet, beacon_data); - let service_fut = PocIotService::connect(self.poc_ingest_uri.clone()); - - match tokio::try_join!(report_fut, service_fut) { - Ok((report, mut poc_service)) => { - let _ = poc_service - .submit_witness(report) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) - .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) - .await; - } - Err(err) => { - warn!(%err, "poc witness report"); - } - } + let _ = Self::mk_witness_report(packet, beacon_data, self.service.gateway_key().clone()) + .and_then(|report| self.service.submit_witness(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) + .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) + .await; + } + + pub async fn mk_beacon( + region_params: Arc, + entropy_uri: Uri, + ) -> Result { + region_params.check_valid()?; + + let mut entropy_service = EntropyService::new(entropy_uri); + let remote_entropy = entropy_service.get_entropy().await?; + let local_entropy = beacon::Entropy::local()?; + + let beacon = beacon::Beacon::new(remote_entropy, local_entropy, ®ion_params)?; + Ok(beacon) + } + + async fn mk_beacon_report( + beacon: beacon::Beacon, + conducted_power: i32, + tmst: u32, + gateway: PublicKey, + ) -> Result { + let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; + report.pub_key = gateway.to_vec(); + report.tx_power = conducted_power; + report.tmst = tmst; + Ok(report) + } + + async fn mk_witness_report( + packet: PacketUp, + payload: Vec, + gateway: PublicKey, + ) -> Result { + let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; + report.pub_key = gateway.to_vec(); + report.data = payload; + Ok(report) } } @@ -287,7 +307,7 @@ fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration { use rand::{Rng, SeedableRng}; use sha2::Digest; - let hash = sha2::Sha256::digest(&key.to_vec()); + let hash = sha2::Sha256::digest(key.to_vec()); let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref()); Duration::seconds(rng.gen_range(0..interval.whole_seconds())) } @@ -371,14 +391,14 @@ mod test { const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw"; const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1"; - let pubkey_1 = helium_crypto::PublicKey::from_str(PUBKEY_1).expect("public key"); + let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key"); let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6)); // Same key and interval should always end up at the same offset assert_eq!( offset_1, mk_beacon_offset(&pubkey_1, time::Duration::hours(6)) ); - let pubkey_2 = helium_crypto::PublicKey::from_str(PUBKEY_2).expect("public key 2"); + let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2"); let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6)); assert_eq!( offset_2, diff --git a/src/error.rs b/src/error.rs index 55671b08..a87074de 100644 --- a/src/error.rs +++ b/src/error.rs @@ -77,8 +77,8 @@ pub enum ServiceError { Stream, #[error("channel closed")] Channel, - #[error("no service")] - NoService, + #[error("no active session")] + NoSession, #[error("age {age}s > {max_age}s")] Check { age: u64, max_age: u64 }, #[error("Unable to connect to local server. Check that `helium_gateway` is running.")] @@ -170,8 +170,12 @@ impl Error { Error::Service(ServiceError::Channel) } - pub fn no_service() -> Error { - Error::Service(ServiceError::NoService) + pub fn no_session() -> Error { + Error::Service(ServiceError::NoSession) + } + + pub fn no_stream() -> Error { + Error::Service(ServiceError::Stream) } pub fn gateway_service_check(age: u64, max_age: u64) -> Error { diff --git a/src/keyed_uri.rs b/src/keyed_uri.rs index c929f5d8..50b5746a 100644 --- a/src/keyed_uri.rs +++ b/src/keyed_uri.rs @@ -43,7 +43,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::services::local::KeyedUri) -> Result { let result = Self { uri: http::Uri::from_str(&v.uri)?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.address)?), + pubkey: Arc::new(PublicKey::from_bytes(v.address)?), }; Ok(result) } @@ -63,7 +63,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::RoutingAddress) -> Result { let result = Self { uri: http::Uri::from_str(&String::from_utf8_lossy(&v.uri))?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.pub_key)?), + pubkey: Arc::new(PublicKey::from_bytes(v.pub_key)?), }; Ok(result) } diff --git a/src/keypair.rs b/src/keypair.rs index 97bae71e..dcea5d02 100644 --- a/src/keypair.rs +++ b/src/keypair.rs @@ -1,4 +1,4 @@ -use crate::*; +use crate::{DecodeError, Error, Result}; #[cfg(feature = "ecc608")] use helium_crypto::ecc608; #[cfg(feature = "tpm")] @@ -10,41 +10,29 @@ use serde::{de, Deserializer}; #[cfg(feature = "ecc608")] use std::path::Path; use std::{collections::HashMap, convert::TryFrom, fmt, fs, io, path, str::FromStr}; +use tonic::async_trait; #[derive(Debug)] pub struct Keypair(helium_crypto::Keypair); pub type PublicKey = helium_crypto::PublicKey; -pub fn load_from_file(path: &str) -> error::Result { - let data = fs::read(path)?; - Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) -} - -pub fn save_to_file(keypair: &Keypair, path: &str) -> io::Result<()> { - if let Some(parent) = path::PathBuf::from(path).parent() { - fs::create_dir_all(parent)?; - }; - fs::write(path, keypair.0.to_vec())?; - Ok(()) +#[async_trait] +pub trait Sign { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static; } -pub fn mk_session_keypair() -> Keypair { - let keypair = helium_crypto::Keypair::generate( - KeyTag { - network: Network::MainNet, - key_type: KeyType::Ed25519, - }, - &mut OsRng, - ); - keypair.into() +pub trait Verify { + fn verify(&self, pub_key: &crate::PublicKey) -> Result; } macro_rules! uri_error { ($format:expr) => { - error::DecodeError::keypair_uri(format!($format)) + DecodeError::keypair_uri(format!($format)) }; ($format:expr, $( $arg:expr ),+ ) => { - error::DecodeError::keypair_uri(format!($format, $( $arg ),+)) + DecodeError::keypair_uri(format!($format, $( $arg ),+)) }; } @@ -61,7 +49,7 @@ impl FromStr for Keypair { .parse() .map_err(|err| uri_error!("invalid keypair url \"{str}\": {err:?}"))?; match url.scheme_str() { - Some("file") | None => match load_from_file(url.path()) { + Some("file") | None => match Self::load_from_file(url.path()) { Ok(k) => Ok(k), Err(Error::IO(io_error)) if io_error.kind() == std::io::ErrorKind::NotFound => { let args = KeypairArgs::from_uri(&url)?; @@ -74,7 +62,7 @@ impl FromStr for Keypair { &mut OsRng, ) .into(); - save_to_file(&new_key, url.path()).map_err(|err| { + new_key.save_to_file(url.path()).map_err(|err| { uri_error!("unable to save key file \"{}\": {err:?}", url.path()) })?; Ok(new_key) @@ -86,7 +74,7 @@ impl FromStr for Keypair { }, #[cfg(feature = "ecc608")] Some("ecc") => { - let args = KeypairArgs::from_uri(&url).map_err(error::DecodeError::keypair_uri)?; + let args = KeypairArgs::from_uri(&url).map_err(DecodeError::keypair_uri)?; let bus_address = url.port_u16().unwrap_or(96); let slot = args.get::("slot", 0)?; @@ -113,7 +101,7 @@ impl FromStr for Keypair { } #[cfg(feature = "tpm")] Some("tpm") => { - let args = KeypairArgs::from_uri(&url).map_err(error::DecodeError::keypair_uri)?; + let args = KeypairArgs::from_uri(&url).map_err(DecodeError::keypair_uri)?; let network = args.get("network", Network::MainNet)?; let path = url.path(); @@ -137,6 +125,38 @@ impl std::ops::Deref for Keypair { } } +impl Keypair { + pub fn new() -> Self { + let keypair = helium_crypto::Keypair::generate( + KeyTag { + network: Network::MainNet, + key_type: KeyType::Ed25519, + }, + &mut OsRng, + ); + keypair.into() + } + + pub fn load_from_file(path: &str) -> Result { + let data = fs::read(path)?; + Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) + } + + pub fn save_to_file(&self, path: &str) -> io::Result<()> { + if let Some(parent) = path::PathBuf::from(path).parent() { + fs::create_dir_all(parent)?; + }; + fs::write(path, self.0.to_vec())?; + Ok(()) + } +} + +impl Default for Keypair { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] struct KeypairArgs(HashMap); diff --git a/src/lib.rs b/src/lib.rs index ac59cf19..dce3c23c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,9 +19,9 @@ mod base64; pub(crate) use crate::base64::Base64; pub use beacon::{Region, RegionParams}; -pub use error::{Error, Result}; +pub use error::{DecodeError, Error, Result}; pub use keyed_uri::KeyedUri; -pub use keypair::{Keypair, PublicKey}; +pub use keypair::{Keypair, PublicKey, Sign, Verify}; pub use packet::{PacketDown, PacketUp}; pub use settings::Settings; @@ -34,7 +34,7 @@ pub type Future = Pin> + Send>>; /// A type alias for `Stream` that may result in `crate::error::Error` pub type Stream = Pin> + Send>>; -pub async fn sign(keypair: K, data: Vec) -> Result> +async fn sign(keypair: K, data: Vec) -> Result> where K: AsRef + std::marker::Send + 'static, { @@ -49,13 +49,46 @@ where .await? } -macro_rules! verify { - ($key: expr, $msg: expr, $sig: ident) => {{ - let mut _msg = $msg.clone(); - _msg.$sig = vec![]; - let buf = _msg.encode_to_vec(); - $key.verify(&buf, &$msg.$sig).map_err(Error::from) - }}; +macro_rules! impl_sign { + ($type: ty) => { + #[tonic::async_trait] + impl Sign for $type { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static, + { + self.signature = crate::sign(keypair, self.encode_to_vec()).await?; + Ok(()) + } + } + }; } +pub(crate) use impl_sign; -pub(crate) use verify; +macro_rules! impl_verify { + ($type: ty) => { + impl crate::Verify for $type { + fn verify(&self, pub_key: &crate::PublicKey) -> Result { + use helium_crypto::Verify as _; + let mut _msg = self.clone(); + _msg.signature = vec![]; + let buf = _msg.encode_to_vec(); + pub_key + .verify(&buf, &self.signature) + .map_err(crate::Error::from) + } + } + }; +} +pub(crate) use impl_verify; + +// macro_rules! verify { +// ($key: expr, $msg: expr, $sig: ident) => {{ +// let mut _msg = $msg.clone(); +// _msg.$sig = vec![]; +// let buf = _msg.encode_to_vec(); +// $key.verify(&buf, &$msg.$sig).map_err(Error::from) +// }}; +// } + +// pub(crate) use verify; diff --git a/src/packet.rs b/src/packet.rs index a6ddf776..8d5b86ee 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,4 +1,4 @@ -use crate::{error::DecodeError, Error, PublicKey, Region, Result}; +use crate::{DecodeError, Error, PublicKey, Region, Result}; use helium_proto::services::{ poc_lora, router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1}, diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 63f8b2ba..007aaba1 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -1,31 +1,21 @@ use crate::{ gateway, - keypair::mk_session_keypair, message_cache::{CacheMessage, MessageCache}, - service::packet_router::PacketRouterService, - sign, sync, Base64, Keypair, PacketUp, Result, Settings, + service::{packet_router::PacketRouterService, Reconnect}, + sync, Base64, PacketUp, PublicKey, Result, Settings, }; -use exponential_backoff::Backoff; use futures::TryFutureExt; -use helium_proto::{ - services::router::{ - envelope_down_v1, envelope_up_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, - PacketRouterSessionInitV1, PacketRouterSessionOfferV1, - }, - Message as ProtoMessage, +use helium_proto::services::router::{ + envelope_down_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, PacketRouterSessionOfferV1, }; use serde::Serialize; -use std::{sync::Arc, time::Instant as StdInstant}; -use tokio::time::{self, Duration, Instant}; +use std::{ops::Deref, time::Instant as StdInstant}; +use tokio::time::Duration; use tracing::{debug, info, warn}; const STORE_GC_INTERVAL: Duration = Duration::from_secs(60); -const RECONNECT_BACKOFF_RETRIES: u32 = 40; -const RECONNECT_BACKOFF_MIN_WAIT: Duration = Duration::from_secs(5); -const RECONNECT_BACKOFF_MAX_WAIT: Duration = Duration::from_secs(1800); // 30 minutes - #[derive(Debug)] pub enum Message { Uplink { @@ -40,7 +30,7 @@ pub struct RouterStatus { #[serde(with = "http_serde::uri")] pub uri: http::Uri, pub connected: bool, - pub session_key: Option, + pub session_key: Option, } pub type MessageSender = sync::MessageSender; @@ -64,9 +54,7 @@ pub struct PacketRouter { messages: MessageReceiver, transmit: gateway::MessageSender, service: PacketRouterService, - reconnect_retry: u32, - session_key: Option>, - keypair: Arc, + reconnect: Reconnect, store: MessageCache, } @@ -80,14 +68,13 @@ impl PacketRouter { let service = PacketRouterService::new(router_settings.uri.clone(), settings.keypair.clone()); let store = MessageCache::new(router_settings.queue); + let reconnect = Reconnect::default(); Self { service, - keypair: settings.keypair.clone(), - session_key: None, transmit, messages, store, - reconnect_retry: 0, + reconnect, } } @@ -95,16 +82,6 @@ impl PacketRouter { pub async fn run(&mut self, shutdown: &triggered::Listener) -> Result { info!(uri = %self.service.uri, "starting"); - let reconnect_backoff = Backoff::new( - RECONNECT_BACKOFF_RETRIES, - RECONNECT_BACKOFF_MIN_WAIT, - RECONNECT_BACKOFF_MAX_WAIT, - ); - - // Use a deadline based sleep for reconnect to allow the store gc timer - // to fire without resetting the reconnect timer - let mut reconnect_sleep = Instant::now() + RECONNECT_BACKOFF_MIN_WAIT; - loop { tokio::select! { _ = shutdown.clone() => { @@ -114,76 +91,61 @@ impl PacketRouter { message = self.messages.recv() => match message { Some(Message::Uplink{packet, received}) => if self.handle_uplink(packet, received).await.is_err() { - self.disconnect(); + self.service.disconnect(); warn!("router disconnected"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true); + self.reconnect.update_next_time(true); }, Some(Message::Status(tx_resp)) => { let status = RouterStatus { uri: self.service.uri.clone(), connected: self.service.is_connected(), - session_key: self.session_key.as_ref().map(|keypair| keypair.public_key().to_owned()), + session_key: self.service.session_key().cloned(), }; tx_resp.send(status) } None => warn!("ignoring closed message channel"), }, - _ = time::sleep_until(reconnect_sleep) => { - reconnect_sleep = self.handle_reconnect(&reconnect_backoff).await; + _ = self.reconnect.wait() => { + let reconnect_result = self.handle_reconnect().await; + self.reconnect.update_next_time(reconnect_result.is_err()); }, router_message = self.service.recv() => match router_message { - Ok(Some(envelope_down_v1::Data::Packet(message))) => self.handle_downlink(message).await, - Ok(Some(envelope_down_v1::Data::SessionOffer(message))) => { + Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await, + Ok(envelope_down_v1::Data::SessionOffer(message)) => { let session_result = self.handle_session_offer(message).await; - if session_result.is_err() { - self.disconnect(); + if session_result.is_ok() { + // (Re)set retry count to max to maximize time to + // next disconnect from service + self.reconnect.retry_count = self.reconnect.max_retries; + } else { + // Failed fto handle session offer, disconnect + self.service.disconnect(); } - reconnect_sleep = self.next_connect(&reconnect_backoff, session_result.is_err()); - }, - Ok(None) => { - warn!("router disconnected"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true) + self.reconnect.update_next_time(session_result.is_err()); }, Err(err) => { warn!(?err, "router error"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true) + self.reconnect.update_next_time(true); }, } } } } - fn next_connect(&mut self, reconnect_backoff: &Backoff, inc_retry: bool) -> Instant { - if inc_retry { - if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES { - self.reconnect_retry = 0; - } else { - self.reconnect_retry += 1; - } - } - let backoff = reconnect_backoff - .next(self.reconnect_retry) - .unwrap_or(RECONNECT_BACKOFF_MAX_WAIT); - Instant::now() + backoff - } - - async fn handle_reconnect(&mut self, reconnect_backoff: &Backoff) -> Instant { - let reconnect_result = self.service.reconnect().await; + async fn handle_reconnect(&mut self) -> Result { // Do not send waiting packets on ok here since we wait for a sesson // offer. Also do not reset the reconnect retry counter since only a // session key indicates a good connection - if let Err(err) = &reconnect_result { - warn!(%err, "failed to connect"); - } - self.next_connect(reconnect_backoff, reconnect_result.is_err()) + self.service + .reconnect() + .inspect_err(|err| warn!(%err, "failed to reconnect")) + .await } async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result { self.store.push_back(uplink, received); if self.service.is_connected() { - if let Some(session_key) = &self.session_key { - self.send_waiting_packets(session_key.clone()).await?; - } + self.send_waiting_packets().await?; } Ok(()) } @@ -193,32 +155,18 @@ impl PacketRouter { } async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result { - let session_key = mk_session_key_init(self.keypair.clone(), &message) - .and_then(|(session_key, session_init)| { - self.service.send(session_init).map_ok(|_| session_key) - }) - .inspect_err(|err| warn!(%err, "failed to initialize session")) - .await?; - self.session_key = Some(session_key.clone()); - info!(session_key = %session_key.public_key(),"initialized session"); - self.send_waiting_packets(session_key.clone()) + self.service.session_init(&message.nonce).await?; + self.send_waiting_packets() .inspect_err(|err| warn!(%err, "failed to send queued packets")) - .await?; - self.reconnect_retry = RECONNECT_BACKOFF_RETRIES; - Ok(()) + .await } - fn disconnect(&mut self) { - self.service.disconnect(); - self.session_key = None; - } - - async fn send_waiting_packets(&mut self, keypair: Arc) -> Result { + async fn send_waiting_packets(&mut self) -> Result { while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { info!(removed, "discarded queued packets"); } - if let Err(err) = self.send_packet(&packet, keypair.clone()).await { + if let Err(err) = self.send_packet(&packet).await { warn!(%err, "failed to send uplink"); self.store.push_front(packet); return Err(err); @@ -227,44 +175,11 @@ impl PacketRouter { Ok(()) } - async fn send_packet( - &mut self, - packet: &CacheMessage, - keypair: Arc, - ) -> Result { + async fn send_packet(&mut self, packet: &CacheMessage) -> Result { debug!(packet_hash = packet.hash().to_b64(), "sending packet"); - let uplink = mk_uplink(packet, keypair).await?; - self.service.send(uplink).await + let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); + uplink.hold_time = packet.hold_time().as_millis() as u64; + self.service.send_uplink(uplink).await } } - -pub async fn mk_uplink( - packet: &CacheMessage, - keypair: Arc, -) -> Result { - use std::ops::Deref; - let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); - uplink.hold_time = packet.hold_time().as_millis() as u64; - uplink.signature = sign(keypair, uplink.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::Packet(uplink); - Ok(envelope) -} - -pub async fn mk_session_key_init( - keypair: Arc, - offer: &PacketRouterSessionOfferV1, -) -> Result<(Arc, envelope_up_v1::Data)> { - let session_keypair = Arc::new(mk_session_keypair()); - let session_key = session_keypair.public_key(); - - let mut session_init = PacketRouterSessionInitV1 { - gateway: keypair.public_key().into(), - session_key: session_key.into(), - nonce: offer.nonce.clone(), - signature: vec![], - }; - session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::SessionInit(session_init); - Ok((session_keypair, envelope)) -} diff --git a/src/service/conduit.rs b/src/service/conduit.rs new file mode 100644 index 00000000..bbffcf7c --- /dev/null +++ b/src/service/conduit.rs @@ -0,0 +1,190 @@ +use crate::{ + service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, + Error, Keypair, PublicKey, Result, Sign, +}; +use futures::TryFutureExt; +use helium_proto::services::{Channel, Endpoint}; +use http::Uri; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, warn}; + +/// The time between TCP keepalive messages to keep the connection to the packet +/// router open. Some load balancer disconnect after a number of seconds. AWS +/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send +/// keepalives +pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); +pub const CONDUIT_CAPACITY: usize = 50; + +/// A conduit service maintains a re-connectable connection to a remote service. +#[derive(Debug)] +pub struct ConduitService> { + pub uri: Uri, + session_keypair: Option>, + conduit: Option>, + keypair: Arc, + client: C, +} + +#[derive(Debug)] +struct Conduit { + tx: mpsc::Sender, + rx: tonic::Streaming, +} + +#[tonic::async_trait] +pub trait ConduitClient { + async fn init( + &mut self, + endpoint: Channel, + tx: mpsc::Sender, + client_rx: ReceiverStream, + keypair: Arc, + ) -> Result>; + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result; +} + +impl Conduit { + async fn new>( + uri: Uri, + client: &mut C, + keypair: Arc, + ) -> Result { + let endpoint = Endpoint::from(uri) + .timeout(RPC_TIMEOUT) + .connect_timeout(CONNECT_TIMEOUT) + .tcp_keepalive(Some(TCP_KEEP_ALIVE_DURATION)) + .connect_lazy(); + let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); + let rx = client + .init( + endpoint, + tx.clone(), + ReceiverStream::new(client_rx), + keypair, + ) + .await?; + Ok(Self { tx, rx }) + } + + async fn recv(&mut self) -> Result> { + Ok(self.rx.message().await?) + } + + async fn send(&mut self, msg: U) -> Result { + Ok(self.tx.send(msg).await?) + } +} + +impl> ConduitService { + pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { + Self { + uri, + keypair, + client, + conduit: None, + session_keypair: None, + } + } + + pub async fn send(&mut self, msg: U) -> Result { + if self.conduit.is_none() { + self.connect().await?; + } + // Unwrap since the above connect early exits if no conduit is created + match self.conduit.as_mut().unwrap().send(msg).await { + Ok(()) => Ok(()), + other => { + self.disconnect(); + other + } + } + } + + pub async fn recv(&mut self) -> Result { + // Since recv is usually called from a select loop we don't try a + // connect every time it is called since the rate for attempted + // connections in failure setups would be as high as the loop rate of + // the caller. This relies on either a reconnect attempt or a message + // send at a later time to reconnect the conduit. + if self.conduit.is_none() { + futures::future::pending::<()>().await; + return Err(Error::no_stream()); + } + match self.conduit.as_mut().unwrap().recv().await { + Ok(Some(msg)) => Ok(msg), + Ok(None) => { + self.disconnect(); + Err(Error::no_stream()) + } + Err(err) => { + self.disconnect(); + Err(err) + } + } + } + + pub fn disconnect(&mut self) { + self.conduit = None; + self.session_keypair = None; + } + + pub async fn connect(&mut self) -> Result { + let conduit = + Conduit::new(self.uri.clone(), &mut self.client, self.keypair.clone()).await?; + self.conduit = Some(conduit); + Ok(()) + } + + pub async fn reconnect(&mut self) -> Result { + self.disconnect(); + self.connect().await + } + + pub fn is_connected(&self) -> bool { + self.conduit.is_some() + } + + pub fn gateway_key(&self) -> &PublicKey { + self.keypair.public_key() + } + + pub fn session_key(&self) -> Option<&PublicKey> { + self.session_keypair.as_ref().map(|k| k.public_key()) + } + + pub fn session_keypair(&self) -> Option> { + self.session_keypair.clone() + } + + pub async fn session_sign(&self, msg: &mut M) -> Result { + if let Some(keypair) = self.session_keypair.as_ref() { + msg.sign(keypair.clone()).await?; + Ok(()) + } else { + Err(Error::no_session()) + } + } + + pub async fn session_init(&mut self, nonce: &[u8]) -> Result { + let session_keypair = Arc::new(Keypair::new()); + let session_key = session_keypair.public_key(); + let msg = self + .client + .mk_session_init(nonce, session_key, self.keypair.clone()) + .await?; + self.send(msg) + .inspect_err(|err| warn!(%err, "failed to initialize session")) + .await?; + self.session_keypair = Some(session_keypair.clone()); + info!(%session_key, "initialized session"); + Ok(()) + } +} diff --git a/src/service/config.rs b/src/service/config.rs index eaeaaae8..d9fcb32e 100644 --- a/src/service/config.rs +++ b/src/service/config.rs @@ -1,10 +1,14 @@ use crate::{ + impl_sign, impl_verify, service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - sign, verify, Error, KeyedUri, Keypair, Region, RegionParams, Result, + KeyedUri, Keypair, Region, RegionParams, Result, Sign, Verify, }; -use helium_crypto::Verify; use helium_proto::{ - services::{self, iot_config::GatewayRegionParamsReqV1, Channel, Endpoint}, + services::{ + self, + iot_config::{GatewayRegionParamsReqV1, GatewayRegionParamsResV1}, + Channel, Endpoint, + }, Message, }; use std::sync::Arc; @@ -39,10 +43,13 @@ impl ConfigService { address: keypair.public_key().to_vec(), signature: vec![], }; - req.signature = sign(keypair, req.encode_to_vec()).await?; + req.sign(keypair).await?; let resp = self.client.region_params(req).await?.into_inner(); - verify!(&self.uri.pubkey, resp, signature)?; + resp.verify(&self.uri.pubkey)?; Ok(RegionParams::try_from(resp)?) } } + +impl_sign!(GatewayRegionParamsReqV1); +impl_verify!(GatewayRegionParamsResV1); diff --git a/src/service/mod.rs b/src/service/mod.rs index b3c061cd..87920d67 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,9 +1,61 @@ -use std::time::Duration; +use tokio::time::{self, Duration, Instant}; pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); +pub const RECONNECT_BACKOFF_RETRIES: u32 = 40; +pub const RECONNECT_BACKOFF_MIN_WAIT: Duration = Duration::from_secs(5); +pub const RECONNECT_BACKOFF_MAX_WAIT: Duration = Duration::from_secs(1800); // 30 minutes + +pub mod conduit; pub mod config; pub mod entropy; pub mod packet_router; pub mod poc; + +#[derive(Debug)] +pub struct Reconnect { + backoff: exponential_backoff::Backoff, + next_time: Instant, + pub max_wait: Duration, + pub max_retries: u32, + pub retry_count: u32, +} + +impl Default for Reconnect { + fn default() -> Self { + Self::new( + RECONNECT_BACKOFF_RETRIES, + RECONNECT_BACKOFF_MIN_WAIT, + RECONNECT_BACKOFF_MAX_WAIT, + ) + } +} + +impl Reconnect { + pub fn new(retries: u32, min: Duration, max: Duration) -> Self { + Self { + backoff: exponential_backoff::Backoff::new(retries, min, max), + next_time: Instant::now() + min, + max_retries: retries, + max_wait: max, + retry_count: 0, + } + } + + pub fn wait(&self) -> time::Sleep { + time::sleep_until(self.next_time) + } + + pub fn update_next_time(&mut self, inc_retry: bool) { + if inc_retry { + if self.retry_count == self.max_retries { + self.retry_count = 0; + } else { + self.retry_count += 1; + } + } + let backoff = self.backoff.next(self.retry_count).unwrap_or(self.max_wait); + self.next_time = Instant::now() + backoff; + } +} diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index a9327f92..394cf05a 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -1,95 +1,48 @@ -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; - use crate::{ - error::DecodeError, - service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - sign, Error, Keypair, Result, + impl_sign, + service::conduit::{ConduitClient, ConduitService}, + DecodeError, Error, Keypair, PublicKey, Result, Sign, }; - use helium_proto::{ services::{ router::{ envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient, - PacketRouterRegisterV1, + PacketRouterPacketUpV1, PacketRouterRegisterV1, PacketRouterSessionInitV1, }, - Channel, Endpoint, + Channel, }, Message, }; - use http::Uri; +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; - -type PacketClient = PacketRouterClient; - -type PacketSender = mpsc::Sender; -type PacketReceiver = tonic::Streaming; +use tonic::async_trait; // The router service maintains a re-connectable connection to a remote packet // router. The service will connect when (re)connect or a packet send is // attempted. It will ensure that the register rpc is called on the constructed // connection before a packet is sent. -#[derive(Debug)] -pub struct PacketRouterService { - pub uri: Uri, - conduit: Option, - keypair: Arc, -} - -/// A router conduit is the tx/rx stream pair for the `route` rpc on the -/// `packet_router` service. It does not connect on construction but on the -/// first messsage sent. -#[derive(Debug)] -struct PacketRouterConduit { - tx: PacketSender, - rx: PacketReceiver, -} - -pub const CONDUIT_CAPACITY: usize = 50; - -/// The time between TCP keepalive messages to keep the connection to the packet -/// router open. Some load balancer disconnect after a number of seconds. AWS -/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send -/// keepalives -pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); - -impl PacketRouterConduit { - async fn new(uri: Uri) -> Result { - let endpoint = Endpoint::from(uri) - .timeout(RPC_TIMEOUT) - .connect_timeout(CONNECT_TIMEOUT) - .tcp_keepalive(Some(TCP_KEEP_ALIVE_DURATION)) - .connect_lazy(); - let mut client = PacketClient::new(endpoint); - let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); - let rx = client - .route(ReceiverStream::new(client_rx)) - .await? - .into_inner(); - Ok(Self { tx, rx }) - } - - async fn recv(&mut self) -> Result> { - match self.rx.message().await { - Ok(Some(msg)) => match msg.data { - Some(data) => Ok(Some(data)), - None => Err(DecodeError::invalid_envelope()), - }, - Ok(None) => Ok(None), - Err(err) => Err(err.into()), - } - } - - async fn send(&mut self, msg: envelope_up_v1::Data) -> Result { - let msg = EnvelopeUpV1 { data: Some(msg) }; - Ok(self.tx.send(msg).await?) - } - - async fn register(&mut self, keypair: Arc) -> Result { +pub struct PacketRouterService( + ConduitService, +); + +pub struct PacketRouterConduitClient {} + +#[async_trait] +impl ConduitClient for PacketRouterConduitClient { + async fn init( + &mut self, + endpoint: Channel, + tx: mpsc::Sender, + client_rx: ReceiverStream, + keypair: Arc, + ) -> Result> { + let mut client = PacketRouterClient::::new(endpoint); + let rx = client.route(client_rx).await?.into_inner(); let mut msg = PacketRouterRegisterV1 { timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -99,73 +52,69 @@ impl PacketRouterConduit { signature: vec![], session_capable: true, }; - msg.signature = sign(keypair.clone(), msg.encode_to_vec()).await?; + msg.sign(keypair.clone()).await?; let msg = EnvelopeUpV1 { data: Some(envelope_up_v1::Data::Register(msg)), }; - Ok(self.tx.send(msg).await?) + tx.send(msg).await.map_err(|_| Error::channel())?; + Ok(rx) } -} -impl PacketRouterService { - pub fn new(uri: Uri, keypair: Arc) -> Self { - Self { - uri, - conduit: None, - keypair, - } + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = PacketRouterSessionInitV1 { + gateway: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = EnvelopeUpV1 { + data: Some(envelope_up_v1::Data::SessionInit(session_init)), + }; + Ok(envelope) } +} - pub async fn send(&mut self, msg: envelope_up_v1::Data) -> Result { - if self.conduit.is_none() { - self.connect().await?; - } - // Unwrap since the above connect early exits if no conduit is created - match self.conduit.as_mut().unwrap().send(msg).await { - Ok(()) => Ok(()), - other => { - self.disconnect(); - other - } - } - } +impl_sign!(PacketRouterRegisterV1); +impl_sign!(PacketRouterPacketUpV1); +impl_sign!(PacketRouterSessionInitV1); - pub async fn recv(&mut self) -> Result> { - // Since recv is usually called from a select loop we don't try a - // connect every time it is called since the rate for attempted - // connections in failure setups would be as high as the loop rate of - // the caller. This relies on either a reconnect attempt or a packet - // send at a later time to reconnect the conduit. - if self.conduit.is_none() { - futures::future::pending::<()>().await; - return Ok(None); - } - match self.conduit.as_mut().unwrap().recv().await { - Ok(msg) if msg.is_some() => Ok(msg), - other => { - self.disconnect(); - other - } - } +impl std::ops::Deref for PacketRouterService { + type Target = ConduitService; + fn deref(&self) -> &Self::Target { + &self.0 } +} - pub fn disconnect(&mut self) { - self.conduit = None; +impl std::ops::DerefMut for PacketRouterService { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } +} - pub async fn connect(&mut self) -> Result { - let mut conduit = PacketRouterConduit::new(self.uri.clone()).await?; - conduit.register(self.keypair.clone()).await?; - self.conduit = Some(conduit); - Ok(()) +impl PacketRouterService { + pub fn new(uri: Uri, keypair: Arc) -> Self { + let client = PacketRouterConduitClient {}; + Self(ConduitService::new(uri, client, keypair)) } - pub async fn reconnect(&mut self) -> Result { - self.disconnect(); - self.connect().await + pub async fn send_uplink(&mut self, mut msg: PacketRouterPacketUpV1) -> Result { + self.session_sign(&mut msg).await?; + let msg = EnvelopeUpV1 { + data: Some(envelope_up_v1::Data::Packet(msg)), + }; + self.0.send(msg).await } - pub fn is_connected(&self) -> bool { - self.conduit.is_some() + pub async fn recv(&mut self) -> Result { + self.0.recv().await.and_then(|msg| match msg.data { + Some(data) => Ok(data), + None => Err(DecodeError::invalid_envelope()), + }) } } diff --git a/src/service/poc.rs b/src/service/poc.rs index 89ad11bc..466eab1a 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -1,37 +1,112 @@ use crate::{ - service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - Result, + impl_sign, + service::conduit::{ConduitClient, ConduitService}, + DecodeError, Keypair, PublicKey, Result, Sign, }; -use helium_proto::services::{ - self, - poc_lora::{LoraBeaconReportReqV1, LoraWitnessReportReqV1}, - Channel, Endpoint, +use helium_proto::{ + services::{ + poc_lora::{ + self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, + LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, + }, + Channel, + }, + Message as ProtoMessage, }; use http::Uri; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; -type PocIotClient = helium_proto::services::poc_lora::Client; +// The poc service maintains a re-connectable connection to a remote poc +// ingester. The service will (re)connect when a poc report send is attempted. +// It will ensure that the stream_requests rpc is called on the constructed +// connection before a report is sent. +pub struct PocIotService( + ConduitService, +); -#[derive(Debug)] -pub struct PocIotService(PocIotClient); +pub struct PocIotConduitClient {} + +#[async_trait] +impl ConduitClient for PocIotConduitClient { + async fn init( + &mut self, + endpoint: Channel, + _tx: mpsc::Sender, + client_rx: ReceiverStream, + _keypair: Arc, + ) -> Result> { + let mut client = poc_lora::Client::::new(endpoint); + let rx = client.stream_requests(client_rx).await?.into_inner(); + Ok(rx) + } + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = poc_lora::LoraStreamSessionInitV1 { + pub_key: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = LoraStreamRequestV1 { + request: Some(lora_stream_request_v1::Request::SessionInit(session_init)), + }; + Ok(envelope) + } +} + +impl_sign!(poc_lora::LoraStreamSessionInitV1); +impl_sign!(poc_lora::LoraBeaconReportReqV1); +impl_sign!(poc_lora::LoraWitnessReportReqV1); + +impl std::ops::Deref for PocIotService { + type Target = ConduitService; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for PocIotService { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} impl PocIotService { - pub async fn connect(uri: Uri) -> Result { - let channel = Endpoint::from(uri) - .connect_timeout(CONNECT_TIMEOUT) - .timeout(RPC_TIMEOUT) - .connect() - .await?; - let client = services::poc_lora::Client::new(channel); - Ok(Self(client)) + pub fn new(uri: Uri, keypair: Arc) -> Self { + let client = PocIotConduitClient {}; + Self(ConduitService::new(uri, client, keypair)) + } + + pub async fn send(&mut self, msg: lora_stream_request_v1::Request) -> Result { + let msg = LoraStreamRequestV1 { request: Some(msg) }; + self.0.send(msg).await + } + + pub async fn recv(&mut self) -> Result { + self.0.recv().await.and_then(|msg| match msg.response { + Some(data) => Ok(data), + None => Err(DecodeError::invalid_envelope()), + }) } - pub async fn submit_beacon(&mut self, req: LoraBeaconReportReqV1) -> Result { - _ = self.0.submit_lora_beacon(req).await?; - Ok(()) + pub async fn submit_beacon(&mut self, mut req: LoraBeaconReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; + let msg = lora_stream_request_v1::Request::BeaconReport(req); + self.send(msg).await } - pub async fn submit_witness(&mut self, req: LoraWitnessReportReqV1) -> Result { - _ = self.0.submit_lora_witness(req).await?; - Ok(()) + pub async fn submit_witness(&mut self, mut req: LoraWitnessReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; + let msg = lora_stream_request_v1::Request::WitnessReport(req); + self.send(msg).await } }