def extractXML(apk_location,config_location):
"""
@param project_dir:
@param apk_location:
@return:
"""
with working_directory("/tmp"):
subprocess.call(["apktool", "d", apk_location])
config = ConfigParser.ConfigParser()
config.read(config_location)
app_name = "app-external-release"
temp = config.get("APP_NAME","app_flavor_name")
if temp != None:
app_name = temp
with working_directory("/tmp/" + app_name):
with open("AndroidManifest.xml") as fd:
obj_file = xmltodict.parse(fd.read())
return ast.literal_eval(json.dumps(obj_file))
python类literal_eval()的实例源码
def main():
parser = optparse.OptionParser()
build_utils.AddDepfileOption(parser)
parser.add_option('--inputs', help='List of files to archive.')
parser.add_option('--output', help='Path to output archive.')
parser.add_option('--base-dir',
help='If provided, the paths in the archive will be '
'relative to this directory', default='.')
options, _ = parser.parse_args()
inputs = ast.literal_eval(options.inputs)
output = options.output
base_dir = options.base_dir
build_utils.DoZip(inputs, output, base_dir)
if options.depfile:
build_utils.WriteDepfile(options.depfile, output)
argument_datatype_class.py 文件源码
项目:warriorframework
作者: warriorframework
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def convert_string_to_datatype(self):
"""Converts an input string to a python datatype """
err_msg = "User input argument value {0} does"\
"not match python syntax for '{1}'".format(self.arg_value, self.datatype)
info_msg = "Warrior FW will handle user input argument value as string (default)"
try:
result = ast.literal_eval(self.arg_value)
except Exception:
print '\n'
print_error(err_msg)
print_info(info_msg)
print '\n'
print_error('unexpected error: {0}'.format(traceback.format_exc()))
result = self.arg_value
else:
if not isinstance(result, self.datatype):
print '\n'
print_error(err_msg)
print_info(info_msg)
print '\n'
result = self.arg_value
return result
def main(unused_argv=None):
# Load image
image = np.expand_dims(image_utils.load_np_image(
os.path.expanduser(FLAGS.input_image)), 0)
output_dir = os.path.expanduser(FLAGS.output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
which_styles = ast.literal_eval(FLAGS.which_styles)
if isinstance(which_styles, list):
_multiple_images(image, which_styles, output_dir)
elif isinstance(which_styles, dict):
_multiple_styles(image, which_styles, output_dir)
else:
raise ValueError('--which_styles must be either a list of style indexes '
'or a dictionary mapping style indexes to weights.')
def convert_param_dict_for_use(self, setting_dict):
"""When loading rows from a saved results df in csv format, some
of the settings may end up being converted to a string representation
and need to be converted back to actual numbers and objects.
May need to be overwritten in child class."""
if 'architecture' in setting_dict.keys():
if type(setting_dict['architecture']) == str:
setting_dict['architecture'] = ast.literal_eval(setting_dict['architecture'])
if 'optimizer' in setting_dict.keys():
if 'GradientDescent' in setting_dict['optimizer']:
setting_dict['optimizer'] = tf.train.GradientDescentOptimizer
elif 'Adagrad' in setting_dict['optimizer']:
setting_dict['optimizer'] = tf.train.AdagradOptimizer
else:
setting_dict['optimizer'] = tf.train.AdamOptimizer
if 'batch_size' in setting_dict.keys():
setting_dict['batch_size'] = int(setting_dict['batch_size'])
print "batch size just got changed in convert_param_dict. It's an", type(setting_dict['batch_size'])
return setting_dict
def read(self,outname):
if ebf.containsKey(outname,'/names0'):
x=ebf.read(outname,'/names0')
self.names0=[str(temp) for temp in x]
self.chain=ebf.read(outname,'/chain/')
else:
self.names0=[]
if ebf.containsKey(outname,'/names1'):
x=ebf.read(outname,'/names1')
self.names1=[str(temp) for temp in x]
self.mu=ebf.read(outname,'/mu/')
self.sigma=ebf.read(outname,'/sigma/')
else:
self.names1=[]
self.descr=ast.literal_eval(ebf.read(outname,'/descr')[0])
def vfam_to_krona(self, vfam_file):
vfam_dic = defaultdict(int)
families_dic = {}
genera_dic = {}
with open(vfam_file, 'r') as vfam_file:
vfam_file.readline() # get rid of the header
for line in vfam_file:
splitted_line = line.split('\t')
vfam = splitted_line[1]
vfam_dic[vfam] += 1
families = ast.literal_eval(splitted_line[3]) # safe eval of dict
families_dic[vfam] = families
genera = ast.literal_eval(splitted_line[4])
genera_dic[vfam] = genera
with open(self.krona_in, 'w') as o:
for vfam, n_reads in vfam_dic.items():
fam_total = sum(families_dic[vfam].values())
for fam, fam_prop in families_dic[vfam].items():
gen_total = sum(genera_dic[vfam].values())
for genera, gen_prop in genera_dic[vfam].items():
n = (n_reads * (fam_prop / fam_total)) * (gen_prop / gen_total)
o.write('%.3f\t%s\t%s\t%s\n' % (n, fam, vfam, genera))
def show_connections():
"""Get list of network connections"""
connections = subprocess.check_output(
"/usr/share/harbour-infraview/helper/infraview-helper",
shell=True)
dlist = []
netstat_keys = [
"udp_tcp",
"ConnID",
"UID",
"localhost",
"localport",
"remotehost",
"remoteport",
"conn_state",
"pid",
"exe_name"]
for line in connections.splitlines():
x = ast.literal_eval(line.decode("utf-8"))
key_value = zip(netstat_keys, x)
key_value_dict = dict(key_value)
dlist.append(dict(key_value_dict))
return dlist
def getAdmin(ID=logChannelID):
raw = ast.literal_eval(str(await bot.api_call("getChatAdministrators",chat_id=ID)))
i=0
adminDict = []
while i < len(raw['result']):
if 'last_name' in raw['result'][i]['user']:
adminDict.append({
'id':raw['result'][i]['user']['id'],
'username':raw['result'][i]['user']['username'],
'first_name':raw['result'][i]['user']['first_name'],
'last_name':raw['result'][i]['user']['last_name']})
else:
adminDict.append({
'id':raw['result'][i]['user']['id'],
'username':raw['result'][i]['user']['username'],
'first_name':raw['result'][i]['user']['first_name'],
'last_name':''})
i += 1
return adminDict
def read_files_worker(self, directory, queue):
""" Read all files in a directory and output to the queue. First line
of every file should contain the index. Worker separates first line
and parses to dict. Tuple of index and text is added to queue.
:directory: Source directory containing files
:queue: Queue to add the tuples to
"""
for file in os.scandir(directory):
if file.is_file():
with open(file.path, 'r', errors='replace') as f:
text = f.readlines()
try:
index = literal_eval(text.pop(0).strip())
queue.put((index, '\n'.join(text)), block=True)
except IndexError:
LOGGER.error('File {0} is not classifyable'
.format(file.path))
LOGGER.info('File reading worker done.')
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
# assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert subkey in d
d = d[subkey]
subkey = key_list[-1]
assert subkey in d
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert d.has_key(subkey)
d = d[subkey]
subkey = key_list[-1]
assert d.has_key(subkey)
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def package_meta():
"""Read __init__.py for global package metadata.
Do this without importing the package.
"""
_version_re = re.compile(r'__version__\s+=\s+(.*)')
_url_re = re.compile(r'__url__\s+=\s+(.*)')
_license_re = re.compile(r'__license__\s+=\s+(.*)')
with open('jetstream/__init__.py', 'rb') as ffinit:
initcontent = ffinit.read()
version = str(ast.literal_eval(_version_re.search(
initcontent.decode('utf-8')).group(1)))
url = str(ast.literal_eval(_url_re.search(
initcontent.decode('utf-8')).group(1)))
licencia = str(ast.literal_eval(_license_re.search(
initcontent.decode('utf-8')).group(1)))
return {
'version': version,
'license': licencia,
'url': url,
}
def api(request):
hostname = None
json_data = None
hash_id = None
data = "this url receives get requests for updating the gpu info to the dashboard"
context = {'data': data}
if request.GET:
# for testing purposes only
# if request.GET['test_data']:
# Test.objects.update_or_create(request.GET)
# break
if request.GET['hash']:
hash_id = request.GET['hash']
if request.GET['url_style']:
json_data = ast.literal_eval(request.GET['url_style'])
if request.GET['hostname']:
hostname = request.GET['hostname']
json_data['host'] = hostname
Miner_Info.objects.filter(host=json_data['host']).update_or_create(json_data)
return render(request, 'home/api.html', context)
def get_map(chat, **kwargs):
redis = kwargs.get('redis')
args = kwargs.get('info').get('args')
active_dungeon = await redis.hget(kwargs.get('info').get('username'), 'active_dungeon')
if len(args) == 2:
name, num = args
if name in Dungeon.ACRONYMS:
active_dungeon = Dungeon.ACRONYMS.get(name)
else:
return chat.reply(f"Errore!\nLa sigla dungeon che mi ha mandato non esiste!\n"
f"Opzioni valide: {', '.join(Dungeon.ACRONYMS.keys())}")
if is_number(num):
active_dungeon += ' ' + num
else:
return chat.reply(f"Errore!\n{num} non è un numero!")
elif not active_dungeon:
return await chat.reply(ErrorReply.NO_ACTIVE_DUNGEONS)
map_string = await redis.get(f'map:{active_dungeon}')
if not map_string:
return await chat.reply('La mappa del dungeon che hai richiesto non esiste!')
dungeon_map = literal_eval(map_string)[:5]
printable_map = active_dungeon + '\n\n' + ''.join([
Dungeon.stringify_room(i, *level, kwargs.get('info').get('emojis')) for i, level in enumerate(dungeon_map, 1)])
markup = Dungeon.map_directions(active_dungeon, 0, 5, kwargs.get('info').get('username'))
return await chat.send_text(printable_map, reply_markup=markup, parse_mode='Markdown')
def next_room(chat, **kwargs):
redis = kwargs.get('redis')
active_dungeon = kwargs.get('active_dungeon')
info = kwargs.get('info')
sender = info.get('username')
arg = info.get('args')
try:
position = int(await redis.hget(sender, 'position')) + 1 if not arg else int(arg[0])
except ValueError:
return chat.reply("Errore!\n L'argomento del comando deve essere un numero!")
if position > Dungeon.length(active_dungeon):
return await chat.reply('Errore!\n La stanza richiesta è maggiore ')
dungeon_map = literal_eval(await redis.get(f"map:{active_dungeon}"))
await redis.hset(sender, 'position', position)
return await chat.reply(Dungeon.stringify_room(
position,
*dungeon_map[position-1],
info.get('emojis')), parse_mode='Markdown')
def stats_choice_phase2(chat, **kwargs):
dungeon, num = kwargs.get('match').group(1).split(':')
redis = kwargs.get('redis')
dungeon_map = literal_eval(await redis.get(f'map:{dungeon} {num}'))
counter = defaultdict(int)
for level in dungeon_map:
for room in level:
counter[room] += 1
tot_rooms = len(dungeon_map) * 3
# dungeon_deadline = await redis.hget('dungeon_deadlines', f'{dungeon} {num}')
percent_completed = round(((tot_rooms - (counter.get('') or 0)) / tot_rooms) * 100, 2)
reply = f"{dungeon} {num}\nPercentuale completamento {percent_completed}%\nMonete: {counter.get('monete') or 0}\n" \
f"Spade: {counter.get('spada') or 0}\nAsce: {counter.get('ascia') or 0}\n" \
f"Aiuta: {counter.get('aiuta') or 0}\nMattonelle: {counter.get('mattonella') or 0}\n" \
f"Stanze vuote: {counter.get('stanza vuota') or 0}\n" \
f"Fontana: {counter.get('fontana') or 0}\nIncisioni: {counter.get('incisioni') or 0}"
await chat.send_text(reply)
digital_ocean.py 文件源码
项目:Learning-Ansible-2-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def read_settings(self):
''' Reads the settings from the digital_ocean.ini file '''
config = ConfigParser.SafeConfigParser()
config.read(os.path.dirname(os.path.realpath(__file__)) + '/digital_ocean.ini')
# Credentials
if config.has_option('digital_ocean', 'api_token'):
self.api_token = config.get('digital_ocean', 'api_token')
# Cache related
if config.has_option('digital_ocean', 'cache_path'):
self.cache_path = config.get('digital_ocean', 'cache_path')
if config.has_option('digital_ocean', 'cache_max_age'):
self.cache_max_age = config.getint('digital_ocean', 'cache_max_age')
# Private IP Address
if config.has_option('digital_ocean', 'use_private_network'):
self.use_private_network = config.get('digital_ocean', 'use_private_network')
# Group variables
if config.has_option('digital_ocean', 'group_variables'):
self.group_variables = ast.literal_eval(config.get('digital_ocean', 'group_variables'))
def _pop_token(self, lineno: int, token_value: str) -> Token:
tokensline = self._lines[lineno - 1]
# Pop the first token with the same name in the same line
for t in tokensline:
if t.name != 'STRING':
line_value = t.value
else:
if t.value[0] == 'f' and t.value[1] in ('"', "'"):
# fstring: token identify as STRING but they parse into the AST as a
# collection of nodes so the token_value is different. To find the
# real token position we'll search inside the fstring token value.
tok_subpos = t.value.find(token_value)
if tok_subpos != -1:
# We don't remove the fstring token from the line in this case; other
# nodes could match different parts of it
newtok = deepcopy(t)
newtok.start.col = t.start.col + tok_subpos
return newtok
raise TokenNotFoundException("Could not find token '{}' inside f-string '{}'"
.format(token_value, t.value))
else:
# normal string; they include the single or double quotes so we liteval
line_value = literal_eval(t.value)
if str(line_value) == str(token_value):
tokensline.remove(t)
return t
raise TokenNotFoundException("Token named '{}' not found in line {}"
.format(token_value, lineno))
def read_piksi_settings_info(self):
self.piksi_settings_info = tree()
settings_info = yaml.load(open(os.path.join(PKG_PATH, 'piksi_settings.yaml'), 'r'))
for s in settings_info:
if s['type'].lower() == 'boolean':
s['parser'] = lambda x: x.lower()=='true'
elif s['type'].lower() in ('float', 'double','int'):
s['parser'] = ast.literal_eval
elif s['type'] == 'enum':
s['parser'] = s['enum'].index
else:
s['parser'] = lambda x: x
self.piksi_settings_info[s['group']][s['name']] = s