How to best handle external files #27396
Replies: 2 comments
-
hey @nicholasd-ff - If I were to implement this, I would do something like this. Here I pass a shared configuration to each asset that is being triggered by the sensor, and these assets have a dependency defined between them. This allows you to define the configuration once, and share it across assets.
You could still define an upstream external asset here to indicate that it comes from an S3 bucket. ![]() ![]() |
Beta Was this translation helpful? Give feedback.
-
Thanks, that's helpful and pretty close to how we ended up handling it. It's frustrating because it seems like the UPathIOManager should allow us to abstract over a remote s3 bucket in production and a local file system for use with What we ended up doing was defining a resource that looked like: import os
import dagster as dg
from dagster_aws.s3 import S3Resource
@dataclass
class S3File:
# CamelCased to match AWS python API.
Key: str
ETag: str
LastModified: datetime
class S3Bucket(dg.ConfigurableResource):
Bucket: str
s3: dg.ResourceDependency[S3Resource]
def download_to_dir(self, s3_file: S3File, dest_dir: str) -> str:
# We use a temporary random file name here, omitted for brevity.
path, filename = os.path.split(s3_file)
tempfile_name: str = os.path.join(dest, filename)
# get file
self.s3.get_client().download_file(
Key=s3_file.Key, Bucket=self.Bucket, Filename=tempfile_name
)
return tempfile_name
def s3_file_from_bucket(self, key) -> S3File:
self.s3.get_client().get_object_metadata(
Bucket=self.Bucket,
Key=key,
ObjectAttributes=["ETag"],
)
return S3File(
Key=prefixed_key,
ETag=now_metadata["ETag"],
LastModified=now_metadata["LastModified"],
) Then our assets definition looks like: @dg.asset
def big_csv_file(context: dg.AssetExecutionContext, ourbucket: S3Bucket) -> S3File:
return ourbucket.from_s3("data/big_csv_file.csv") And we use it like: @dg.asset_check(asset=big_csv_file)
def check_big_csv((ourbucket: S3Bucket, big_csv_file: S3File):
with tempfile.TemporaryDirectory() as td:
# Fetch the big file
temp_big_csv = ourbucket.download_to_dir(big_csv_file, td)
# Do whatever checks you need. Obviously the downside is you have to manually download it to the executor to do anything to it, but that would be true regardless. |
Beta Was this translation helpful? Give feedback.
-
Hi Dagster community.
I've looked at Dagster university and a bunch of the examples, read about sensors and external assets and tried my hand at implementing our initial use-case a bunch of different ways and I am a bit bewildered at how complicated it seems to be to handle external files.
We want to feed a bunch of csvs into a jupyter notebook from s3, this seems like a natural fit for external assets and a sensor, but if we define them as follows:
Then now what do I do? Accessing the metadata of an external asset is extremely awkward And requires me to do something like:
Is this really the right way to approach it? I also thought about materializing the metadata for an external S3 object as a regular python class - something like:
But then it becomes awkward to materialize the asset based on a sensor, I'd have to fetch the metadata twice - once in the sensor to trigger the materialization, and again in the asset materialization because there appears no way other than metadata or the run / partition key to pass information from the sensor. I feel like I'm missing something really obvious here.
I get that dagster wants every asset as a python object so it can be pickled, and we have other / future workflows which that would be true for but there must be plenty of people for whom it's not always the best fit?
I found this question: #18211 but it doesn't really seem like it is coming from the same place. Eventually we might want to run this on dagster+ or on separate executors to materialize outputs in parallel, so I don't really want to materialize each CSV to a hard-coded location under dagster_home as most of the examples in dagster university do.
Beta Was this translation helpful? Give feedback.
All reactions