def __enter__(self):
# Save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# New terminal setting unbuffered
self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
return self
python类ICANON的实例源码
def setup(self):
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def sendeof(self):
"""This sends an EOF to the child. This sends a character which causes
the pending parent output buffer to be sent to the waiting child
program without waiting for end-of-line. If it is the first character
of the line, the read() in the user program returns 0, which signifies
end-of-file. This means to work as expected a sendeof() has to be
called at the beginning of a line. This method does not send a newline.
It is the responsibility of the caller to ensure the eof is sent at the
beginning of a line. """
### Hmmm... how do I send an EOF?
###C if ((m = write(pty, *buf, p - *buf)) < 0)
###C return (errno == EWOULDBLOCK) ? n : -1;
#fd = sys.stdin.fileno()
#old = termios.tcgetattr(fd) # remember current state
#attr = termios.tcgetattr(fd)
#attr[3] = attr[3] | termios.ICANON # ICANON must be set to recognize EOF
#try: # use try/finally to ensure state gets restored
# termios.tcsetattr(fd, termios.TCSADRAIN, attr)
# if hasattr(termios, 'CEOF'):
# os.write (self.child_fd, '%c' % termios.CEOF)
# else:
# # Silly platform does not define CEOF so assume CTRL-D
# os.write (self.child_fd, '%c' % 4)
#finally: # restore state
# termios.tcsetattr(fd, termios.TCSADRAIN, old)
if hasattr(termios, 'VEOF'):
char = termios.tcgetattr(self.child_fd)[6][termios.VEOF]
else:
# platform does not define VEOF so assume CTRL-D
char = chr(4)
self.send(char)
def __init__(self):
global CIO_STARTED
if CIO_STARTED:
raise Exception("cannot init cio twice")
CIO_STARTED = True
self.buf=''
self.fh_ocfg=list()
if os.name == 'posix':
# for posix, need to set to non-canonical input.
self.posix=True
fh = sys.stdin.fileno()
# if the following call fails, we are probably called with a stdin which is not a tty.
ocfg = termios.tcgetattr(fh)
cfg = termios.tcgetattr(fh)
cfg[3] = cfg[3]&~termios.ICANON&~termios.ECHO
#cfg[0] = cfg[0]&~termios.INLCR
#cfg[1] = cfg[0]&~termios.OCRNL
cfg[6][termios.VMIN] = 0
cfg[6][termios.VTIME] = 0
termios.tcsetattr(fh,termios.TCSAFLUSH,cfg)
self.fh_ocfg.extend((fh,ocfg))
atexit.register(stop_canon_input,self.fh_ocfg)
elif os.name == 'nt':
# for windows, don't need to configure the terminal.
self.posix=False
else:
# know only posix and windows...
raise Exception("os variant %s not supported"%repr(os.name))
def setup(self):
self.old = termios.tcgetattr(self.fd)
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def run(self):
if os.name != "posix":
raise ValueError("Can only be run on Posix systems")
return
buffer = ""
# While PySerial would be preferable and more machine-independant,
# it does not support echo suppression
with open(self.dbgser_tty_name, 'r+') as dbgser:
# Config the debug serial port
oldattrs = termios.tcgetattr(dbgser)
newattrs = termios.tcgetattr(dbgser)
newattrs[4] = termios.B115200 # ispeed
newattrs[5] = termios.B115200 # ospeed
newattrs[3] = newattrs[3] & ~termios.ICANON & ~termios.ECHO
newattrs[6][termios.VMIN] = 0
newattrs[6][termios.VTIME] = 10
termios.tcsetattr(dbgser, termios.TCSANOW, newattrs)
# As long as we weren't asked to stop, try to capture dbgserial
# output and push each line up the result queue.
try:
while not self.stoprequest.isSet():
ch = dbgser.read(1)
if ch:
if ch == "\n":
# Push the line (sans newline) into our queue
# and clear the buffer for the next line.
self.result_q.put(buffer)
buffer = ""
elif ch != "\r":
buffer += ch
except IOError:
pass
finally:
# Restore previous settings
termios.tcsetattr(dbgser, termios.TCSAFLUSH, oldattrs)
# Flush any partial buffer
if buffer:
self.result_q.put(buffer)
def _patch_lflag(cls, attrs):
return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
def _patch_lflag(cls, attrs):
return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
def prepare_tty():
"set the terminal in char mode (return each keyboard press at once) and"\
" switch off echoing of this input; return the original settings"
stdin_fd = sys.stdin.fileno() # will most likely be 0 ;->
old_stdin_config = termios.tcgetattr(stdin_fd)
[ iflag, oflag, cflag, lflag, ispeed, ospeed, cc ] = \
termios.tcgetattr(stdin_fd)
cc[termios.VTIME] = 1
cc[termios.VMIN] = 1
iflag = iflag & ~(termios.IGNBRK |
termios.BRKINT |
termios.PARMRK |
termios.ISTRIP |
termios.INLCR |
termios.IGNCR |
#termios.ICRNL |
termios.IXON)
# oflag = oflag & ~termios.OPOST
cflag = cflag | termios.CS8
lflag = lflag & ~(termios.ECHO |
termios.ECHONL |
termios.ICANON |
# termios.ISIG |
termios.IEXTEN)
termios.tcsetattr(stdin_fd, termios.TCSANOW,
[ iflag, oflag, cflag, lflag, ispeed, ospeed, cc ])
return (stdin_fd, old_stdin_config)
def setup(self):
self.old = termios.tcgetattr(self.fd)
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def setup(self):
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def __enter__(self):
# save the terminal settings
self.fd = sys.stdin.fileno()
self.new_term = termios.tcgetattr(self.fd)
self.old_term = termios.tcgetattr(self.fd)
# new terminal setting unbuffered
self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
# switch to unbuffered terminal
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)
return self.query_keyboard
def _fix_tty(self):
"""Set suitable tty options
"""
assert self.tcattr is not None
iflag, oflag, cflag, lflag, ispeed, ospeed, chars = self.tcattr # pylint:disable=unpacking-non-sequence
# equivalent to cfmakeraw
iflag &= ~(termios.IGNBRK | termios.BRKINT | termios.PARMRK | termios.ISTRIP | termios.INLCR |
termios.IGNCR | termios.ICRNL | termios.IXON)
oflag &= ~termios.OPOST
lflag &= ~(termios.ECHO | termios.ECHONL | termios.ICANON | termios.ISIG | termios.IEXTEN)
cflag &= ~(termios.CSIZE | termios.PARENB)
cflag |= termios.CS8
termios.tcsetattr(STDIN, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, chars])
def _patch_lflag(cls, attrs):
return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
def setup(self):
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def enable():
global old_tc
# set terminal attributes
new = termios.tcgetattr(sys.stdin.fileno())
if old_tc == None:
old_tc = new[:]
new[3] = new[3] & ~termios.ECHO
new[3] = new[3] & ~termios.ICANON
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, new)
del new
def prepare(self):
# per-readline preparations:
self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &=~ (termios.BRKINT | termios.INPCK |
termios.ISTRIP | termios.IXON)
raw.oflag &=~ (termios.OPOST)
raw.cflag &=~ (termios.CSIZE|termios.PARENB)
raw.cflag |= (termios.CS8)
raw.lflag &=~ (termios.ICANON|termios.ECHO|
termios.IEXTEN|(termios.ISIG*1))
raw.cc[termios.VMIN] = 1
raw.cc[termios.VTIME] = 0
tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
self.screen = []
self.height, self.width = self.getheightwidth()
self.__buffer = []
self.__posxy = 0, 0
self.__gone_tall = 0
self.__move = self.__move_short
self.__offset = 0
self.__maybe_write_code(self._smkx)
try:
self.old_sigwinch = signal.signal(
signal.SIGWINCH, self.__sigwinch)
except ValueError:
pass
def setup(self):
self.old = termios.tcgetattr(self.fd)
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def warning(message, answer=None):
"""
Print warning message into srdErr and may can for answer
:param message: message
:param answer: list of supported options. Default is first item.
"""
c = ""
if sys.stderr.isatty():
sys.stderr.write("\n\x1b[92;01m%s " % message)
else:
sys.stderr.write("\n%s " % message)
if answer:
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
newattr = termios.tcgetattr(fd)
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, newattr)
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
try:
while 1:
try:
c = sys.stdin.read(1)
break
except IOError:
pass
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
c = ("%s" % c).lower()
if sys.stderr.isatty():
sys.stderr.write(" %s\x1b[39;49;00m\n\n" % c)
else:
sys.stderr.write(" %s\n\n" % c)
if answer:
for it in answer:
if c in it:
return c
return answer.pop(0)
def setup(self):
new = termios.tcgetattr(self.fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)