Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump arrow/arrow-flight to 45.0.0 and re-enable server crate #112

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[workspace]

members = ["crates/*"]
exclude = ["crates/server"]

[workspace.package]
authors = [
Expand All @@ -15,8 +14,8 @@ license = "MIT"

[workspace.dependencies]
ahash = "0.8.3"
arrow = "27.0.0"
arrow-flight = "27.0.0"
arrow = "45.0.0"
arrow-flight = "45.0.0"
async-compression = { version = "0.3.15", features = ["tokio", "stream", "zstd"] }
async-trait = "0.1.72"
atoi = "2.0.0"
Expand Down Expand Up @@ -59,7 +58,7 @@ thiserror = "1.0.44"
tokio = { version = "1.29.1", features = ["full"], default-features = true }
tokio-tar = "0.3.1"
tokio-util = {version = "0.7.8", features = ["io"] }
tonic = "0.8.3"
tonic = "0.9"

[workspace.metadata.release]
pre-release-commit-message = "Release {{crate_name}} {{version}}"
15 changes: 8 additions & 7 deletions crates/server/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ impl TryFrom<Action> for CreateGraphFromFileConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<CreateGraphFromFileConfig>(action.body.as_slice())
.map_err(from_json_error)
serde_json::from_slice::<CreateGraphFromFileConfig>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -214,7 +213,7 @@ impl TryFrom<Action> for RemoveGraphConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand All @@ -227,7 +226,7 @@ impl TryFrom<Action> for ToRelabeledConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand All @@ -248,7 +247,7 @@ impl TryFrom<Action> for ToUndirectedConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -276,7 +275,7 @@ impl TryFrom<Action> for ComputeConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<ComputeConfig>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<ComputeConfig>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -324,5 +323,7 @@ pub fn from_json_error(error: serde_json::Error) -> Status {

pub fn into_flight_result<T: serde::Serialize>(result: T) -> FlightResult<arrow_flight::Result> {
let result = serde_json::to_vec(&result).map_err(from_json_error)?;
Ok(arrow_flight::Result { body: result })
Ok(arrow_flight::Result {
body: result.into(),
})
}
2 changes: 1 addition & 1 deletion crates/server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl TryFrom<Ticket> for PropertyId {
type Error = Status;

fn try_from(ticket: Ticket) -> Result<Self, Self::Error> {
serde_json::from_slice::<PropertyId>(ticket.ticket.as_slice()).map_err(from_json_error)
serde_json::from_slice::<PropertyId>(&ticket.ticket).map_err(from_json_error)
}
}

Expand Down
38 changes: 23 additions & 15 deletions crates/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ use crate::catalog::*;
use std::borrow::Cow;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use arrow::datatypes::Float32Type;
use arrow::datatypes::Int64Type;
use arrow::datatypes::UInt64Type;
use arrow::error::ArrowError;
use arrow::ipc::writer;
use arrow::{datatypes::Schema, ipc::writer::IpcWriteOptions};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{
flight_service_server::FlightService, utils::flight_data_from_arrow_batch, Action, ActionType,
Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaAsIpc, SchemaResult, Ticket,
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
SchemaResult, Ticket,
};
use futures::{Stream, StreamExt};
use futures::stream::BoxStream;
use futures::StreamExt;
use graph::page_rank::PageRankConfig;
use graph::prelude::Components;
use graph::prelude::DeltaSteppingConfig;
Expand Down Expand Up @@ -54,18 +55,17 @@ impl Default for FlightServiceImpl {
}
}

type BoxedFlightStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;
pub(crate) type FlightResult<T> = Result<T, Status>;

#[tonic::async_trait]
impl FlightService for FlightServiceImpl {
type DoActionStream = BoxedFlightStream<arrow_flight::Result>;
type DoExchangeStream = BoxedFlightStream<FlightData>;
type DoGetStream = BoxedFlightStream<FlightData>;
type DoPutStream = BoxedFlightStream<PutResult>;
type HandshakeStream = BoxedFlightStream<HandshakeResponse>;
type ListActionsStream = BoxedFlightStream<ActionType>;
type ListFlightsStream = BoxedFlightStream<FlightInfo>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;

async fn do_get(&self, request: Request<Ticket>) -> FlightResult<Response<Self::DoGetStream>> {
let property_id = request.into_inner().try_into()?;
Expand All @@ -78,10 +78,18 @@ impl FlightService for FlightServiceImpl {
let ipc_write_options = IpcWriteOptions::default();
// Record batches are pre-computed and are immediately available.
// Imho, there is no need to implement lazy batch computation.
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

let record_batches = property_entry
.batches
.iter()
.map(|batch| flight_data_from_arrow_batch(batch, &ipc_write_options).1)
.map(|batch| {
let (_, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &ipc_write_options)
.expect("DictionaryTracker configured above to not error on replacement");
encoded_batch.into()
})
.map(Ok)
.collect::<Vec<_>>();

Expand Down Expand Up @@ -160,7 +168,7 @@ impl FlightService for FlightServiceImpl {

let result = serde_json::to_vec(&result).map_err(from_json_error)?;
let result = arrow_flight::PutResult {
app_metadata: result,
app_metadata: result.into(),
};

Ok(Response::new(Box::pin(futures::stream::once(async {
Expand Down