def display_warning_to_user(self):
""" display warning to user
show user the commands + switches and ask if they
would still like to proceed or not """
user_message = Fore.CYAN + "\nYou are about to run the following commands:"
print(user_message)
for command in self.commands:
print(command)
user_message = Fore.CYAN + "\nOn the following devices:"
print(user_message)
for device in sorted(self.devices.keys()):
print(device)
user_message = Fore.RED + "\nAre you sure you wish to proceed? (y/n) " + Fore.WHITE
user_input = input(user_message)
if user_input.lower() == 'y':
return True
else:
return False
python类CYAN的实例源码
def main(city=0):
if not city:
city = getLocation()['city']
send_url = (
"http://api.openweathermap.org/data/2.5/forecast/daily?q={0}&cnt=1"
"&APPID=ab6ec687d641ced80cc0c935f9dd8ac9&units=metric".format(city)
)
r = requests.get(send_url)
j = json.loads(r.text)
rain = j['list'][0]['weather'][0]['id']
if rain >= 300 and rain <= 500: # In case of drizzle or light rain
print(Fore.CYAN + "It appears that you might need an umbrella today." + Fore.RESET)
elif rain > 700:
print(Fore.CYAN + "Good news! You can leave your umbrella at home for today!" + Fore.RESET)
else:
print(Fore.CYAN + "Uhh, bad luck! If you go outside, take your umbrella with you." + Fore.RESET)
def add_contestant(self, mac, row, col, response):
try:
if not 1 <= int(row) <= 255: raise
if not 1 <= int(col) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid row/col: row=%s col=%s" % (row, col)
return
ip = self.contestant_ip_format.replace('R', row).replace('C', col)
print(Fore.CYAN + "Contestant PC connected: MAC=%s IP=%s" % (mac, ip))
result = self.ethers_manager.add_ether(mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def __data_parser__(self, data):
html = etree.HTML(data)
search_results = []
for li in html.xpath('//li[@class="gl-item"]'):
try:
div = li.xpath('.//div[contains(@class, "p-img")]')[0]
search_results.append({
'url': div.xpath('./a/@href')[0],
'intro': div.xpath('./a/@title')[0],
'price': float(li.xpath('.//div[@class="p-price"]')[0]
.xpath('./strong/i/text()')[0].strip()),
'sales': li.xpath('.//div[@class="p-commit"]')[0]
.xpath('./strong/a/text()')[0].strip(),
'belong': colorful_text('??', Fore.CYAN)
})
except (IndexError, ValueError) as e:
error()
return search_results
def __data_parser__(self, data):
try:
if data['mods']['itemlist']['data']['auctions']:
search_results = data['mods']['itemlist']['data']['auctions']
return [{
'intro': result["raw_title"],
'price': float(result["view_price"]),
'delivery': colorful_text(result["view_fee"], Fore.RED)
if float(result["view_fee"]) > 0 else result["view_fee"],
'sales': int(result["view_sales"].split('?')[0]),
'belong': colorful_text("??", Fore.CYAN)
if result.get('shopcard', {}).get('isTmall', False) else "??",
'url': result["detail_url"]
} for result in search_results]
error('Ops, get no goods..')
return []
except KeyError:
error('Ops, some key error happened..')
return []
def print_goods(search_result):
"""use validate search result to print a table
:param search_result: search result in taobao and jd
:return: None
"""
search_result = sort_by_money(search_result)
goods_table = PrettyTable(TABLE_TITLE)
for index, goods in enumerate(search_result):
goods["index"] = index
goods_row = [goods.get(item, None) for item in ITEM_KEY]
goods_table.add_row(goods_row)
print(colorful_text('ready to hands chopping?', Fore.CYAN))
print(goods_table)
open_detail_page(search_result)
def Gen_Dict():
ch=str(raw_input( Fore.CYAN +"Want to enter custom charset??(Enter y or n): "+Fore.RESET))
if ch == 'y':
charset=str(raw_input( Fore.CYAN +"Enter custom charset: "+Fore.RESET))
elif ch == 'n':
charset=string.letters[0:26]
min_length=int(input( Fore.CYAN +"Enter min passwd length: "+Fore.RESET))
max_length=int(input( Fore.CYAN +"Enter max passwd length: "+Fore.RESET))
f=open("tempwlist","w")
count=0
for wordlen in range(min_length,max_length+1):
for word in listwords(charset,wordlen):
f.write(word+'\n')
count+=1
print Fore.GREEN+"\nDictionary created with %s words....\n" %count + Fore.RESET
f.close()
def app_header(self):
header = '\n'
header += ' ??????????? ???? ?????? ???? ??? ?????? ??????? ??????????????? \n'
header += ' ????????????? ?????????????????? ??????????????????? ????????????????\n'
header += ' ??? ????????????????????????? ?????????????? ?????????? ????????\n'
header += ' ??? ???????????????????????????????????????? ????????? ????????\n'
header += ' ??????????? ??? ?????? ?????? ????????? ??????????????????????? ???\n'
header += ' ?????????? ?????? ?????? ???????? ??? ??????? ??????????? ???\n'
header2 = ' Connection Manager\n'
header3 = ' {}\n'.format('-' * 70)
header3 += ' Version : {}\n'.format(__version__)
header3 += ' Release date : {}\n'.format(__release_date__)
header3 += ' Github : https://github.com/fachrioktavian/CManager\n'
header3 += ' Dev by : {0} - ({1})\n'.format(__author__, __author_email__)
header3 += ' Official : https://dracos-linux.org\n'
header3 += ' {}\n'.format('-' * 70)
print (Fore.RED + Style.DIM + header + header2)
print (Fore.CYAN + Style.DIM + header3)
def user_liked_saved(username, scan_upvoted=True, scan_saved=True):
""" Gets all the upvoted/saved comments and/or submissions for the given User. """
global _reddit, _user
try:
if _user.name.lower() == username.lower():
redditor = _user
else:
redditor = _reddit.redditor(username)
if scan_saved:
stringutil.print_color(Fore.CYAN, '\tLoading %s\'s Saved Posts...' % redditor.name)
for saved in redditor.saved(limit=None):
re = RedditElement(saved)
yield re
if scan_upvoted:
stringutil.print_color(Fore.CYAN, '\tLoading %s\'s Upvoted Posts...' % redditor.name)
for upvoted in redditor.upvoted(limit=None):
re = RedditElement(upvoted)
yield re
except prawcore.exceptions.NotFound:
stringutil.error('Cannot locate comments or submissions for nonexistent user: %s' % username)
except prawcore.Forbidden:
stringutil.error('Cannot load Upvoted/Saved Posts from the User "%s", because they are private!' % username)
def log(self, prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+c+prefix+e+"] "+text
def log(prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def upload_firebase(self):
data={}
category = []
cat = session.query(Category).all()
for x in cat:
qry = session.query(Items).filter(Items.category_id==x.id)
data[x.category]=[d.items for d in qry]
name = click.prompt(click.style('Please enter your username:', fg='cyan', bold=True))
print Fore.GREEN + 'Syncing..... '
jsonData = firebase.put( '/todo-cli', name, data)
if jsonData:
print Fore.CYAN + 'Done!'
exit()
else:
print 'Try again'
def ban(ctx, *, member : discord.Member = None):
if not ctx.message.author.server_permissions.ban_members:
return
if not member:
embed = discord.Embed(description = ctx.message.author.mention + ", you did not specify a user to ban! :x:", color = 0xF00000)
return await client.say(embed = embed)
try:
await client.ban(member)
except Exception as e:
if 'Privilege is too low' in str(e):
embed = discord.Embed(description = "Privilege is too low. :x:", color = 0xF00000)
return await client.say(embed = embed)
print(Fore.RED + "Command Failed To Execute |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]\n Reason: " + Fore.YELLOW + "Insufficient Permissions! Both user and bot need Ban Members permission!")
embed = discord.Embed(description = "**%s** has been banned."%member.name, color = 0xF00000)
return await client.say(embed = embed)
print(Fore.CYAN + "Command Successfully Executed |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]")
def kick(ctx, *, member : discord.Member = None):
if not ctx.message.author.server_permissions.kick_members:
return
if not member:
return await client.say(ctx.message.author.mention + "Specify a user to kick!")
try:
await client.kick(member)
except Exception as e:
if 'Privilege is too low' in str(e):
embed = discord.Embed(description = "Privilege is too low. :x:", color = 0xF00000)
return await client.say(embed = embed)
print(Fore.RED + "Command Failed To Execute |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]\n Reason: " + Fore.YELLOW + "Inusfficient Permissions! Both user and bot need Kick Members permission!")
embed = discord.Embed(description = "**%s** has been kicked."%member.name, color = 0xF00000)
return await client.say(embed = embed)
print(Fore.CYAN + "Command Successfully Executed |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]")
def menu():
# Using colour from colorama: https://pypi.python.org/pypi/colorama
# Formatting e.g.: Fore.COLOUR, Back.COLOUR, Style.DIM with e.g. DIM, RED, CYAN, etc.:
print(Fore.BLACK + Back.WHITE + "10cbazbt3 menu:" + Style.RESET_ALL)
print(Fore.YELLOW + Style.DIM + "Main:")
print(" b = Blurb m = Mentions")
print(" r = Reply t = Timeline")
print(" blog = BLOG o = Own blurbs")
print(" pins = PINS")
print("Admin:")
print(" Login = Login menu = show Menu")
print(" Logout = Logout. sites = my Sites")
print(" exit = Exit")
print(Style.RESET_ALL)
# DEFINE 10C POST INTERACTIONS:
# LOTS OF DUPLICATION HERE!
# Define the 'blurb' (social post) subroutine:
def print_cyan(string):
if windows_client(): reinit()
print (Fore.CYAN + Style.BRIGHT + string + Style.RESET_ALL)
if windows_client(): deinit()
def print_status(status, depth=0):
if status.get('vpn') == 'disabled':
print('vpn ' + Fore.RED + 'disabled' + Fore.RESET)
return
for name, v in [('status', status)] + status['childrenStatus'].items():
line = Fore.RESET + name.ljust(12)
if v['status'] in ('on', 'starting'):
line += Fore.GREEN
elif v['status'] == 'failed':
line += Fore.RED
line += v['status']
if v.get('error'):
line += Fore.RED + " (%s)" % v['error']
line += Fore.RESET
print(line)
for k, v in status.items():
if k in ('status', 'childrenStatus', 'error'):
continue
if k == 'up':
k = '??? '
elif k == 'down':
k = '??? '
print(Fore.RESET + k.ljust(12) + Fore.CYAN + str(v) + Fore.RESET)
def log_debug(msg, logger=LOG, color=Fore.CYAN):
logger.debug('{}{}{}'.format(color, msg, Style.RESET_ALL))
def jenkins_accept_connections(sock_server, handler, bind_ip='0.0.0.0',
port=JENKINS_TCP_SERVER_PORT):
"""
Creates a simple TCP/UDP server in a different thread, recieves one packet
and stops.
:param sock_server: SocketServer.BaseServer class
:param handler: SocketServer.BaseRequestHandler subclass
:param bind_ip: Interface to bind a given server to. Defaults to '0.0.0.0',
i.e. listen to all interfaces
:param port: port to listen to
"""
sock_server.allow_reuse_address = True
server = sock_server((bind_ip, port), handler)
server.connection_list = []
thread_name = threading.current_thread().name + '_accepted_connections'
server_thread = threading.Thread(name=thread_name,
target=server.serve_forever)
try:
server_thread.start()
if isinstance(sock_server, SocketServer.TCPServer):
utils.wait_net_port(bind_ip, port, 3, 1)
LOG.debug('{}Starting SocketServer in a new thread{}'.format(
Fore.CYAN, Style.RESET_ALL))
yield server.connection_list
finally:
LOG.debug('{}Shutting down SocketServer{}'.format(
Fore.CYAN, Style.RESET_ALL))
server.shutdown()
server.server_close()
def _check_visible_ip(pod, specs, connection_list):
if pod.public_ip:
LOG.debug(
'{}Check if pod IP is visible as public IP for pod with '
'public IP\nExpected: {} Actual: {}{}'.format(
Fore.CYAN, pod.public_ip, connection_list[-1],
Style.RESET_ALL))
utils.assert_eq(connection_list[-1], pod.public_ip)
else:
LOG.debug(
'{}Check if pod IP is visible as node IP for pod without '
'public IP\nExpected: {} Actual: {}{}'.format(
Fore.CYAN, specs[pod.name]['hostIP'], connection_list[-1],
Style.RESET_ALL))
utils.assert_eq(connection_list[-1], specs[pod.name]['hostIP'])
def test_SNAT_rules(cluster):
container_ids, container_ips, pods, specs = setup_pods(cluster)
# --------- Test that SNAT rules are applied correctly --------
jenkins_ip = _get_jenkins_ip(cluster)
LOG.debug('{}Test that SNAT rules work properly{}'.format(
Fore.CYAN, Style.RESET_ALL))
LOG_MSG = "Check SNAT rules for pod '{}' public IP: '{}' host node: '{}'"
BIND_IP = '0.0.0.0'
POD_TCP_CMD = 'nc -z -v {} {}'.format(jenkins_ip, JENKINS_TCP_SERVER_PORT)
POD_UDP_CMD = 'nc -u -z -v {} {}'.format(jenkins_ip,
JENKINS_UDP_SERVER_PORT)
for name, pod in pods.items():
msg = LOG_MSG.format(name, pod.public_ip, specs[name]['host'])
# Check if pod can ping jenkins
ping(pod, container_ids[name], jenkins_ip)
LOG.debug('{}TCP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL))
# Check if SNAT rules work properly for TCP connections
with jenkins_accept_connections(
SocketServer.TCPServer, MyRequestHandler, BIND_IP,
JENKINS_TCP_SERVER_PORT) as connection_list:
pod.docker_exec(container_ids[name], POD_TCP_CMD)
_check_visible_ip(pod, specs, connection_list)
LOG.debug('{}UDP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL))
# Check if SNAT rules work properly for UDP connections
with jenkins_accept_connections(
SocketServer.UDPServer, MyRequestHandler, BIND_IP,
JENKINS_UDP_SERVER_PORT) as connection_list:
pod.docker_exec(container_ids[name], POD_UDP_CMD)
_check_visible_ip(pod, specs, connection_list)
def info(self, message):
print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.CYAN + message + Fore.RESET)
def log_info(message):
print(Fore.CYAN + message + Fore.RESET)
def log_info(message):
print(Fore.CYAN + message + Fore.RESET)
def info(self, message):
print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.CYAN + message + Fore.RESET)
def log_info(message):
print(Fore.CYAN + message + Fore.RESET)
def log_info(message):
print(Fore.CYAN + message + Fore.RESET)
def good_bye(word, default='has'):
print(Fore.BLUE + Style.BRIGHT +
'*-* Your database {1} been {0}. *-*'.format(word, default))
print(Fore.CYAN + Style.BRIGHT + '*_* Have a nice day! *_*')
sys.exit()
def formatEntry(e:Entry):
R = _S.RESET_ALL
ID = R + _B.BLUE
P = R + _F.LIGHTGREEN_EX
TAG = R + _F.RED
NAME = R + _B.RED
Bd = R + _F.CYAN
PATH = R + _F.YELLOW
TITLE = R + _F.LIGHTMAGENTA_EX
AUTHORS = R + _F.MAGENTA
prefix = Bd + '| ' + R
comment = ( s + '\n' + prefix for s in e.comment.split('\n') ) if e.comment != '' else ()
return ''.join( (
Bd, '--------------------------------------------------------------------------------', R, '\n',
prefix, ID, 'ID : ', '{:>5}'.format(e.ID or ''), R, ' '*47, P, '{:>20}'.format(e.priority), R, '\n',
prefix, NAME, e.name, R, '\n',
prefix, PATH, e.pathstr(), R, '\n',
*( (
prefix, TITLE, e.bibtex.get('title', ''), R, '\n',
prefix, AUTHORS, e.bibtex.get('author', ''), R, '\n',
) if e.bibtex else (
prefix, TITLE, '<No Bibtex>', R, '\n')
),
prefix, (R + ' ').join(''.join((TAG, '#', t)) for t in e.tags), '\n',
prefix, R, *comment , '\n',
Bd, '--------------------------------------------------------------------------------', R, '\n',
))
def _run(self, marc_record):
if self.show_titles:
print('{}\t{}'.format(marc_record.id, marc_record.title()).encode('utf-8'))
else:
print(marc_record.id.encode('utf-8'))
if self.show_subjects:
for field in marc_record.fields:
if field.tag.startswith('6'):
if field.sf('2') == self.source.sf['2']:
print(' {}{}{}'.format(Fore.YELLOW, field, Style.RESET_ALL).encode('utf-8'))
else:
print(' {}{}{}'.format(Fore.CYAN, field, Style.RESET_ALL).encode('utf-8'))
return 0 # No, we didn't modify anything