Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): Add retry policies to temporal workflows/activities #551

Merged
merged 12 commits into from
Oct 5, 2024
Prev Previous commit
Next Next commit
run poe check
  • Loading branch information
HamadaSalhab committed Oct 2, 2024
commit 578f4732f165e30c6ddd744b32f3181fa319c688
4 changes: 2 additions & 2 deletions agents-api/agents_api/clients/temporal.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import timedelta
from uuid import UUID

from ..common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.client import Client, TLSConfig

from ..autogen.openapi_model import TransitionTarget
from ..common.protocol.tasks import ExecutionInput
from ..common.retry_policies import DEFAULT_RETRY_POLICY
from ..env import (
temporal_client_cert,
temporal_namespace,
Expand Down Expand Up @@ -55,7 +55,7 @@ async def run_task_execution_workflow(
task_queue=temporal_task_queue,
id=str(job_id),
run_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
# TODO: Should add search_attributes for queryability
)

Expand Down
1 change: 1 addition & 0 deletions agents-api/agents_api/common/retry_policies.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta

from temporalio.common import RetryPolicy

DEFAULT_RETRY_POLICY = RetryPolicy(
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/routers/docs/create_doc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Annotated
from uuid import UUID, uuid4

from ...common.retry_policies import DEFAULT_RETRY_POLICY
from fastapi import BackgroundTasks, Depends
from starlette.status import HTTP_201_CREATED
from temporalio.client import Client as TemporalClient

from ...activities.types import EmbedDocsPayload
from ...autogen.openapi_model import CreateDocRequest, ResourceCreatedResponse
from ...clients import temporal
from ...common.retry_policies import DEFAULT_RETRY_POLICY
from ...dependencies.developer_id import get_developer_id
from ...env import temporal_task_queue, testing
from ...models.docs.create_doc import create_doc as create_doc_query
Expand Down Expand Up @@ -42,7 +42,7 @@ async def run_embed_docs_task(
embed_payload,
task_queue=temporal_task_queue,
id=str(job_id),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

# TODO: Remove this conditional once we have a way to run workflows in
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/workflows/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ async def run(self, a: int, b: int) -> int:
demo_activity,
args=[a, b],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
2 changes: 1 addition & 1 deletion agents-api/agents_api/workflows/embed_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ async def run(self, embed_payload: EmbedDocsPayload) -> None:
embed_docs,
embed_payload,
schedule_to_close_timeout=timedelta(seconds=600),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
2 changes: 1 addition & 1 deletion agents-api/agents_api/workflows/mem_rating.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async def run(self, memory: str) -> None:
mem_rating,
memory,
schedule_to_close_timeout=timedelta(seconds=600),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
2 changes: 1 addition & 1 deletion agents-api/agents_api/workflows/summarization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async def run(self, session_id: str) -> None:
summarization,
session_id,
schedule_to_close_timeout=timedelta(seconds=600),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
13 changes: 7 additions & 6 deletions agents-api/agents_api/workflows/task_execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from datetime import timedelta
from typing import Any

from ...common.retry_policies import DEFAULT_RETRY_POLICY
from pydantic import RootModel
from temporalio import workflow
from temporalio.exceptions import ApplicationError

from ...common.retry_policies import DEFAULT_RETRY_POLICY

# Import necessary modules and types
with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
Expand Down Expand Up @@ -387,7 +388,7 @@ async def run(
task_steps.raise_complete_async,
args=[context, output],
schedule_to_close_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

state = PartialTransition(type="resume", output=result)
Expand Down Expand Up @@ -420,7 +421,7 @@ async def run(
task_steps.raise_complete_async,
args=[context, tool_calls_input],
schedule_to_close_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

# Feed the tool call results back to the model
Expand All @@ -432,7 +433,7 @@ async def run(
schedule_to_close_timeout=timedelta(
seconds=30 if debug or testing else 600
),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
state = PartialTransition(output=new_response.output, type="resume")

Expand Down Expand Up @@ -476,7 +477,7 @@ async def run(
task_steps.raise_complete_async,
args=[context, tool_call],
schedule_to_close_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

state = PartialTransition(output=tool_call_response, type="resume")
Expand Down Expand Up @@ -507,7 +508,7 @@ async def run(
schedule_to_close_timeout=timedelta(
seconds=30 if debug or testing else 600
),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

state = PartialTransition(output=tool_call_response)
Expand Down
8 changes: 5 additions & 3 deletions agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from ...common.retry_policies import DEFAULT_RETRY_POLICY

with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
from ...autogen.openapi_model import (
Expand Down Expand Up @@ -33,7 +35,7 @@ async def continue_as_child(
previous_inputs,
user_state,
],
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)


Expand Down Expand Up @@ -170,7 +172,7 @@ async def execute_map_reduce_step(
task_steps.base_evaluate,
args=[reduce, {"results": result, "_": output}],
schedule_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

return result
Expand Down Expand Up @@ -246,7 +248,7 @@ async def execute_map_reduce_step_parallel(
extra_lambda_strs,
],
schedule_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

except BaseException as e:
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/workflows/task_execution/transition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import timedelta

from temporalio import workflow
from ...common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.exceptions import ApplicationError

from ...activities import task_steps
Expand All @@ -11,6 +10,7 @@
TransitionTarget,
)
from ...common.protocol.tasks import PartialTransition, StepContext
from ...common.retry_policies import DEFAULT_RETRY_POLICY


async def transition(
Expand Down Expand Up @@ -45,7 +45,7 @@ async def transition(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)

except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/workflows/truncation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async def run(self, session_id: str, token_count_threshold: int) -> None:
truncation,
args=[session_id, token_count_threshold],
schedule_to_close_timeout=timedelta(seconds=600),
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
)
4 changes: 3 additions & 1 deletion agents-api/notebooks/03-summarise.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,9 @@
" messages.append(user(start_message))\n",
"\n",
" print(\"Starting chatml generation\")\n",
" trim_result = generate(messages, model=\"gpt-4-turbo\", temperature=0.1, stop=[\"</ct\"])\n",
" trim_result = generate(\n",
" messages, model=\"gpt-4-turbo\", temperature=0.1, stop=[\"</ct\"]\n",
" )\n",
" print(\"End chatml generation\")\n",
" messages.append(trim_result)\n",
"\n",
Expand Down
Loading
Loading