Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add tests for frame caching in remote debugging
Tests cover the key frame caching scenarios: cache hits when the stack is stable, cache invalidation when functions return or new ones are called, partial stack reuse when only the top changes, and recursive frames. Also verifies that cache_frames=True produces equivalent results to cache_frames=False.
  • Loading branch information
pablogsal committed Dec 1, 2025
commit c56c1e156b33162fb668676ff8727ed95e5ff614
362 changes: 362 additions & 0 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import unittest
import os
import textwrap
Expand Down Expand Up @@ -2038,5 +2039,366 @@ def busy_thread():
p.stderr.close()


class TestFrameCaching(unittest.TestCase):
"""Test that frame caching produces correct results.

Uses socket-based synchronization for deterministic testing.
All tests verify cache reuse via object identity checks (assertIs).
"""

maxDiff = None

@contextlib.contextmanager
def _target_process(self, script_body):
"""Context manager for running a target process with socket sync."""
port = find_unused_port()
script = f"""\
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
{textwrap.dedent(script_body)}
"""

with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)

script_name = _make_test_script(script_dir, "script", script)
client_socket = None
p = None
try:
p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()

def make_unwinder(cache_frames=True):
return RemoteUnwinder(p.pid, all_threads=True, cache_frames=cache_frames)

yield p, client_socket, make_unwinder

except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket:
client_socket.close()
if p:
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)

def _wait_for_signal(self, client_socket, signal):
"""Block until signal received from target."""
response = b""
while signal not in response:
chunk = client_socket.recv(64)
if not chunk:
break
response += chunk
return response

def _get_frames(self, unwinder, required_funcs):
"""Sample and return frame_info list for thread containing required_funcs."""
traces = unwinder.get_stack_trace()
for interp in traces:
for thread in interp.threads:
funcs = [f.funcname for f in thread.frame_info]
if required_funcs.issubset(set(funcs)):
return thread.frame_info
return None

def _sample_frames(self, client_socket, unwinder, wait_signal, send_ack, required_funcs):
"""Wait for signal, sample frames, send ack. Returns frame_info list."""
self._wait_for_signal(client_socket, wait_signal)
frames = self._get_frames(unwinder, required_funcs)
client_socket.sendall(send_ack)
return frames

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_cache_hit_same_stack(self):
"""Test that 3 consecutive samples reuse cached frame objects (identity check)."""
script_body = """\
def level3():
sock.sendall(b"sync1")
sock.recv(16)
sock.sendall(b"sync2")
sock.recv(16)
sock.sendall(b"sync3")
sock.recv(16)

def level2():
level3()

def level1():
level2()

level1()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)
expected = {"level1", "level2", "level3"}

frames1 = self._sample_frames(client_socket, unwinder, b"sync1", b"ack", expected)
frames2 = self._sample_frames(client_socket, unwinder, b"sync2", b"ack", expected)
frames3 = self._sample_frames(client_socket, unwinder, b"sync3", b"done", expected)

self.assertIsNotNone(frames1)
self.assertIsNotNone(frames2)
self.assertIsNotNone(frames3)
self.assertEqual(len(frames1), len(frames2))
self.assertEqual(len(frames2), len(frames3))

# All frames must be identical objects (cache reuse)
for i, (f1, f2, f3) in enumerate(zip(frames1, frames2, frames3)):
self.assertIs(f1, f2, f"Frame {i}: samples 1-2 must be same object")
self.assertIs(f2, f3, f"Frame {i}: samples 2-3 must be same object")
self.assertIs(f1, f3, f"Frame {i}: samples 1-3 must be same object")

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_cache_invalidation_on_return(self):
"""Test cache invalidation when stack shrinks (function returns)."""
script_body = """\
def inner():
sock.sendall(b"at_inner")
sock.recv(16)

def outer():
inner()
sock.sendall(b"at_outer")
sock.recv(16)

outer()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)

frames_deep = self._sample_frames(
client_socket, unwinder, b"at_inner", b"ack", {"inner", "outer"})
frames_shallow = self._sample_frames(
client_socket, unwinder, b"at_outer", b"done", {"outer"})

self.assertIsNotNone(frames_deep)
self.assertIsNotNone(frames_shallow)

funcs_deep = [f.funcname for f in frames_deep]
funcs_shallow = [f.funcname for f in frames_shallow]

self.assertIn("inner", funcs_deep)
self.assertIn("outer", funcs_deep)
self.assertNotIn("inner", funcs_shallow)
self.assertIn("outer", funcs_shallow)

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_cache_invalidation_on_call(self):
"""Test cache invalidation when stack grows (new function called)."""
script_body = """\
def deeper():
sock.sendall(b"at_deeper")
sock.recv(16)

def middle():
sock.sendall(b"at_middle")
sock.recv(16)
deeper()

def top():
middle()

top()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)

frames_before = self._sample_frames(
client_socket, unwinder, b"at_middle", b"ack", {"middle", "top"})
frames_after = self._sample_frames(
client_socket, unwinder, b"at_deeper", b"done", {"deeper", "middle", "top"})

self.assertIsNotNone(frames_before)
self.assertIsNotNone(frames_after)

funcs_before = [f.funcname for f in frames_before]
funcs_after = [f.funcname for f in frames_after]

self.assertIn("middle", funcs_before)
self.assertIn("top", funcs_before)
self.assertNotIn("deeper", funcs_before)

self.assertIn("deeper", funcs_after)
self.assertIn("middle", funcs_after)
self.assertIn("top", funcs_after)

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_partial_stack_reuse(self):
"""Test that unchanged bottom frames are reused when top changes (A→B→C to A→B→D)."""
script_body = """\
def func_c():
sock.sendall(b"at_c")
sock.recv(16)

def func_d():
sock.sendall(b"at_d")
sock.recv(16)

def func_b():
func_c()
func_d()

def func_a():
func_b()

func_a()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)

# Sample at C: stack is A→B→C
frames_c = self._sample_frames(
client_socket, unwinder, b"at_c", b"ack", {"func_a", "func_b", "func_c"})
# Sample at D: stack is A→B→D (C returned, D called)
frames_d = self._sample_frames(
client_socket, unwinder, b"at_d", b"done", {"func_a", "func_b", "func_d"})

self.assertIsNotNone(frames_c)
self.assertIsNotNone(frames_d)

# Find func_a and func_b frames in both samples
def find_frame(frames, funcname):
for f in frames:
if f.funcname == funcname:
return f
return None

frame_a_in_c = find_frame(frames_c, "func_a")
frame_b_in_c = find_frame(frames_c, "func_b")
frame_a_in_d = find_frame(frames_d, "func_a")
frame_b_in_d = find_frame(frames_d, "func_b")

self.assertIsNotNone(frame_a_in_c)
self.assertIsNotNone(frame_b_in_c)
self.assertIsNotNone(frame_a_in_d)
self.assertIsNotNone(frame_b_in_d)

# The bottom frames (A, B) should be the SAME objects (cache reuse)
self.assertIs(frame_a_in_c, frame_a_in_d, "func_a frame should be reused from cache")
self.assertIs(frame_b_in_c, frame_b_in_d, "func_b frame should be reused from cache")

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_recursive_frames(self):
"""Test caching with same function appearing multiple times (recursion)."""
script_body = """\
def recurse(n):
if n <= 0:
sock.sendall(b"sync1")
sock.recv(16)
sock.sendall(b"sync2")
sock.recv(16)
else:
recurse(n - 1)

recurse(5)
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)

frames1 = self._sample_frames(
client_socket, unwinder, b"sync1", b"ack", {"recurse"})
frames2 = self._sample_frames(
client_socket, unwinder, b"sync2", b"done", {"recurse"})

self.assertIsNotNone(frames1)
self.assertIsNotNone(frames2)

# Should have multiple "recurse" frames (6 total: recurse(5) down to recurse(0))
recurse_count = sum(1 for f in frames1 if f.funcname == "recurse")
self.assertEqual(recurse_count, 6, "Should have 6 recursive frames")

self.assertEqual(len(frames1), len(frames2))

# All frames should be identical objects (cache reuse)
for i, (f1, f2) in enumerate(zip(frames1, frames2)):
self.assertIs(f1, f2, f"Frame {i}: recursive frames must be same object")

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_cache_vs_no_cache_equivalence(self):
"""Test that cache_frames=True and cache_frames=False produce equivalent results."""
script_body = """\
def level3():
sock.sendall(b"ready")
sock.recv(16)

def level2():
level3()

def level1():
level2()

level1()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
self._wait_for_signal(client_socket, b"ready")

# Sample with cache
unwinder_cache = make_unwinder(cache_frames=True)
frames_cached = self._get_frames(unwinder_cache, {"level1", "level2", "level3"})

# Sample without cache
unwinder_no_cache = make_unwinder(cache_frames=False)
frames_no_cache = self._get_frames(unwinder_no_cache, {"level1", "level2", "level3"})

client_socket.sendall(b"done")

self.assertIsNotNone(frames_cached)
self.assertIsNotNone(frames_no_cache)

# Same number of frames
self.assertEqual(len(frames_cached), len(frames_no_cache))

# Same function names in same order
funcs_cached = [f.funcname for f in frames_cached]
funcs_no_cache = [f.funcname for f in frames_no_cache]
self.assertEqual(funcs_cached, funcs_no_cache)

# Same line numbers
lines_cached = [f.lineno for f in frames_cached]
lines_no_cache = [f.lineno for f in frames_no_cache]
self.assertEqual(lines_cached, lines_no_cache)


if __name__ == "__main__":
unittest.main()