def fetch_valid_json(cmd):
"""Assert stdout contains valid JSON
:param cmd: program and arguments
:type cmd: [str]
:returns: parsed JSON AST
"""
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 0
assert stderr == b''
try:
return json.loads(stdout.decode('utf-8'))
except json.JSONDecodeError:
error_text = 'Command "{}" returned invalid JSON'.format(' '.join(cmd))
raise Exception(error_text)
python类JSONDecodeError()的实例源码
def do_request(ns):
method = METHODS[ns.command]
destinations = dictify(ns.destinations) if hasattr(ns, 'destinations') else None
json = jsonify(ns, destinations=destinations if destinations else None)
validate(ns.api_url, throw=True)
response = requests.request(method, ns.api_url / 'autocert', json=json)
status = response.status_code
try:
json = response.json()
output(json)
except JSONDecodeError as jde:
print('status =', status)
print('JSONDecodeError =', jde)
print('text =', response.text)
return -1
return status
def read(filepath):
path = None
try:
if os.path.isfile(os.path.expanduser(filepath)):
path = os.path.expanduser(filepath)
elif os.path.isfile('./.keystorerc'):
path = './.keystorerc'
else:
raise OSError('''The config file .keystorerc is not found in home or local working directory.
Please refer to https://pypi.python.org/pypi/keystore for setting up a .keystorerc file.''')
with open(path) as f:
conf = json.loads(f.read())
return conf
except OSError as err:
print('Unable to open config: {}'.format(err), file=sys.stderr)
return None
except json.JSONDecodeError as err:
print('Unable to parse config: {}'.format(err), file=sys.stderr)
return None
def show_response(self, response):
"""Print an HTTP Response."""
if response.headers.get('Content-Type', None) == 'application/json':
try:
body = json.dumps(response.json(), indent=2)
except json.JSONDecodeError:
body = response.text
else:
body = response.text
http_txt = self.HTTP_TPL.substitute(
http_version=str(float(response.raw.version) / 10),
status_code=response.status_code,
reason=response.reason,
headers=self.key_value_pairs(response.headers),
body=body,
)
return highlight(http_txt, self.http_lexer, self.formatter)
def populate_user_blacklist(self):
for user in self.user_blacklist:
user_id_url = self.url_user_detail % (user)
info = self.s.get(user_id_url)
# prevent error if 'Account of user was deleted or link is invalid
from json import JSONDecodeError
try:
all_data = json.loads(info.text)
except JSONDecodeError as e:
self.write_log('Account of user %s was deleted or link is '
'invalid' % (user))
else:
# prevent exception if user have no media
id_user = all_data['user']['id']
# Update the user_name with the user_id
self.user_blacklist[user] = id_user
log_string = "Blacklisted user %s added with ID: %s" % (user,
id_user)
self.write_log(log_string)
time.sleep(5 * random.random())
def load_json(ctx, filename):
if filename is None:
if sys.stdin.isatty():
click.echo(ctx.get_usage())
click.echo("Try `jsoncut --help' for more information.")
sys.exit(0)
else:
filename = '-'
try:
with click.open_file(filename) as file_:
return json.load(file_)
except EnvironmentError as e:
if not sys.stdin.isatty():
sys.stdin.read()
click.echo(exc.default_error_mesg_fmt(e), err=True)
sys.exit(1)
except json.JSONDecodeError as e:
click.echo(exc.default_error_mesg_fmt(e), err=True)
sys.exit(1)
def get_latest_log_entry_for(name,**kwargs):
db = connect_db()
c = db.cursor()
refine_search = ''
if 'successfully_uploaded' == True:
refine_search = 'AND uploaded = 1 '
c.execute('SELECT info_json, uploadable, uploaded FROM log WHERE name=? '+refine_search+'ORDER BY time DESC LIMIT 1',(name,))
try:
entry = c.fetchone()
result = json.loads(entry[0])
uploadable = entry[1]
uploaded = entry[2]
except (json.JSONDecodeError,TypeError):
result = {}
uploadable = None
uploaded = None
return result, uploadable, uploaded
def get_candles_df(self, currency_pair, epoch_start, epoch_end, period=False):
"""
Returns candlestick chart data in pandas dataframe
"""
try:
data = self.get_candles(currency_pair, epoch_start, epoch_end, period)
df = pd.DataFrame(data)
df = df.tail(1)
df['close'] = df['close'].astype(float)
df['volume'] = df['volume'].astype(float)
df['pair'] = currency_pair
return df
except (PoloniexError, JSONDecodeError) as e:
print()
print(colored('!!! Got exception while retrieving polo data:' + str(e) + ', pair: ' + currency_pair, 'red'))
return pd.DataFrame()
def recv(self):
n = 0
self._soc.settimeout(0.1)
try:
got = self._soc.recv()
while 1:
try:
val = json.loads(got)
self._handle_event(val['method'], val['params'])
n += 1
break
except json.JSONDecodeError as e:
self._handle_event(got[:e.pos])
n += 1
got = got[e.pos:]
except websocket.WebSocketTimeoutException:
pass
self._soc.settimeout(None)
return n
def read_json(conn, length):
"""
Reads json request of the given length from the client socket.
Specified length must be available in the socket; otherwise function
will raise BlockingIOError.
:param conn: active client socket to read data from
:param length: length of the json content
:return: dictionary corresponding to received json
:rtype: dict
:raise json.JSONDecodeError:
:raise BlockingIOError:
"""
buffer = io.BytesIO()
while length > 0:
chunk = conn.recv(min(2048, length))
length -= buffer.write(chunk)
content = buffer.getvalue().decode('utf-8')
if not content:
return None
else:
return json.loads(content)
def parse_sofa_message(message):
match = SOFA_REGEX.match(message)
if not match:
raise SyntaxError("Invalid SOFA message")
body = match.group('json')
try:
body = json.loads(body)
except json.JSONDecodeError:
raise SyntaxError("Invalid SOFA message: body is not valid json")
type = match.group('type').lower()
if type not in VALID_SOFA_TYPES:
raise SyntaxError("Invalid SOFA type")
if type not in IMPLEMENTED_SOFA_TYPES:
raise NotImplementedError("SOFA type '{}' has not been implemented yet".format(match.group('type')))
try:
return IMPLEMENTED_SOFA_TYPES[type](**body)
except TypeError:
raise SyntaxError("Invalid SOFA message: body contains unexpected fields")
def __call__(self, request):
if isinstance(request, (bytes, str)):
try:
request = json_decode(request)
except json.JSONDecodeError:
return _parse_error(request)
# check batch request
if isinstance(request, list):
resp = []
for r in request:
result = await self._handle_single_request(r)
if result:
resp.append(result)
# if all were notifications
if not resp:
return None
return resp
# standard single request
return await self._handle_single_request(request)
def __init__(self, *args, **kwargs):
super_self = super()
super_self.__init__()
super_self.__setattr__('__values__', {})
if len(args) > 0:
json_obj = args[0]
else:
json_obj = {}
if isinstance(json_obj, str):
try:
json_obj = json.loads(json_obj)
except json.JSONDecodeError:
raise TypeError('json object is not valid (dict-like or json string) for conversion to model: {!r}'
.format(json_obj))
self.from_dict(json_obj)
self.from_dict(kwargs)
def json(self, branch='master', filename=''):
"""Retrieve _filename_ from GitLab.
Args:
branch (str): Git Branch to find file.
filename (str): Name of file to retrieve.
Returns:
dict: Decoded JSON.
Raises:
SystemExit: Invalid JSON provided.
"""
file_contents = self.get(branch=branch, filename=filename)
try:
json_dict = json.loads(file_contents)
# TODO: Use json.JSONDecodeError when Python 3.4 has been deprecated
except ValueError as error:
msg = ('"{filename}" appears to be invalid json. '
'Please validate it with http://jsonlint.com. '
'JSON decoder error:\n'
'{error}').format(
filename=filename, error=error)
raise SystemExit(msg)
LOG.debug('JSON object:\n%s', json_dict)
return json_dict
def process_log(file_path):
"""
Expects header as first line in log file. Header begins with comment character '#'. The line is a json string dump of
a dictionary that contains the following keys:
name: str, Task name
maintainer: str, Name of person
tags: list of tags associated with the task. can be empty
properties: list of properties associated with the task. can be empty
run_id: str, a run ID for the task run
timestamp: str, timestamp for the task run
:param file_path:
:return:
"""
# read header
if isinstance(file_path, str):
with open(file_path) as f:
line = f.readline()
else:
line = file_path.readline()
if not line.startswith("#"):
raise ValueError("Expecting header in log file")
try:
metadata = json.loads(line[1:])
if 'timestamp' in metadata:
metadata['timestamp'] = dateutil_parse(metadata['timestamp'])
except JSONDecodeError as e:
metadata = {"name": "", "timestamp": "", "run_id": ""}
df = parse_log(file_path)
return df, metadata
def validate_json(value):
try:
json.loads(value)
except json.JSONDecodeError:
raise ValidationError("Not valid json")
def submit(request):
"""Endpoint for submitting new crawls.
Expects POST['data'] to be populated with data for a single document group."""
# TODO authentication? Secret keys?
# TODO stop processing the documents when submitted; use processing queues
input_raw = request.POST.get('data')
if not input_raw:
return HttpResponseBadRequest('POST["data"] is not set!')
try:
input = json.loads(input_raw)
except json.JSONDecodeError:
return HttpResponseBadRequest('POST["data"] is not valid json')
input_hash = hash.dict_sha1(input)
submitted, created = models.SubmittedData.objects.update_or_create(
sha1=input_hash,
data=input
)
try:
doc, new = process(submitted)
index_data(doc)
except ProcessingInputError as e:
print(e)
return HttpResponseServerError('error: ' + str(e))
response = {
'status': 'ok',
'new': new,
}
return JsonResponse(response)
def method(self, key, **data):
""" Return a result of executing vk's method `method`
Function for special cases only!
This method doesn't process nor errors nor captcha.
"""
url = f"https://api.vk.com/method/{key}?access_token={self.token}&v={VERSION}"
if data is None:
data = {}
if data.get("_replace_nl", True):
for k, v in data.items():
data[k] = v.replace("\n", "<br>")
if "_replace_nl" in data:
del data["_replace_nl"]
async with self.session.post(url, data=data, **self.req_kwargs) as resp:
try:
results = json_iter_parse(await resp.text())
for data in results:
if 'response' in data:
return data['response']
except json.JSONDecodeError:
self.logger.error("Error while executing vk method: vk's response is wrong!")
return False
return False
def _get_page(self, page_number):
data = {
'mid': str(self._mid),
'page': str(page_number),
'_': '1496132105785'
}
pages = 0
fansnumber = 0
fans_ids = ""
try:
url = "http://space.bilibili.com/ajax/friend/GetFansList?" + urlencode(data)
response = requests.get(url)
if response.status_code != 200:
return None
html_cont = response.text
try:
data = json.loads(html_cont)
if data and (data.get('status') is True):
if data and 'data' in data.keys():
if(page_number == 1):
pages = data.get('data').get('pages')
fansnumber = data.get('data').get('results')
for fans in data.get('data').get('list'):
fans_ids = str(fans.get('fid')) + ',' + fans_ids
elif (data.get('data') == "????????"):
pages = 0
fansnumber = 0
except JSONDecodeError:
pass
self._fans_ids = fans_ids + self._fans_ids
return pages, fansnumber
except RequestException:
return self._get_page(page_number)
def _getinfo(self, cont):
aids = ""
tag_ids = ""
tag_names = ""
tag_counts = ""
try:
data = json.loads(cont)
if data and 'data' in data.keys():
for video in data.get('data').get('vlist'):
aids = str(video.get('aid')) + ',' + aids
return aids
except JSONDecodeError:
pass