Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge upstream/main into gh-138122-fix
  • Loading branch information
pablogsal committed Dec 7, 2025
commit a01bd24b1b2550c08f038f8e15644d2fc8a3f4e3
95 changes: 33 additions & 62 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import contextlib
import unittest
import os
import textwrap
Expand Down Expand Up @@ -218,13 +217,16 @@ def requires_subinterpreters(meth):

# Errors that can occur transiently when reading process memory without synchronization
RETRIABLE_ERRORS = (
"Bad address",
"Task list appears corrupted",
"Invalid linked list structure reading remote memory",
"Invalid string length",
"Unknown error reading memory",
"Unhandled frame owner",
"Failed to parse initial frame",
"Failed to process frame chain",
"Failed to unwind stack",
"process_vm_readv",
)


Expand Down Expand Up @@ -2275,10 +2277,8 @@ def _get_frames_with_retry(self, unwinder, required_funcs):
funcs = {f.funcname for f in thread.frame_info}
if required_funcs.issubset(funcs):
return thread.frame_info
except RuntimeError as e:
if _is_retriable_error(e):
pass
else:
except (OSError, RuntimeError) as e:
if not _is_retriable_error(e):
raise
time.sleep(0.1)
return None
Expand Down Expand Up @@ -2802,68 +2802,39 @@ def foo2():
make_unwinder,
):
unwinder = make_unwinder(cache_frames=True)
buffer = b""

def recv_msg():
"""Receive a single message from socket."""
nonlocal buffer
while b"\n" not in buffer:
chunk = client_socket.recv(256)
if not chunk:
return None
buffer += chunk
msg, buffer = buffer.split(b"\n", 1)
return msg

def get_thread_frames(target_funcs):
"""Get frames for thread matching target functions."""
for _ in range(self.MAX_TRIES):
try:
traces = unwinder.get_stack_trace()
except (RuntimeError, OSError):
time.sleep(0.1)
continue
all_threads = [t for i in traces for t in i.threads]
for thread in all_threads:
funcs = [f.funcname for f in thread.frame_info]
if any(f in funcs for f in target_funcs):
return funcs
time.sleep(0.1)
return None

# Message dispatch table: signal -> required functions for that thread
dispatch = {
b"t1:baz1": {"baz1", "bar1", "foo1"},
b"t2:baz2": {"baz2", "bar2", "foo2"},
b"t1:blech1": {"blech1", "foo1"},
b"t2:blech2": {"blech2", "foo2"},
}

# Track results for each sync point
results = {}

# Process 4 sync points: baz1, baz2, blech1, blech2
# With the lock, threads are serialized - handle one at a time
for _ in range(4):
msg = recv_msg()
self.assertIsNotNone(msg, "Expected message from subprocess")

# Determine which thread/function and take snapshot
if msg == b"t1:baz1":
funcs = get_thread_frames(["baz1", "bar1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at baz1")
results["t1:baz1"] = funcs
elif msg == b"t2:baz2":
funcs = get_thread_frames(["baz2", "bar2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at baz2")
results["t2:baz2"] = funcs
elif msg == b"t1:blech1":
funcs = get_thread_frames(["blech1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at blech1")
results["t1:blech1"] = funcs
elif msg == b"t2:blech2":
funcs = get_thread_frames(["blech2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at blech2")
results["t2:blech2"] = funcs

# Release thread to continue
# Process 4 sync points (order depends on thread scheduling)
buffer = _wait_for_signal(client_socket, b"\n")
for i in range(4):
# Extract first message from buffer
msg, sep, buffer = buffer.partition(b"\n")
self.assertIn(msg, dispatch, f"Unexpected message: {msg}")

# Sample frames for the thread at this sync point
required_funcs = dispatch[msg]
frames = self._get_frames_with_retry(unwinder, required_funcs)
self.assertIsNotNone(frames, f"Thread not found for {msg}")
results[msg] = [f.funcname for f in frames]

# Release thread and wait for next message (if not last)
client_socket.sendall(b"k")
if i < 3:
buffer += _wait_for_signal(client_socket, b"\n")

# Validate Phase 1: baz snapshots
t1_baz = results.get("t1:baz1")
t2_baz = results.get("t2:baz2")
t1_baz = results.get(b"t1:baz1")
t2_baz = results.get(b"t2:baz2")
self.assertIsNotNone(t1_baz, "Missing t1:baz1 snapshot")
self.assertIsNotNone(t2_baz, "Missing t2:baz2 snapshot")

Expand All @@ -2888,8 +2859,8 @@ def get_thread_frames(target_funcs):
self.assertNotIn("foo1", t2_baz)

# Validate Phase 2: blech snapshots (cache invalidation test)
t1_blech = results.get("t1:blech1")
t2_blech = results.get("t2:blech2")
t1_blech = results.get(b"t1:blech1")
t2_blech = results.get(b"t2:blech2")
self.assertIsNotNone(t1_blech, "Missing t1:blech1 snapshot")
self.assertIsNotNone(t2_blech, "Missing t2:blech2 snapshot")

Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.