def ex_print (type, msg, ret):
colorama.init()
# Define color and style constants
c_error = Fore.RED
c_action = Fore.YELLOW
c_ok = Fore.GREEN
c_white = Fore.WHITE
s_br = Style.BRIGHT
s_reset = Style.RESET_ALL
message = {
"error": c_error + s_br,
"action": c_action,
"positive": c_ok + s_br,
"info": c_white + s_br,
"reset": s_reset
}
style = message.get(type, s_reset)
if ret == 0:
print(style + msg, end = "")
else:
print(style + msg)
python类RESET_ALL的实例源码
def option(question, options, default=1):
print(question)
options = list(options)
for idx, element in enumerate(options):
print("{}) {}".format(idx+1,element))
while True:
i = input("{Style.BRIGHT}{Fore.BLUE}:: {Fore.RESET}Please pick[{}]: {Style.RESET_ALL}".format(default, Style=Style, Fore=Fore))
try:
if i == "":
i = default
else:
i = int(i)
if 0 < i <= len(options):
return options[i-1]
except:
pass
return None
def size():
cache_dir = cfg.cache.dir
source_dir = cfg.source.dir
cache_size = 0
for f in cache_dir.walkfiles():
cache_size += f.size
source_size = 0
for f in source_dir.walkfiles():
source_size += f.size
print("{Style.BRIGHT}Cache: {Style.RESET_ALL} {}".format(
humanize.naturalsize(cache_size, binary=True),
Style=Style
))
print("{Style.BRIGHT}Source:{Style.RESET_ALL} {}".format(
humanize.naturalsize(source_size, binary=True),
Style=Style
))
def console_write(self, color, message):
""" Writes message to console.
:param color: the colorama color representation.
:param message: str the message to write.
"""
msg_encoded = message.encode('ascii', 'ignore')
if config.USE_24HOUR:
ts = time.strftime('%H:%M:%S')
else:
ts = time.strftime('%I:%M:%S:%p')
if config.CONSOLE_COLORS:
msg = COLOR['white'] + '[' + ts + '] ' + Style.RESET_ALL + color + msg_encoded
else:
msg = '[' + ts + '] ' + msg_encoded
try:
print(msg)
except UnicodeEncodeError as ue:
log.error(ue, exc_info=True)
if config.DEBUG_MODE:
traceback.print_exc()
if config.CHAT_LOGGING:
write_to_log('[' + ts + '] ' + message, self.roomname)
def banner():
print(Style.DIM)
print(' ___________________________')
print(' / /\\')
print(' / sadboyzvone\'s _/ /\\')
print(' / Intel 8080 / \/')
print(' / Assembler /\\')
print('/___________________________/ /')
print('\___________________________\/')
print(' \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\'
+ Style.RESET_ALL + Style.BRIGHT)
print(Fore.WHITE + '\nPowered by ' + Fore.BLUE + 'Pyt'
+ Fore.YELLOW + 'hon' + Fore.WHITE
+ '\nCopyright (C) 2017, Zvonimir Rudinski')
# Print usage information
def rebuild_entry(self, entry, styles):
entry = list(entry)
# This is the file path.
entry[0] = os.path.basename(entry[0]) if self.strip else entry[0]
# Always an int (entry line number)
entry[1] = str(entry[1])
new_entry = [
styles['line'].format(entry[1]) + Style.RESET_ALL,
styles['module'].format(entry[0]) + Style.RESET_ALL,
styles['context'].format(entry[2]) + Style.RESET_ALL,
styles['call'].format(entry[3]) + Style.RESET_ALL
]
if self.conservative:
new_entry[0], new_entry[1] = new_entry[1], new_entry[0]
return new_entry
def test_tag_chars():
with raises(ValueError):
am = AnsiMarkup(tag_sep='{')
with raises(ValueError):
am = AnsiMarkup(tag_sep='(-)')
with raises(ValueError):
am = AnsiMarkup(tag_sep='qq')
am = AnsiMarkup(tag_sep='{}')
r1 = p('0<b>1<d>2</d>3</b>4')
r2 = am.parse('0{b}1{d}2{/d}3{/b}4')
assert r1 == r2 == '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.BRIGHT + '3' + S.RESET_ALL + '4'
assert s('<b>1</b>') == am.strip('{b}1{/b}') == '1'
def reader(wordlist, chunks_size, verbose):
""" Load up chunks_sizes of the wordlist
into the queue
"""
queue = Queue()
chunk = list(islice(wordlist, chunks_size))
while chunk:
# Get chunks_size records from the wordlist
if verbose:
print(Fore.BLUE + "[QUEUE]" + DEBUG + "inserting into queue:")
print("{}".format(chunk) + Style.RESET_ALL)
queue.put(chunk)
chunk = list(islice(wordlist, chunks_size))
return queue
def signal_handler(signum, *kwargs):
""" A handler for various interrupts
"""
global exit_flag
exit_flag = True
if signum == signal.SIGINT:
print(ERROR + "user quit" + Style.RESET_ALL)
else:
print(ERROR + "signal caught: {}".format(signum) + Style.RESET_ALL)
print("[*] shutting down at {}".format(time.ctime()))
# let time for the threads to terminate
time.sleep(2)
sys.exit(0)
def _update_ui():
global timeout_count, result_codes, connection_error_count
print '\r',
for k, v in result_codes.iteritems():
print "%s:" % k,
if k == '200 OK':
print(Fore.LIGHTGREEN_EX),
else:
print(Fore.RED),
print "%s " % v,
print(Style.RESET_ALL),
if timeout_count > 0:
print('Timeouts: '+Fore.YELLOW + str(timeout_count) + Style.RESET_ALL) + ' ',
if connection_error_count >0:
print('Connection Errors: '+Fore.RED + str(connection_error_count) + Style.RESET_ALL),
sys.stdout.flush()
def console_write(self, color, message):
""" Writes message to console.
:param color: the colorama color representation.
:param message: str the message to write.
"""
msg_encoded = message.encode('ascii', 'ignore')
if config.USE_24HOUR:
ts = time.strftime('%H:%M:%S')
else:
ts = time.strftime('%I:%M:%S:%p')
if config.CONSOLE_COLORS:
msg = COLOR['white'] + '[' + ts + '] ' + Style.RESET_ALL + color + msg_encoded
else:
msg = '[' + ts + '] ' + msg_encoded
try:
print(msg)
except UnicodeEncodeError as ue:
log.error(ue, exc_info=True)
if config.DEBUG_MODE:
traceback.print_exc()
if config.CHAT_LOGGING:
write_to_log('[' + ts + '] ' + message, self.roomname)
def local_exec(cmd, env=None, shell=False, check_retcode=True):
LOG.debug("{}Executing locally: '{}'{}".format(
Style.DIM, cmd if isinstance(cmd, basestring) else " ".join(cmd),
Style.RESET_ALL))
if env is not None:
# Sanitize env because it doees not digest non string values
env = dict((key, str(value)) for key, value in env.items())
# Unite the system and custom environments
env = dict(os.environ, **env)
proc = subprocess.Popen(cmd, env=env, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, shell=shell)
out, err = proc.communicate()
ret_code = proc.returncode
_proceed_exec_result(
out.decode('ascii', 'ignore'), err.decode('ascii', 'ignore'),
ret_code, check_retcode)
return ret_code, out, err
def ssh_exec_live(ssh, cmd, timeout=None, check_retcode=True):
LOG.debug(u"{}Calling SSH live: '{}'{}".format(Style.DIM, cmd,
Style.RESET_ALL))
try:
interact = SSHClientInteraction(ssh, timeout=timeout, display=True,
logger=logging.getLogger())
interact.expect('.*')
interact.send(cmd + "; exit $?") # needed to not depend on prompt type
interact.tail()
ret_code = interact.channel.recv_exit_status()
except Exception:
LOG.debug("Something went wrong in 'ssh_exec_live':\n{}".format(
format_exception(sys.exc_info())
))
raise
_proceed_exec_result("", "", ret_code, check_retcode)
return ret_code, "", ""
def console_write(self, color, message):
"""
Writes message to console.
:param color: the colorama color representation.
:param message: str the message to write.
"""
if config.USE_24HOUR:
ts = time.strftime('%H:%M:%S')
else:
ts = time.strftime('%I:%M:%S:%p')
if config.CONSOLE_COLORS:
msg = COLOR['white'] + '[' + ts + '] ' + Style.RESET_ALL + color + message
else:
msg = '[' + ts + '] ' + message
try:
print(msg)
except UnicodeEncodeError as ue:
log.error(ue, exc_info=True)
if config.DEBUG_MODE:
traceback.print_exc()
if config.CHAT_LOGGING:
write_to_log('[' + ts + '] ' + message, self.room_name)
def authorize_concept(self, concept):
if '2' not in concept.sf:
raise ValueError('No vocabulary code (2) given!')
if concept.sf['2'] in self.vocabularies:
vocab = self.vocabularies[concept.sf['2']]
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
return
response = vocab.authorize_term(concept.term, concept.tag)
if response.get('id') is not None:
identifier = response.get('id')
if concept.sf.get('0'):
if concept.sf.get('0') == ANY_VALUE:
pass # ignore ANY_VALUE
elif identifier != concept.sf['0']:
identifier = pick_one('The $$0 value does not match the authority record id. ' +
'Please select which to use',
[concept.sf['0'], identifier])
concept.sf['0'] = identifier
log.info(Fore.GREEN + '?' + Style.RESET_ALL + ' Authorized: %s', concept)
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
def echo_llamalist_stats(dev_names, badges=False, proof=True):
stats = {}
for dev_name in dev_names:
counts = get_llama_stats(dev_name) if not badges else get_badges_stats(dev_name)
dev_name = counts.get('Name')
if not dev_name: continue
stats[dev_name] = counts
print('@{} {:,} badges sent, {:,} badges received'.format(dev_name, counts['Given'], counts['Received']))
print(Fore.GREEN + '---' + Style.RESET_ALL)
num = 1
#+k[1]['Received']
for dev_name, counts in sorted(stats.items(), key=lambda k: k[1]['Given'], reverse=True):
if proof:
print('{}. @{} {:,} badges sent, {:,} badges received <a href="https://{}.deviantart.com/badges/{}">[proof]</a><br>'
.format(num, dev_name, counts['Given'], counts['Received'], dev_name, '' if badges else 'llama/'))
else:
print('{}. @{} {:,} badges sent, {:,} badges received<br>'
.format(num, dev_name, counts['Given'], counts['Received']))
num += 1
def TOR_PROC_CHECK():
isTorRunnin = False
TOR_INFO = {}
TOR_PROC = None
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name'])
except psutil.NoSuchProcess:
pass
else:
if pinfo['name'] == "tor":
isTorRunnin = True
TOR_INFO['pid'] = pinfo['pid']
TOR_INFO['name'] = pinfo['name']
break
if isTorRunnin == True:
print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " Tor is running." + Style.RESET_ALL)
TOR_PROC = psutil.Process(int(TOR_INFO['pid']))
return TOR_PROC
else:
print ("[" + Fore.RED + Style.BRIGHT + "-" + Style.RESET_ALL + "]" + Fore.RED + Style.BRIGHT + " Tor is not running." + Style.RESET_ALL)
exit()
def TABLE_PRETTYPRINT(TOR_PROC):
AF_INET6 = getattr(socket, 'AF_INET6', object())
PMAP = {
(AF_INET, SOCK_STREAM): 'tcp',
(AF_INET6, SOCK_STREAM): 'tcp6',
(AF_INET, SOCK_DGRAM): 'udp',
(AF_INET6, SOCK_DGRAM): 'udp6',
}
print (Fore.BLUE + Style.BRIGHT +"\t=> Process name : %s\n\t=> PID : %s"%(TOR_PROC.name(),TOR_PROC.pid))
print Style.RESET_ALL
templete = "%-15s %-25s %-25s %s"
print (templete % ("Proto", "Local address", "Remote address", "Status"))
print (templete % ("=====", "=============", "==============", "======"))
for attr in TOR_PROC.connections(kind='inet'):
LADDR = "%s:%s"%(attr.laddr)
RADDR = None
if attr.raddr:
RADDR = "%s:%s"%(attr.raddr)
print (templete % (PMAP[(attr.family, attr.type)], LADDR, RADDR or '-', attr.status))
print
def printBoard(self):
"""
Print board to terminal for debugging
"""
def getItem(item):
if item == Board.BLACK :
return Fore.WHITE + "|" + Fore.BLACK + "O"
elif item == Board.WHITE :
return Fore.WHITE + "|" + Fore.WHITE + "O"
else:
return Fore.WHITE + "| "
def getRow(row):
return "".join(map(getItem,row))
print("\t" + Back.GREEN + " BOARD ")
print("\t" + Back.GREEN + Fore.WHITE + " |0|1|2|3|4|5|6|7")
for i in range(8):
print("\t" + Back.GREEN + Fore.WHITE + "{}{}".format(i,
getRow(self.board[i])))
sys.stdout.write(Style.RESET_ALL)
def printBoard(self, t='live'):
"""
Print board to terminal for debugging
@param string type 'virtual' or 'live'
"""
board = self.board if t == 'live' else self.virtualBoard
print(Back.GREEN + "\t BOARD ")
# Print column titles
head = Back.GREEN + Fore.WHITE + "\t "
for i in range(self.board.shape[0]):
head += '|' + str(i)
print(head)
# Print rows
for i in range(self.board.shape[0]):
print(Back.GREEN + Fore.WHITE + "\t" + str(i) + getRow(board[i]))
print(Style.RESET_ALL)
return
def process_url(self, url, info):
""" Accepts a URL and the array of file info generated for it by this class, and then attempts to download it using any possible handler.
Returns whatever the handlers do, which should be a path to the file itself or the contianing directory for an album.
+Also returns False or None if no appropriate handler was found, or if the handler told us not to download anything.
"""
ret_val = False # Default to 'False', meaning no file was located by a handler.
for h in self.handlers:
print("\tChecking handler: %s" % h.tag)
ret = h.handle(url, info)
if ret is None:
# None is returned when the handler specifically wants this URL to be "finished", but not added to the files list.
stringutil.print_color(Fore.GREEN, "\t+Handler '%s' completed correctly, but returned no files!" % h.tag )
ret_val = None
break
if ret:
# The handler will return a file/directory name if it worked properly.
ret_val = stringutil.normalize_file(ret)
stringutil.out("%s\t+Handler '%s' completed correctly! %s%s" % (Fore.GREEN, h.tag, stringutil.fit(ret_val, 75), Style.RESET_ALL) )
break
#
#
if ret_val is False:
stringutil.error("\t!No handlers were able to accept this URL." )
return ret_val
#
def log(self, prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+c+prefix+e+"] "+text
def log(prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def colorprint(text_color, bail_result=False):
'''
Colorprint text into the console. Call it as a curried function.
greenPrinter = colorprinter('GREEN')
greenPrinter('this will be green')
OR:
colorprint('GREEN')('this will be green')
'''
def printer(text):
''' This actually does the printing part. Allows for reusable color printers. '''
color = Fore.GREEN if text_color == 'GREEN' else Fore.RED
if not bail_result:
print(color + text)
print(Style.RESET_ALL)
else:
return color + text
return printer
def status_loop(self):
x = self.targets
status = ""
for i in range(0, len(x), 3):
a1 = "%d. %s (%s)" % (i+1, x[i]['team'], x[i]['ip'])
if i + 1 < len(x):
a2 = "%d. %s (%s)" % (i+2, x[i+1]['team'], x[i+1]['ip'])
else:
a2 = ""
if i + 2 < len(x):
a3 = "%d. %s (%s)" % (i+3, x[i+2]['team'], x[i+2]['ip'])
else:
a3 = ""
a1f = Fore.GREEN if x[i]['compromised'] else Fore.RED
if i + 1 < len(x):
a2f = Fore.GREEN if x[i+1]['compromised'] else Fore.RED
if i + 2 < len(x):
a3f = Fore.GREEN if x[i+2]['compromised'] else Fore.RED
a1 = a1f + a1.ljust(45, " ") + Style.RESET_ALL
a2 = a2f + a2.ljust(45, " ") + Style.RESET_ALL
a3 = a3f + a3.ljust(20, " ") + Style.RESET_ALL
status += ("%s%s%s\n" % (a1, a2, a3))
open("status", 'w').write(status)
self.loop.call_later(self.detect_interval, self.status_work)
def require(package, requirement=None):
requirement = requirement or package
try:
return __import__(package)
except ImportError:
from colorama import Fore, Style
print(
Fore.YELLOW,
'This example requires the {!r} package. Install it using:'.
format(requirement),
Style.RESET_ALL,
sep=''
)
print()
print(
Fore.YELLOW,
' $ pip install {!s}'.format(requirement),
Style.RESET_ALL,
sep=''
)
print()
raise
def getparam(ser, p_name, p_data):
"""Decode a specific parameter from the serial port."""
printc(Back.RED + p_name, prepend='\n')
for typ in p_data:
schr = ser.read()
if typ == 'h':
printc(c2hex(schr), Fore.GREEN)
elif typ == 'H':
if len(HIGHLIGHT):
HIGHLIGHT.popleft()
HIGHLIGHT.append(c2hex(schr))
printc(c2hex(schr), Fore.BLUE)
elif typ == 'a':
printc(c2ascii(schr) or c2hex(schr), Fore.WHITE, prepend='')
else:
printc(c2hex(schr), Fore.WHITE)
printc(Style.RESET_ALL, prepend='')
def test_colour_with_string_with_single_stack_status(self):
strings = [
"string string {0} string".format(status)
for status in sorted(self.statuses.keys())
]
responses = [
self.stack_status_colourer.colour(string)
for string in strings
]
assert responses == [
"string string {0}{1}{2} string".format(
self.statuses[status],
status,
Style.RESET_ALL
)
for status in sorted(self.statuses.keys())
]
def colour(self, string):
"""
Colours all Stack Statueses in ``string``.
The colours applied are defined in
``sceptre.stack_status_colourer.StackStatusColourer.STACK_STATUS_CODES``
:param string: A string to colour.
:type string: str
:returns: The string with all stack status values coloured.
:rtype: str
"""
stack_statuses = re.findall(self.STACK_STATUS_PATTERN, string)
for status in stack_statuses:
string = re.sub(
r"\b{0}\b".format(status),
"{0}{1}{2}".format(
self.STACK_STATUS_CODES[status], status, Style.RESET_ALL
),
string
)
return string
def _get_color_printer(stream=sys.stdout, colors=True):
"""
Return a print function tied to a particular stream, along with coloring info.
"""
try:
from colorama import init, Fore, Back, Style
init(autoreset=True)
except ImportError:
Fore = Back = Style = _NoColor()
if not colors:
Fore = Back = Style = _NoColor()
def color_print(s, fore='', color='', end=''):
"""
"""
print(color + s, file=stream, end='')
print(Style.RESET_ALL, file=stream, end='')
print(end=end)
return color_print, Fore, Back, Style