Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@
"birthtime",
"IFEXEC",
// "stat"
"FIRMLINK"
"FIRMLINK",
// CPython internal names
"sysdict",
"settraceallthreads",
"setprofileallthreads"
],
// flagWords - list of words to be always considered incorrect
"flagWords": [
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env:
# test_posixpath: OSError: (22, 'The filename, directory name, or volume label syntax is incorrect. (os error 123)')
# test_venv: couple of failing tests
WINDOWS_SKIPS: >-
test_asyncio
test_glob
test_rlcompleter
test_pathlib
Expand Down
12 changes: 12 additions & 0 deletions Lib/test/test_asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os
from test import support
from test.support import load_package_tests
from test.support import import_helper

support.requires_working_socket(module=True)

# Skip tests if we don't have concurrent.futures.
import_helper.import_module('concurrent.futures')

def load_tests(*args):
return load_package_tests(os.path.dirname(__file__), *args)
4 changes: 4 additions & 0 deletions Lib/test/test_asyncio/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import load_tests
import unittest

unittest.main()
8 changes: 8 additions & 0 deletions Lib/test/test_asyncio/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os

if __name__ == '__main__':
while True:
buf = os.read(0, 1024)
if not buf:
break
os.write(1, buf)
6 changes: 6 additions & 0 deletions Lib/test/test_asyncio/echo2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import os

if __name__ == '__main__':
buf = os.read(0, 1024)
os.write(1, b'OUT:'+buf)
os.write(2, b'ERR:'+buf)
11 changes: 11 additions & 0 deletions Lib/test/test_asyncio/echo3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

if __name__ == '__main__':
while True:
buf = os.read(0, 1024)
if not buf:
break
try:
os.write(1, b'OUT:'+buf)
except OSError as ex:
os.write(2, b'ERR:' + ex.__class__.__name__.encode('ascii'))
269 changes: 269 additions & 0 deletions Lib/test/test_asyncio/functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import asyncio
import asyncio.events
import contextlib
import os
import pprint
import select
import socket
import tempfile
import threading
from test import support


class FunctionalTestCaseMixin:

def new_loop(self):
return asyncio.new_event_loop()

def run_loop_briefly(self, *, delay=0.01):
self.loop.run_until_complete(asyncio.sleep(delay))

def loop_exception_handler(self, loop, context):
self.__unhandled_exceptions.append(context)
self.loop.default_exception_handler(context)

def setUp(self):
self.loop = self.new_loop()
asyncio.set_event_loop(None)

self.loop.set_exception_handler(self.loop_exception_handler)
self.__unhandled_exceptions = []

def tearDown(self):
try:
self.loop.close()

if self.__unhandled_exceptions:
print('Unexpected calls to loop.call_exception_handler():')
pprint.pprint(self.__unhandled_exceptions)
self.fail('unexpected calls to loop.call_exception_handler()')

finally:
asyncio.set_event_loop(None)
self.loop = None

def tcp_server(self, server_prog, *,
family=socket.AF_INET,
addr=None,
timeout=support.LOOPBACK_TIMEOUT,
backlog=1,
max_clients=10):

if addr is None:
if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
with tempfile.NamedTemporaryFile() as tmp:
addr = tmp.name
else:
addr = ('127.0.0.1', 0)

sock = socket.create_server(addr, family=family, backlog=backlog)
if timeout is None:
raise RuntimeError('timeout is required')
if timeout <= 0:
raise RuntimeError('only blocking sockets are supported')
sock.settimeout(timeout)

return TestThreadedServer(
self, sock, server_prog, timeout, max_clients)

def tcp_client(self, client_prog,
family=socket.AF_INET,
timeout=support.LOOPBACK_TIMEOUT):

sock = socket.socket(family, socket.SOCK_STREAM)

if timeout is None:
raise RuntimeError('timeout is required')
if timeout <= 0:
raise RuntimeError('only blocking sockets are supported')
sock.settimeout(timeout)

return TestThreadedClient(
self, sock, client_prog, timeout)

def unix_server(self, *args, **kwargs):
if not hasattr(socket, 'AF_UNIX'):
raise NotImplementedError
return self.tcp_server(*args, family=socket.AF_UNIX, **kwargs)

def unix_client(self, *args, **kwargs):
if not hasattr(socket, 'AF_UNIX'):
raise NotImplementedError
return self.tcp_client(*args, family=socket.AF_UNIX, **kwargs)

@contextlib.contextmanager
def unix_sock_name(self):
with tempfile.TemporaryDirectory() as td:
fn = os.path.join(td, 'sock')
try:
yield fn
finally:
try:
os.unlink(fn)
except OSError:
pass

def _abort_socket_test(self, ex):
try:
self.loop.stop()
finally:
self.fail(ex)


##############################################################################
# Socket Testing Utilities
##############################################################################


class TestSocketWrapper:

def __init__(self, sock):
self.__sock = sock

def recv_all(self, n):
buf = b''
while len(buf) < n:
data = self.recv(n - len(buf))
if data == b'':
raise ConnectionAbortedError
buf += data
return buf

def start_tls(self, ssl_context, *,
server_side=False,
server_hostname=None):

ssl_sock = ssl_context.wrap_socket(
self.__sock, server_side=server_side,
server_hostname=server_hostname,
do_handshake_on_connect=False)

try:
ssl_sock.do_handshake()
except:
ssl_sock.close()
raise
finally:
self.__sock.close()

self.__sock = ssl_sock

def __getattr__(self, name):
return getattr(self.__sock, name)

def __repr__(self):
return '<{} {!r}>'.format(type(self).__name__, self.__sock)


class SocketThread(threading.Thread):

def stop(self):
self._active = False
self.join()

def __enter__(self):
self.start()
return self

def __exit__(self, *exc):
self.stop()


class TestThreadedClient(SocketThread):

def __init__(self, test, sock, prog, timeout):
threading.Thread.__init__(self, None, None, 'test-client')
self.daemon = True

self._timeout = timeout
self._sock = sock
self._active = True
self._prog = prog
self._test = test

def run(self):
try:
self._prog(TestSocketWrapper(self._sock))
except Exception as ex:
self._test._abort_socket_test(ex)


class TestThreadedServer(SocketThread):

def __init__(self, test, sock, prog, timeout, max_clients):
threading.Thread.__init__(self, None, None, 'test-server')
self.daemon = True

self._clients = 0
self._finished_clients = 0
self._max_clients = max_clients
self._timeout = timeout
self._sock = sock
self._active = True

self._prog = prog

self._s1, self._s2 = socket.socketpair()
self._s1.setblocking(False)

self._test = test

def stop(self):
try:
if self._s2 and self._s2.fileno() != -1:
try:
self._s2.send(b'stop')
except OSError:
pass
finally:
super().stop()

def run(self):
try:
with self._sock:
self._sock.setblocking(False)
self._run()
finally:
self._s1.close()
self._s2.close()

def _run(self):
while self._active:
if self._clients >= self._max_clients:
return

r, w, x = select.select(
[self._sock, self._s1], [], [], self._timeout)

if self._s1 in r:
return

if self._sock in r:
try:
conn, addr = self._sock.accept()
except BlockingIOError:
continue
except TimeoutError:
if not self._active:
return
else:
raise
else:
self._clients += 1
conn.settimeout(self._timeout)
try:
with conn:
self._handle_client(conn)
except Exception as ex:
self._active = False
try:
raise
finally:
self._test._abort_socket_test(ex)

def _handle_client(self, sock):
self._prog(TestSocketWrapper(sock))

@property
def addr(self):
return self._sock.getsockname()
Loading
Loading