def callback(_log, exit_on_error=False):
"""Helper to call the appropriate logging level"""
log = logging.getLogger("iocage")
level = _log["level"]
message = _log["message"]
if level == 'CRITICAL':
log.critical(message)
elif level == 'ERROR':
log.error(message)
elif level == 'WARNING':
log.warning(message)
elif level == 'INFO':
log.info(message)
elif level == 'DEBUG':
log.debug(message)
elif level == 'VERBOSE':
log.log(15, message)
elif level == 'NOTICE':
log.log(25, message)
elif level == 'EXCEPTION':
try:
if not os.isatty(sys.stdout.fileno()) and not exit_on_error:
raise RuntimeError(message)
else:
log.error(message)
raise SystemExit(1)
except AttributeError:
# They are lacking the fileno object
raise RuntimeError(message)
python类isatty()的实例源码
def is_a_tty():
try:
return os.isatty(sys.stdout.fileno())
except Exception:
return False
def build(containers, service_names, **kwargs):
monochrome = not os.isatty(1)
presenters = build_log_presenters(service_names, monochrome)
printer = LogPrinter(containers, presenters, **kwargs)
printer.start()
def get_terminal_height(fd=1):
"""
Returns height of terminal if it is a tty, 999 otherwise
:param fd: file descriptor (default: 1=stdout)
"""
if os.isatty(fd):
height = os.get_terminal_size(fd)[1]
else:
height = 50
return height
def get_terminal_width(fd=1):
"""
Returns width of terminal if it is a tty, 999 otherwise
:param fd: file descriptor (default: 1=stdout)
"""
if os.isatty(fd):
width = os.get_terminal_size(fd)[0]
else:
width = 50
return width
def _check_interactive(*descriptors):
for desc in descriptors:
try:
if not isatty(desc.fileno()):
return False
except Exception:
# Anything broken we are going to pretend this is not
# interactive
return False
return True # pragma: no cover
def _provide_processing_entry_feedback(self, entry):
self.processed_entries += 1
if os.isatty(sys.stdout.fileno()):
if self.processed_entries > 1:
sys.stdout.write("\033[F")
sys.stdout.write("\r\033[KProcessing entry {}/{} ({})...\n".format(
self.processed_entries,
self.expected_entries,
sanitise(entry)
))
def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
result = None
if sys.__stdin__.isatty():
do_prompt = self.prompt
if prompt and default is not None:
msg = "%s [%s]: " % (prompt, default)
elif prompt:
msg = "%s: " % prompt
else:
msg = 'input for %s: ' % varname
if confirm:
while True:
result = do_prompt(msg, private)
second = do_prompt("confirm " + msg, private)
if result == second:
break
self.display("***** VALUES ENTERED DO NOT MATCH ****")
else:
result = do_prompt(msg, private)
else:
result = None
self.warning("Not prompting as we are not in interactive mode")
# if result is false and default is not None
if not result and default is not None:
result = default
if encrypt:
# Circular import because encrypt needs a display class
from ansible.utils.encrypt import do_encrypt
result = do_encrypt(result, encrypt, salt_size, salt)
# handle utf-8 chars
result = to_text(result, errors='surrogate_or_strict')
return result
def _set_column_width(self):
if os.isatty(0):
tty_size = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
else:
tty_size = 0
self.columns = max(79, tty_size)
def main(server, eventHandler, params):
if not os.isatty(sys.stdout.fileno()):
print("FATAL: Unable to run 'ncurses' UI without a TTY.")
return
ui = NCursesUI()
try:
curses.wrapper(ui.main, server, eventHandler, params)
except:
import traceback
traceback.print_exc()
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
tty(-like) device, else False.
On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
the child pty may not appear as a terminal device. This means
methods such as setecho(), setwinsize(), getwinsize() may raise an
IOError. '''
return os.isatty(self.child_fd)
def getecho(self):
'''This returns the terminal echo mode. This returns True if echo is
on or False if echo is off. Child applications that are expecting you
to enter a password often set ECHO False. See waitnoecho().
Not supported on platforms where ``isatty()`` returns False. '''
return self.ptyproc.getecho()
def setecho(self, state):
'''This sets the terminal echo mode on or off. Note that anything the
child sent before the echo will be lost, so you should be sure that
your input buffer is empty before you call setecho(). For example, the
following will work as expected::
p = pexpect.spawn('cat') # Echo is on by default.
p.sendline('1234') # We expect see this twice from the child...
p.expect(['1234']) # ... once from the tty echo...
p.expect(['1234']) # ... and again from cat itself.
p.setecho(False) # Turn off tty echo
p.sendline('abcd') # We will set this only once (echoed by cat).
p.sendline('wxyz') # We will set this only once (echoed by cat)
p.expect(['abcd'])
p.expect(['wxyz'])
The following WILL NOT WORK because the lines sent before the setecho
will be lost::
p = pexpect.spawn('cat')
p.sendline('1234')
p.setecho(False) # Turn off tty echo
p.sendline('abcd') # We will set this only once (echoed by cat).
p.sendline('wxyz') # We will set this only once (echoed by cat)
p.expect(['1234'])
p.expect(['1234'])
p.expect(['abcd'])
p.expect(['wxyz'])
Not supported on platforms where ``isatty()`` returns False.
'''
return self.ptyproc.setecho(state)
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
tty(-like) device, else False.
On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
the child pty may not appear as a terminal device. This means
methods such as setecho(), setwinsize(), getwinsize() may raise an
IOError. '''
return os.isatty(self.fd)
def link(source, target):
"""link(source, target) -> None
Attempt to hard link the source file to the target file name.
On OS/2, this creates a complete copy of the source file.
"""
s = os.open(source, os.O_RDONLY | os.O_BINARY)
if os.isatty(s):
raise OSError(errno.EXDEV, 'Cross-device link')
data = os.read(s, 1024)
try:
t = os.open(target, os.O_WRONLY | os.O_BINARY | os.O_CREAT | os.O_EXCL)
except OSError:
os.close(s)
raise
try:
while data:
os.write(t, data)
data = os.read(s, 1024)
except OSError:
os.close(s)
os.close(t)
os.unlink(target)
raise
os.close(s)
os.close(t)
def test_isatty(self):
if hasattr(os, "isatty"):
self.assertEqual(os.isatty(support.make_bad_fd()), False)
def setUp(self):
# isatty() and close() can hang on some platforms. Set an alarm
# before running the test to make sure we don't hang forever.
self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
signal.alarm(10)
def handle_sig(self, sig, frame):
self.fail("isatty hung")
def setup_tty(self):
if os.isatty(sys.stdin.fileno()):
LOG.debug('putting tty into raw mode')
self.old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin)
def restore_tty(self):
if os.isatty(sys.stdin.fileno()):
LOG.debug('restoring tty configuration')
termios.tcsetattr(sys.stdin, termios.TCSADRAIN,
self.old_settings)