Skip to content

Commit

Permalink
Merge pull request #112 from s1ck/update-server-module-arrow-45
Browse files Browse the repository at this point in the history
Bump arrow/arrow-flight to 45.0.0 and re-enable server crate
  • Loading branch information
s1ck authored Aug 4, 2023
2 parents 5c04311 + 0b321ef commit 1413320
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 27 deletions.
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[workspace]

members = ["crates/*"]
exclude = ["crates/server"]

[workspace.package]
authors = [
Expand All @@ -15,8 +14,8 @@ license = "MIT"

[workspace.dependencies]
ahash = "0.8.3"
arrow = "27.0.0"
arrow-flight = "27.0.0"
arrow = "45.0.0"
arrow-flight = "45.0.0"
async-compression = { version = "0.3.15", features = ["tokio", "stream", "zstd"] }
async-trait = "0.1.72"
atoi = "2.0.0"
Expand Down Expand Up @@ -59,7 +58,7 @@ thiserror = "1.0.44"
tokio = { version = "1.29.1", features = ["full"], default-features = true }
tokio-tar = "0.3.1"
tokio-util = {version = "0.7.8", features = ["io"] }
tonic = "0.8.3"
tonic = "0.9"

[workspace.metadata.release]
pre-release-commit-message = "Release {{crate_name}} {{version}}"
15 changes: 8 additions & 7 deletions crates/server/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ impl TryFrom<Action> for CreateGraphFromFileConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<CreateGraphFromFileConfig>(action.body.as_slice())
.map_err(from_json_error)
serde_json::from_slice::<CreateGraphFromFileConfig>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -214,7 +213,7 @@ impl TryFrom<Action> for RemoveGraphConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand All @@ -227,7 +226,7 @@ impl TryFrom<Action> for ToRelabeledConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand All @@ -248,7 +247,7 @@ impl TryFrom<Action> for ToUndirectedConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<Self>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<Self>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -276,7 +275,7 @@ impl TryFrom<Action> for ComputeConfig {
type Error = Status;

fn try_from(action: Action) -> Result<Self, Self::Error> {
serde_json::from_slice::<ComputeConfig>(action.body.as_slice()).map_err(from_json_error)
serde_json::from_slice::<ComputeConfig>(&action.body).map_err(from_json_error)
}
}

Expand Down Expand Up @@ -324,5 +323,7 @@ pub fn from_json_error(error: serde_json::Error) -> Status {

pub fn into_flight_result<T: serde::Serialize>(result: T) -> FlightResult<arrow_flight::Result> {
let result = serde_json::to_vec(&result).map_err(from_json_error)?;
Ok(arrow_flight::Result { body: result })
Ok(arrow_flight::Result {
body: result.into(),
})
}
2 changes: 1 addition & 1 deletion crates/server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl TryFrom<Ticket> for PropertyId {
type Error = Status;

fn try_from(ticket: Ticket) -> Result<Self, Self::Error> {
serde_json::from_slice::<PropertyId>(ticket.ticket.as_slice()).map_err(from_json_error)
serde_json::from_slice::<PropertyId>(&ticket.ticket).map_err(from_json_error)
}
}

Expand Down
38 changes: 23 additions & 15 deletions crates/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ use crate::catalog::*;
use std::borrow::Cow;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use arrow::datatypes::Float32Type;
use arrow::datatypes::Int64Type;
use arrow::datatypes::UInt64Type;
use arrow::error::ArrowError;
use arrow::ipc::writer;
use arrow::{datatypes::Schema, ipc::writer::IpcWriteOptions};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{
flight_service_server::FlightService, utils::flight_data_from_arrow_batch, Action, ActionType,
Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaAsIpc, SchemaResult, Ticket,
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
SchemaResult, Ticket,
};
use futures::{Stream, StreamExt};
use futures::stream::BoxStream;
use futures::StreamExt;
use graph::page_rank::PageRankConfig;
use graph::prelude::Components;
use graph::prelude::DeltaSteppingConfig;
Expand Down Expand Up @@ -54,18 +55,17 @@ impl Default for FlightServiceImpl {
}
}

type BoxedFlightStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;
pub(crate) type FlightResult<T> = Result<T, Status>;

#[tonic::async_trait]
impl FlightService for FlightServiceImpl {
type DoActionStream = BoxedFlightStream<arrow_flight::Result>;
type DoExchangeStream = BoxedFlightStream<FlightData>;
type DoGetStream = BoxedFlightStream<FlightData>;
type DoPutStream = BoxedFlightStream<PutResult>;
type HandshakeStream = BoxedFlightStream<HandshakeResponse>;
type ListActionsStream = BoxedFlightStream<ActionType>;
type ListFlightsStream = BoxedFlightStream<FlightInfo>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;

async fn do_get(&self, request: Request<Ticket>) -> FlightResult<Response<Self::DoGetStream>> {
let property_id = request.into_inner().try_into()?;
Expand All @@ -78,10 +78,18 @@ impl FlightService for FlightServiceImpl {
let ipc_write_options = IpcWriteOptions::default();
// Record batches are pre-computed and are immediately available.
// Imho, there is no need to implement lazy batch computation.
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

let record_batches = property_entry
.batches
.iter()
.map(|batch| flight_data_from_arrow_batch(batch, &ipc_write_options).1)
.map(|batch| {
let (_, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &ipc_write_options)
.expect("DictionaryTracker configured above to not error on replacement");
encoded_batch.into()
})
.map(Ok)
.collect::<Vec<_>>();

Expand Down Expand Up @@ -160,7 +168,7 @@ impl FlightService for FlightServiceImpl {

let result = serde_json::to_vec(&result).map_err(from_json_error)?;
let result = arrow_flight::PutResult {
app_metadata: result,
app_metadata: result.into(),
};

Ok(Response::new(Box::pin(futures::stream::once(async {
Expand Down

0 comments on commit 1413320

Please sign in to comment.