Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move XCom serialization and deserialization methods to the Task SDK #45231

Open
Tracked by #44481
kaxil opened this issue Dec 27, 2024 · 1 comment
Open
Tracked by #44481

Move XCom serialization and deserialization methods to the Task SDK #45231

kaxil opened this issue Dec 27, 2024 · 1 comment
Assignees

Comments

@kaxil
Copy link
Member

kaxil commented Dec 27, 2024

Currently, as part of AIP-72, we implemented XCom API endpoints in the Task Execution API Server that expect a JSON-serialized string

def set_xcom(
dag_id: str,
run_id: str,
task_id: str,
key: str,
value: Annotated[
str,
Body(
description="A JSON-formatted string representing the value to set for the XCom.",
openapi_examples={
"simple_value": {
"summary": "Simple value",
"value": '"value1"',
},
"dict_value": {
"summary": "Dictionary value",
"value": '{"key2": "value2"}',
},
"list_value": {
"summary": "List value",
"value": '["value1"]',
},
},
),
],

This was done so that we can have the same interface for multi-language support (Python, Go, Java etc) for Task SDK. The contract is simple: API Server always deals with JSON-formatted string --- and the responsibility of serialization and de-serialization of Native objects to string and back lies to the language-specific clients.

In order to do that we should move the current Serialization and de-serialization logic to clients. This will also allow storing and using XCom backends correctly, which can also handle different languages.

As part of this task, we should also figure out how we can avoid serialization and deserialization from SQLAlchemy as we use JSON type for value column --- which leads to double / triple serialization

  1. Serialization from Task SDK and sent to API Server
  2. API Server calls XCom.serialize
  3. SQLAlchemy JSON type serializes it further

Result of Triple serialization:

image
@kaxil kaxil self-assigned this Dec 27, 2024
@dosubot dosubot bot added the area:core label Dec 27, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 27, 2024
Fixing this so main works with most of the XCom example. But proper fix will be part of apache#45231
kaxil added a commit that referenced this issue Dec 27, 2024
Fixing this so main works with most of the XCom example. But proper fix will be part of #45231
jason810496 pushed a commit to jason810496/airflow that referenced this issue Dec 28, 2024
Fixing this so main works with most of the XCom example. But proper fix will be part of apache#45231
jason810496 pushed a commit to jason810496/airflow that referenced this issue Dec 28, 2024
Fixing this so main works with most of the XCom example. But proper fix will be part of apache#45231
@amoghrajesh
Copy link
Contributor

I think this might be the right issue to introduce a "XComAccessor" kind of interface. I am adding support for handling error cases in #45341 to match legacy

LefterisXefteris pushed a commit to LefterisXefteris/airflow that referenced this issue Jan 5, 2025
Fixing this so main works with most of the XCom example. But proper fix will be part of apache#45231
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests

2 participants