def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
except UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
return is_daemon
python类tcgetpgrp()的实例源码
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except io.UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
return is_daemon
def initialise(self):
if not self.enabled:
return
if termios is None:
self.enabled = False
return
try:
self.tty = open("/dev/tty", "w+")
os.tcgetpgrp(self.tty.fileno())
self.clean_tcattr = termios.tcgetattr(self.tty)
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = self.clean_tcattr
new_lflag = lflag & (0xffffffff ^ termios.ICANON)
new_cc = cc[:]
new_cc[termios.VMIN] = 0
self.cbreak_tcattr = [
iflag, oflag, cflag, new_lflag, ispeed, ospeed, new_cc]
except Exception:
self.enabled = False
return
def stop_was_requested(self):
"""Check whether a 'keyboard stop' instruction has been sent.
Returns true if ^X has been sent on the controlling terminal.
Consumes all available input on /dev/tty.
"""
if not self.enabled:
return False
# Don't try to read the terminal if we're in the background.
# There's a race here, if we're backgrounded just after this check, but
# I don't see a clean way to avoid it.
if os.tcgetpgrp(self.tty.fileno()) != os.getpid():
return False
try:
termios.tcsetattr(self.tty, termios.TCSANOW, self.cbreak_tcattr)
except EnvironmentError:
return False
try:
seen_ctrl_x = False
while True:
c = os.read(self.tty.fileno(), 1)
if not c:
break
if c == "\x18":
seen_ctrl_x = True
except EnvironmentError:
seen_ctrl_x = False
finally:
termios.tcsetattr(self.tty, termios.TCSANOW, self.clean_tcattr)
return seen_ctrl_x
def get_name_for_fd(fd):
"""
Return the process name for a given process ID.
"""
try:
pgrp = os.tcgetpgrp(fd)
except OSError:
# See: https://github.com/jonathanslenders/pymux/issues/46
return
try:
with open('/proc/%s/cmdline' % pgrp, 'rb') as f:
return f.read().decode('utf-8', 'ignore').partition('\0')[0]
except IOError:
pass
def get_name_for_fd(fd):
"""
Return the process name for a given process ID.
"""
try:
pgrp = os.tcgetpgrp(fd)
except OSError:
return
try:
return get_proc_name(pgrp)
except IOError:
pass
def show_progress(self, done=False):
"""Display progress bar if enabled and if running on correct terminal"""
if SHOWPROG and self.showprog and (done or self.index % 500 == 0) \
and (os.getpgrp() == os.tcgetpgrp(sys.stderr.fileno())):
rows, columns = struct.unpack('hh', fcntl.ioctl(2, termios.TIOCGWINSZ, "1234"))
if columns < 100:
sps = 30
else:
# Terminal is wide enough, include bytes/sec
sps = 42
# Progress bar length
wlen = int(columns) - len(str_units(self.filesize)) - sps
# Number of bytes per progress bar unit
xunit = float(self.filesize)/wlen
# Progress bar units done so far
xdone = int(self.offset/xunit)
xtime = time.time()
progress = 100.0*self.offset/self.filesize
# Display progress only if there is some change in progress
if (done and not self.progdone) or (self.prevdone != xdone or \
int(self.prevtime) != int(xtime) or \
round(self.prevprog) != round(progress)):
if done:
# Do not display progress again when done=True
self.progdone = 1
otime = xtime - self.timestart # Overall time
tdelta = xtime - self.prevtime # Segment time
self.prevprog = progress
self.prevdone = xdone
self.prevtime = xtime
# Number of progress bar units for completion
slen = wlen - xdone
if done:
# Overall average bytes/sec
bps = self.offset / otime
else:
# Segment average bytes/sec
bps = (self.offset - self.prevoff) / tdelta
self.prevoff = self.offset
# Progress bar has both foreground and background colors
# as green and in case the terminal does not support colors
# then a "=" is displayed instead instead of a green block
pbar = " [\033[32m\033[42m%s\033[m%s] " % ("="*xdone, " "*slen)
# Add progress percentage and how many bytes have been
# processed so far relative to the total number of bytes
pbar += "%5.1f%% %9s/%s" % (progress, str_units(self.offset), str_units(self.filesize))
if columns < 100:
sys.stderr.write("%s %-6s\r" % (pbar, str_time(otime)))
else:
# Terminal is wide enough, include bytes/sec
sys.stderr.write("%s %9s/s %-6s\r" % (pbar, str_units(bps), str_time(otime)))
if done:
sys.stderr.write("\n")