Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write events to run.out #45

Merged
merged 3 commits into from
Feb 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -5,10 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.5.0] - unreleased
### Added
- Write events to `run.out`. See [PR 45].

### Change
- Change `RunParameters::test_start_time` from `String` to `DateTime` for ease of use. See [PR 41].

[PR 41]: https://github.com/testground/sdk-rust/pull/41
[PR 45]: https://github.com/testground/sdk-rust/pull/45

## [0.4.0]
### Added
13 changes: 1 addition & 12 deletions src/background.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use futures::stream::StreamExt;
use influxdb::{Client, WriteQuery};
@@ -377,17 +376,7 @@ impl BackgroundTask {
if let PayloadType::Event(ref event) = payload {
// The Testground daemon determines the success or failure of a test
// instance by parsing stdout for runtime events.
println!(
"{}",
serde_json::to_string(&LogLine {
ts: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
event,
})
.unwrap(),
);
println!("{}", serde_json::to_string(&LogLine::new(event)).unwrap());
}

let request = Request {
73 changes: 68 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

use crate::{
background::{BackgroundTask, Command},
@@ -12,6 +15,7 @@ use clap::Parser;

use influxdb::WriteQuery;

use crate::events::LogLine;
use tokio::sync::{
mpsc::{self, channel, Sender},
oneshot,
@@ -31,21 +35,39 @@ pub struct Client {
global_seq: u64,
/// A group-scoped sequence number assigned to this test instance by the sync service.
group_seq: u64,
/// A path to `run.out`.
run_out: Option<PathBuf>,
}

impl Client {
pub async fn new_and_init() -> Result<Self, Box<dyn std::error::Error>> {
let run_parameters = RunParameters::try_parse()?;
let run_parameters: RunParameters = RunParameters::try_parse()?;

let (cmd_tx, cmd_rx) = channel(1);

let background = BackgroundTask::new(cmd_rx, run_parameters.clone()).await?;

let run_out = run_parameters
.test_outputs_path
.to_str()
.map(|path_str| {
if path_str.is_empty() {
None
} else {
let mut path = PathBuf::from(path_str);
path.push("run.out");
Some(path)
}
})
.unwrap_or(None);

// `global_seq` and `group_seq` are initialized by 0 at this point since no way to signal to the sync service.
let mut client = Self {
cmd_tx,
run_parameters,
global_seq: 0,
group_seq: 0,
run_out,
};

tokio::spawn(background.run());
@@ -64,7 +86,7 @@ impl Client {
// Note that the sdk-go only signals, but not waits.
.signal_and_wait(
format!("initialized_group_{}", client.run_parameters.test_group_id),
client.run_parameters.test_group_instance_count as u64,
client.run_parameters.test_group_instance_count,
)
.await?;

@@ -252,6 +274,8 @@ impl Client {
let json_event = serde_json::to_string(&event).expect("Event Serialization");

println!("{}", json_event);

self.write(&event.event);
}

pub async fn record_success(self) -> Result<(), Error> {
@@ -263,6 +287,10 @@ impl Client {

receiver.await.expect(BACKGROUND_SENDER)?;

self.write(&EventType::Success {
group: self.run_parameters.test_group_id.clone(),
});

Ok(())
}

@@ -271,12 +299,20 @@ impl Client {

let (sender, receiver) = oneshot::channel();

let cmd = Command::SignalFailure { error, sender };
let cmd = Command::SignalFailure {
error: error.clone(),
sender,
};

self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);

receiver.await.expect(BACKGROUND_SENDER)?;

self.write(&EventType::Failure {
group: self.run_parameters.test_group_id.clone(),
error,
});

Ok(())
}

@@ -291,15 +327,21 @@ impl Client {
let (sender, receiver) = oneshot::channel();

let cmd = Command::SignalCrash {
error,
stacktrace,
error: error.clone(),
stacktrace: stacktrace.clone(),
sender,
};

self.cmd_tx.send(cmd).await.expect(BACKGROUND_RECEIVER);

receiver.await.expect(BACKGROUND_SENDER)?;

self.write(&EventType::Crash {
groups: self.run_parameters.test_group_id.clone(),
error,
stacktrace,
});

Ok(())
}

@@ -332,4 +374,25 @@ impl Client {
pub fn group_seq(&self) -> u64 {
self.group_seq
}

/// Writes an event to `run.out`.
fn write(&self, event_type: &EventType) {
if let Some(path) = self.run_out.as_ref() {
let mut file = match File::options().create(true).append(true).open(path) {
Ok(file) => file,
Err(e) => {
eprintln!("Failed to open `run.out`: {}", e);
return;
}
};

if let Err(e) = writeln!(
file,
"{}",
serde_json::to_string(&LogLine::new(event_type)).expect("Event Serialization")
) {
eprintln!("Failed to write a log to `run.out`: {}", e);
}
}
}
}
13 changes: 13 additions & 0 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(dead_code)]

use serde::Serialize;
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Serialize, Debug)]
pub struct Event {
@@ -13,6 +14,18 @@ pub struct LogLine<'a> {
pub event: &'a EventType,
}

impl LogLine<'_> {
pub fn new(event: &EventType) -> LogLine {
LogLine {
ts: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
event,
}
}
}

#[derive(Serialize, Debug)]
pub enum EventType {
#[serde(rename = "start_event")]