-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38765] Fix persisted metadata handling in sink #27301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
Outdated
Show resolved
Hide resolved
...lanner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
Outdated
Show resolved
Hide resolved
...er/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
Outdated
Show resolved
Hide resolved
...er/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
Outdated
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
Outdated
Show resolved
Hide resolved
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dawidwys. +1 for this. Please update the release notes and let people know about the past inconsistency and that there is a potential restoring issue but the state was corrupted anyways.
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for digging deeper and fixing this bug in a backwards compatible way. I left some last comments.
| // We introduced a new version, because statements that were never rolling back to a value from | ||
| // state could run succesfully. We allow those jobs to be upgraded. Without a new versions such jobs | ||
| // would fail on restore, because the state serializer would differ | ||
| @ExecNodeMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch version can stay at v1. There is no upgrade story that would interrupt a running batch job. I did it similarly for scan_v2, where I just updated v1 to the new behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Overlooked that in your commit
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
Show resolved
Hide resolved
| @Override | ||
| protected Stream<String> getSavepointPaths( | ||
| TableTestProgram program, ExecNodeMetadata metadata) { | ||
| // disable the writable metadata test for sink node with version 1. it fails after the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we generalize this into a method in RestoreTestBase that TableSinkRestoreTest can implement for ignoring certain programs with versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if that's what you had in mind, but I refactored it a bit.
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for leveraging the version infrastructure and ensuring backwards compatibility.
|
Thank you. I'll just squash the commits to run the CI on a final version that I can merge 👍 |
What is the purpose of the change
Fixes a bug where an incorrect type is used in sink for
Verifying this change
Added a test in
SinkTestProgramsDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation