Skip to content

Commit

Permalink
Merge pull request #911 from julep-ai/f/async_boto3
Browse files Browse the repository at this point in the history
feat(agents-api): Add async boto client
  • Loading branch information
creatorrr authored Nov 29, 2024
2 parents d3b37db + c095f09 commit b72c3f0
Show file tree
Hide file tree
Showing 40 changed files with 331 additions and 793 deletions.
4 changes: 1 addition & 3 deletions agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
TextOnlyDocSearchRequest,
VectorDocSearchRequest,
)
from ..autogen.Sessions import CreateSessionRequest
from ..autogen.Tools import SystemDef
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store, load_from_blob_store_if_remote
from ..env import testing
Expand All @@ -35,7 +33,7 @@ async def execute_system(
arguments: dict[str, Any] = system.arguments or {}

if set(arguments.keys()) == {"bucket", "key"}:
arguments = load_from_blob_store_if_remote(arguments)
arguments = await load_from_blob_store_if_remote(arguments)

arguments["developer_id"] = context.execution_input.developer_id

Expand Down
9 changes: 7 additions & 2 deletions agents-api/agents_api/activities/sync_items_remote.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Any

from beartype import beartype
Expand All @@ -10,14 +11,18 @@
async def save_inputs_remote_fn(inputs: list[Any]) -> list[Any | RemoteObject]:
from ..common.storage_handler import store_in_blob_store_if_large

return [store_in_blob_store_if_large(input) for input in inputs]
return await asyncio.gather(
*[store_in_blob_store_if_large(input) for input in inputs]
)


@beartype
async def load_inputs_remote_fn(inputs: list[Any | RemoteObject]) -> list[Any]:
from ..common.storage_handler import load_from_blob_store_if_remote

return [load_from_blob_store_if_remote(input) for input in inputs]
return await asyncio.gather(
*[load_from_blob_store_if_remote(input) for input in inputs]
)


save_inputs_remote = activity.defn(name="save_inputs_remote")(save_inputs_remote_fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def evaluate_step(
else context.current_step.evaluate
)

values = context.prepare_for_step(include_remote=True) | additional_values
values = await context.prepare_for_step(include_remote=True) | additional_values

output = simple_eval_dict(expr, values)
result = StepOutcome(output=output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def for_each_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, ForeachStep)

output = await base_evaluate(
context.current_step.foreach.in_, context.prepare_for_step()
context.current_step.foreach.in_, await context.prepare_for_step()
)
return StepOutcome(output=output)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def if_else_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, IfElseWorkflowStep)

expr: str = context.current_step.if_
output = await base_evaluate(expr, context.prepare_for_step())
output = await base_evaluate(expr, await context.prepare_for_step())
output: bool = bool(output)

result = StepOutcome(output=output)
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/task_steps/log_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def log_step(context: StepContext) -> StepOutcome:
template: str = context.current_step.log
output = await render_template(
template,
context.prepare_for_step(include_remote=True),
await context.prepare_for_step(include_remote=True),
skip_vars=["developer_id"],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def map_reduce_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, MapReduceStep)

output = await base_evaluate(
context.current_step.over, context.prepare_for_step()
context.current_step.over, await context.prepare_for_step()
)

return StepOutcome(output=output)
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/task_steps/prompt_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def format_tool(tool: Tool) -> dict:
async def prompt_step(context: StepContext) -> StepOutcome:
# Get context data
prompt: str | list[dict] = context.current_step.model_dump()["prompt"]
context_data: dict = context.prepare_for_step(include_remote=True)
context_data: dict = await context.prepare_for_step(include_remote=True)

# If the prompt is a string and starts with $_ then we need to evaluate it
should_evaluate_prompt = isinstance(prompt, str) and prompt.startswith(
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/task_steps/return_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def return_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, ReturnStep)

exprs: dict[str, str] = context.current_step.return_
output = await base_evaluate(exprs, context.prepare_for_step())
output = await base_evaluate(exprs, await context.prepare_for_step())

result = StepOutcome(output=output)
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def set_value_step(
try:
expr = override_expr if override_expr is not None else context.current_step.set

values = context.prepare_for_step() | additional_values
values = await context.prepare_for_step() | additional_values
output = simple_eval_dict(expr, values)
result = StepOutcome(output=output)

Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/task_steps/switch_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def switch_step(context: StepContext) -> StepOutcome:
output: int = -1
cases: list[str] = [c.case for c in context.current_step.switch]

evaluator = get_evaluator(names=context.prepare_for_step())
evaluator = get_evaluator(names=await context.prepare_for_step())

for i, case in enumerate(cases):
result = evaluator.eval(case)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def tool_call_step(context: StepContext) -> StepOutcome:
raise ApplicationError(f"Tool {tool_name} not found in the toolset")

arguments = await base_evaluate(
context.current_step.arguments, context.prepare_for_step()
context.current_step.arguments, await context.prepare_for_step()
)

call_id = generate_call_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ async def transition_step(
transition_info: CreateTransitionRequest,
) -> Transition:
# Load output from blob store if it is a remote object
transition_info.output = load_from_blob_store_if_remote(transition_info.output)
transition_info.output = await load_from_blob_store_if_remote(
transition_info.output
)

# Create transition
transition = await create_execution_transition_async(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def wait_for_input_step(context: StepContext) -> StepOutcome:
assert isinstance(context.current_step, WaitForInputStep)

exprs = context.current_step.wait_for_input.info
output = await base_evaluate(exprs, context.prepare_for_step())
output = await base_evaluate(exprs, await context.prepare_for_step())

result = StepOutcome(output=output)
return result
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/task_steps/yield_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def yield_step(context: StepContext) -> StepOutcome:
], f"Workflow {workflow} not found in task"

# Evaluate the expressions in the arguments
arguments = await base_evaluate(exprs, context.prepare_for_step())
arguments = await base_evaluate(exprs, await context.prepare_for_step())

# Transition to the first step of that workflow
transition_target = TransitionTarget(
Expand Down
Loading

0 comments on commit b72c3f0

Please sign in to comment.