def parse_default(field, ftype, fdefault):
if not (ftype == 'bool' and fdefault == 'true'):
try:
fdefault = literal_eval(fdefault.rstrip('LDF'))
except (ValueError, SyntaxError):
fdefault = None
if type(fdefault) is int:
if ftype[0] != 'u' and ftype[:5] != 'fixed':
if fdefault >> 63:
fdefault = c_long(fdefault).value
elif fdefault >> 31 and ftype[-2:] != '64':
fdefault = c_int(fdefault).value
else:
fdefault &= (1 << int(ftype[-2:])) - 1
if ftype == 'float' and abs(fdefault) >> 23:
fdefault = unpack('=f', pack('=i', fdefault))[0]
elif ftype == 'double' and abs(fdefault) >> 52:
fdefault = unpack('=d', pack('=q', fdefault))[0]
if fdefault:
field.default_value = str(fdefault)
python类literal_eval()的实例源码
def get_default(default, setting_type):
if isinstance(default, str):
if setting_type[0] == 'str_t':
return default
else:
try:
return ast.literal_eval(default)
except:
if setting_type[0] == 'bool_t':
return default.lower()=='true'
else:
try:
return setting_type[1](default.split(' ')[0])
except:
return
elif default is None:
return setting_type[1]()
else:
return default
def read_settings(self):
''' Reads the settings from the digital_ocean.ini file '''
config = ConfigParser.SafeConfigParser()
config.read(os.path.dirname(os.path.realpath(__file__)) + '/digital_ocean.ini')
# Credentials
if config.has_option('digital_ocean', 'api_token'):
self.api_token = config.get('digital_ocean', 'api_token')
# Cache related
if config.has_option('digital_ocean', 'cache_path'):
self.cache_path = config.get('digital_ocean', 'cache_path')
if config.has_option('digital_ocean', 'cache_max_age'):
self.cache_max_age = config.getint('digital_ocean', 'cache_max_age')
# Private IP Address
if config.has_option('digital_ocean', 'use_private_network'):
self.use_private_network = config.getboolean('digital_ocean', 'use_private_network')
# Group variables
if config.has_option('digital_ocean', 'group_variables'):
self.group_variables = ast.literal_eval(config.get('digital_ocean', 'group_variables'))
def get_upload_url(self,session):
"""Summary
Args:
session (TYPE): Description
Returns:
TYPE: Description
"""
r = session.get ( DataManagement.__APPSPOT_URL )
if r.text.startswith ( '\n<!DOCTYPE html>' ):
self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, '
'refresh the authentication token. If it did not work report a problem. '
'They might have changed something in the Matrix.' )
sys.exit ( 1 )
elif r.text.startswith ( '<HTML>' ):
self.logger.debug ( 'Redirecting to upload URL' )
r = session.get ( DataManagement.__APPSPOT_URL )
d = ast.literal_eval ( r.text )
return d['url']
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert subkey in d
d = d[subkey]
subkey = key_list[-1]
assert subkey in d
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def checkPlugins(self):
response = self._send("GET", "pluginManager/api/python?depth=1")
if response.status != 200:
print("Warning: could not verify plugins: HTTP error: {} {}"
.format(response.status, response.reason),
file=sys.stderr)
response.read()
else:
try:
plugins = ast.literal_eval(response.read().decode("utf8"))["plugins"]
required = set(requiredPlugins.keys())
for p in plugins:
if p["shortName"] not in required: continue
if not p["active"] or not p["enabled"]:
raise BuildError("Plugin not enabled: " + requiredPlugins[p["shortName"]])
required.remove(p["shortName"])
if required:
raise BuildError("Missing plugin(s): " + ", ".join(
requiredPlugins[p] for p in required))
except BuildError:
raise
except:
raise BuildError("Malformed Jenkins response while checking plugins!")
def collect_moves(self, reader, name):
Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
if name.split('-')[-1].isdigit():
for row in reader:
if name == row[0]:
pokemon = name.split('-')[0].title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
else:
for row in reader:
if name in row[0]:
pokemon = name.title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
"""Parse exception details."""
try:
return func(*args, **kwargs)
except XenAPI.Failure as exc:
LOG.debug("Got exception: %s", exc)
if (len(exc.details) == 4 and
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exc.details[2] == 'Failure'):
params = None
try:
params = ast.literal_eval(exc.details[3])
except Exception:
raise exc
raise XenAPI.Failure(params)
else:
raise
except xmlrpclib.ProtocolError as exc:
LOG.debug("Got exception: %s", exc)
raise
def _int_handler(self):
"""
Handler for int values. Parses QLineEdit.
"""
current = self._select_widget.text()
try:
# Make sure this scans as a string
if type(ast.literal_eval(current)) != int:
raise ValueError
new_value = int(current)
color = "#FFFFFF"
self._current_value = new_value
self._valid = True
except ValueError:
color = "#FFB6C1"
self._valid = False
self._select_widget.setStyleSheet("QLineEdit {{ background-color: {} }}".format(color))
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert d.has_key(subkey)
d = d[subkey]
subkey = key_list[-1]
assert d.has_key(subkey)
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def fixSettingDictLoadedFromResultsDf(setting_dict):
if 'hidden_layers' in setting_dict.keys():
if type(setting_dict['hidden_layers']) == str:
setting_dict['hidden_layers'] = ast.literal_eval(setting_dict['hidden_layers'])
if 'optimizer' in setting_dict.keys():
if 'GradientDescent' in setting_dict['optimizer']:
setting_dict['optimizer'] = tf.train.GradientDescentOptimizer
elif 'Adagrad' in setting_dict['optimizer']:
setting_dict['optimizer'] = tf.train.AdagradOptimizer
else:
setting_dict['optimizer'] = tf.train.AdamOptimizer
for setting in ['batch_size','decay_steps']:
if setting in setting_dict.keys():
setting_dict[setting] = int(setting_dict[setting])
return setting_dict
def coalesce_ordered_dict(dict_str):
"""Return an :obj:collections.OrderedDict from ``dict_str``.
>>> config = configparser.ConfigParser()
>>> config.add_section('x')
>>> config.set('x', 'y', "{'a': 0}\\n{'b': 1}")
>>> od = coalesce_ordered_dict(config['x']['y'])
>>> assert isinstance(od, OrderedDict)
>>> assert od.popitem() == ('b', 1)
>>> assert od.popitem() == ('a', 0)
:param str dict_str: A string of newline-delimited dictionary
literals.
"""
ordered_dict = OrderedDict()
[ordered_dict.update(literal_eval(i))
for i in dict_str.splitlines()]
return ordered_dict
def aget(self, **kw):
session = aiohttp.ClientSession()
url = URL(self.url)
if kw:
url = url.with_query(**kw)
logger.debug("GET %s", url)
try:
response = yield from session.get(url, timeout=10)
payload = yield from response.read()
finally:
yield from session.close()
response.raise_for_status()
payload = payload.decode('utf-8')
if response.content_type == 'text/x-python':
payload = ast.literal_eval(payload)
return Payload.factory(response.status, response.headers, payload)
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert d.has_key(subkey)
d = d[subkey]
subkey = key_list[-1]
assert d.has_key(subkey)
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert subkey in d
d = d[subkey]
subkey = key_list[-1]
assert subkey in d
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert isinstance(value, type(d[subkey])), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def send(self, *args, **kwargs):
# In raven<6, args = (data, headers).
# In raven 6.x args = (url, data, headers)
if len(args) == 2:
data, _ = args
elif len(args) == 3:
_, data, _ = args
else:
raise Exception('raven Transport.send api seems to have changed')
raw = json.loads(zlib.decompress(data).decode('utf8'))
# to make asserting easier, parse json strings into python strings
for k, v in list(raw['extra'].items()):
try:
val = ast.literal_eval(v)
raw['extra'][k] = val
except Exception:
pass
self.messages.append(raw)
def _callback(cls, cron):
pool = Pool()
Config = pool.get('ir.configuration')
try:
args = (cron.args or []) and literal_eval(cron.args)
Model = pool.get(cron.model)
with Transaction().set_user(cron.user.id):
getattr(Model, cron.function)(*args)
except Exception:
Transaction().cursor.rollback()
req_user = cron.request_user
language = (req_user.language.code if req_user.language
else Config.get_language())
with Transaction().set_user(cron.user.id), \
Transaction().set_context(language=language):
cls.send_error_message(cron)
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert d.has_key(subkey)
d = d[subkey]
subkey = key_list[-1]
assert d.has_key(subkey)
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def set_custom_user_vars(opt_key, settings, args):
"""Set custom user configuration variables"""
custom_vars = args.get(opt_key)
if not custom_vars:
return
for var in custom_vars:
# parse key-value pair
var = var.strip()
eq_pos = var.find('=')
if eq_pos < 1 or eq_pos > len(var) - 2:
raise RuntimeError('Expected "key=value" for --conf-var command-line argument; got "{}"'.format(var))
key, value = var[:eq_pos].strip(), var[eq_pos + 1:].strip()
# interpret type of value
try:
settings[key] = ast.literal_eval(value)
except:
settings[key] = value
def validate_custom_val(self, val):
""" Pass in a desired custom value and ensure it is valid.
Probaly should check type, etc, but let's assume fine for the moment.
"""
self.ensure_one()
if self.custom_type in ('int', 'float'):
minv = self.min_val
maxv = self.max_val
val = literal_eval(val)
if minv and maxv and (val < minv or val > maxv):
raise ValidationError(
_("Selected custom value '%s' must be between %s and %s"
% (self.name, self.min_val, self.max_val))
)
elif minv and val < minv:
raise ValidationError(
_("Selected custom value '%s' must be at least %s" %
(self.name, self.min_val))
)
elif maxv and val > maxv:
raise ValidationError(
_("Selected custom value '%s' must be lower than %s" %
(self.name, self.max_val + 1))
)
def _parse_line(self, line):
line = line.decode("latin1")
line = line.rstrip()
l = len(line)
line = line.lstrip()
if not line or line.startswith("#"):
return None
indent = (l - len(line)) // 2
parts = line.split("\t")
command = parts[0]
args = {}
for i in xrange(1, len(parts)):
n, v = parts[i].split("=")
args[n] = literal_eval(v)
return (indent, command, args)
def create_and_save_model(self):
model_params = self.job_args['model']['params']
params = base64.b64decode(model_params)
list_params = literal_eval(params)
dataset_format = self.job_args.get('dataset_format')
if dataset_format == 'libsvm':
self.model = self.controller.create_model_libsvm(self.data,
list_params)
elif dataset_format == 'text':
self.model = self.controller.create_model_text(self.data,
list_params)
else:
self.model = self.controller.create_model(self.data, list_params)
if self.model:
self.model.save(self.context, self.modelpath)
def extractJson(message: str) -> Optional[Mapping]:
""" Returns the first json blob found in the string if any are found """
# First pass that relies on it being in a code block
for match in re.findall('\`\s*?{[\s\S]*?}\s*?\`', message):
potJson = match[1:-1].strip()
try:
return json.loads(potJson)
except ValueError:
pass
# Second pass doesn't require the code block, but it still uses the json parser
for match in re.findall('{[\s\S]*}', message):
try:
return json.loads(match)
except ValueError:
pass
# Third pass uses ast.literal_eval (which IS safe-it only evals literals) and some replacements to handle malformed
# JSON. This is a horrible JSON parser and will incorrectly parse certain types of JSON, but it is far more
# accepting so we might as well try doing this
for match in re.findall('{[\s\S]*}', message):
try:
return fuzzyJsonParse(match)
except (SyntaxError, ValueError):
pass
return None
def __getCheatsheets(self):
server = self.serverManager.getDownloadServer(self.localServer)
url = server.url
if not url[-1] == '/':
url = url + '/'
url = url + self.jsonServerLocation
data = requests.get(url).text
data = ast.literal_eval(data)
cheatsheets = []
icon = self.__getIconWithName('cheatsheet')
for k,d in data['cheatsheets'].items():
c = Cheatsheet()
c.name = d['name']
c.aliases = d['aliases']
c.globalversion = data['global_version']
c.version = d['version']
c.image = icon
c.onlineid = k
c.status = 'online'
cheatsheets.append(c)
return sorted(cheatsheets, key=lambda x: x.name.lower())
def test_view_with_args(self):
"""Dedicated test to ensure that views with args work
"""
view_name = 'thumber_tests:args_example'
args = ('foobar',)
path = reverse(view_name, args=args)
http_referer = 'http://example.com{0}'.format(path)
response = self.client.get(path, follow=True)
self.assertContains(response, 'Example Template!', status_code=200)
self.assertContains(response, 'Was this service useful?')
# Post with thumber_token=ajax for a JSON response
data = {'satisfied': 'True', 'thumber_token': 'ajax'}
response = self.client.post(path, data, HTTP_REFERER=http_referer)
# Check a Feedback model was created
self.assertEquals(Feedback.objects.count(), 1)
feedback = Feedback.objects.all()[0]
view_args = ast.literal_eval(feedback.view_args)
self.assertEquals(view_args[0], args)
self.assertEquals(view_args[1], {})
def test_view_with_kwargs(self):
"""Dedicated test to ensure that views with kwargs work, and the kwargs get stored in the model
"""
view_name = 'thumber_tests:kwargs_example'
kwargs = {'slug': 'foobar'}
path = reverse(view_name, kwargs=kwargs)
http_referer = 'http://example.com{0}'.format(path)
response = self.client.get(path, follow=True)
self.assertContains(response, 'Example Template!', status_code=200)
self.assertContains(response, 'Was this service useful?')
# Post with thumber_token=ajax for a JSON response
data = {'satisfied': 'True', 'thumber_token': 'ajax'}
response = self.client.post(path, data, HTTP_REFERER=http_referer)
# Check a Feedback model was created
self.assertEquals(Feedback.objects.count(), 1)
feedback = Feedback.objects.all()[0]
view_args = ast.literal_eval(feedback.view_args)
self.assertEquals(view_args[1], kwargs)
self.assertEquals(view_args[0], ())
def ner(request):
"""This functionality calls the run_ner() functionality to tag the message .
It is called through api call
Attributes:
request: url parameters
"""
message = request.GET.get('message')
entities_data = request.GET.get('entities', [])
entities = []
if entities_data:
entities = ast.literal_eval(entities_data)
ner_logger.debug('Start: %s -- %s' % (message, entities))
output = run_ner(entities=entities, message=message)
ner_logger.debug('Finished %s : %s ' % (message, output))
return HttpResponse(json.dumps({'data': output}), content_type='application/json')
def __creatBlocks(self):
"""
Second part of parsing. Find blocks and creat a list.
"""
w = list(zip(self.lines, self.indentationList))
self.blocks, indentation, level = "[", 0, 0
for i in w:
if i[1] > indentation:
level = level + 1
self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"'
elif i[1] == 0:
if len(self.blocks) > 1:
self.blocks += "]" * (level) + ','
self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"'
level = 0
elif i[1] < indentation:
if w.index(i) != len(w):
self.blocks += "]" + "," + '"' + \
urllib.parse.quote_plus(i[0]) + '"'
level += -1
elif i[1] == indentation:
self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"'
indentation = i[1]
self.blocks += "]" * (level + 1)
self.blocks = ast.literal_eval(self.blocks)
def cfg_from_list(cfg_list):
"""Set config keys via list (e.g., from command line)."""
from ast import literal_eval
assert len(cfg_list) % 2 == 0
for k, v in zip(cfg_list[0::2], cfg_list[1::2]):
key_list = k.split('.')
d = __C
for subkey in key_list[:-1]:
assert d.has_key(subkey)
d = d[subkey]
subkey = key_list[-1]
assert d.has_key(subkey)
try:
value = literal_eval(v)
except:
# handle the case when v is a string literal
value = v
assert type(value) == type(d[subkey]), \
'type {} does not match original type {}'.format(
type(value), type(d[subkey]))
d[subkey] = value
def working_directory(directory):
owd = os.getcwd()
try:
os.chdir(directory)
yield directory
finally:
os.chdir(owd)
# def extractXML(project_dir, apk_location):
# """
# Parses AndroidManifest file and returns a dictionary object
# :param project_dir: Project Location
# :param apk_location: Apk location
# :return: Parsed AndroidManifest Dictionary
# """
# with working_directory(project_dir):
# subprocess.check_output(["./gradlew", "assembleRelease"])
# with working_directory("/tmp"):
# subprocess.call(["apktool", "d", apk_location])
# with working_directory("/tmp" + "/app-release/"):
# with open("AndroidManifest.xml") as fd:
# obj_file = xmltodict.parse(fd.read())
# return ast.literal_eval(json.dumps(obj_file))