Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental update #798

Open
dmpetrov opened this issue Jan 7, 2025 · 16 comments · May be fixed by #928
Open

Incremental update #798

dmpetrov opened this issue Jan 7, 2025 · 16 comments · May be fixed by #928
Assignees
Labels
enhancement New feature or request

Comments

@dmpetrov
Copy link
Member

dmpetrov commented Jan 7, 2025

Description

User can do incremental update manually like:

def my_embedding(file: File) -> list[float]:
    return [...]

dc = DataChain.from_storage("s3://bkt/dir1/*.jpg")
# Create 1st version
dc = dc.map(emd=my_embedding).save("image_emb")

...
# update

new = DataChain.from_storage("s3://bkt/dir1/*.jpg")
old = DataChain.from_dataset("image_emb")
diff = new.diff(old).map(emd=my_embedding)

# Create 2nd version
res = old.union(diff).save("image_emb")

It would be great if this can be supported out of the box. Users could then update datasets directly from the UI.

def my_embedding(file: File) -> list[float]:
    return [...]

# Create 1st version
dc = DataChain.incremental_dataset("s3://bkt/dir1/*.jpg", my_embedding, "image_emb")
...
# Update to 2st version
dc = dc.update()

Challenges:

  • We need to preserve the environment since we use a custom python code my_embedding(). So, Inline project meta #776 might be a prerequisite.
  • Not every dataset is updatable. Do we need a special type (hopefully not)?
  • It might be better to generalize this story to updatable-pipelines and solve this, more general case.
@dmpetrov dmpetrov added the enhancement New feature or request label Jan 7, 2025
@ilongin ilongin self-assigned this Jan 7, 2025
@ilongin
Copy link
Contributor

ilongin commented Feb 7, 2025

@dmpetrov I'm looking at this example for incremental update in issue description and I'm not sure if this will be enough for the users. This only assumes someone needs to add one new column for each new record in the bucket, but I think real world scenarios will be much more complex. For example, someone could be adding multiple new columns, generating new ones, filtering out something etc.

First thing that comes to my mind is to maybe put a "contract" where user needs to define a custom function that receives chain and returns updated chain. In that function user should put all kind of modifications that is needed. So something like:

def update_dc(dc: DataChain) -> DataChain:
   return dc.filter(...).map(...).gen(...).merge(...)

dc = DataChain.incremental_dataset("s3://bkt/dir1/*.jpg", update_dc, "image_emb")
dc.update() # first update
dc.update() # second update
...

My first idea is to:

  1. Save function as string (e,g using dill.source.getsource(fn))
  2. Dynamically import function on each .update() call and use it on new rows

Constraints:

  1. User must put all needed imports inside the function to work
  2. User must not call any other function from the script in it. It can call 3rd party libs but as mentioned in 1), they need to be imported in the function

I'm not sure how we can mitigate constraints as we cannot evaluate the whole script since there can be bunch of other unrelated code that should not run in incremental update of a specific dataset.
Any thoughts appreciated.

@dmpetrov
Copy link
Member Author

dmpetrov commented Feb 9, 2025

Thank you @ilongin for thoughtful feedback!

You are right that this is not enough. The proposed approach looks good. Some comments are below.

"image_emb"

Why is this needed? Shouldn't it be temporary without name?

  • User must put all needed imports inside the function to work

Only if you run the code in a separate environment/process. Is that the plan?

  • User must not call any other function from the script in it. It can call 3rd party libs but as mentioned in 1), they need to be imported in the function

That's might be ok.

Also, have you considered using requirements in function comments like here: #776 (comment)

It can look like:

def my_embedding(file: File) -> list[float]:
    """
# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "pandas < 2.1.0"
# ]
# ///
    """
    return [...]

It looks pretty ugly if you ask me. Is there any other way of doing this? Our MLEM project had some functionality to deserialize all requirements from a current environment/process.

It can also work on scripts level, not function level:

> dc = DataChain.incremental_dataset("s3://bkt/dir1/*.jpg", script="my_update.py", func="my_func")

my_update.py:

# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "pandas < 2.1.0"
#   "datachain > 0.6"
# ]
# ///

import pandas as pd
from datachain import File

def my_embedding(file: File) -> list[float]:
    return [...]

The script approach looks cleaner and we can start with it. But we need something for functions as well.

@shcheklein
Copy link
Member

My 2cs / ideas: (custom function and capturing envs seems to be complicated tbh).

Since dataset can be considered as a graph of steps, can we get back to the root (probably storage, or reading newer version of another dataset), get diff there and apply step only to that delta + do union at the end? Most likely that's what most people want, no?

I also I assume users will be fine to run their DataChain code, thus we don't need to control the environment. Something like:

DataChain.from_storage("s3://bkt/dir1/").filter(...).save("something", incremental=True)

is has to figure out to apply filter only to a subset of files - all new files in the storage since the last run (not sure about deletions?)

wdyt, folks?

@ilongin
Copy link
Contributor

ilongin commented Feb 9, 2025

Why is this needed? Shouldn't it be temporary without name?

Hm, I don't see why it should be temporary. My thought was that when user calls DataChain.incremental_update(...) first time it will create a dataset and every .update() will just create new version of it. BTW you also put a name in your example in issue description.

Only if you run the code in a separate environment/process. Is that the plan?

Yes, I think we need to run this "update" function in separate env/process i.e. someone could create another script with just doing:

ds = DataChain.from_dataset("my-incremental-update-ds")
ds.update()

and this should work.

Also, have you considered using requirements in function comments like here: #776 (comment)
It can also work on scripts level, not function level:

I would leave this on script level and save into dataset. Then we can use it on each incremental update call. I'm looking into that issue in parallel and I think inline script metadata with custom [tool] section is def the way to go (as you also put in that task description)

@ilongin
Copy link
Contributor

ilongin commented Feb 9, 2025

My 2cs / ideas: (custom function and capturing envs seems to be complicated tbh).

Since dataset can be considered as a graph of steps, can we get back to the root (probably storage, or reading newer version of another dataset), get diff there and apply step only to that delta + do union at the end? Most likely that's what most people want, no?

I also I assume users will be fine to run their DataChain code, thus we don't need to control the environment. Something like:

DataChain.from_storage("s3://bkt/dir1/").filter(...).save("something", incremental=True)
is has to figure out to apply filter only to a subset of files - all new files in the storage since the last run (not sure about deletions?)

wdyt, folks?

Yea, this seems like a good idea as well. The only thing is that if someone want's to use this dataset in some other script / use case, and want's to make sure it has the latest data, he would need to copy all that code (you put only .filter() in example, but it could be much more complex than that with adding signals, rows etc. and maybe dependency on other parts of the script) while if we save that "update function" somehow and make it run in separate env/process he could do just call .update()

my_new_ds = DataChain.from_dataset("my-incremental-ds").update().filter(...).map(...).save("some-new-dataset")

@shcheklein
Copy link
Member

The only thing is that if someone want's to use this dataset in some other script / use case, and want's to make sure it has the latest data

Yep, I assume that people would control the dataset from its own script. Users won't be updating it (seems like an advanced case?)

@dmpetrov
Copy link
Member Author

is has to figure out to apply filter only to a subset of files - all new files in the storage since the last run (not sure about deletions?)

@shcheklein is your idea to apply only filters, but not functions? If that's the case, I'm afraid that it will prevent us from very basic use cases like delta updates for embeddings.

@dmpetrov
Copy link
Member Author

def update_dc(dc: DataChain) -> DataChain:
return dc.filter(...).map(...).gen(...).merge(...)

@ilongin this might be a good workaround but only if we won't find a way to incorporate it to the regular API.

@dmpetrov
Copy link
Member Author

dmpetrov commented Feb 14, 2025

The big goal here is to support - "Update" button in dataset UI. So, all required information has to be stored within the dataset.

In realistic cases, people need to recompute embeddings for new files using their custom functions. This is a very basic use case we initially heart from users. Filters are also needed. And it could be a combination of (multiple) filters and (multiple) mappers like:

def my_embedding(file: File) -> list[float]:
    return [...]

target_product = my_embedding(File("product_example.jpg"))

(
    DataChain
    .from_storage("s3://bkt/dir1/")
    .filter(C("file.path").glob("*.jpg"))
    .map(emb=my_embedding)
    .mutate(dist=func.cosine_distance(C("emd"), target_product),
    .filter(C("dist") < 0.107)
    .save("like_product", incremental=True)  # It should know about both of the filters as well as custom function
)

# The most tricky part here is the requirements.txt

What is outside of the scope: join, group, aggregate, distinct, etc. These operations are not incremental-update compatible since these are require a whole context (all files).

The question is - how we can implement these functionality step by step. We should probably start with @shcheklein idea (if I understood it correctly) of saving only chains but not custom functions and progressing to the whole functionality.

  1. Serialize filters in incremental datasets
  2. Implement "Update" button in UI
  3. Serialize mutate/...
  4. Serialize custom code
  5. Serialize requirements

I'd love to hear your feedback, folks!

@ilongin
Copy link
Contributor

ilongin commented Feb 14, 2025

@dmpetrov thanks for the examples.

So if we need to support that "Update" button in UI, we def need to store all info in the dataset , as you mentioned.

The easiest solution to this, and I think that's what Ivan meant as well in his comment (@shcheklein please confirm) is for user to just re-run the same script where he created the dataset (ofc. we also need that incremental flag in .save() as well). We already have the whole query script saved in the Studio (just need to add saving of environment like requirements etc I think). The only catch is that user needs to make sure he doesn't have any other side effects in the script itself, like creating other datasets for which he doesn't want to be re-run and which are not relevant to incremental one - this would be some kind of implicit "contract", but also the common sense for the user as well.
The only missing part, beside incremental flag and it's implementation, is to add that "Update" button which basically just automates

  1. user clicking on dataset version to open the script
  2. user clicks "Run" script again.

I think we could even implement .update() on DataChain which would re-run the whole script itself and create new version of incremental dataset. This would mean we run script inside of a script in Studio so not sure how that would work atm.

If this contract of making sure no other side effects in the script is too limited (although I don't see a reason why), then other option is what I suggested - different type of "contract" where user needs to put all that mappers, filters etc. in a special function and make sure all imports needed are in the function itself. Then we would serialize that function (as string), along the way with all requirements and other settings and run it on demand. The advantages of this approach is that we can have handy .update() function (as seen in example below) that can be run in another script without c/p the whole thing like those filters, mappers etc. Downsize is little bit more complex / custom "contract", worse API (the need for special .create_incremental(...) factory method as in example) and harder to implement.

Then we would have something like this:

def update_fn(dc: DataChain) -> DataChain:
   from datachain import func, Flile, C

   def my_embedding(file: File) -> list[float]:
       return [...]

    target_product = my_embedding(File("product_example.jpg"))

    return (
        dc
        .filter(C("file.path").glob("*.jpg"))
        .map(emb=my_embedding)
        .mutate(dist=func.cosine_distance(C("emd"), target_product)
        .filter(C("dist") < 0.107)
    )

ds = DataChain.create_incremental(DataChain.from_storage("s3://bkt/dir1/"), "like_product", update_fn)

# Some other script
dc = DataChain.from_dataset("like_product").update()  # this is now possible to run in another script as we save everything we needed in the dataset itself. We can 

I think we should choose between these 2 proposals and I think first one of re-running the script itself is the best. I don't think we should go with path of serializing separately filters, mutate, mappers etc. as it seems too complicated.

@shcheklein
Copy link
Member

Discussed with @dmpetrov - I think he understands the idea I was suggesting here.

One new insight here - it seems we'll need to define incremental flag on the source level (or both levels) (from_storage(..., incremental=True/False) - since in certain cases we'll want to keep it non-incremental. E.g. when we read a single file and we always need that file in the pipeline.

@ilongin
Copy link
Contributor

ilongin commented Feb 15, 2025

Discussed with @dmpetrov - I think he understands the idea I was suggesting here.

One new insight here - it seems we'll need to define incremental flag on the source level (or both levels) (from_storage(..., incremental=True/False) - since in certain cases we'll want to keep it non-incremental. E.g. when we read a single file and we always need that file in the pipeline.

@shcheklein ok, so final decision is to go with first approach in my last comment (re-running the whole script every time user wants to do inc update)?

@shcheklein
Copy link
Member

@ilongin can we as a first step do a few examples - like actual scripts from some demo projects? to see how they will look like? or what would it take to do a prototype and experiment with it? something quick a dirty?

@ilongin
Copy link
Contributor

ilongin commented Feb 17, 2025

@shcheklein scripts will look the same as they are now, user just need to remember to not put side effects in it as he will run them multiple times, for each update. I started working on this and will post some examples here when it's done for CLI. For Studio some additional work will need to be done which can be done in step 2.
I think for CLI I should make it work in day or two.

@dmpetrov
Copy link
Member Author

One new insight here - it seems we'll need to define incremental flag on the source level (or both levels) (from_storage(..., incremental=True/False) - since in certain cases we'll want to keep it non-incremental. E.g. when we read a single file and we always need that file in the pipeline.

I thought more about this... It looks like defining it in dataset level is ok since an incremental dataset has to have a single source (bucket or dataset). Otherwise, merge() will be needed which is not possible for incrementals - join, group, aggregate, distinct, are prohibited.

@dmpetrov
Copy link
Member Author

ok, so final decision is to go with first approach in my last comment (re-running the whole script every time user wants to do inc update)?

It would be great to check if we can do this in chain level (not rerunning a script). If it's too complicated then we should take script-level as a shortcut.

As you might remember, we went though this issue before with map(). The first implementation of map() / add_signal() also worked only in script level initially. So, only 1 map() was possible if user need parallel or distributed compute. Later, David figured out that a function can be pickled with all it's dependencies using cloudpickle lib. This lib is also used in Spark, Dask and similar projects. So, he was able to make map() work in function level, not script level.

It would be great if we can do the same here, without limiting users with scripts.

@ilongin ilongin linked a pull request Feb 19, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants