-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve pane index through reshuffle. #34348
base: master
Are you sure you want to change the base?
Conversation
81d6f1d
to
6159384
Compare
6159384
to
2952071
Compare
2952071
to
f30477d
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@@ -124,6 +124,7 @@ var ( | |||
CoderTimer = cdrUrn(pipepb.StandardCoders_TIMER) | |||
|
|||
CoderKV = cdrUrn(pipepb.StandardCoders_KV) | |||
CoderTuple = "beam:coder:tuple:v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems suspiciously out of place, being a magic string and also not a standard coder. What is the story behind it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I placed it here because python switches between tuple coder depending on number of elements which cauuses python tests to fail with prism runner
beam/sdks/python/apache_beam/coders/coders.py
Line 1232 in 4fe34db
if self.is_kv_coder(): |
For example, with the pane index change there ends up being a tuple coder with 3 elements (because pane info is now included)
coders {
key: "ref_Coder_TupleCoder_6"
value {
spec {
urn: "beam:coder:tuple:v1"
}
component_coder_ids: "ref_Coder_BytesCoder_1"
component_coder_ids: "ref_Coder_NullableCoder_7"
component_coder_ids: "ref_Coder_FastPrimitivesCoder_8"
}
}
But without pane info it falls back to using KV coder which is supported by prism
coders {
key: "ref_Coder_TupleCoder_6"
value {
spec {
urn: "beam:coder:kv:v1"
}
component_coder_ids: "ref_Coder_BytesCoder_1"
component_coder_ids: "ref_Coder_NullableCoder_7"
}
}
Prism error without this change:
ERROR:root:prism error building stage stage-002:
unknown coder urn key: beam:coder:tuple:v1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like something we probably need to fix another way. The actual shuffle needs KVs, with everything we want to preserve reified into the value component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual shuffle needs KVs
What do you mean by this? The way I understand it is
- Reshuffle adds random keys (k, v)
- ReifyMetadata maintains a kv, with a nested tuple as a value (value, timestamp, pane_info)
return key, (value, timestamp, pane_info)
I guess if we want to avoid this we can have a nested kv in ReifyMetedata so it is
- key, (value, (timestamp, pane_info))
Then the regular kv coder should work?
Or can we also use windowed_value as the value in the reify output instead of a tuple with the medatada?
The original reify just used a kv as the value in the reify function
return key, (value, timestamp) |
We are now including pane info as mentioned above so it becomes a tuple
This only happens for global window case, in the custom window case the value for reify is a windowed value
return key, windowed_value.WindowedValue(value, timestamp, [window]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Yea KV(key, (value, timestamp, pane_info)
should be fine. The runner should not need to have any understanding of the coder for (value, timestamp, pane_info)
since that is in general user type / coder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK... @lostluck does know Prism best. But it isn't in line with my take on the model. Tuple is a language-specific esoteric coder that isn't part of the Beam model and shouldn't be explicitly understood by anything outside the Python SDK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and of course big follow-up question: this coder is not new... presumably it already works, so why does this change require it to become runner-understood?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's as Claude says.
Basically the Python Tuple coder is an outlier: It pretends to be a standard beam coder, with an arbitrary number of components, and Python plays fast and loose with the notion of coder types. No other "custom coder" uses exposed sub components, essentially. Custom coders are usually fully opaque.
The problem here is I tried to avoid needing to enumerate all Beam coders with sub components that needed processing. We already had a "leaf" list, why do we also need a "composite" list? That means there are two approaches:
-
This current PR's approach: Promote the janky python approach to be a known thing for all runners/SDKs. Since the URN is already pretending to be a standard URN, this isn't too hard, and it permits other SDKs to interoperate with that coder. AKA, turning the exception to be part of the standard.
-
We are forced to specify known composite coders to avoid Length Prefixing them unnecessarily.
So instead of just the set of Known Leaf Coders, we would have the set of Known Composite Coders, that don't need length prefixing. Anything else should just be length prefixed.
Eg. We add a list of the known Composite URNs that should not be length prefixed by the
Existing Leaf list:
var leafCoders = map[string]struct{}{ |
Where the check should go, so it's the same logic for wrapping unknowns. eg.
if len(c.GetComponentCoderIds()) == 0 && !leaf { |
if (len(c.GetComponentCoderIds()) == 0 && !leaf) || !isKnownCompositeCoder(c) {
This has two risks though:
-
Changed length prefixing behavior, may mean tests that are currently passing will fail. It'll be important to run the Java suite locally (I don't trust the Github action to run uncached when it's just a prism side change. Only noticed that after I got re-orged. If it takes less than 20m to run, it was cached and can't be trusted).
-
The converse issue: What if the Python SDK doesn't know how to deal with a runner side wrapped length prefixed tuple coder? Then a Python fix would be needed. This would hopefully be evident in the test suite uses of tuple coder.
(There's a issue with Java Row coders failing deep in the Java SDK when they're wrapped in a length prefix. The introspection doesn't know how to skip the LP wrapper (see #32931).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added link to the tracking issue for the prism tuple coder issue to the PR description: #32636
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and of course big follow-up question: this coder is not new... presumably it already works, so why does this change require it to become runner-understood?
And specifically for this question: If it's not in the validates runner suite it's extremely hard to test when one doesn't use the SDK. In this case, there was exactly one test for it. There's only ~70 validates runner tests for Python.
Should fix #32636.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.