Skip to content

Commit

Permalink
Merge pull request #912 from julep-ai/c/async-system-tool-execution
Browse files Browse the repository at this point in the history
chore(agents-api): Run sync system tool executions in new processes
  • Loading branch information
creatorrr authored Nov 29, 2024
2 parents b72c3f0 + 9d68580 commit 8aa82e5
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import Any
from uuid import UUID

Expand All @@ -22,6 +24,9 @@
from ..models.developer import get_developer
from .utils import get_handler

# For running synchronous code in the background
process_pool_executor = ProcessPoolExecutor()


@auto_blob_store(deep=True)
@beartype
Expand Down Expand Up @@ -105,13 +110,26 @@ async def execute_system(
developer_id = arguments.pop("developer_id")
session_id = arguments.pop("session_id", None)
data = CreateSessionRequest(**arguments)
return handler(developer_id=developer_id, session_id=session_id, data=data)

# In case sessions.create becomes asynchronous in the future
if asyncio.iscoroutinefunction(handler):
return await handler()

# Run the synchronous function in another process
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool_executor, partial(handler, developer_id, session_id, data)
)

# Handle regular operations
if asyncio.iscoroutinefunction(handler):
return await handler(**arguments)
return handler(**arguments)

# Run the synchronous function in another process
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool_executor, partial(handler, **arguments)
)
except BaseException as e:
if activity.in_activity():
activity.logger.error(f"Error in execute_system_call: {e}")
Expand Down

0 comments on commit 8aa82e5

Please sign in to comment.