python类RequestException()的实例源码

virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def scan_file(self, this_file):
        """ Submit a file to be scanned by VirusTotal

        :param this_file: File to be scanned (32MB file size limit)
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        try:
            if type(this_file) == str and os.path.isfile(this_file):
                files = {'file': (this_file, open(this_file, 'rb'))}
            elif isinstance(this_file, StringIO.StringIO):
                files = {'file': this_file.read()}
            else:
                files = {'file': this_file}
        except TypeError as e:
            return dict(error=e.message)

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def rescan_file(self, this_hash):
        """ Rescan a previously submitted filed or schedule an scan to be performed in the future.

        :param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
                          the three allowed hashes (up to 25 items), this allows you to perform a batch request with
                          one single call. Note that the file must already be present in our file store.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_file_report(self, this_hash):
        """ Get the scan results for a file.

        You can also specify a CSV list made up of a combination of hashes and scan_ids
        (up to 4 items with the standard request rate), this allows you to perform a batch
        request with one single call.
        i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.

        :param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
                            retrieve or scan_ids from a previous call to scan_file.
        :return:
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scan_url(self, this_url):
        """ Submit a URL to be scanned by VirusTotal.

        :param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
                         standard request rate) so as to perform a batch scanning request with one single call. The
                         URLs must be separated by a new line character.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'url': this_url}

        try:
            response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_url_report(self, this_url, scan='0'):
        """ Get the scan results for a URL. (can do batch searches like get_file_report)

        :param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
                         (sha256-timestamp as returned by the URL submission API) to access a specific report. At the
                         same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
                         to perform a batch request with one single call (up to 4 resources per call with the standard
                         request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
                         character.
        :param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
                      for analysis if no report is found for it in VirusTotal's database. In this case the result will
                      contain a scan_id field that can be used to query the analysis report later on.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
        """ Submit a file to be scanned by VirusTotal.

        Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
        retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
        making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
        special upload URL.

        :param this_file: The file to be uploaded.
        :param notify_url: A URL to which a POST notification should be sent when the scan finishes.
        :param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
                                    sent only if the scan results differ from the previous analysis.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        files = {'file': (this_file, open(this_file, 'rb'))}

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_upload_url(self):
        """ Get a special URL for submitted files bigger than 32MB.

        In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
        can POST files up to 200MB in size. This API generates such a URL.

        :return: JSON special upload URL to which you can POST files up to 200MB in size.
        """
        params = {'apikey': self.api_key}

        try:
            response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        if response.status_code == requests.codes.ok:
            return response.json()['upload_url']
        else:
            return dict(response_code=response.status_code)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_file_behaviour(self, this_hash):
        """ Get a report about the behaviour of the file in sand boxed environment.

        VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
        attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
        ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
        generated for a given file in our dataset.

        If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
        lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
        behaviour-v1 property of the additional_info field in the JSON report.

        :param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
        :return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
        """
        params = {'apikey': self.api_key, 'hash': this_hash}

        try:
            response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_file(self, this_hash):
        """ Download a file by its hash.

        Downloads a file from VirusTotal's store given one of its hashes. This call can be used in conjuction with
        the file searching call in order to download samples that match a given set of criteria.

        :param this_hash: The md5/sha1/sha256 hash of the file you want to download.
        :return: Downloaded file in response.content
        """
        params = {'apikey': self.api_key, 'hash': this_hash}

        try:
            response = requests.get(self.base + 'file/download', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        if response.status_code == requests.codes.ok:
            return response.content
        elif response.status_code == 403:
            return dict(error='You tried to perform calls to functions for which you require a Private API key.',
                        response_code=response.status_code)
        elif response.status_code == 404:
            return dict(error='File not found.', response_code=response.status_code)
        else:
            return dict(response_code=response.status_code)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_url_report(self, this_url, scan='0', allinfo=1):
        """ Get the scan results for a URL.

        :param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
        (sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
        can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
        request with one single call. The CSV list must be separated by new line characters.
        :param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
        for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
        scan_id field that can be used to query the analysis report later on.
        :param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
        (other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
        related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
        of other tools and datasets when fed with the URL.
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_url_distribution(self, after=None, reports='true', limit=1000):
        """ Get a live feed with the lastest URLs submitted to VirusTotal.

        Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
        call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.

        :param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
        :param reports:  (optional) When set to "true" each item retrieved will include the results for each particular
        URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
        item returned will only contain the scanned URL and its detection ratio.
        :param limit: (optional) Retrieve limit file items at most (default: 1000).
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}

        try:
            response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_ip_report(self, this_ip):
        """ Get information about a given IP address.

        Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS
        infrastructure).

        :param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
        supported.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'ip': this_ip}

        try:
            response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def put_comments(self, resource, comment):
        """ Post a comment on a file or URL.

        Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
        Community, under the corresponding tab in the reports for each particular item.

        Comments can range from URLs and locations where a given file was found in the wild to full reverse
        engineering reports on a given malware specimen, anything that may help other analysts in extending their
        knowledge about a particular file or URL.

        :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
        to comment on.
        :param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
        and reference users using the "@" syntax (e.g. @VirusTotalTeam).
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}

        try:
            response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_comments(self, resource, before=None):
        """ Get comments for a file or URL.

        Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are
        user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of
        files up to fully-featured reverse engineering reports on a given sample.

        :param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve.
        :param before: (optional) A datetime token that allows you to iterate over all comments on a specific item
        whenever it has been commented on more than 25 times.
        :return: JSON response - The application answers with the comments sorted in descending order according to
        their date.
        """
        params = dict(apikey=self.api_key, resource=resource, before=before)

        try:
            response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_hashes_from_search(self, query, page=None):
        """ Get the scan results for a file.

        Even if you do not have a Private Mass API key that you can use, you can still automate VirusTotal Intelligence
        searches pretty much in the same way that the searching for files api call works.

        :param query: a VirusTotal Intelligence search string in accordance with the file search documentation .
            <https://www.virustotal.com/intelligence/help/file-search/>
        :param page: the next_page property of the results of a previously issued query to this API. This parameter
            should not be provided if it is the very first query to the API, i.e. if we are retrieving the
            first page of results.
        apikey: the API key associated to a VirusTotal Community account with VirusTotal Intelligence privileges.
        """
        params = {'query': query, 'apikey': self.api_key, 'page': page}

        try:
            response = requests.get(self.base + 'search/programmatic/', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return response.json()['next_page'], response
plugin.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def view_autocomplete(self, request, group, **kwargs):
        field = request.GET.get('autocomplete_field')
        query = request.GET.get('autocomplete_query')
        if field != 'issue_id' or not query:
            return Response({'issue_id': []})
        query = query.encode('utf-8')
        _url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
        try:
            req = self.make_api_request(group.project, _url)
            body = safe_urlread(req)
        except (requests.RequestException, PluginError) as e:
            return self.handle_api_error(e)

        try:
            json_resp = json.loads(body)

        except ValueError as e:
            return self.handle_api_error(e)

        resp = json_resp.get('stories', {})
        stories = resp.get('stories', [])
        issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]

        return Response({field: issues})
plugin.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def link_issue(self, request, group, form_data, **kwargs):
        comment = form_data.get('comment')
        if not comment:
            return
        _url = '%s/%s/comments' % (self.build_api_url(group, 'stories'), form_data['issue_id'])
        try:
            req = self.make_api_request(group.project, _url, json_data={"text": comment})
            body = safe_urlread(req)
        except requests.RequestException as e:
            msg = six.text_type(e)
            raise PluginError('Error communicating with Pivotal: %s' % (msg, ))

        try:
            json_resp = json.loads(body)
        except ValueError as e:
            msg = six.text_type(e)
            raise PluginError('Error communicating with Pivotal: %s' % (msg, ))

        if req.status_code > 399:
            raise PluginError(json_resp['error'])
io.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _stream(self, url, headers={}):

        response = self.__get(url, headers=headers, stream=True, timeout=self.timeout)
        length = 0
        piece_size = 65536
        try:
            for piece in response.iter_content(piece_size):
                length += len(piece)
                yield piece
        except requests.RequestException as re:
            raise exceptions.RetryableIOError(re)
        if CONTENT_LENGTH in response.headers:
            content_length = int(response.headers[CONTENT_LENGTH])
            if content_length != length:
                raise exceptions.ContentLengthMismatch(
                    "Length mismatch {} != {}".format(content_length, length))
test_functest_utils.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_push_results_to_db_request_post_failed(self):
        dic = self._get_env_dict(None)
        CONST.__setattr__('results_test_db_url', self.db_url)
        with mock.patch.dict(os.environ,
                             dic,
                             clear=True), \
                mock.patch('functest.utils.functest_utils.logger.error') \
                as mock_logger_error, \
                mock.patch('functest.utils.functest_utils.requests.post',
                           side_effect=requests.RequestException):
            self.assertFalse(functest_utils.
                             push_results_to_db(self.project, self.case_name,
                                                self.start_date,
                                                self.stop_date,
                                                self.result, self.details))
            mock_logger_error.assert_called_once_with(test_utils.
                                                      RegexMatch("Pushing "
                                                                 "Result to"
                                                                 " DB"
                                                                 "(\S+\s*) "
                                                                 "failed:"))
queue.py 文件源码 项目:doppler 作者: TakumiHQ 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def callback(request_id, message, callback_url, scheduled_at, last_retry=None, retry_delay=None, _attempts=None):
    if retry_delay is None:
        retry_delay = DEFAULT_RETRY_DELAY
    try:
        response = requests.post(callback_url, data=message)
        response.raise_for_status()
    except requests.RequestException:
        callback.retry(
            request_id=request_id,
            message=message,
            callback_url=callback_url,
            scheduled_at=scheduled_at,
            last_retry=int(time.time()),
            retry_delay=retry_delay,
            delay=retry_delay,
            taskid=request_id,
            _attempts=_attempts
        )
__init__.py 文件源码 项目:routersploit 作者: reverse-shell 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def http_request(method, url, session=requests, **kwargs):
    """ Wrapper for 'requests' silencing exceptions a little bit. """

    kwargs.setdefault('timeout', 30.0)
    kwargs.setdefault('verify', False)

    try:
        return getattr(session, method.lower())(url, **kwargs)
    except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema):
        print_error("Invalid URL format: {}".format(url))
        return
    except requests.exceptions.ConnectionError:
        print_error("Connection error: {}".format(url))
        return
    except requests.RequestException as error:
        print_error(error)
        return
    except socket.error as err:
        print_error(err)
        return
    except KeyboardInterrupt:
        print_info()
        print_status("Module has been stopped")
warranty_ibm_lenovo.py 文件源码 项目:warranty_check 作者: device42 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_product_info(self, serial, retry=True):
        if self.debug:
            print '\t[+] Checking possible product "%s"' % serial
        timeout = 10
        try:
            resp = self.requests.get(self.url + '?productId=' + serial, verify=True, timeout=timeout)
            msg = 'Status code: %s' % str(resp.status_code)
            if str(resp.status_code) == '401':
                print '\t[!] HTTP error. Message was: %s' % msg
                print '\t[!] waiting for 30 seconds to let the api server calm down'
                # suspecting blockage due to to many api calls. Put in a pause of 30 seconds and go on
                time.sleep(30)
                if retry:
                    print '\n[!] Retry'
                    self.get_product_info(serial, False)
                else:
                    return None
            else:
                return resp.json()
        except requests.RequestException as e:
            self.error_msg(e)
            return None
warranty_hp.py 文件源码 项目:warranty_check 作者: device42 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_access_key(self):
        if self.debug:
            print '\t[+] Getting HP access token'

        timeout = 10
        payload = {
            'apiKey': self.api_key,
            'apiSecret': self.api_secret,
            'grantType': 'client_credentials',
            'scope': 'warranty'
        }
        headers = {
            'Accept': 'application/json',
            'Content-type': 'application/x-www-form-urlencoded'
        }

        try:
            resp = requests.post(self.url + '/oauth/v1/token',
                                 data=payload, headers=headers, verify=True, timeout=timeout)
            result = json.loads(resp.text)
            return result['access_token']

        except requests.RequestException as e:
            self.error_msg(e)
            sys.exit()
base.py 文件源码 项目:python-lunrclient 作者: rackerlabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def http_request(self, call, url, **kwargs):
        try:
            # Remove args with no value
            kwargs = self.unused(kwargs)
            if self.client.timeout:
                kwargs['timeout'] = self.client.timeout

            if self.debug:
                print("-- %s on %s with %s " % (call.__name__.upper(),
                                                url, kwargs))
            resp = call(url, **kwargs)
            if self.debug:
                print("-- response: %s " % resp.text)
            if resp.status_code != 200:
                raise LunrHttpError("%s returned '%s' with '%s'" %
                                    (url, resp.status_code,
                                     json.loads(resp.text)['reason']),
                                    resp.status_code)
            return response(json.loads(resp.text), resp.status_code)
        except requests.RequestException as e:
            raise LunrError(str(e))
__init__.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def http_request(method, url, session=requests, **kwargs):
    """ Wrapper for 'requests' silencing exceptions a little bit. """

    kwargs.setdefault('timeout', 30.0)
    kwargs.setdefault('verify', False)

    try:
        return getattr(session, method.lower())(url, **kwargs)
    except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema):
        print_error("Invalid URL format: {}".format(url))
        return
    except requests.exceptions.ConnectionError:
        print_error("Connection error: {}".format(url))
        return
    except requests.RequestException as error:
        print_error(error)
        return
    except socket.error as err:
        print_error(err)
        return
    except KeyboardInterrupt:
        print_info()
        print_status("Module has been stopped")
location.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_freegeoip() -> Optional[Dict[str, Any]]:
    """Query freegeoip.io for location data."""
    try:
        raw_info = requests.get(FREEGEO_API, timeout=5).json()
    except (requests.RequestException, ValueError):
        return None

    return {
        'ip': raw_info.get('ip'),
        'country_code': raw_info.get('country_code'),
        'country_name': raw_info.get('country_name'),
        'region_code': raw_info.get('region_code'),
        'region_name': raw_info.get('region_name'),
        'city': raw_info.get('city'),
        'zip_code': raw_info.get('zip_code'),
        'time_zone': raw_info.get('time_zone'),
        'latitude': raw_info.get('latitude'),
        'longitude': raw_info.get('longitude'),
    }
location.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_ip_api() -> Optional[Dict[str, Any]]:
    """Query ip-api.com for location data."""
    try:
        raw_info = requests.get(IP_API, timeout=5).json()
    except (requests.RequestException, ValueError):
        return None

    return {
        'ip': raw_info.get('query'),
        'country_code': raw_info.get('countryCode'),
        'country_name': raw_info.get('country'),
        'region_code': raw_info.get('region'),
        'region_name': raw_info.get('regionName'),
        'city': raw_info.get('city'),
        'zip_code': raw_info.get('zip'),
        'time_zone': raw_info.get('timezone'),
        'latitude': raw_info.get('lat'),
        'longitude': raw_info.get('lon'),
    }
trawl_poloniex.py 文件源码 项目:core 作者: IntelligentTrading 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pull_poloniex_data():
    try:
        logger.info("pulling Poloniex data...")
        req = get('https://poloniex.com/public?command=returnTicker')

        data = req.json()
        timestamp = time.time()

        poloniex_data_point = ExchangeData.objects.create(
            source=POLONIEX,
            data=json.dumps(data),
            timestamp=timestamp
        )
        logger.info("Saving Poloniex price, volume data...")
        _save_prices_and_volumes(data, timestamp)


    except RequestException:
        return 'Error to collect data from Poloniex'
session.py 文件源码 项目:pywxclient 作者: justdoit0823 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def request(self, method, url, **kwargs):

        headers = kwargs.get('headers')
        if headers:
            headers = dict(headers)
            if 'User-Agent' not in map(str.title, headers.keys()):
                headers['User-Agent'] = self.user_agent
        else:
            headers = self.default_headers

        kwargs['headers'] = headers

        try:
            return super(RequestsSession, self).request(method, url, **kwargs)
        except requests.RequestException:
            raise RequestError
inst_fetch.py 文件源码 项目:PSpider 作者: xianhu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def working(self, priority: int, url: str, keys: dict, deep: int, repeat: int, proxies=None) -> (int, bool, object):
        """
        working function, must "try, except" and don't change the parameters and return
        :return (fetch_result, proxies_state, content): fetch_result can be -2(fetch failed, stop thread), -1(fetch failed), 0(need repeat), 1(fetch success)
        :return (fetch_result, proxies_state, content): proxies_state can be True(avaiable), False(unavaiable)
        :return (fetch_result, proxies_state, content): content can be any object, for example string, list, etc
        """
        logging.debug("%s start: %s", self.__class__.__name__, CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))

        time.sleep(random.randint(0, self._sleep_time))
        try:
            fetch_result, proxies_state, content = self.url_fetch(priority, url, keys, deep, repeat, proxies=proxies)
        except requests.RequestException:
            if repeat >= self._max_repeat:
                fetch_result, proxies_state, content = -1, True, None
                logging.error("%s error: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
            else:
                fetch_result, proxies_state, content = 0, True, None
                logging.debug("%s repeat: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
        except Exception:
            fetch_result, proxies_state, content = -1, True, None
            logging.error("%s error: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))

        logging.debug("%s end: fetch_result=%s, proxies_state=%s, url=%s", self.__class__.__name__, fetch_result, proxies_state, url)
        return fetch_result, proxies_state, content


问题


面经


文章

微信
公众号

扫码关注公众号