Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add thread test
  • Loading branch information
pablogsal committed Dec 1, 2025
commit adb04294a8c23e1190c924586c53a063d6a355f1
160 changes: 160 additions & 0 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,6 +2399,166 @@ def level1():
lines_no_cache = [f.lineno for f in frames_no_cache]
self.assertEqual(lines_cached, lines_no_cache)

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_cache_per_thread_isolation(self):
"""Test that frame cache is per-thread and cache invalidation works independently."""
script_body = """\
import threading

lock = threading.Lock()

def sync(msg):
with lock:
sock.sendall(msg + b"\\n")
sock.recv(1)

# Thread 1 functions
def baz1():
sync(b"t1:baz1")

def bar1():
baz1()

def blech1():
sync(b"t1:blech1")

def foo1():
bar1() # Goes down to baz1, syncs
blech1() # Returns up, goes down to blech1, syncs

# Thread 2 functions
def baz2():
sync(b"t2:baz2")

def bar2():
baz2()

def blech2():
sync(b"t2:blech2")

def foo2():
bar2() # Goes down to baz2, syncs
blech2() # Returns up, goes down to blech2, syncs

t1 = threading.Thread(target=foo1)
t2 = threading.Thread(target=foo2)
t1.start()
t2.start()
t1.join()
t2.join()
"""

with self._target_process(script_body) as (p, client_socket, make_unwinder):
unwinder = make_unwinder(cache_frames=True)
buffer = b""

def recv_msg():
"""Receive a single message from socket."""
nonlocal buffer
while b"\n" not in buffer:
chunk = client_socket.recv(256)
if not chunk:
return None
buffer += chunk
msg, buffer = buffer.split(b"\n", 1)
return msg

def get_thread_frames(target_funcs):
"""Get frames for thread matching target functions."""
for _ in range(3):
try:
traces = unwinder.get_stack_trace()
for interp in traces:
for thread in interp.threads:
funcs = [f.funcname for f in thread.frame_info]
if any(f in funcs for f in target_funcs):
return funcs
except RuntimeError:
pass
return None

# Track results for each sync point
results = {}

# Process 4 sync points: baz1, baz2, blech1, blech2
# With the lock, threads are serialized - handle one at a time
for _ in range(4):
msg = recv_msg()
self.assertIsNotNone(msg, "Expected message from subprocess")

# Determine which thread/function and take snapshot
if msg == b"t1:baz1":
funcs = get_thread_frames(["baz1", "bar1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at baz1")
results["t1:baz1"] = funcs
elif msg == b"t2:baz2":
funcs = get_thread_frames(["baz2", "bar2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at baz2")
results["t2:baz2"] = funcs
elif msg == b"t1:blech1":
funcs = get_thread_frames(["blech1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at blech1")
results["t1:blech1"] = funcs
elif msg == b"t2:blech2":
funcs = get_thread_frames(["blech2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at blech2")
results["t2:blech2"] = funcs

# Release thread to continue
client_socket.sendall(b"k")

# Validate Phase 1: baz snapshots
t1_baz = results.get("t1:baz1")
t2_baz = results.get("t2:baz2")
self.assertIsNotNone(t1_baz, "Missing t1:baz1 snapshot")
self.assertIsNotNone(t2_baz, "Missing t2:baz2 snapshot")

# Thread 1 at baz1: should have foo1->bar1->baz1
self.assertIn("baz1", t1_baz)
self.assertIn("bar1", t1_baz)
self.assertIn("foo1", t1_baz)
self.assertNotIn("blech1", t1_baz)
# No cross-contamination
self.assertNotIn("baz2", t1_baz)
self.assertNotIn("bar2", t1_baz)
self.assertNotIn("foo2", t1_baz)

# Thread 2 at baz2: should have foo2->bar2->baz2
self.assertIn("baz2", t2_baz)
self.assertIn("bar2", t2_baz)
self.assertIn("foo2", t2_baz)
self.assertNotIn("blech2", t2_baz)
# No cross-contamination
self.assertNotIn("baz1", t2_baz)
self.assertNotIn("bar1", t2_baz)
self.assertNotIn("foo1", t2_baz)

# Validate Phase 2: blech snapshots (cache invalidation test)
t1_blech = results.get("t1:blech1")
t2_blech = results.get("t2:blech2")
self.assertIsNotNone(t1_blech, "Missing t1:blech1 snapshot")
self.assertIsNotNone(t2_blech, "Missing t2:blech2 snapshot")

# Thread 1 at blech1: bar1/baz1 should be GONE (cache invalidated)
self.assertIn("blech1", t1_blech)
self.assertIn("foo1", t1_blech)
self.assertNotIn("bar1", t1_blech, "Cache not invalidated: bar1 still present")
self.assertNotIn("baz1", t1_blech, "Cache not invalidated: baz1 still present")
# No cross-contamination
self.assertNotIn("blech2", t1_blech)

# Thread 2 at blech2: bar2/baz2 should be GONE (cache invalidated)
self.assertIn("blech2", t2_blech)
self.assertIn("foo2", t2_blech)
self.assertNotIn("bar2", t2_blech, "Cache not invalidated: bar2 still present")
self.assertNotIn("baz2", t2_blech, "Cache not invalidated: baz2 still present")
# No cross-contamination
self.assertNotIn("blech1", t2_blech)


if __name__ == "__main__":
unittest.main()