def tba_get(self, path):
"""Base method for querying the TBA API. Returns the response JSON as a python dict.
:param path: (str) Request path, without the API address prefix (https://www.thebluealliance.com/api/v2/)
:return: A dict parsed from the response from the API.
"""
if self.app_id['X-TBA-App-Id'] == "":
raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')
url_str = 'https://www.thebluealliance.com/api/v2/' + path
r = self.session.get(url_str, headers=self.app_id)
# print(r.url)
tba_txt = r.text
try:
return json.loads(tba_txt)
except json.JSONDecodeError:
print(url_str)
print(tba_txt)
python类JSONDecodeError()的实例源码
def donations_helper():
print("donations_helper")
FecApiObj = FECAPI(FEC_APIKEY)
committees = FecApiObj.get_committees()
PPCampFinObj = CampaignFinanceAPI(ProPublica_APIKEY)
PPCongressApi = CongressAPI(ProPublica_APIKEY)
legislator_index = dict()
legislators = PPCongressApi.list_members('house')["results"][0]["members"]
for legislator in legislators:
name = str(legislator['first_name']) + " " + str(legislator['last_name'])
legislator_index[name] = legislator
legislators = PPCongressApi.list_members('senate')["results"][0]["members"]
for legislator in legislators:
name = str(legislator['first_name']) + " " + str(legislator['last_name'])
legislator_index[name] = legislator
print("starting to iterate through superpacs")
donations = []
count = 0
for committee in committees:
if(2016 in committee['cycles']):
try:
indepExpend = PPCampFinObj.get_indep_expends(str(committee['committee_id']))
for expend in indepExpend["results"]:
try:
#expend fo a particular expenditure
expend['committee_id'] = str(committee['committee_id'])
expend['propublica_candidate_id'] = str(legislator_index[expend['candidate_name']]['id'])
donations.append(expend)
except KeyError:
pass
except JSONDecodeError:
pass
count += 1
return donations
def load(self):
# Prepare + load directory.
super().load()
# Load the files and parse JSON.
parsed_settings = dict()
try:
for file_name in self.files:
file_path = os.path.join(self.directory, file_name)
with open(file_path, 'r') as file_handle:
parsed_settings.update(json.load(file_handle))
except json.JSONDecodeError as e:
raise ImproperlyConfigured(
'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e))
)
# Loop and set in local settings (+ uppercase keys).
for key, value in parsed_settings.items():
self.settings[key.upper()] = value
def from_json(cls, data):
if isinstance(data, cls):
return data
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
raise
d = {}
for k, v in (data or {}).items():
d[cls._toPy.get(k, k)] = v
try:
return cls(**d)
except TypeError:
raise
def execute(self):
try:
self.system.run()
except (ReadTimeout, ConnectionError, JSONDecodeError):
pass
except exceptions.TradingSystemException as e:
curr = datetime.now()
print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e)))
except Exception as e:
curr = datetime.now()
print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args))
traceback.print_exc()
if self.interval:
threading.Timer(self.interval, self.execute).start()
def assertJSON(self, response):
if response.text:
try:
data = json.loads(response.text)
except json.JSONDecodeError:
self.fail("Response data is not JSON.")
else:
reference = "{formatted_data}\n".format(
formatted_data=json.dumps(
data, indent=2, sort_keys=True, separators=(',', ': ')
)
)
self.assertEqual(
reference,
response.text
)
def get_phenolist():
# TODO: should this be memoized?
from .file_utils import common_filepaths
filepath = common_filepaths['phenolist']
try:
with open(os.path.join(filepath)) as f:
phenolist = json.load(f)
except (FileNotFoundError, PermissionError):
raise PheWebError(
"You need a file to define your phenotypes at '{}'.\n".format(filepath) +
"For more information on how to make one, see <https://github.com/statgen/pheweb#3-make-a-list-of-your-phenotypes>")
except json.JSONDecodeError:
print("Your file at '{}' contains invalid json.\n".format(filepath) +
"The error it produced was:")
raise
for pheno in phenolist:
pheno['phenocode'] = urllib.parse.quote_plus(pheno['phenocode'])
return phenolist
def with_json(self, json_dict: dict) -> "Statistic":
body = json.dumps(json_dict, sort_keys=True)
actual_body = self.get_current_request().body.decode("utf-8", errors="skip")
try:
actual_json_dict = json.loads(actual_body)
except json.JSONDecodeError:
requested_time = self._current_request_index + 1
self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
f"But for the {requested_time} time: json was corrupted "
f"{actual_body.__repr__()}.")
return self
actual_body = json.dumps(actual_json_dict, sort_keys=True)
if body != actual_body:
requested_time = self._current_request_index + 1
self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
f"But for the {requested_time} time: json was {actual_body}.")
return self
def _load_stored_result_from_file(self, default=None):
# First, let's attempt to load the existing results json file
try:
with open(self._results_file_path) as fp:
file_data = fp.read()
except IOError:
return default
# Now attempt to deserialize the file... if that fails, save a backup of the file
try:
return json.loads(file_data)
except json.JSONDecodeError as e:
backup_file_name = "corrupted.{}.{}".format(
str(uuid.uuid4())[:8],
self._results_file_name,
)
logger.error(
"Unable to parse file {}: {}, renaming to {} and creating new file".format(
self._results_file_path, e, backup_file_name,
)
)
dst = "{}/{}".format(self._results_directory, backup_file_name)
os.rename(self._results_file_path, dst)
return default
def setup_logging(config_file_path, log_level=logging.INFO):
"""
Logging configuration helper.
:param config_file_path: file path to logging configuration file.
https://docs.python.org/3/library/logging.config.html#object-connections
:param log_level: defaults to logging.INFO
:return: None - access the logger by name as described in the config--or the "root" logger as a backup.
"""
try:
with open(config_file_path, 'rt') as f:
config = json.load(f)
logging.config.dictConfig(config)
except (ValueError, IOError, OSError):
# json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError
logging.basicConfig(log_level=log_level)
logging.root.exception(
"Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: "
"[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
def __init__(self, callResult):
self.success = False
try:
# decode from json if possible
result = callResult.json()
# if there is a status then the response came from the API server
if result['status']:
# so store a copy of the json
self.response = result
# copy the json result code
code = result['status']['code']
# assemble a description of the result
self.text = str(code) + ': ' + result['status']['info']
# if we got a 200 OK then the the json object has the answer data
if code == 200:
self.success = True
else:
# otherwise, assemble a description from the HTTP results
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
except JSONDecodeError:
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
# chat API call
def __init__(self, callResult):
self.success = False
try:
# decode from json if possible
result = callResult.json()
# if there is a status then the response came from the API server
if result['status']:
# so store a copy of the json
self.response = result
# copy the json result code
code = result['status']['code']
# assemble a description of the result
self.text = str(code) + ': ' + result['status']['info']
# if we got a 200 OK then the the json object has the answer data
if code == 200:
self.success = True
else:
# otherwise, assemble a description from the HTTP results
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
except JSONDecodeError:
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
# chat API call
def get_manifest(self, archive):
try:
with ZipFile(archive.temporary_file_path()) as plugin:
print(plugin.namelist())
prefix = self.get_prefix(plugin)
prefix = prefix + '/' if len(prefix) else ''
with plugin.open('{}manifest.json'.format(prefix)) as myfile:
manifest = json.loads(myfile.read())
validate_manifest(manifest)
return manifest
except BadZipFile:
raise ValidationError('Bad .zip format')
except FileNotFoundError:
raise ValidationError('Error with upload, please try again')
except KeyError:
raise ValidationError('No manifest.json found in archive')
except json.JSONDecodeError:
raise ValidationError('Error with manifest.json, bad Json Format')
except avasdk.exceptions.ValidationError as e:
raise ValidationError('Error in manifest.json ({})'.format(e))
def req_json(self, req):
if req.content_length is None or req.content_length == 0:
return None
if req.content_type is not None and req.content_type.lower(
) == 'application/json':
raw_body = req.stream.read(req.content_length or 0)
if raw_body is None:
return None
try:
json_body = json.loads(raw_body.decode('utf-8'))
return json_body
except json.JSONDecodeError as jex:
print(
"Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
self.error(
req.context,
"Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
raise errors.InvalidFormat("%s: Invalid JSON in body: %s" %
(req.path, jex))
else:
raise errors.InvalidFormat("Requires application/json payload")
def _parse_and_update_body(self, handler_def):
"""Parses the request body to JSON."""
if self.request.body:
try:
json_body = json.loads(self.request.body.decode('utf-8'))
except json.JSONDecodeError:
raise BadRequestError(
"Malformed request body. JSON is expected."
)
new_body = json_body
if handler_def.consumes:
try:
new_body = handler_def.consumes.from_json(json_body)
except ValidationError:
# TODO: log warning or error
raise BadRequestError("Bad data structure.")
self.request.body = new_body
def parseMessage(self, msg):
print(msg)
try:
decoded = json.loads(msg)
except json.JSONDecodeError:
return
type = decoded["msgtype"]
if type == "control":
self.handleControl(decoded)
elif type == "sheetdelta":
self.passSheetDelta(decoded)
elif type == "request":
self.handleRequest(decoded)
elif type == "nodedata":
self.passNodedata(decoded)
def discoverWorkers(self):
"""Discover new workers via udp broadcasts"""
rlist, wlist, elist = select([self.discoverysocket], [], [], 0)
if rlist:
received = self.discoverysocket.recvfrom(4096)[0]
discoverydata = {}
try:
discoverydata = json.loads(bytes.decode(received))
except json.JSONDecodeError:
pass
if "ip" in discoverydata and "port" in discoverydata:
if "host" in discoverydata:
name = discoverydata["host"]
else:
name = discoverydata["ip"] + ":" + str(discoverydata["port"])
if name not in self.workers:
treeItem = QTreeWidgetItem(1001) # Type 1000 for Worker Item
treeItem.setText(0, name)
self.treeWidget.addTopLevelItem(treeItem)
self.grabPeriodicInfos() # Grab monitor data
self.workers[name] = Worker(discoverydata, treeItem, nodeDataJar=self.nodeDataJar)
self.workers[name].tick(self.sheetDeltaMemory)
self.workers[name].synchronize()
def _ws_recv_handler(self):
# Given command responses and notifications are all send through the
# same websocket, separate them here, passing command response thanks
# to a Queue.
while True:
raw = await self._websocket.recv()
try:
if isinstance(raw, bytes):
raw = raw.decode()
recv = ejson_loads(raw)
if 'status' in recv:
# Message response
self._resp_queue.put_nowait(recv)
else:
# Event
self._signal_ns.signal(recv['event']).send(recv['sender'])
except (KeyError, TypeError, json.JSONDecodeError):
# Dummy ???
logger.warning('Backend server sent invalid message: %s' % raw)
def handshake(self):
if self.id:
raise HandshakeError('Handshake already done.')
challenge = _generate_challenge()
query = {'handshake': 'challenge', 'challenge': challenge}
yield Effect(EHandshakeSend(ejson_dumps(query)))
raw_resp = yield Effect(EHandshakeRecv())
try:
resp = ejson_loads(raw_resp)
except (TypeError, json.JSONDecodeError):
error = HandshakeError('Invalid challenge response format')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
resp = HandshakeAnswerSchema().load(resp)
claimed_identity = resp['identity']
try:
pubkey = yield Effect(EPubKeyGet(claimed_identity))
pubkey.verify(resp['answer'], challenge.encode())
yield Effect(EHandshakeSend('{"status": "ok", "handshake": "done"}'))
self.id = claimed_identity
except (TypeError, PubKeyNotFound, InvalidSignature):
error = HandshakeError('Invalid signature, challenge or identity')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
def load_string(json_string):
"""Deserialize `json_string` to Python Object. If `json_string` is not a valid json
document, just return `json_string`. It is used in the context of activity results: floto
handles str and JSON serialized results.
Parameters
----------
json_string : any
Returns
-------
obj
"""
try:
j = json.loads(json_string)
except (TypeError, json.JSONDecodeError):
j = json_string
return j
def _json_get(inp):
""" Get a Python object (list or dict) regardless of whether data is passed
as a JSON string, a file path, or is already a python object.
Returns the parsed data, as well as "native" if the data was already a
Python object, "str" if the data was passed as a JSON string, or the path
if the data passed was a file path """
if not (isinstance(inp, dict) or isinstance(inp, list)): # Python object
try: # JSON string
data = json.loads(inp)
dataformat = "str"
except json.JSONDecodeError: # JSON filepath
# Store the filename in the dataformat variable if dataformat is a
# file, because it's just one fewer variable to keep track of
dataformat = inp
with open(inp, encoding="utf-8") as f:
data = json.load(f)
else:
dataformat = "native"
return data, dataformat
def post(self):
request_body = json.dumps(request.get_json())
channel = get_channel_name_from_json(request.get_json())
query_data = json.loads('{}')
try:
# TODO Asnycronous call? ??? ???.
response = ServerComponents().query(request_body, channel)
logging.debug(f"query result : {response}")
query_data['response_code'] = str(response.response_code)
try:
query_data['response'] = json.loads(response.response)
except json.JSONDecodeError as e:
logging.warning("your response is not json, your response(" + str(response.response) + ")")
query_data['response'] = response.response
except _Rendezvous as e:
logging.error(f'Execute Query Error : {e}')
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
# TODO REST ?? ??? ???(?? ??? Response code ??) Extract Method ?? ?? ??? ?? ??? ??
logging.debug("gRPC timeout !!!")
query_data['response_code'] = str(message_code.Response.timeout_exceed)
return query_data
def get(self):
args = ServerComponents().parser.parse_args()
response = ServerComponents().get_transaction(args['hash'], get_channel_name_from_args(args))
tx_data = json.loads('{}')
tx_data['response_code'] = str(response.response_code)
tx_data['data'] = ""
if len(response.data) is not 0:
try:
tx_data['data'] = json.loads(response.data)
except json.JSONDecodeError as e:
logging.warning("your data is not json, your data(" + str(response.data) + ")")
tx_data['data'] = response.data
tx_data['meta'] = ""
if len(response.meta) is not 0:
tx_data['meta'] = json.loads(response.meta)
tx_data['more_info'] = response.more_info
b64_sign = base64.b64encode(response.signature)
tx_data['signature'] = b64_sign.decode()
b64_public_key = base64.b64encode(response.public_key)
tx_data['public_key'] = b64_public_key.decode()
return tx_data
def get(self):
logging.debug('transaction result')
args = ServerComponents().parser.parse_args()
logging.debug('tx_hash : ' + args['hash'])
channel_name = get_channel_name_from_args(args)
response = ServerComponents().get_invoke_result(args['hash'], channel_name)
verify_result = dict()
verify_result['response_code'] = str(response.response_code)
if len(response.result) is not 0:
try:
result = json.loads(response.result)
result['jsonrpc'] = '2.0'
verify_result['response'] = result
except json.JSONDecodeError as e:
logging.warning("your data is not json, your data(" + str(response.data) + ")")
verify_result['response_code'] = message_code.Response.fail
else :
verify_result['response_code'] = str(message_code.Response.fail)
return verify_result
def jwks_to_keyjar(jwks, iss=''):
"""
Convert a JWKS to a KeyJar instance.
:param jwks: String representation of a JWKS
:return: A :py:class:`oic.utils.keyio.KeyJar` instance
"""
if not isinstance(jwks, dict):
try:
jwks = json.loads(jwks)
except json.JSONDecodeError:
raise ValueError('No proper JSON')
kj = KeyJar()
kj.import_jwks(jwks, issuer=iss)
return kj
def get_key_from_headers(self, request, key_names, key_in_body=False):
if key_in_body:
try:
body = json.loads(request.body.decode('utf-8'))
for k in key_names:
if k in body:
return body[k]
return None
except json.JSONDecodeError:
return None
for n in key_names:
name = n.upper().replace('-', '_')
key_name = 'HTTP_{0}'.format(name)
if key_name in request.META:
return request.META[key_name]
return None
def check_files():
default = {"VOLUME": 50, "MAX_LENGTH": 3700, "VOTE_ENABLED": True,
"MAX_CACHE": 0, "SOUNDCLOUD_CLIENT_ID": None,
"TITLE_STATUS": True, "AVCONV": False, "VOTE_THRESHOLD": 50,
"SERVERS": {}}
settings_path = "data/audio/settings.json"
if not os.path.isfile(settings_path):
print("Creating default audio settings.json...")
dataIO.save_json(settings_path, default)
else: # consistency check
try:
current = dataIO.load_json(settings_path)
except JSONDecodeError:
# settings.json keeps getting corrupted for unknown reasons. Let's
# try to keep it from making the cog load fail.
dataIO.save_json(settings_path, default)
current = dataIO.load_json(settings_path)
if current.keys() != default.keys():
for key in default.keys():
if key not in current.keys():
current[key] = default[key]
print(
"Adding " + str(key) + " field to audio settings.json")
dataIO.save_json(settings_path, current)
def listen(self, ws, environ):
self._add(ws)
while not ws.closed:
try:
message = ws.receive()
except WebSocketError:
break
if message is not None:
try:
message = json.loads(message)
except json.JSONDecodeError:
break
# Odoo heavily relies on httprequests, for each message
# a new httprequest will be created. This request will be
# based on the original environ from the socket initialization
# request.
httprequest = werkzeug.wrappers.Request(environ.copy())
odoo.http.root.setup_session(httprequest)
odoo.http.root.setup_db(httprequest)
odoo.http.root.setup_lang(httprequest)
gevent.spawn(self.respond, ws, httprequest, message)
self._remove(ws)
def __get_conf_from_file(self, file_name):
try:
json_file = open(file_name, 'r')
setting_dict = json.load(json_file)
self.TRUST_P = setting_dict.get('trust_p', 0.45)
self.COUNTRY = setting_dict.get('country', 'JP')
self.COVER_MIN_SIZE = setting_dict.get('cover_min_size', 1000)
self.SAVE_COVER_TYPE = setting_dict.get('save_cover_type', 1)
self.DOWNLOAD_DIR = setting_dict.get('download_dir', './cover')
self.SEARCH_POSTFIX_NAME = setting_dict.get('search_postfix_name', 'cue,m4a,flac')
self.FILE_NAME_FORMAT = setting_dict.get('file_name_format', '')
self.REPLACE_COVER = setting_dict.get('replace_cover', 1)
self.SEARCH_THREAD_NUM = setting_dict.get('search_thread_num', 6)
self.DOWNLOAD_THREAD_NUM = setting_dict.get('download_thread_num', 5)
except FileNotFoundError:
print("Conf file 'setting.json' not found")
exit(1)
except json.JSONDecodeError:
print("Conf file err")
exit(1)
except Exception:
print("Some err occur")
exit(1)
def load_from_persistent_storage(self):
# precedence: files > (modules, scripts)
# Note that `files` will only be changed manually
# It is not updated by changing `modules` or `scripts`.
# However, `files` will affect `modules` and `scripts`.
decoded = {}
if self.project_directory and self.project_directory.exists():
for key, project_file in self.project_files.items():
try:
decoded[key] = serialize.decode(category_type[key], json.loads(project_file.read_text()))
except json.JSONDecodeError as err:
logger.error(err)
except FileNotFoundError as err:
#logger.error(err)
pass
return decoded