Skip to content
Prev Previous commit
Next Next commit
Update test_logging.py from 3.13.7
  • Loading branch information
ShaharNaveh committed Sep 9, 2025
commit 1966c3eaa85cd9efbb020bcff67b6a8926062859
121 changes: 12 additions & 109 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,7 @@ class SMTPHandlerTest(BaseTest):
# bpo-14314, bpo-19665, bpo-34092: don't wait forever
TIMEOUT = support.LONG_TIMEOUT

# TODO: RUSTPYTHON
@unittest.skip(reason="RUSTPYTHON hangs")
@unittest.skip("TODO: RUSTPYTHON; hangs")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was added manually

def test_basic(self):
sockmap = {}
server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
Expand Down Expand Up @@ -2154,8 +2153,7 @@ def handle_request(self, request):
request.end_headers()
self.handled.set()

# TODO: RUSTPYTHON
@unittest.skip("TODO: RUSTPYTHON; flaky test")
@unittest.skip('TODO: RUSTPYTHON; flaky test')
def test_output(self):
# The log message sent to the HTTPHandler is properly received.
logger = logging.getLogger("http")
Expand Down Expand Up @@ -4060,8 +4058,7 @@ def _mpinit_issue121723(qspec, message_to_log):
# log a message (this creates a record put in the queue)
logging.getLogger().info(message_to_log)

# TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
@skip_if_tsan_fork
@support.requires_subprocess()
def test_multiprocessing_queues(self):
Expand Down Expand Up @@ -4121,8 +4118,7 @@ def test_90195(self):
# Logger should be enabled, since explicitly mentioned
self.assertFalse(logger.disabled)

# TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
def test_111615(self):
# See gh-111615
import_helper.import_module('_multiprocessing') # see gh-113692
Expand Down Expand Up @@ -4171,91 +4167,6 @@ def __init__(self, *args, **kwargs):
handler = logging.getHandlerByName('custom')
self.assertEqual(handler.custom_kwargs, custom_kwargs)

# TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
@unittest.expectedFailure
# See gh-91555 and gh-90321
@support.requires_subprocess()
def test_deadlock_in_queue(self):
queue = multiprocessing.Queue()
handler = logging.handlers.QueueHandler(queue)
logger = multiprocessing.get_logger()
level = logger.level
try:
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.debug("deadlock")
finally:
logger.setLevel(level)
logger.removeHandler(handler)

def test_recursion_in_custom_handler(self):
class BadHandler(logging.Handler):
def __init__(self):
super().__init__()
def emit(self, record):
logger.debug("recurse")
logger = logging.getLogger("test_recursion_in_custom_handler")
logger.addHandler(BadHandler())
logger.setLevel(logging.DEBUG)
logger.debug("boom")

@threading_helper.requires_working_threading()
def test_thread_supression_noninterference(self):
lock = threading.Lock()
logger = logging.getLogger("test_thread_supression_noninterference")

# Block on the first call, allow others through
#
# NOTE: We need to bypass the base class's lock, otherwise that will
# block multiple calls to the same handler itself.
class BlockOnceHandler(TestHandler):
def __init__(self, barrier):
super().__init__(support.Matcher())
self.barrier = barrier

def createLock(self):
self.lock = None

def handle(self, record):
self.emit(record)

def emit(self, record):
if self.barrier:
barrier = self.barrier
self.barrier = None
barrier.wait()
with lock:
pass
super().emit(record)
logger.info("blow up if not supressed")

barrier = threading.Barrier(2)
handler = BlockOnceHandler(barrier)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

t1 = threading.Thread(target=logger.debug, args=("1",))
with lock:

# Ensure first thread is blocked in the handler, hence supressing logging...
t1.start()
barrier.wait()

# ...but the second thread should still be able to log...
t2 = threading.Thread(target=logger.debug, args=("2",))
t2.start()
t2.join(timeout=3)

self.assertEqual(len(handler.buffer), 1)
self.assertTrue(handler.matches(levelno=logging.DEBUG, message='2'))

# The first thread should still be blocked here
self.assertTrue(t1.is_alive())

# Now the lock has been released the first thread should complete
t1.join()
self.assertEqual(len(handler.buffer), 2)
self.assertTrue(handler.matches(levelno=logging.DEBUG, message='1'))

class ManagerTest(BaseTest):
def test_manager_loggerclass(self):
Expand Down Expand Up @@ -4663,8 +4574,7 @@ def test_dollars(self):
f = logging.Formatter('${asctime}--', style='$')
self.assertTrue(f.usesTime())

# TODO: RUSTPYTHON; ValueError: Unexpected error parsing format string
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; ValueError: Unexpected error parsing format string
def test_format_validate(self):
# Check correct formatting
# Percentage style
Expand Down Expand Up @@ -4838,8 +4748,7 @@ def test_defaults_parameter(self):
def test_invalid_style(self):
self.assertRaises(ValueError, logging.Formatter, None, None, 'x')

# TODO: RUSTPYTHON; AttributeError: 'struct_time' object has no attribute 'tm_gmtoff'
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: 'struct_time' object has no attribute 'tm_gmtoff'
def test_time(self):
r = self.get_record()
dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
Expand All @@ -4854,8 +4763,7 @@ def test_time(self):
f.format(r)
self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')

# TODO: RUSTPYTHON; AttributeError: 'struct_time' object has no attribute 'tm_gmtoff'
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: 'struct_time' object has no attribute 'tm_gmtoff'
def test_default_msec_format_none(self):
class NoMsecFormatter(logging.Formatter):
default_msec_format = None
Expand Down Expand Up @@ -5257,8 +5165,7 @@ def __init__(self, name='MyLogger', level=logging.NOTSET):
h.close()
logging.setLoggerClass(logging.Logger)

# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_logging_at_shutdown(self):
# bpo-20037: Doing text I/O late at interpreter shutdown must not crash
code = textwrap.dedent("""
Expand All @@ -5278,8 +5185,7 @@ def __del__(self):
self.assertIn("exception in __del__", err)
self.assertIn("ValueError: some error", err)

# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_logging_at_shutdown_open(self):
# bpo-26789: FileHandler keeps a reference to the builtin open()
# function to be able to open or reopen the file during Python
Expand Down Expand Up @@ -6480,8 +6386,7 @@ def rotator(source, dest):
rh.close()

class TimedRotatingFileHandlerTest(BaseFileTest):
# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.expectedFailureIfWindows('TODO: RUSTPYTHON')
@unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.")
def test_should_not_rollover(self):
# See bpo-45401. Should only ever rollover regular files
Expand Down Expand Up @@ -6535,8 +6440,7 @@ def test_rollover(self):
print(tf.read())
self.assertTrue(found, msg=msg)

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.expectedFailureIfWindows('TODO: RUSTPYTHON')
def test_rollover_at_midnight(self, weekly=False):
os_helper.unlink(self.fn)
now = datetime.datetime.now()
Expand Down Expand Up @@ -6580,8 +6484,7 @@ def test_rollover_at_midnight(self, weekly=False):
for i, line in enumerate(f):
self.assertIn(f'testing1 {i}', line)

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.expectedFailureIfWindows('TODO: RUSTPYTHON')
def test_rollover_at_weekday(self):
self.test_rollover_at_midnight(weekly=True)

Expand Down
Loading