Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions Lib/test/test_multiprocessing_fork/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,6 @@

install_tests_in_module_dict(globals(), 'fork', only_type="processes")

import os, sys # TODO: RUSTPYTHON
class WithProcessesTestCondition(WithProcessesTestCondition): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_notify_all(self): super().test_notify_all() # TODO: RUSTPYTHON

class WithProcessesTestLock(WithProcessesTestLock): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
def test_repr_lock(self): super().test_repr_lock() # TODO: RUSTPYTHON

class WithProcessesTestManagerRestart(WithProcessesTestManagerRestart): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
def test_rapid_restart(self): super().test_rapid_restart() # TODO: RUSTPYTHON

class WithProcessesTestProcess(WithProcessesTestProcess): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_args_argument(self): super().test_args_argument() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_process(self): super().test_process() # TODO: RUSTPYTHON

class WithProcessesTestPoolWorkerLifetime(WithProcessesTestPoolWorkerLifetime): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_pool_worker_lifetime(self): super().test_pool_worker_lifetime() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_pool_worker_lifetime_early_close(self): super().test_pool_worker_lifetime_early_close() # TODO: RUSTPYTHON

class WithProcessesTestQueue(WithProcessesTestQueue): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_fork(self): super().test_fork() # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
def test_get(self): super().test_get() # TODO: RUSTPYTHON

class WithProcessesTestSharedMemory(WithProcessesTestSharedMemory): # TODO: RUSTPYTHON
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
def test_shared_memory_SharedMemoryManager_basics(self): super().test_shared_memory_SharedMemoryManager_basics() # TODO: RUSTPYTHON

if __name__ == '__main__':
unittest.main()
25 changes: 25 additions & 0 deletions crates/vm/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,31 @@ impl ToPyObject for PyCodec {
}

impl CodecsRegistry {
/// Force-unlock the inner RwLock after fork in the child process.
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold this lock.
#[cfg(all(unix, feature = "host_env"))]
pub(crate) unsafe fn force_unlock_after_fork(&self) {
if self.inner.try_write().is_some() {
return;
}
let is_shared = self.inner.try_read().is_some();
if is_shared {
loop {
// SAFETY: Lock is shared-locked by dead thread(s).
unsafe { self.inner.force_unlock_read() };
if self.inner.try_write().is_some() {
return;
}
}
} else {
// SAFETY: Lock is exclusively locked by a dead thread.
unsafe { self.inner.force_unlock_write() };
}
}

pub(crate) fn new(ctx: &Context) -> Self {
::rustpython_vm::common::static_cell! {
static METHODS: Box<[PyMethodDef]>;
Expand Down
26 changes: 26 additions & 0 deletions crates/vm/src/intern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ impl Clone for StringPool {
}

impl StringPool {
/// Force-unlock the inner RwLock after fork in the child process.
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold this lock.
#[cfg(all(unix, feature = "host_env"))]
pub(crate) unsafe fn force_unlock_after_fork(&self) {
if self.inner.try_write().is_some() {
return;
}
// Lock is stuck from a thread that no longer exists.
let is_shared = self.inner.try_read().is_some();
if is_shared {
loop {
// SAFETY: Lock is shared-locked by dead thread(s).
unsafe { self.inner.force_unlock_read() };
if self.inner.try_write().is_some() {
return;
}
}
} else {
// SAFETY: Lock is exclusively locked by a dead thread.
unsafe { self.inner.force_unlock_write() };
}
}

#[inline]
pub unsafe fn intern<S: InternableString>(
&self,
Expand Down
37 changes: 28 additions & 9 deletions crates/vm/src/stdlib/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,22 @@ pub mod module {
#[cfg(feature = "threading")]
crate::object::reset_weakref_locks_after_fork();

// Force-unlock all global VM locks that may have been held by
// threads that no longer exist in the child process after fork.
// SAFETY: After fork, only the forking thread survives. Any lock
// held by another thread is permanently stuck. The forking thread
// does not hold these locks during fork() (a high-level Python op).
unsafe {
vm.ctx.string_pool.force_unlock_after_fork();
vm.state.codec_registry.force_unlock_after_fork();
force_unlock_mutex_after_fork(&vm.state.atexit_funcs);
force_unlock_mutex_after_fork(&vm.state.before_forkers);
force_unlock_mutex_after_fork(&vm.state.after_forkers_child);
force_unlock_mutex_after_fork(&vm.state.after_forkers_parent);
force_unlock_mutex_after_fork(&vm.state.global_trace_func);
force_unlock_mutex_after_fork(&vm.state.global_profile_func);
}

// Mark all other threads as done before running Python callbacks
#[cfg(feature = "threading")]
crate::stdlib::thread::after_fork_child(vm);
Expand All @@ -716,18 +732,21 @@ pub mod module {
vm.signal_handlers
.get_or_init(crate::signal::new_signal_handlers);

let after_forkers_child = match vm.state.after_forkers_child.try_lock() {
Some(guard) => guard.clone(),
None => {
// SAFETY: After fork in child process, only the current thread
// exists. The lock holder no longer exists.
unsafe { vm.state.after_forkers_child.force_unlock() };
vm.state.after_forkers_child.lock().clone()
}
};
let after_forkers_child: Vec<PyObjectRef> = vm.state.after_forkers_child.lock().clone();
run_at_forkers(after_forkers_child, false, vm);
}

/// Force-unlock a PyMutex if held by a dead thread after fork.
///
/// # Safety
/// Must only be called after fork() in the child process.
unsafe fn force_unlock_mutex_after_fork<T>(mutex: &crate::common::lock::PyMutex<T>) {
if mutex.try_lock().is_none() {
// SAFETY: Lock is held by a dead thread after fork.
unsafe { mutex.force_unlock() };
}
}

fn py_os_after_fork_parent(vm: &VirtualMachine) {
let after_forkers_parent: Vec<PyObjectRef> = vm.state.after_forkers_parent.lock().clone();
run_at_forkers(after_forkers_parent, false, vm);
Expand Down
3 changes: 1 addition & 2 deletions crates/vm/src/stdlib/sys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,7 @@ mod sys {
Ok(exc) => {
// PyErr_Display: try traceback._print_exception_bltin first
if let Ok(tb_mod) = vm.import("traceback", 0)
&& let Ok(print_exc_builtin) =
tb_mod.get_attr("_print_exception_bltin", vm)
&& let Ok(print_exc_builtin) = tb_mod.get_attr("_print_exception_bltin", vm)
&& print_exc_builtin
.call((exc.as_object().to_owned(),), vm)
.is_ok()
Expand Down
23 changes: 14 additions & 9 deletions crates/vm/src/stdlib/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,13 @@ pub(crate) mod _thread {
// Reinitialize frame slot for current thread
crate::vm::thread::reinit_frame_slot_after_fork(vm);

// Clean up thread handles if we can acquire the lock.
// Use try_lock because the mutex might have been held during fork.
// If we can't acquire it, just skip - the child process will work
// correctly with new handles it creates.
if let Some(mut handles) = vm.state.thread_handles.try_lock() {
// Clean up thread handles. Force-unlock if held by a dead thread.
// SAFETY: After fork, only the current thread exists.
if vm.state.thread_handles.try_lock().is_none() {
unsafe { vm.state.thread_handles.force_unlock() };
}
{
let mut handles = vm.state.thread_handles.lock();
// Clean up dead weak refs and mark non-current threads as done
handles.retain(|(inner_weak, done_event_weak): &HandleEntry| {
let Some(inner) = inner_weak.upgrade() else {
Expand Down Expand Up @@ -957,10 +959,13 @@ pub(crate) mod _thread {
});
}

// Clean up shutdown_handles as well.
// This is critical to prevent _shutdown() from waiting on threads
// that don't exist in the child process after fork.
if let Some(mut handles) = vm.state.shutdown_handles.try_lock() {
// Clean up shutdown_handles. Force-unlock if held by a dead thread.
// SAFETY: After fork, only the current thread exists.
if vm.state.shutdown_handles.try_lock().is_none() {
unsafe { vm.state.shutdown_handles.force_unlock() };
}
{
let mut handles = vm.state.shutdown_handles.lock();
// Mark all non-current threads as done in shutdown_handles
handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| {
let Some(inner) = inner_weak.upgrade() else {
Expand Down
Loading