Skip to content

[Draft] Allow setting ClientOptions for all datafusion.object_store contexts #1083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::sync::Arc;
use std::time::Duration;

use pyo3::prelude::*;

Expand All @@ -24,6 +25,7 @@ use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::http::{HttpBuilder, HttpStore};
use object_store::local::LocalFileSystem;
use object_store::ClientOptions;
use pyo3::exceptions::PyValueError;
use url::Url;

Expand Down Expand Up @@ -164,6 +166,41 @@ impl PyGoogleCloudContext {
}
}

#[pyclass(name = "ClientOptions", module = "datafusion.store", subclass)]
#[derive(Debug, Clone)]
pub struct PyClientOptions {
pub inner: ClientOptions,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main question I have is does this need to be Arc<ClientOptions> rather than just ClientOptions?

}

impl Default for PyClientOptions {
fn default() -> Self {
Self::new()
}
}

#[pymethods]
impl PyClientOptions {
#[pyo3(signature=())]
#[new]
pub fn new() -> Self {
Self {
inner: ClientOptions::new(),
}
}

#[pyo3(signature = (timeout))]
pub fn with_timeout(&mut self, timeout: Duration) -> Self {
self.inner = self.inner.clone().with_timeout(timeout);
self.clone()
}

#[pyo3(signature = (timeout))]
pub fn with_connect_timeout(&mut self, timeout: Duration) -> Self {
self.inner = self.inner.clone().with_connect_timeout(timeout);
self.clone()
}
}

#[pyclass(name = "AmazonS3", module = "datafusion.store", subclass)]
#[derive(Debug, Clone)]
pub struct PyAmazonS3Context {
Expand All @@ -174,14 +211,15 @@ pub struct PyAmazonS3Context {
#[pymethods]
impl PyAmazonS3Context {
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, endpoint=None, allow_http=false, imdsv1_fallback=false))]
#[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, endpoint=None, client_options=None, allow_http=false, imdsv1_fallback=false))]
#[new]
fn new(
bucket_name: String,
region: Option<String>,
access_key_id: Option<String>,
secret_access_key: Option<String>,
endpoint: Option<String>,
client_options: Option<PyClientOptions>,
//retry_config: RetryConfig,
allow_http: bool,
imdsv1_fallback: bool,
Expand Down Expand Up @@ -209,6 +247,10 @@ impl PyAmazonS3Context {
builder = builder.with_imdsv1_fallback();
};

if let Some(client_options) = client_options {
builder = builder.with_client_options(client_options.inner);
};

let store = builder
.with_bucket_name(bucket_name.clone())
//.with_retry_config(retry_config) #TODO: add later
Expand Down Expand Up @@ -250,6 +292,7 @@ impl PyHttpContext {
}

pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyClientOptions>()?;
m.add_class::<PyAmazonS3Context>()?;
m.add_class::<PyMicrosoftAzureContext>()?;
m.add_class::<PyGoogleCloudContext>()?;
Expand Down