python类JSONDecodeError()的实例源码

api.py 文件源码 项目:pyTBA 作者: Thing342 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def tba_get(self, path):
        """Base method for querying the TBA API. Returns the response JSON as a python dict.
        :param path: (str) Request path, without the API address prefix (https://www.thebluealliance.com/api/v2/)
        :return: A dict parsed from the response from the API.
        """

        if self.app_id['X-TBA-App-Id'] == "":
            raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')

        url_str = 'https://www.thebluealliance.com/api/v2/' + path
        r = self.session.get(url_str, headers=self.app_id)
        # print(r.url)
        tba_txt = r.text
        try:
            return json.loads(tba_txt)
        except json.JSONDecodeError:
            print(url_str)
            print(tba_txt)
parse_indepexpends.py 文件源码 项目:SuperPACs 作者: SpencerNorris 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def donations_helper():
    print("donations_helper")
    FecApiObj = FECAPI(FEC_APIKEY)
    committees = FecApiObj.get_committees()
    PPCampFinObj = CampaignFinanceAPI(ProPublica_APIKEY)
    PPCongressApi = CongressAPI(ProPublica_APIKEY)
    legislator_index = dict()
    legislators = PPCongressApi.list_members('house')["results"][0]["members"]
    for legislator in legislators:
        name = str(legislator['first_name']) + " " + str(legislator['last_name'])
        legislator_index[name] = legislator
    legislators = PPCongressApi.list_members('senate')["results"][0]["members"]
    for legislator in legislators:
        name = str(legislator['first_name']) + " " + str(legislator['last_name'])
        legislator_index[name] = legislator
    print("starting to iterate through superpacs")
    donations = []
    count = 0
    for committee in committees:
        if(2016 in committee['cycles']):
            try:
                indepExpend = PPCampFinObj.get_indep_expends(str(committee['committee_id']))

                for expend in indepExpend["results"]:
                    try:
                        #expend fo a particular expenditure
                        expend['committee_id'] = str(committee['committee_id'])
                        expend['propublica_candidate_id'] = str(legislator_index[expend['candidate_name']]['id'])
                        donations.append(expend)
                    except KeyError:
                        pass
            except JSONDecodeError:
                pass
            count += 1
    return donations
json.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def load(self):
        # Prepare + load directory.
        super().load()

        # Load the files and parse JSON.
        parsed_settings = dict()

        try:
            for file_name in self.files:
                file_path = os.path.join(self.directory, file_name)
                with open(file_path, 'r') as file_handle:
                    parsed_settings.update(json.load(file_handle))
        except json.JSONDecodeError as e:
            raise ImproperlyConfigured(
                'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e))
            )

        # Loop and set in local settings (+ uppercase keys).
        for key, value in parsed_settings.items():
            self.settings[key.upper()] = value
facade.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def from_json(cls, data):
        if isinstance(data, cls):
            return data
        if isinstance(data, str):
            try:
                data = json.loads(data)
            except json.JSONDecodeError:
                raise
        d = {}
        for k, v in (data or {}).items():
            d[cls._toPy.get(k, k)] = v

        try:
            return cls(**d)
        except TypeError:
            raise
executor.py 文件源码 项目:bitcoin-trading-system 作者: vinicius-ronconi 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def execute(self):
        try:
            self.system.run()
        except (ReadTimeout, ConnectionError, JSONDecodeError):
            pass
        except exceptions.TradingSystemException as e:
            curr = datetime.now()
            print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e)))
        except Exception as e:
            curr = datetime.now()
            print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args))
            traceback.print_exc()

        if self.interval:
            threading.Timer(self.interval, self.execute).start()
__init__.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def assertJSON(self, response):
        if response.text:
            try:
                data = json.loads(response.text)
            except json.JSONDecodeError:
                self.fail("Response data is not JSON.")
            else:
                reference = "{formatted_data}\n".format(
                    formatted_data=json.dumps(
                        data, indent=2, sort_keys=True, separators=(',', ': ')
                    )
                )
                self.assertEqual(
                    reference,
                    response.text
                )
utils.py 文件源码 项目:pheweb 作者: statgen 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_phenolist():
    # TODO: should this be memoized?
    from .file_utils import common_filepaths
    filepath = common_filepaths['phenolist']
    try:
        with open(os.path.join(filepath)) as f:
            phenolist = json.load(f)
    except (FileNotFoundError, PermissionError):
        raise PheWebError(
            "You need a file to define your phenotypes at '{}'.\n".format(filepath) +
            "For more information on how to make one, see <https://github.com/statgen/pheweb#3-make-a-list-of-your-phenotypes>")
    except json.JSONDecodeError:
        print("Your file at '{}' contains invalid json.\n".format(filepath) +
              "The error it produced was:")
        raise
    for pheno in phenolist:
        pheno['phenocode'] = urllib.parse.quote_plus(pheno['phenocode'])
    return phenolist
statistic.py 文件源码 项目:py_fake_server 作者: Telichkin 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def with_json(self, json_dict: dict) -> "Statistic":
        body = json.dumps(json_dict, sort_keys=True)

        actual_body = self.get_current_request().body.decode("utf-8", errors="skip")
        try:
            actual_json_dict = json.loads(actual_body)
        except json.JSONDecodeError:
            requested_time = self._current_request_index + 1
            self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
                                        f"But for the {requested_time} time: json was corrupted "
                                        f"{actual_body.__repr__()}.")
            return self

        actual_body = json.dumps(actual_json_dict, sort_keys=True)
        if body != actual_body:
            requested_time = self._current_request_index + 1
            self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
                                        f"But for the {requested_time} time: json was {actual_body}.")
        return self
handlers.py 文件源码 项目:seproxer 作者: Rastii 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _load_stored_result_from_file(self, default=None):
        # First, let's attempt to load the existing results json file
        try:
            with open(self._results_file_path) as fp:
                file_data = fp.read()
        except IOError:
            return default

        # Now attempt to deserialize the file... if that fails, save a backup of the file
        try:
            return json.loads(file_data)
        except json.JSONDecodeError as e:
            backup_file_name = "corrupted.{}.{}".format(
                str(uuid.uuid4())[:8],
                self._results_file_name,
            )
            logger.error(
                "Unable to parse file {}: {}, renaming to {} and creating new file".format(
                    self._results_file_path, e, backup_file_name,
                )
            )
            dst = "{}/{}".format(self._results_directory, backup_file_name)
            os.rename(self._results_file_path, dst)

        return default
util.py 文件源码 项目:sonic-py-swsssdk 作者: Azure 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setup_logging(config_file_path, log_level=logging.INFO):
    """
    Logging configuration helper.

    :param config_file_path: file path to logging configuration file.
    https://docs.python.org/3/library/logging.config.html#object-connections
    :param log_level: defaults to logging.INFO
    :return: None - access the logger by name as described in the config--or the "root" logger as a backup.
    """
    try:
        with open(config_file_path, 'rt') as f:
            config = json.load(f)
        logging.config.dictConfig(config)
    except (ValueError, IOError, OSError):
        # json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError
        logging.basicConfig(log_level=log_level)
        logging.root.exception(
            "Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: "
            "[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
conversation-redirect.py 文件源码 项目:Examples 作者: hutomadotAI 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, callResult):
        self.success = False
        try:
            # decode from json if possible
            result = callResult.json()
            # if there is a status then the response came from the API server
            if result['status']:
                # so store a copy of the json
                self.response = result
                # copy the json result code
                code = result['status']['code']
                # assemble a description of the result
                self.text = str(code) + ': ' + result['status']['info']
                # if we got a 200 OK then the the json object has the answer data
                if code == 200:
                    self.success = True
            else:
                # otherwise, assemble a description from the HTTP results
                self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
        except JSONDecodeError:
            self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason


# chat API call
ChatSample.py 文件源码 项目:Examples 作者: hutomadotAI 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, callResult):
        self.success = False
        try:
            # decode from json if possible
            result = callResult.json()
            # if there is a status then the response came from the API server
            if result['status']:
                # so store a copy of the json
                self.response = result
                # copy the json result code
                code = result['status']['code']
                # assemble a description of the result
                self.text = str(code) + ': ' + result['status']['info']
                # if we got a 200 OK then the the json object has the answer data
                if code == 200:
                    self.success = True
            else:
                # otherwise, assemble a description from the HTTP results
                self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
        except JSONDecodeError:
            self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason


# chat API call
forms.py 文件源码 项目:ava-website 作者: ava-project 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_manifest(self, archive):
        try:
            with ZipFile(archive.temporary_file_path()) as plugin:
                print(plugin.namelist())
                prefix = self.get_prefix(plugin)
                prefix = prefix + '/' if len(prefix) else ''
                with plugin.open('{}manifest.json'.format(prefix)) as myfile:
                    manifest = json.loads(myfile.read())
                validate_manifest(manifest)
                return manifest
        except BadZipFile:
            raise ValidationError('Bad .zip format')
        except FileNotFoundError:
            raise ValidationError('Error with upload, please try again')
        except KeyError:
            raise ValidationError('No manifest.json found in archive')
        except json.JSONDecodeError:
            raise ValidationError('Error with manifest.json, bad Json Format')
        except avasdk.exceptions.ValidationError as e:
            raise ValidationError('Error in manifest.json ({})'.format(e))
base.py 文件源码 项目:drydock 作者: att-comdev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def req_json(self, req):
        if req.content_length is None or req.content_length == 0:
            return None

        if req.content_type is not None and req.content_type.lower(
        ) == 'application/json':
            raw_body = req.stream.read(req.content_length or 0)

            if raw_body is None:
                return None

            try:
                json_body = json.loads(raw_body.decode('utf-8'))
                return json_body
            except json.JSONDecodeError as jex:
                print(
                    "Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
                self.error(
                    req.context,
                    "Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
                raise errors.InvalidFormat("%s: Invalid JSON in body: %s" %
                                           (req.path, jex))
        else:
            raise errors.InvalidFormat("Requires application/json payload")
handler.py 文件源码 项目:calm 作者: bagrat 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _parse_and_update_body(self, handler_def):
        """Parses the request body to JSON."""
        if self.request.body:
            try:
                json_body = json.loads(self.request.body.decode('utf-8'))
            except json.JSONDecodeError:
                raise BadRequestError(
                    "Malformed request body. JSON is expected."
                )

            new_body = json_body
            if handler_def.consumes:
                try:
                    new_body = handler_def.consumes.from_json(json_body)
                except ValidationError:
                    # TODO: log warning or error
                    raise BadRequestError("Bad data structure.")

            self.request.body = new_body
worker.py 文件源码 项目:pyree-old 作者: DrLuke 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parseMessage(self, msg):
        print(msg)
        try:
            decoded = json.loads(msg)
        except json.JSONDecodeError:
            return

        type = decoded["msgtype"]
        if type == "control":
            self.handleControl(decoded)
        elif type == "sheetdelta":
            self.passSheetDelta(decoded)
        elif type == "request":
            self.handleRequest(decoded)
        elif type == "nodedata":
            self.passNodedata(decoded)
workerManager.py 文件源码 项目:pyree-old 作者: DrLuke 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def discoverWorkers(self):
        """Discover new workers via udp broadcasts"""
        rlist, wlist, elist = select([self.discoverysocket], [], [], 0)
        if rlist:
            received = self.discoverysocket.recvfrom(4096)[0]
            discoverydata = {}
            try:
                discoverydata = json.loads(bytes.decode(received))
            except json.JSONDecodeError:
                pass

            if "ip" in discoverydata and "port" in discoverydata:
                if "host" in discoverydata:
                    name = discoverydata["host"]
                else:
                    name = discoverydata["ip"] + ":" + str(discoverydata["port"])
                if name not in self.workers:
                    treeItem = QTreeWidgetItem(1001)    # Type 1000 for Worker Item
                    treeItem.setText(0, name)
                    self.treeWidget.addTopLevelItem(treeItem)
                    self.grabPeriodicInfos()    # Grab monitor data
                    self.workers[name] = Worker(discoverydata, treeItem, nodeDataJar=self.nodeDataJar)
                    self.workers[name].tick(self.sheetDeltaMemory)
                    self.workers[name].synchronize()
backend.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _ws_recv_handler(self):
        # Given command responses and notifications are all send through the
        # same websocket, separate them here, passing command response thanks
        # to a Queue.
        while True:
            raw = await self._websocket.recv()
            try:
                if isinstance(raw, bytes):
                    raw = raw.decode()
                recv = ejson_loads(raw)
                if 'status' in recv:
                    # Message response
                    self._resp_queue.put_nowait(recv)
                else:
                    # Event
                    self._signal_ns.signal(recv['event']).send(recv['sender'])
            except (KeyError, TypeError, json.JSONDecodeError):
                # Dummy ???
                logger.warning('Backend server sent invalid message: %s' % raw)
session.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handshake(self):
        if self.id:
            raise HandshakeError('Handshake already done.')
        challenge = _generate_challenge()
        query = {'handshake': 'challenge', 'challenge': challenge}
        yield Effect(EHandshakeSend(ejson_dumps(query)))
        raw_resp = yield Effect(EHandshakeRecv())
        try:
            resp = ejson_loads(raw_resp)
        except (TypeError, json.JSONDecodeError):
            error = HandshakeError('Invalid challenge response format')
            yield Effect(EHandshakeSend(error.to_raw()))
            raise error
        resp = HandshakeAnswerSchema().load(resp)
        claimed_identity = resp['identity']
        try:
            pubkey = yield Effect(EPubKeyGet(claimed_identity))
            pubkey.verify(resp['answer'], challenge.encode())
            yield Effect(EHandshakeSend('{"status": "ok", "handshake": "done"}'))
            self.id = claimed_identity
        except (TypeError, PubKeyNotFound, InvalidSignature):
            error = HandshakeError('Invalid signature, challenge or identity')
            yield Effect(EHandshakeSend(error.to_raw()))
            raise error
json_encoder.py 文件源码 项目:floto 作者: babbel 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_string(json_string):
        """Deserialize `json_string` to Python Object. If `json_string` is not a valid json
        document, just return `json_string`. It is used in the context of activity results: floto
        handles str and JSON serialized results.

        Parameters
        ----------
        json_string : any

        Returns
        -------

        obj

        """
        try:
            j = json.loads(json_string)
        except (TypeError, json.JSONDecodeError):
            j = json_string
        return j
ui_io.py 文件源码 项目:ui2 作者: controversial 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _json_get(inp):
    """ Get a Python object (list or dict) regardless of whether data is passed
    as a JSON string, a file path, or is already a python object.

    Returns the parsed data, as well as "native" if the data was already a
    Python object, "str" if the data was passed as a JSON string, or the path
    if the data passed was a file path """

    if not (isinstance(inp, dict) or isinstance(inp, list)):  # Python object
        try:                                                    # JSON string
            data = json.loads(inp)
            dataformat = "str"
        except json.JSONDecodeError:                            # JSON filepath
            # Store the filename in the dataformat variable if dataformat is a
            # file,  because it's just one fewer variable to keep track of
            dataformat = inp
            with open(inp, encoding="utf-8") as f:
                data = json.load(f)
    else:
        dataformat = "native"

    return data, dataformat
rest_server.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
        request_body = json.dumps(request.get_json())
        channel = get_channel_name_from_json(request.get_json())
        query_data = json.loads('{}')
        try:
            # TODO Asnycronous call? ??? ???.
            response = ServerComponents().query(request_body, channel)
            logging.debug(f"query result : {response}")
            query_data['response_code'] = str(response.response_code)
            try:
                query_data['response'] = json.loads(response.response)

            except json.JSONDecodeError as e:
                logging.warning("your response is not json, your response(" + str(response.response) + ")")
                query_data['response'] = response.response

        except _Rendezvous as e:
            logging.error(f'Execute Query Error : {e}')
            if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
                # TODO REST ?? ??? ???(?? ??? Response code ??) Extract Method ?? ?? ??? ?? ??? ??
                logging.debug("gRPC timeout !!!")
                query_data['response_code'] = str(message_code.Response.timeout_exceed)

        return query_data
rest_server.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self):
        args = ServerComponents().parser.parse_args()
        response = ServerComponents().get_transaction(args['hash'], get_channel_name_from_args(args))
        tx_data = json.loads('{}')
        tx_data['response_code'] = str(response.response_code)
        tx_data['data'] = ""
        if len(response.data) is not 0:
            try:
                tx_data['data'] = json.loads(response.data)
            except json.JSONDecodeError as e:
                logging.warning("your data is not json, your data(" + str(response.data) + ")")
                tx_data['data'] = response.data

        tx_data['meta'] = ""
        if len(response.meta) is not 0:
            tx_data['meta'] = json.loads(response.meta)

        tx_data['more_info'] = response.more_info
        b64_sign = base64.b64encode(response.signature)
        tx_data['signature'] = b64_sign.decode()
        b64_public_key = base64.b64encode(response.public_key)
        tx_data['public_key'] = b64_public_key.decode()

        return tx_data
rest_server.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self):
        logging.debug('transaction result')
        args = ServerComponents().parser.parse_args()
        logging.debug('tx_hash : ' + args['hash'])
        channel_name = get_channel_name_from_args(args)
        response = ServerComponents().get_invoke_result(args['hash'], channel_name)
        verify_result = dict()
        verify_result['response_code'] = str(response.response_code)
        if len(response.result) is not 0:
            try:
                result = json.loads(response.result)
                result['jsonrpc'] = '2.0'
                verify_result['response'] = result
            except json.JSONDecodeError as e:
                logging.warning("your data is not json, your data(" + str(response.data) + ")")
                verify_result['response_code'] = message_code.Response.fail
        else :
            verify_result['response_code'] = str(message_code.Response.fail)
        return verify_result
bundle.py 文件源码 项目:fedoidc 作者: OpenIDC 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def jwks_to_keyjar(jwks, iss=''):
    """
    Convert a JWKS to a KeyJar instance.

    :param jwks: String representation of a JWKS
    :return: A :py:class:`oic.utils.keyio.KeyJar` instance
    """
    if not isinstance(jwks, dict):
        try:
            jwks = json.loads(jwks)
        except json.JSONDecodeError:
            raise ValueError('No proper JSON')

    kj = KeyJar()
    kj.import_jwks(jwks, issuer=iss)
    return kj
key_auth.py 文件源码 项目:django-api-bouncer 作者: menecio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_key_from_headers(self, request, key_names, key_in_body=False):
        if key_in_body:
            try:
                body = json.loads(request.body.decode('utf-8'))
                for k in key_names:
                    if k in body:
                        return body[k]
                return None
            except json.JSONDecodeError:
                return None

        for n in key_names:
            name = n.upper().replace('-', '_')
            key_name = 'HTTP_{0}'.format(name)
            if key_name in request.META:
                return request.META[key_name]

        return None
audio.py 文件源码 项目:Shallus-Bot 作者: cgropp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_files():
    default = {"VOLUME": 50, "MAX_LENGTH": 3700, "VOTE_ENABLED": True,
               "MAX_CACHE": 0, "SOUNDCLOUD_CLIENT_ID": None,
               "TITLE_STATUS": True, "AVCONV": False, "VOTE_THRESHOLD": 50,
               "SERVERS": {}}
    settings_path = "data/audio/settings.json"

    if not os.path.isfile(settings_path):
        print("Creating default audio settings.json...")
        dataIO.save_json(settings_path, default)
    else:  # consistency check
        try:
            current = dataIO.load_json(settings_path)
        except JSONDecodeError:
            # settings.json keeps getting corrupted for unknown reasons. Let's
            # try to keep it from making the cog load fail.
            dataIO.save_json(settings_path, default)
            current = dataIO.load_json(settings_path)
        if current.keys() != default.keys():
            for key in default.keys():
                if key not in current.keys():
                    current[key] = default[key]
                    print(
                        "Adding " + str(key) + " field to audio settings.json")
            dataIO.save_json(settings_path, current)
channel.py 文件源码 项目:odooku 作者: odooku 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def listen(self, ws, environ):
        self._add(ws)
        while not ws.closed:
            try:
                message = ws.receive()
            except WebSocketError:
                break

            if message is not None:
                try:
                    message = json.loads(message)
                except json.JSONDecodeError:
                    break

                # Odoo heavily relies on httprequests, for each message
                # a new httprequest will be created. This request will be
                # based on the original environ from the socket initialization
                # request.
                httprequest = werkzeug.wrappers.Request(environ.copy())
                odoo.http.root.setup_session(httprequest)
                odoo.http.root.setup_db(httprequest)
                odoo.http.root.setup_lang(httprequest)
                gevent.spawn(self.respond, ws, httprequest, message)

        self._remove(ws)
conf.py 文件源码 项目:iCover 作者: littleneko 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __get_conf_from_file(self, file_name):
        try:
            json_file = open(file_name, 'r')
            setting_dict = json.load(json_file)

            self.TRUST_P = setting_dict.get('trust_p', 0.45)
            self.COUNTRY = setting_dict.get('country', 'JP')
            self.COVER_MIN_SIZE = setting_dict.get('cover_min_size', 1000)
            self.SAVE_COVER_TYPE = setting_dict.get('save_cover_type', 1)
            self.DOWNLOAD_DIR = setting_dict.get('download_dir', './cover')
            self.SEARCH_POSTFIX_NAME = setting_dict.get('search_postfix_name', 'cue,m4a,flac')
            self.FILE_NAME_FORMAT = setting_dict.get('file_name_format', '')
            self.REPLACE_COVER = setting_dict.get('replace_cover', 1)
            self.SEARCH_THREAD_NUM = setting_dict.get('search_thread_num', 6)
            self.DOWNLOAD_THREAD_NUM = setting_dict.get('download_thread_num', 5)
        except FileNotFoundError:
            print("Conf file 'setting.json' not found")
            exit(1)
        except json.JSONDecodeError:
            print("Conf file err")
            exit(1)
        except Exception:
            print("Some err occur")
            exit(1)
project_settings_module.py 文件源码 项目:call_map 作者: nccgroup 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load_from_persistent_storage(self):
        # precedence: files > (modules, scripts)

        # Note that `files` will only be changed manually
        # It is not updated by changing `modules` or `scripts`.
        # However, `files` will affect `modules` and `scripts`.

        decoded = {}
        if self.project_directory and self.project_directory.exists():
            for key, project_file in self.project_files.items():
                try:
                    decoded[key] = serialize.decode(category_type[key], json.loads(project_file.read_text()))
                except json.JSONDecodeError as err:
                    logger.error(err)
                except FileNotFoundError as err:
                    #logger.error(err)
                    pass

        return decoded


问题


面经


文章

微信
公众号

扫码关注公众号