Skip to content

Commit

Permalink
fix(agents-api): Fix blob store, ifelse branch, temporal postgres (#882)
Browse files Browse the repository at this point in the history
<!-- ELLIPSIS_HIDDEN -->



> [!IMPORTANT]
> Fixes blob store handling, improves retry logic, and updates
configurations for agents API, memory store, and scheduler.
> 
>   - **Blob Store Handling**:
> - In `execute_system.py`, added logic to load arguments from blob
store if they contain `bucket` and `key`.
> - In `storage_handler.py`, updated `load_from_blob_store_if_remote()`
to handle dicts with `bucket` and `key`.
>   - **Retry Logic**:
> - In `models/utils.py`, increased retry attempts from 2 to 4 in
`is_resource_busy()`.
> - Updated error handling in `cozo_query()` to check both `e` and
`e.resp` for 'busy' status.
>   - **Workflow Execution**:
> - In `helpers.py`, modified `execute_if_else_branch()` to handle
`None` else_branch by setting a default `EvaluateStep`.
>   - **Configuration Updates**:
> - In `memory-store/docker-compose.yml`, added `develop` section for
file watching and rebuild actions.
> - In `memory-store/options`, increased `max_background_jobs` to 32 and
`max_subcompactions` to 8.
> - In `scheduler/docker-compose.yml`, set `temporal-db-data` volume to
external.
> 
> <sup>This description was created by </sup>[<img alt="Ellipsis"
src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=julep-ai%2Fjulep&utm_source=github&utm_medium=referral)<sup>
for fa011a9. It will automatically
update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->

---------

Co-authored-by: Julep Developers <[email protected]>
Co-authored-by: whiterabbit1983 <[email protected]>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent 7810756 commit 4113fab
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 9 deletions.
7 changes: 6 additions & 1 deletion agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
)
from ..autogen.Sessions import CreateSessionRequest
from ..autogen.Tools import SystemDef
from ..common.protocol.remote import RemoteObject
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store
from ..common.storage_handler import auto_blob_store, load_from_blob_store_if_remote
from ..env import testing
from ..models.developer import get_developer
from .utils import get_handler
Expand All @@ -33,6 +34,10 @@ async def execute_system(
) -> Any:
"""Execute a system call with the appropriate handler and transformed arguments."""
arguments: dict[str, Any] = system.arguments or {}

if set(arguments.keys()) == {"bucket", "key"}:
arguments = load_from_blob_store_if_remote(arguments)

arguments["developer_id"] = context.execution_input.developer_id

# Unbox all the arguments
Expand Down
4 changes: 4 additions & 0 deletions agents-api/agents_api/common/storage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def load_from_blob_store_if_remote(x: Any | RemoteObject) -> Any:
elif isinstance(x, RemoteList):
x = list(x)

elif isinstance(x, dict) and set(x.keys()) == {"bucket", "key"}:
fetched = s3.get_object(x["key"])
return deserialize(fetched)

return x


Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def is_resource_busy(e: Exception) -> bool:
return isinstance(e, HTTPException) and e.status_code == 429

@retry(
stop=stop_after_attempt(2),
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception(is_resource_busy),
)
Expand Down Expand Up @@ -254,7 +254,7 @@ def wrapper(*args: P.args, client=None, **kwargs: P.kwargs) -> pd.DataFrame:

debug and print(repr(e))

if "busy" in str(getattr(e, "resp", e)).lower():
if "busy" in (str(e) + str(getattr(e, "resp", e))).lower():
raise HTTPException(
status_code=429, detail="Resource busy. Please try again later."
) from e
Expand Down
6 changes: 5 additions & 1 deletion agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
from ...autogen.openapi_model import (
EvaluateStep,
TransitionTarget,
Workflow,
WorkflowStep,
Expand Down Expand Up @@ -90,14 +91,17 @@ async def execute_if_else_branch(
context: StepContext,
execution_input: ExecutionInput,
then_branch: WorkflowStep,
else_branch: WorkflowStep,
else_branch: WorkflowStep | None,
condition: bool,
previous_inputs: RemoteList | list[Any],
user_state: dict[str, Any] = {},
) -> Any:
workflow.logger.info(f"If-Else step: Condition evaluated to {condition}")
chosen_branch = then_branch if condition else else_branch

if chosen_branch is None:
chosen_branch = EvaluateStep(evaluate={"output": "_"})

if_else_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].if_else"
if_else_wf_name += ".then" if condition else ".else"

Expand Down
10 changes: 9 additions & 1 deletion memory-store/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ services:
ports:
- "9070:9070"

develop:
watch:
- action: sync+restart
path: ./options
target: /data/cozo.db/options
- action: rebuild
path: Dockerfile

labels:
ofelia.enabled: "true"
ofelia.job-exec.backupcron.schedule: "@every 3h"
Expand All @@ -35,4 +43,4 @@ volumes:
cozo_data:
external: true
cozo_backup:
external: true
external: true
8 changes: 4 additions & 4 deletions memory-store/options
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
compaction_readahead_size=2097152
strict_bytes_per_sync=false
bytes_per_sync=1048576
max_background_jobs=16
max_background_jobs=32
avoid_flush_during_shutdown=false
max_background_flushes=-1
delayed_write_rate=16777216
max_open_files=-1
max_subcompactions=2
max_subcompactions=8
writable_file_max_buffer_size=5048576
wal_bytes_per_sync=0
max_background_compactions=-1
Expand Down Expand Up @@ -100,10 +100,10 @@
memtable_protection_bytes_per_key=0
target_file_size_multiplier=1
report_bg_io_stats=false
write_buffer_size=267108864
write_buffer_size=534217728
memtable_huge_page_size=0
max_successive_merges=0
max_write_buffer_number=25
max_write_buffer_number=50
prefix_extractor=rocksdb.CappedPrefix.9
bottommost_compression_opts={checksum=false;max_dict_buffer_bytes=0;enabled=false;max_dict_bytes=0;max_compressed_bytes_per_kb=896;parallel_threads=1;zstd_max_train_bytes=0;level=32767;use_zstd_dict_trainer=true;strategy=0;window_bits=-14;}
paranoid_file_checks=false
Expand Down
1 change: 1 addition & 0 deletions scheduler/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ services:

volumes:
temporal-db-data:
external: true

0 comments on commit 4113fab

Please sign in to comment.