Skip to content

Commit 130a6c2

Browse files
committed
new improved local box
1 parent 680e091 commit 130a6c2

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed

src/codeboxapi/local.py

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
"""
2+
Local implementation of CodeBox.
3+
This is useful for testing and development.c
4+
In case you don't put an api_key,
5+
this is the default CodeBox.
6+
"""
7+
8+
import asyncio
9+
import os
10+
import subprocess
11+
from os import PathLike
12+
from queue import Queue
13+
from threading import Thread
14+
from typing import (
15+
AsyncGenerator,
16+
AsyncIterator,
17+
BinaryIO,
18+
Generator,
19+
Iterator,
20+
Literal,
21+
Self,
22+
Union,
23+
)
24+
25+
from jupyter_client.manager import KernelManager
26+
27+
from . import utils
28+
from .codebox import CodeBox, CodeBoxFile, ExecChunk
29+
from .config import settings
30+
31+
32+
# todo implement inactivity timeout to close kernel after 10 minutes of last method call
33+
class LocalBox(CodeBox):
34+
"""
35+
LocalBox is a CodeBox implementation that runs code locally.
36+
This is useful for testing and development.
37+
"""
38+
39+
_instance: Self | None = None
40+
41+
def __new__(cls, *_, **__):
42+
if not cls._instance:
43+
cls._instance = super().__new__(cls)
44+
else:
45+
if settings.debug:
46+
print(
47+
"INFO: Using a LocalBox which is not fully isolated\n"
48+
" and not scalable across multiple parallel users.\n"
49+
" Make sure to use a CODEBOX_API_KEY in production.\n"
50+
" Set envar CODEBOX_DEBUG=False to not see this again.\n"
51+
)
52+
return cls._instance
53+
54+
def __init__(self, /, **kwargs) -> None:
55+
super().__init__()
56+
os.environ["PYDEVD_DISABLE_FILE_VALIDATION"] = "1"
57+
self.kernel = KernelManager()
58+
self.cwd = settings.default_working_dir
59+
# startup
60+
utils.check_installed("jupyter-client")
61+
os.makedirs(self.cwd, exist_ok=True)
62+
if not self.kernel.is_alive():
63+
self.kernel = KernelManager(ip=os.getenv("LOCALHOST", "127.0.0.1"))
64+
self.kernel.start_kernel(cwd=self.cwd)
65+
66+
def stream_exec(
67+
self,
68+
code: str | PathLike,
69+
language: Literal["python", "bash"] = "python",
70+
timeout: float | None = None,
71+
cwd: str | None = None,
72+
) -> Generator[ExecChunk, None, None]:
73+
"""
74+
Creates a Generator that streams chunks of the output of the code execution
75+
"""
76+
code = utils.resolve_pathlike(code)
77+
78+
if language == "python":
79+
msg_queue: Queue[dict | None] = Queue()
80+
81+
def output_hook(msg):
82+
msg_queue.put(msg)
83+
84+
def execute_code():
85+
self.kernel.client().execute_interactive(code, output_hook=output_hook)
86+
msg_queue.put(None)
87+
88+
execution_thread = Thread(target=execute_code)
89+
execution_thread.start()
90+
91+
while True:
92+
msg = msg_queue.get()
93+
if msg is None:
94+
break
95+
yield utils.parse_message(msg)
96+
97+
execution_thread.join()
98+
99+
elif language == "bash":
100+
with utils.raise_timeout(timeout):
101+
process = subprocess.Popen(
102+
code,
103+
shell=True,
104+
stdout=subprocess.PIPE,
105+
stderr=subprocess.STDOUT,
106+
text=True,
107+
)
108+
if process.stdout:
109+
for line in process.stdout:
110+
yield ExecChunk(type="stream", content=line.strip())
111+
process.wait()
112+
if process.returncode != 0:
113+
yield ExecChunk(type="error", content="Command execution failed")
114+
else:
115+
raise ValueError(f"Unsupported language: {language}")
116+
117+
def upload(
118+
self,
119+
file_name: str,
120+
content: BinaryIO | bytes | str,
121+
timeout: float | None = None,
122+
) -> CodeBoxFile:
123+
with utils.raise_timeout(timeout):
124+
file_path = os.path.join(self.cwd, file_name)
125+
with open(file_path, "wb") as file:
126+
if isinstance(content, str):
127+
file.write(content.encode())
128+
elif isinstance(content, BinaryIO):
129+
while chunk := content.read(8192):
130+
file.write(chunk)
131+
else:
132+
file.write(content)
133+
file_size = os.path.getsize(file_path)
134+
return CodeBoxFile(
135+
remote_path=file_path,
136+
size=file_size,
137+
codebox=self,
138+
)
139+
140+
def stream_download(
141+
self,
142+
file_name: str,
143+
timeout: float | None = None,
144+
) -> Iterator[bytes]:
145+
with utils.raise_timeout(timeout):
146+
with open(os.path.join(self.cwd, file_name), "rb") as file:
147+
yield file.read()
148+
149+
async def astream_exec(
150+
self,
151+
code: Union[str, PathLike],
152+
language: Literal["python", "bash"] = "python",
153+
timeout: float | None = None,
154+
cwd: str | None = None,
155+
) -> AsyncGenerator[ExecChunk, None]:
156+
code = utils.resolve_pathlike(code)
157+
158+
if language == "python":
159+
msg_queue: asyncio.Queue = asyncio.Queue()
160+
161+
async def output_hook(msg):
162+
await msg_queue.put(msg)
163+
164+
execution_task = asyncio.create_task(
165+
self.kernel.client()._async_execute_interactive(
166+
code, output_hook=output_hook, timeout=timeout
167+
)
168+
)
169+
170+
try:
171+
while not execution_task.done() or not msg_queue.empty():
172+
msg = await msg_queue.get()
173+
yield utils.parse_message(msg)
174+
finally:
175+
if not execution_task.done():
176+
execution_task.cancel()
177+
try:
178+
await execution_task
179+
except asyncio.CancelledError:
180+
pass
181+
182+
elif language == "bash":
183+
async with asyncio.timeout(timeout):
184+
process = await asyncio.create_subprocess_shell(
185+
code,
186+
stdout=asyncio.subprocess.PIPE,
187+
stderr=asyncio.subprocess.STDOUT,
188+
cwd=cwd,
189+
)
190+
if process.stdout:
191+
async for line in process.stdout:
192+
yield ExecChunk(type="stream", content=line.decode().strip())
193+
await process.wait()
194+
if process.returncode != 0:
195+
yield ExecChunk(type="error", content="Command execution failed")
196+
197+
else:
198+
raise ValueError(f"Unsupported language: {language}")
199+
200+
async def aupload(
201+
self,
202+
file_name: str,
203+
content: BinaryIO | bytes | str,
204+
timeout: float | None = None,
205+
) -> CodeBoxFile:
206+
import aiofiles
207+
208+
async with asyncio.timeout(timeout):
209+
file_path = os.path.join(self.cwd, file_name)
210+
async with aiofiles.open(file_path, "wb") as file:
211+
if isinstance(content, str):
212+
await file.write(content.encode())
213+
elif isinstance(content, BinaryIO):
214+
while chunk := content.read(8192):
215+
await file.write(chunk)
216+
else:
217+
await file.write(content)
218+
219+
file_size = await aiofiles.os.path.getsize(file_path)
220+
return CodeBoxFile(
221+
remote_path=file_path,
222+
size=file_size,
223+
codebox=self,
224+
)
225+
226+
async def astream_download(
227+
self,
228+
remote_file_path: str,
229+
timeout: float | None = None,
230+
) -> AsyncIterator[bytes]:
231+
import aiofiles
232+
233+
async with asyncio.timeout(timeout):
234+
async with aiofiles.open(
235+
os.path.join(self.cwd, remote_file_path), "rb"
236+
) as f:
237+
yield await f.read()
238+
239+
def __del__(self):
240+
self.kernel.shutdown_kernel()

0 commit comments

Comments
 (0)