Skip to content

Commit

Permalink
feat(rust): make portal handshake optional
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Feb 5, 2025
1 parent 2c2d8ad commit f9f12e6
Show file tree
Hide file tree
Showing 49 changed files with 987 additions and 285 deletions.
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam/src/remote/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Worker for RemoteRelay {
ctx: &mut Context,
msg: Routed<Self::Message>,
) -> Result<()> {
if msg.msg_addr() == self.addresses.main_remote {
if msg.msg_addr() == &self.addresses.main_remote {
let mut local_message = msg.into_local_message();

// Remove my address from the onward_route
Expand Down
13 changes: 12 additions & 1 deletion implementations/rust/ockam/ockam_api/src/influxdb/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ impl NodeManagerWorker {
policy_expression,
privileged,
tls,
skip_handshake,
enable_nagle,
} = body.tcp_outlet;
let address = self
.node_manager
Expand Down Expand Up @@ -95,6 +97,8 @@ impl NodeManagerWorker {
reachable_from_default_secure_channel,
OutletAccessControl::WithPolicyExpression(policy_expression),
privileged,
skip_handshake,
enable_nagle,
)
.await
{
Expand All @@ -121,6 +125,8 @@ impl NodeManagerWorker {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
} = body.tcp_inlet.clone();

//TODO: should be an easier way to tweak the multiaddr
Expand Down Expand Up @@ -189,6 +195,8 @@ impl NodeManagerWorker {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
)
.await
{
Expand Down Expand Up @@ -334,7 +342,8 @@ impl InfluxDBPortals for BackgroundNodeClient {
policy_expression: Option<PolicyExpression>,
influxdb_config: InfluxDBOutletConfig,
) -> miette::Result<OutletStatus> {
let mut outlet_payload = CreateOutlet::new(to, tls, from.cloned(), true, false);
let mut outlet_payload =
CreateOutlet::new(to, tls, from.cloned(), true, false, false, false);
if let Some(policy_expression) = policy_expression {
outlet_payload.set_policy_expression(policy_expression);
}
Expand Down Expand Up @@ -376,6 +385,8 @@ impl InfluxDBPortals for BackgroundNodeClient {
disable_tcp_fallback,
false,
tls_certificate_provider,
false,
false,
);
let payload = CreateInfluxDBInlet::new(inlet_payload, lease_usage, lease_issuer_route);
Request::post("/node/influxdb_inlet").body(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ impl KafkaInletController {
false,
false,
None,
false,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ impl KafkaOutletController {
false,
OutletAccessControl::WithPolicyExpression(self.policy_expression.clone()),
false,
false,
false,
)
.await
.map(|info| info.to)?;
Expand Down
22 changes: 21 additions & 1 deletion implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub struct CreateInlet {
#[n(12)] pub(crate) privileged: bool,
/// TLS certificate provider route.
#[n(13)] pub(crate) tls_certificate_provider: Option<MultiAddr>,
/// Skip Portal handshake for lower latency, but also lower throughput
#[n(14)] pub(crate) skip_handshake: bool,
/// Enable Nagle's algorithm for potentially higher throughput, but higher latency
#[n(15)] pub(crate) enable_nagle: bool,
}

impl CreateInlet {
Expand All @@ -69,6 +73,8 @@ impl CreateInlet {
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -83,6 +89,8 @@ impl CreateInlet {
disable_tcp_fallback,
privileged,
tls_certificate_provider: None,
skip_handshake,
enable_nagle,
}
}

Expand All @@ -96,6 +104,8 @@ impl CreateInlet {
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -110,6 +120,8 @@ impl CreateInlet {
disable_tcp_fallback,
privileged,
tls_certificate_provider: None,
skip_handshake,
enable_nagle,
}
}

Expand Down Expand Up @@ -169,7 +181,11 @@ pub struct CreateOutlet {
/// will be used.
#[n(5)] pub policy_expression: Option<PolicyExpression>,
/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
#[n(6)] pub privileged: bool
#[n(6)] pub privileged: bool,
/// Skip Portal handshake for lower latency, but also lower throughput
#[n(7)] pub skip_handshake: bool,
/// Enable Nagle's algorithm for potentially higher throughput, but higher latency
#[n(8)] pub(crate) enable_nagle: bool,
}

impl CreateOutlet {
Expand All @@ -179,6 +195,8 @@ impl CreateOutlet {
worker_addr: Option<Address>,
reachable_from_default_secure_channel: bool,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> Self {
Self {
hostname_port,
Expand All @@ -187,6 +205,8 @@ impl CreateOutlet {
reachable_from_default_secure_channel,
policy_expression: None,
privileged,
skip_handshake,
enable_nagle,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ impl InMemoryNode {
false,
false,
None,
false,
false,
)
.await?;

Expand Down Expand Up @@ -324,6 +326,8 @@ impl InMemoryNode {
false,
OutletAccessControl::WithPolicyExpression(outlet_policy_expression),
false,
false,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub fn create_inlet_payload(
disable_tcp_fallback: bool,
privileged: bool,
tls_certificate_provider: &Option<MultiAddr>,
skip_handshake: bool,
enable_nagle: bool,
) -> CreateInlet {
let via_project = outlet_addr.matches(0, &[ProjectProto::CODE.into()]);
let mut payload = if via_project {
Expand All @@ -37,6 +39,8 @@ pub fn create_inlet_payload(
enable_udp_puncture,
disable_tcp_fallback,
privileged,
skip_handshake,
enable_nagle,
)
} else {
CreateInlet::to_node(
Expand All @@ -48,6 +52,8 @@ pub fn create_inlet_payload(
enable_udp_puncture,
disable_tcp_fallback,
privileged,
skip_handshake,
enable_nagle,
)
};
if let Some(e) = policy_expression.as_ref() {
Expand Down Expand Up @@ -80,6 +86,8 @@ impl Inlets for BackgroundNodeClient {
disable_tcp_fallback: bool,
privileged: bool,
tls_certificate_provider: &Option<MultiAddr>,
skip_handshake: bool,
enable_nagle: bool,
) -> miette::Result<Reply<InletStatus>> {
let request = {
let payload = create_inlet_payload(
Expand All @@ -95,6 +103,8 @@ impl Inlets for BackgroundNodeClient {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
);
Request::post("/node/inlet").body(payload)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl InMemoryNode {
disable_tcp_fallback: bool,
privileged: bool,
tls_certificate_provider: Option<MultiAddr>,
skip_handshake: bool,
enable_nagle: bool,
) -> Result<InletStatus> {
self.node_manager
.create_inlet(
Expand All @@ -48,6 +50,8 @@ impl InMemoryNode {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
)
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub trait Inlets {
disable_tcp_fallback: bool,
privileged: bool,
tls_certificate_provider: &Option<MultiAddr>,
skip_handshake: bool,
enable_nagle: bool,
) -> miette::Result<Reply<InletStatus>>;

async fn show_inlet(&self, ctx: &Context, alias: &str) -> miette::Result<Reply<InletStatus>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl NodeManager {
disable_tcp_fallback: bool,
privileged: bool,
tls_certificate_provider: Option<MultiAddr>,
skip_handshake: bool,
enable_nagle: bool,
) -> Result<InletStatus> {
debug! {
%listen_address,
Expand All @@ -50,6 +52,8 @@ impl NodeManager {
%alias,
%enable_udp_puncture,
%disable_tcp_fallback,
%skip_handshake,
%enable_nagle,
"creating inlet"
}

Expand Down Expand Up @@ -127,6 +131,8 @@ impl NodeManager {
udp_puncture: None,
additional_route: None,
privileged,
skip_handshake,
enable_nagle,
};

let replacer = Arc::new(Mutex::new(replacer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl NodeManagerWorker {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
} = create_inlet;
match self
.node_manager
Expand All @@ -49,6 +51,8 @@ impl NodeManagerWorker {
disable_tcp_fallback,
privileged,
tls_certificate_provider,
skip_handshake,
enable_nagle,
)
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub(super) struct InletSessionReplacer {
pub(super) udp_puncture: Option<UdpPuncture>,
pub(super) additional_route: Option<Route>,
pub(super) privileged: bool,
pub(super) skip_handshake: bool,
pub(super) enable_nagle: bool,
}

impl InletSessionReplacer {
Expand Down Expand Up @@ -109,7 +111,9 @@ impl InletSessionReplacer {
let (incoming_ac, outgoing_ac) = self.access_control(node_manager).await?;
let options = TcpInletOptions::new()
.with_incoming_access_control(incoming_ac)
.with_outgoing_access_control(outgoing_ac);
.with_outgoing_access_control(outgoing_ac)
.set_skip_handshake(self.skip_handshake)
.set_enable_nagle(self.enable_nagle);

let options = if self.udp_puncture_enabled() && self.disable_tcp_fallback {
options.paused()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ impl NodeManagerWorker {
policy_expression,
tls,
privileged,
skip_handshake,
enable_nagle,
} = create_outlet;

match self
Expand All @@ -40,6 +42,8 @@ impl NodeManagerWorker {
reachable_from_default_secure_channel,
OutletAccessControl::WithPolicyExpression(policy_expression),
privileged,
skip_handshake,
enable_nagle,
)
.await
{
Expand Down Expand Up @@ -99,6 +103,8 @@ impl NodeManager {
reachable_from_default_secure_channel: bool,
access_control: OutletAccessControl,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> Result<OutletStatus> {
let worker_addr = self.registry.outlets.generate_worker_addr(worker_addr);

Expand Down Expand Up @@ -134,7 +140,9 @@ impl NodeManager {
let mut options = TcpOutletOptions::new()
.with_incoming_access_control(incoming_ac)
.with_outgoing_access_control(outgoing_ac)
.with_tls(tls);
.with_tls(tls)
.set_skip_handshake(skip_handshake)
.set_enable_nagle(enable_nagle);
if self.project_authority().is_none() {
for api_transport_flow_control_id in &self.api_transport_flow_control_ids {
options = options.as_consumer(api_transport_flow_control_id)
Expand Down Expand Up @@ -241,6 +249,7 @@ impl NodeManager {

#[async_trait]
pub trait Outlets {
#[allow(clippy::too_many_arguments)]
async fn create_outlet(
&self,
ctx: &Context,
Expand All @@ -249,6 +258,8 @@ pub trait Outlets {
from: Option<&Address>,
policy_expression: Option<PolicyExpression>,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> miette::Result<OutletStatus>;
}

Expand All @@ -263,8 +274,18 @@ impl Outlets for BackgroundNodeClient {
from: Option<&Address>,
policy_expression: Option<PolicyExpression>,
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
) -> miette::Result<OutletStatus> {
let mut payload = CreateOutlet::new(to, tls, from.cloned(), true, privileged);
let mut payload = CreateOutlet::new(
to,
tls,
from.cloned(),
true,
privileged,
skip_handshake,
enable_nagle,
);
if let Some(policy_expression) = policy_expression {
payload.set_policy_expression(policy_expression);
}
Expand Down
Loading

0 comments on commit f9f12e6

Please sign in to comment.