Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions Lib/pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def master_open():
Open a pty master and return the fd, and the filename of the slave end.
Deprecated, use openpty() instead."""

import warnings
warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14

try:
master_fd, slave_fd = os.openpty()
except (AttributeError, OSError):
Expand Down Expand Up @@ -69,6 +72,9 @@ def slave_open(tty_name):
opened filedescriptor.
Deprecated, use openpty() instead."""

import warnings
warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14

result = os.open(tty_name, os.O_RDWR)
try:
from fcntl import ioctl, I_PUSH
Expand Down Expand Up @@ -101,32 +107,14 @@ def fork():
master_fd, slave_fd = openpty()
pid = os.fork()
if pid == CHILD:
# Establish a new session.
os.setsid()
os.close(master_fd)

# Slave becomes stdin/stdout/stderr of child.
os.dup2(slave_fd, STDIN_FILENO)
os.dup2(slave_fd, STDOUT_FILENO)
os.dup2(slave_fd, STDERR_FILENO)
if slave_fd > STDERR_FILENO:
os.close(slave_fd)

# Explicitly open the tty to make it become a controlling tty.
tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
os.close(tmp_fd)
os.login_tty(slave_fd)
else:
os.close(slave_fd)

# Parent and child process.
return pid, master_fd

def _writen(fd, data):
"""Write all the data to a descriptor."""
while data:
n = os.write(fd, data)
data = data[n:]

def _read(fd):
"""Default read function."""
return os.read(fd, 1024)
Expand All @@ -136,9 +124,42 @@ def _copy(master_fd, master_read=_read, stdin_read=_read):
Copies
pty master -> standard output (master_read)
standard input -> pty master (stdin_read)"""
fds = [master_fd, STDIN_FILENO]
while fds:
rfds, _wfds, _xfds = select(fds, [], [])
if os.get_blocking(master_fd):
# If we write more than tty/ndisc is willing to buffer, we may block
# indefinitely. So we set master_fd to non-blocking temporarily during
# the copy operation.
os.set_blocking(master_fd, False)
try:
_copy(master_fd, master_read=master_read, stdin_read=stdin_read)
finally:
# restore blocking mode for backwards compatibility
os.set_blocking(master_fd, True)
return
high_waterlevel = 4096
stdin_avail = master_fd != STDIN_FILENO
stdout_avail = master_fd != STDOUT_FILENO
i_buf = b''
o_buf = b''
while 1:
rfds = []
wfds = []
if stdin_avail and len(i_buf) < high_waterlevel:
rfds.append(STDIN_FILENO)
if stdout_avail and len(o_buf) < high_waterlevel:
rfds.append(master_fd)
if stdout_avail and len(o_buf) > 0:
wfds.append(STDOUT_FILENO)
if len(i_buf) > 0:
wfds.append(master_fd)

rfds, wfds, _xfds = select(rfds, wfds, [])

if STDOUT_FILENO in wfds:
try:
n = os.write(STDOUT_FILENO, o_buf)
o_buf = o_buf[n:]
except OSError:
stdout_avail = False

if master_fd in rfds:
# Some OSes signal EOF by returning an empty byte string,
Expand All @@ -150,19 +171,22 @@ def _copy(master_fd, master_read=_read, stdin_read=_read):
if not data: # Reached EOF.
return # Assume the child process has exited and is
# unreachable, so we clean up.
else:
os.write(STDOUT_FILENO, data)
o_buf += data

if master_fd in wfds:
n = os.write(master_fd, i_buf)
i_buf = i_buf[n:]

if STDIN_FILENO in rfds:
if stdin_avail and STDIN_FILENO in rfds:
data = stdin_read(STDIN_FILENO)
if not data:
fds.remove(STDIN_FILENO)
stdin_avail = False
else:
_writen(master_fd, data)
i_buf += data

def spawn(argv, master_read=_read, stdin_read=_read):
"""Create a spawned process."""
if type(argv) == type(''):
if isinstance(argv, str):
argv = (argv,)
sys.audit('pty.spawn', argv)

Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,7 @@ def child(wpipe):
expected = terminal_input.decode(sys.stdin.encoding) # what else?
self.assertEqual(input_result, expected)

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_input_tty(self):
# Test input() functionality when wired to a tty (the code path
# is different and invokes GNU readline if available).
Expand All @@ -2257,17 +2258,20 @@ def skip_if_readline(self):
self.skipTest("the readline module is loaded")

@unittest.skipUnless(hasattr(sys.stdin, 'detach'), 'TODO: RustPython: requires detach function in TextIOWrapper')
@unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux
def test_input_tty_non_ascii(self):
self.skip_if_readline()
# Check stdin/stdout encoding is used when invoking PyOS_Readline()
self.check_input_tty("prompté", b"quux\xe9", "utf-8")

@unittest.skipUnless(hasattr(sys.stdin, 'detach'), 'TODO: RustPython: requires detach function in TextIOWrapper')
@unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux
def test_input_tty_non_ascii_unicode_errors(self):
self.skip_if_readline()
# Check stdin/stdout error handler is used when invoking PyOS_Readline()
self.check_input_tty("prompté", b"quux\xe9", "ascii")

@unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: got 0 lines in pipe but expected 2, child output was: quux
def test_input_no_stdout_fileno(self):
# Issue #24402: If stdin is the original terminal but stdout.fileno()
# fails, do not use the original stdout file descriptor
Expand Down
Loading
Loading