Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): Add retry policies to temporal workflows/activities #551

Merged
merged 12 commits into from
Oct 5, 2024
Prev Previous commit
Next Next commit
feat(agents-api): Add retry policies to workflows/activities executions
  • Loading branch information
HamadaSalhab committed Oct 2, 2024
commit 940b6ac65d5f02957756f12fbae6723f0cfdc20d
2 changes: 2 additions & 0 deletions agents-api/agents_api/clients/temporal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta
from uuid import UUID

from ..common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.client import Client, TLSConfig

from ..autogen.openapi_model import TransitionTarget
Expand Down Expand Up @@ -54,6 +55,7 @@ async def run_task_execution_workflow(
task_queue=temporal_task_queue,
id=str(job_id),
run_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
# TODO: Should add search_attributes for queryability
)

Expand Down
17 changes: 17 additions & 0 deletions agents-api/agents_api/common/retry_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta
from temporalio.common import RetryPolicy

DEFAULT_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2,
maximum_attempts=2,
HamadaSalhab marked this conversation as resolved.
Show resolved Hide resolved
maximum_interval=timedelta(seconds=10),
HamadaSalhab marked this conversation as resolved.
Show resolved Hide resolved
non_retryable_error_types=[
"WorkflowExecutionAlreadyStarted",
"TypeError",
"AssertionError",
"HTTPException",
"SyntaxError",
"ValueError",
HamadaSalhab marked this conversation as resolved.
Show resolved Hide resolved
],
)
2 changes: 2 additions & 0 deletions agents-api/agents_api/workflows/task_execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import timedelta
from typing import Any

from ...common.retry_policies import DEFAULT_RETRY_POLICY
from pydantic import RootModel
from temporalio import workflow
from temporalio.exceptions import ApplicationError
Expand Down Expand Up @@ -200,6 +201,7 @@ async def run(
schedule_to_close_timeout=timedelta(
seconds=30 if debug or testing else 600
),
retry_policy=DEFAULT_RETRY_POLICY,
)
workflow.logger.debug(
f"Step {context.cursor.step} completed successfully"
Expand Down
3 changes: 2 additions & 1 deletion agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from ...common.retry_policies import DEFAULT_RETRY_POLICY
with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
from ...autogen.openapi_model import (
Expand Down Expand Up @@ -33,6 +33,7 @@ async def continue_as_child(
previous_inputs,
user_state,
],
retry_policy=DEFAULT_RETRY_POLICY
)


Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/workflows/task_execution/transition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta

from temporalio import workflow
from ...common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.exceptions import ApplicationError

from ...activities import task_steps
Expand Down Expand Up @@ -44,6 +45,7 @@ async def transition(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
)

except Exception as e:
Expand Down