def read_chat_history():
choice = select_a_friend()
if choice is not None:
print (Fore.BLUE + "Messages sent are shown in blue color \n" + Fore.GREEN + " Received Messages and Read Messages are shown in green color:"+Fore.RESET)
chats = friends[choice].chats
for chat in chats:
if chat.sent_by_me:
print (Fore.RED + str(chat['time']) + " " + Fore.BLUE + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print (Fore.RED + str(chat['time']) + " " + Fore.GREEN + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print "Wrong choice"
python类BLUE的实例源码
def option(question, options, default=1):
print(question)
options = list(options)
for idx, element in enumerate(options):
print("{}) {}".format(idx+1,element))
while True:
i = input("{Style.BRIGHT}{Fore.BLUE}:: {Fore.RESET}Please pick[{}]: {Style.RESET_ALL}".format(default, Style=Style, Fore=Fore))
try:
if i == "":
i = default
else:
i = int(i)
if 0 < i <= len(options):
return options[i-1]
except:
pass
return None
def banner():
print(Style.DIM)
print(' ___________________________')
print(' / /\\')
print(' / sadboyzvone\'s _/ /\\')
print(' / Intel 8080 / \/')
print(' / Assembler /\\')
print('/___________________________/ /')
print('\___________________________\/')
print(' \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\'
+ Style.RESET_ALL + Style.BRIGHT)
print(Fore.WHITE + '\nPowered by ' + Fore.BLUE + 'Pyt'
+ Fore.YELLOW + 'hon' + Fore.WHITE
+ '\nCopyright (C) 2017, Zvonimir Rudinski')
# Print usage information
def reader(wordlist, chunks_size, verbose):
""" Load up chunks_sizes of the wordlist
into the queue
"""
queue = Queue()
chunk = list(islice(wordlist, chunks_size))
while chunk:
# Get chunks_size records from the wordlist
if verbose:
print(Fore.BLUE + "[QUEUE]" + DEBUG + "inserting into queue:")
print("{}".format(chunk) + Style.RESET_ALL)
queue.put(chunk)
chunk = list(islice(wordlist, chunks_size))
return queue
def test_forecast_formatted_as_expected(self, print_say_mock):
with patch.object(requests, 'get', return_value=MyResponse) as get_mock:
forecast.main(self.CI_instance, 'Some location')
last_call = call(
"\tMin temperature: {} {}\n".format(
'17.0', self.units['str_units']),
self.CI_instance,
Fore.BLUE
)
third_call = call(
"\tWeather: {}".format('Clear'),
self.CI_instance,
Fore.BLUE
)
self.assertEqual(last_call, print_say_mock.mock_calls[-1])
self.assertEqual(third_call, print_say_mock.mock_calls[2])
def play(data):
if len(data) == 0:
print(Fore.BLUE + "Song name doesn't exist. (play '"'song name'"') " + Fore.RESET)
else:
wanted = data
find = os.popen("ls | grep -i " + '"' + wanted + '"')
music = str(find.readline())
if not music:
os.system("instantmusic -s " + wanted + " 2> /dev/null")
find = os.popen("ls -tc --hide='__*' --hide='*.py'")
music = str(find.readline()).replace("\n", "")
os.system("XDG_CURRENT_DESKTOP= DESKTOP_SESSION= xdg-open " +
music.replace(" ", "\ ").replace(" (", " \("). replace(")", "\)") +
" 2> /dev/null")
else:
os.system("XDG_CURRENT_DESKTOP= DESKTOP_SESSION= xdg-open " +
music.replace(" ", "\ ").replace(" (", " \("). replace(")", "\)") +
" 2> /dev/null")
def live_score(desc):
mid = match_id(desc)
data = c.livescore(mid)
score = {}
score['matchinfo'] = "{}, {}".format(data['matchinfo']['mnum'], data['matchinfo']['mchdesc'])
score['status'] = "{}, {}".format(data['matchinfo']['mchstate'].title(), data['matchinfo']['status'])
score['bowling'] = data['bowling']
score['batting'] = data['batting']
text = ''
text += Fore.LIGHTYELLOW_EX + score['matchinfo'] + '\n' + score['status'] + '\n\n'
text += Fore.BLUE + score['batting']['team'] + '\n' + Fore.BLACK
for scr in reversed(score['batting']['score']):
text += "{} :- {}/{} in {} overs\n".format(scr['desc'], scr['runs'], scr['wickets'], scr['overs'])
for b in reversed(score['batting']['batsman']):
text += "{} : {}({}) \n".format(b['name'].strip('*'), b['runs'], b['balls'])
text += Fore.BLUE + "\n" + score['bowling']['team'] + '\n' + Fore.BLACK
for scr in reversed(score['bowling']['score']):
text += "{} :- {}/{} in {} overs\n".format(scr['desc'], scr['runs'], scr['wickets'], scr['overs'])
for b in reversed(score['bowling']['bowler']):
text += "{} : {}/{} \n".format(b['name'].strip('*'), b['wickets'], b['runs'])
text += Fore.RESET
return text
def __init__(self, ethers_path, static_ethers, wipe):
"""
:param ethers_path: path to the ethers file
:param static_ethers: path to static ethers file (only when wiping)
:param wipe: wipe the ethers
"""
self.ethers_path = ethers_path
self.wipe = wipe
self.ethers = {}
EthersManager.assert_writable(self.ethers_path)
if wipe:
print(Fore.RED + Style.BRIGHT + "The ethers file will be wiped!")
if static_ethers:
print(Fore.BLUE + "The static ethers will be loaded")
self.load_ethers(static_ethers)
else:
print(Fore.BLUE + "The ethers file will be created from scratch")
else:
self.load_ethers(self.ethers_path)
def add_worker(self, mac, num, response):
try:
if not 1 <= int(num) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid num: num=%s" % (num)
return
ip = self.worker_ip_format.replace('N', num)
print(Fore.BLUE + "Worker PC connected: MAC=%s IP=%s" % (mac, ip))
result = self.ethers_manager.add_ether(mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def defrag(self):
download_queue = self.handle.get_download_queue()
downloading = [piece['piece_index'] for piece in download_queue]
numerales = ""
pieces = self.status.pieces
for i, piece in enumerate(pieces):
numeral = Fore.GREEN + "#" if piece else Fore.RED + "#"
if i in downloading:
numeral = Fore.YELLOW + "v"
elif self._served_blocks is not None and self._served_blocks[i]:
numeral = Fore.BLUE + ">"
numeral += str(self.handle.piece_priority(i))
numerales += numeral
if numerales != "":
numerales = term.magenta("\nPieces download state:\n" + numerales)
return "%s\n" % numerales
def TABLE_PRETTYPRINT(TOR_PROC):
AF_INET6 = getattr(socket, 'AF_INET6', object())
PMAP = {
(AF_INET, SOCK_STREAM): 'tcp',
(AF_INET6, SOCK_STREAM): 'tcp6',
(AF_INET, SOCK_DGRAM): 'udp',
(AF_INET6, SOCK_DGRAM): 'udp6',
}
print (Fore.BLUE + Style.BRIGHT +"\t=> Process name : %s\n\t=> PID : %s"%(TOR_PROC.name(),TOR_PROC.pid))
print Style.RESET_ALL
templete = "%-15s %-25s %-25s %s"
print (templete % ("Proto", "Local address", "Remote address", "Status"))
print (templete % ("=====", "=============", "==============", "======"))
for attr in TOR_PROC.connections(kind='inet'):
LADDR = "%s:%s"%(attr.laddr)
RADDR = None
if attr.raddr:
RADDR = "%s:%s"%(attr.raddr)
print (templete % (PMAP[(attr.family, attr.type)], LADDR, RADDR or '-', attr.status))
print
def getparam(ser, p_name, p_data):
"""Decode a specific parameter from the serial port."""
printc(Back.RED + p_name, prepend='\n')
for typ in p_data:
schr = ser.read()
if typ == 'h':
printc(c2hex(schr), Fore.GREEN)
elif typ == 'H':
if len(HIGHLIGHT):
HIGHLIGHT.popleft()
HIGHLIGHT.append(c2hex(schr))
printc(c2hex(schr), Fore.BLUE)
elif typ == 'a':
printc(c2ascii(schr) or c2hex(schr), Fore.WHITE, prepend='')
else:
printc(c2hex(schr), Fore.WHITE)
printc(Style.RESET_ALL, prepend='')
def earthquake_print(data):
for i in data['features']:
if i['id'] not in UPDATES:
UPDATES.append(i['id'])
print('___________________________________________________________________________________________________')
print(i['properties']['title'], end=' MAG: ')
print(Fore.RED+str(i['properties']['mag']))
print('Location:', i['properties']['place'])
alert = i['properties']['alert']
if alert == 'green':
print(Fore.RED + 'Type: ' + i['properties']['type'], end=' ')
elif alert == 'yellow':
print(Fore.YELLOW + 'Type: ' + i['properties']['type'], end=' ')
else:
print(Fore.BLUE + 'Type: ' + i['properties']['type'], end=' ')
ms = i['properties']['time']
print(datetime.datetime.fromtimestamp(ms/1000.0))
print('INFO:', i['properties']['url'])
def walker(node, source_lines, indent=''):
"""Recursively visit the ast in a preorder traversal."""
node_name = str(node)[0:str(node).index('(')]
value = None
if hasattr(node, 'value'):
if '(' in str(node.value):
value = str(node.value)[0:str(node.value).index('(')]
else:
value = node.value
name = node.name if hasattr(node, 'name') else None
print('{}{} {} (name: {}, value: {})'.format(
indent, CHAR_TUBE, node_name, name, value))
lines = [line for line in node.as_string().split('\n')]
for line in lines:
print(indent + FILL + '>>>' + Fore.BLUE + line + Fore.RESET)
for child in node.get_children():
walker(child, source_lines, indent + FILL + CHAR_PIPE)
def get_comment_list(insta_username) :
media_id=get_post_id(insta_username)
request_url=BASE_URL+"media/%s/comments/?access_token=%s"%(media_id,APP_ACCESS_TOKEN)
print "Get Requested URL:%s"%(request_url)
comment_info=requests.get(request_url).json()
if comment_info['meta']['code']==200 :
if len(comment_info['data']) :
for x in range(0,len(comment_info['data'])) :
comment_id=comment_info['data'][x]['id']
comment_text=comment_info['data'][x]['text']
print Fore.BLUE+Style.BRIGHT+"Comments are %s"%(comment_text)
else:
print "there is no comment regarding this post"
else :
print "status code other than 200 found"
def on_member_join(member):
server_id = member.server.id
user_id = member.id
banonjoin = "SELECT COUNT(*) FROM Hackbans WHERE user_id = '" + user_id + "' AND server_id = '" + server_id + "'"
read_from_db(banonjoin)
if datta[0][0] < 1:
print(Fore.BLUE + "A non-hackbanned user joined, they were not bananaed! :D")
elif datta[0][0] > 0:
print("user is hackbananaed")
print(Fore.RED + "A hackbanned user joined but was bananaed! :D")
await client.ban(member)
print(Fore.BLUE + str(datta))
## HACKBAN ##
######################## Permissions ########################
def inlineinteractions(postid, posterusername, decodedposttext):
# Accepts all keys. Initial setup: [enter] moves to next post, 'r' replies, 'rp' reposts, '*' stars, 'p' pins:
inlinecommand = input(Fore.BLUE + Style.DIM + "[enter],r,rp,*,p " + Style.RESET_ALL)
if inlinecommand == "r":
postidreply = postid
poster = posterusername
replyinline(postidreply, poster)
if inlinecommand == "rp":
postidrepost = postid
repostinline(postidrepost)
if inlinecommand == "*":
postidstar = postid
starinline(postidstar)
if inlinecommand == "p":
postidpin = postid
pininline(postidpin)
# DEFINE 10C GET TIMELINE SUBROUTINES:
# Define the 'timelinebase' subroutine:
def main(self):
start = 0
website = self.get_options('website')
if "http//" in website:
website = website.replace('http//','http://')
print "* Checking for " + Fore.BLUE + website + Style.RESET_ALL
if website[-1:] == "/":
website = website[:-1]
try:
requests.get(website)
print Fore.GREEN + "* url is stable" + Style.RESET_ALL
start = 1
except:
print Fore.RED + "* url schema not correct" + Style.RESET_ALL
if start == 1:
for line in self.cms:
print "* checking " + str(line)
for path in self.cms[line]:
complet_url = website + path
req = requests.get(complet_url)
if req.status_code in self.status_code:
print Fore.GREEN + "* possible using " + str(line) + " with : " + str(complet_url) + Style.RESET_ALL
self.export.append(complet_url + " ("+str(line)+")")
def main(self):
domain_list = []
load_name = self.get_options("enterprise")
print Style.BRIGHT + Fore.BLUE + "Search domain name for "+load_name + Style.RESET_ALL
start_with = ["www.","http://","https://"]
end_with = [".com",".fr",".org",".de",".eu"]
for line in start_with:
for end_line in end_with:
domain = line + str(load_name) + end_line
try:
return_code = urllib.urlopen(domain).getcode()
return_code = str(return_code)
if return_code != "404":
domain_list.append(domain)
print Fore.GREEN + "- "+Style.RESET_ALL + domain
except:
Back.YELLOW + Fore.BLACK + "Can't get return code" + Style.RESET_ALL
if len(domain_list) > 0:
for domain in domain_list:
self.export.append(domain)
else:
print Fore.RED + "No domain found" + Style.RESET_ALL
def main(self):
server = "www.google.fr"
limit = 100
if self.get_options('limit') != '':
limit = int(self.get_options('limit'))
url = "http://"+server+"/search?num="+str(limit)+"&start=10&hl=en&meta=&q=site%3Afr.viadeo.com/fr/profile/%20"+self.get_options('enterprise')
r=requests.get(url)
results = r.content
regex = re.compile("\>fr\.viadeo\.com\/fr\/profile\/(.*?)\<\/cite")
output = regex.findall(results)
if len(output) > 0:
print Fore.GREEN + "Viadeo result : "+ Style.RESET_ALL
for line in output:
if line.strip() != "":
print Fore.BLUE + "* " + Style.RESET_ALL + line.strip()
self.export.append(line.strip())
else:
print Fore.RED + "* "+Style.RESET_ALL + "No result found"
def main(self):
domain = self.get_options('domain')
if "://" in domain:
domain = domain.split("://")[1]
if domain[:-1] == "/":
domain = domain[-1]
try:
response = subprocess.check_output(["sslyze", "--regular", domain])
if response != "":
explode = response.split('\n')
for line in explode:
self.export.append(line)
print Fore.BLUE + "* " + Style.RESET_ALL + line
else:
print Fore.RED + "* " + Style.RESET_ALL + "Can't get SSL/TLS with sslyze"
except OSError as e:
if e.errno == os.errno.ENOENT:
print e
else:
# Something else went wrong while trying to run `sslyze`
raise
def main(self):
try:
response = subprocess.check_output(["wafw00f", self.get_options("domain")])
if "is behind a" in response:
regex = re.compile("is behind a(.*)")
result = regex.findall(response)
print Fore.GREEN + "* " + Style.RESET_ALL + "Firewall found"
print Fore.BLUE + "* " + Style.RESET_ALL + result[0].strip()
self.export.append(result[0].strip())
else:
print Fore.RED + "* " + Style.RESET_ALL + "Can't get firewall with wafw00f"
except OSError as e:
if e.errno == os.errno.ENOENT:
print e
else:
# Something else went wrong while trying to run `wget`
raise
def main(self):
detail = None
website = self.require["website"][0]["value"]
if "://" in self.get_options('website'):
website = self.get_options('website').split('://')[1]
try:
whois_information = pythonwhois.get_whois(website)
detail = whois_information['contacts']['registrant']
except:
print Fore.RED + "Please use correct name without http(s)://" + Style.RESET_ALL
export = []
total = ""
if detail != None:
for element in detail:
print Fore.BLUE + "* " + Style.RESET_ALL + element + " : " + str(detail[element])
total = element + " : "+ str(detail[element])
self.export.append(total)
else:
print Fore.RED + "Can't get whois information for : " +self.get_options('website') + Style.RESET_ALL
def browser_hacks():
print Fore.YELLOW + " ! For use module please use :use moduleName" + Style.RESET_ALL
if os.path.exists("core/BHDB/"):
list_module = glob.glob("core/BHDB/*.py")
for module in list_module:
if ".py" in module:
module_name = module.split(".py")[0]
module_name = module_name.replace('core/BHDB/','')
if "__init__" not in module:
description = "No module description found"
if "#description:" in open(module).read():
description = open(module).read().split("#description:")[1]
description = description.split("#")[0]
print Fore.BLUE + " * "+ Style.RESET_ALL + module_name + " " + description
else:
print Back.RED + Fore.BLACK + "Browserhacking directory not found" + Style.RESET_ALL
def print_blue(string):
if windows_client(): reinit()
print (Fore.BLUE + Style.BRIGHT + string + Style.RESET_ALL)
if windows_client(): deinit()
def yes_no(question, default="yes"):
"""Ask a yes/no question via input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(Style.BRIGHT + Fore.BLUE + ":: " + Fore.RESET + question + prompt + Style.NORMAL)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
def printIt( p, sha, type, repo, branch, login, message, filler):
#clean up comments that have \n in them
message = message.replace("\n"," ")
#print Fore.RED+sha[:SHACOLW].ljust(SHACOLW," ")+Fore.RESET,type[:EVENTCOLW].ljust(EVENTCOLW,filler),repo[:DFLTCOLW].ljust(DFLTCOLW,filler),branch[:BRANCHCOLW].ljust(BRANCHCOLW,filler),Fore.BLUE+login.ljust(DFLTCOLW," ")+Fore.RESET,message.ljust(MSGCOLW," ")
line = Fore.RED+sha[:SHACOLW].ljust(SHACOLW," ")+Fore.RESET
line += " " + type[:EVENTCOLW].ljust(EVENTCOLW,filler)
line += " " + repo[:DFLTCOLW].ljust(DFLTCOLW,filler)
line += " " + branch[:BRANCHCOLW].ljust(BRANCHCOLW,filler)
line += " " + Fore.BLUE+login.ljust(DFLTCOLW," ")+Fore.RESET
line += " " + message.ljust(MSGCOLW," ")
line += "\n"
try:
p.stdin.write(line)
except:
# Stop loop on "Invalid pipe" or "Invalid argument".
# No sense in continuing with broken pipe.
exit(1)
return
#----------------------------------------------------------------------------------------------------------------------
#process arguments
def printHelp():
print('\nThis ' + Fore.BLUE + 'Intel' + Fore.WHITE
+ ' 8080 assembler was made for ' + Fore.BLUE + 'Project '
+ Fore.YELLOW + 'Week' + Fore.WHITE + ' at my school.')
print('It is written in ' + Fore.BLUE + 'Pyt' + Fore.YELLOW + 'hon'
+ Fore.WHITE)
print('Modules: ' + Fore.RED + 'Co' + Fore.BLUE + 'lo'
+ Fore.YELLOW + 'ra' + Fore.GREEN + 'ma' + Fore.WHITE)
print('\nPass a file path as an arguement.')
# Main function
def test_usertags():
user_tags = {
'info': F.GREEN + S.BRIGHT,
'info1': p('<g><b>'),
'call': lambda: F.BLUE + B.RED
}
am = AnsiMarkup(tags=user_tags)
assert am.parse('<info>1</info>') == F.GREEN + S.BRIGHT + '1' + S.RESET_ALL
assert am.parse('<info>1</info>') == am.parse('<info1>1</info1>')
assert am.parse('<call>1</call>') == F.BLUE + B.RED + '1' + S.RESET_ALL
assert am.strip('<info1>1</info1>') == '1'
def do_clock(self, s):
"""Gives information about time."""
print_say(ctime(), self, Fore.BLUE)