python类get()的实例源码

import_cards.py 文件源码 项目:django-magic-cards 作者: pbaranay 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
get_data.py 文件源码 项目:X-ray-classification 作者: bendidi 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def main():
    for url in url_list :
        try:
            r = requests.get(url)
        except : continue
        tree = html.fromstring(r.text)

        script = tree.xpath('//script[@language="javascript"]/text()')[0]

        json_string = regex.findall(script)[0]
        json_data = json.loads(json_string)

        next_page_url = tree.xpath('//footer/a/@href')

        links = [domain + x['nodeRef'] for x in json_data]
        for link in links:
            extract(link)
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
urlcategory.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        Analyzer.run(self)

        if self.data_type == 'domain' or self.data_type == 'url':
            try:
                pattern = re.compile("(?:Category: )([\w\s]+)")
                baseurl = 'http://www.fortiguard.com/webfilter?q='
                url = baseurl + self.getData()
                req = requests.get(url)
                category_match = re.search(pattern, req.content, flags=0)
                self.report({
                    'category': category_match.group(1)
                })
            except ValueError as e:
                self.unexpectedError(e)
        else:
            self.notSupported()
crawl_xici_ip.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def judge_ip(self, ip, port):
        #??ip????
        http_url = "http://www.baidu.com"
        proxy_url = "http://{0}:{1}".format(ip, port)
        try:
            proxy_dict = {
                "http":proxy_url,
            }
            response = requests.get(http_url, proxies=proxy_dict)
        except Exception as e:
            print ("invalid ip and port")
            self.delete_ip(ip)
            return False
        else:
            code = response.status_code
            if code >= 200 and code < 300:
                print ("effective ip")
                return True
            else:
                print  ("invalid ip and port")
                self.delete_ip(ip)
                return False
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def watchJob(jobId, exchangeName):
        queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
        queue.bindToExchange(exchangeName, jobId)

        while True:
            body, method = queue.get()

            if method:
                body = json.loads(body)

                if body["current_status"] == "SUCCEEDED":
                    return jobId
                else:
                    raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
            else:
                pass
utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _verify(self):
        try:
            self.read(self.path)
        except IOError as e:
            print "Couldn't open {path}: {reason}".format(path=self.path, reason=e)
            exit(-1)
        else:
            d = {}
            for name, attrs in self._configParams.iteritems():
                if attrs["required"]:
                    if not self.has_section(attrs["section"]):
                        raise LookupError("missing required section {s} in the configuration!\nRUN `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"]))

                    if not self.has_option(attrs["section"], name):
                        raise LookupError("missing required option {o} in section {s}!\nRun `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"], o=name))

                try:
                    d[name] = self.get(attrs["section"], name)

                except NoOptionError:
                    pass
                except NoSectionError:
                    pass

            return d
main.py 文件源码 项目:optimum-arena 作者: ovidiugiorgi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_best(url):
    url = 'http://www.infoarena.ro' + url
    source_code = requests.get(url)
    plain_text = source_code.text
    soup = BeautifulSoup(plain_text, "html.parser")
    name = soup.find('span', {'class': 'username'}).find('a')['href'][35:]
    tests = soup.find_all('td', {'class': 'number'})
    max_ms = -1
    for test in tests:
        test = test.string
        if test.endswith('ms'):
            time = int(test.strip('ms'))
            max_ms = max(max_ms, time)
    if name not in d or max_ms < d[name][0]:
        d[name] = (max_ms, url)
    print(max_ms, name, url)
flora.py 文件源码 项目:flora 作者: Lamden 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def list(package_name):
    # lists all of the packages for a user, or all of the implementations for a package

    # <username> / <package> , <implementation>

    # detemine if there's a user and package, or just a user
    p = split_package_name(package_name)
    if p['username'] != None:
        # get all of the packages and print their names in a pretty print
        if p['package'] != None:
            # get all implementations and print their names in a pretty print
            if p['implementation'] != None:
                print('Cannot list one specific implementation. Use "print".')
                return
            return
        return

    print('Error parsing arguments. Got {}. Specify in format such that: <username>/<package> with <package> being optional.'.format(p))

# @cli.command()
# @click.argument('payload')
# def print(payload):

#   pass
STFDevicesControl.py 文件源码 项目:PhonePerformanceMeasure 作者: KyleCe 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __get_api_conf(self, sfile, conf_name):
        full_path = Fun.get_file_in_directory_full_path(sfile)
        print full_path
        if not os.path.exists(full_path):
            print("Error: Cannot get config file")
            sys.exit(-1)
        sfile = full_path
        conf = ConfigParser.ConfigParser()
        conf.read(sfile)
        print conf.sections()
        try:
            self.url = conf.get(conf_name, "url")
            self.access_token = conf.get(conf_name, "access_token")
            self.api_token = conf.get(conf_name, "api_token")
        except Exception, e:
            print("Error: " + str(e))
            sys.exit(-1)

    # ????
pull_prod.py 文件源码 项目:ELO-Darts 作者: pwgraham91 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def pull_user_data(session):
    print 'pulling users'
    user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
    loaded_data = json.loads(user_data.text)
    for user_dict in loaded_data:
        user = User(
            id=user_dict['id'],
            name=user_dict['name'],
            email=user_dict['email'],
            admin=user_dict['admin'],
            avatar=user_dict['avatar'],
            active=user_dict['active'],
            created_at=user_dict['created_at'],
            elo=user_dict['elo'],
            wins=user_dict['wins'],
            losses=user_dict['losses']
        )
        session.add(user)
    session.commit()
    print 'done pulling users'
pull_prod.py 文件源码 项目:ELO-Darts 作者: pwgraham91 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def pull_game_data(session):
    print 'pulling games'
    game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
    loaded_data = json.loads(game_data.text)
    for game_dict in loaded_data:
        game = Game(
            id=game_dict['id'],
            created_at=game_dict['created_at'],
            deleted_at=game_dict['deleted_at'],
            winner_id=game_dict['winner_id'],
            winner_elo_score=game_dict['winner_elo_score'],
            loser_id=game_dict['loser_id'],
            loser_elo_score=game_dict['loser_elo_score'],
            submitted_by_id=game_dict['submitted_by_id']
        )
        session.add(game)
    session.commit()
    print 'done pulling games'
codeship.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def format_data(cls, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        builds = [cls.format_build(build) for build in data.get('builds', [])]
        estimate_time(builds)
        return dict(
            builds=builds[:4],
            health=health_summary(builds),
            name=data.get('repository_name'),
        )
codeship.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def format_build(cls, build):
        """Re-format the build data for the front-end.

        Arguments:
          build (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        start, finish, elapsed = elapsed_time(
            build.get('started_at'),
            build.get('finished_at'),
        )
        return super().format_build(dict(
            author=build.get('github_username'),
            duration=(
                None if start is None or finish is None else finish - start
            ),
            elapsed=elapsed,
            message=build.get('message'),
            outcome=build.get('status'),
            started_at=start,
        ))
github.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def format_data(cls, name, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`list`): The JSON data from the response.
          name (:py:class:`str`): The name of the repository.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        return dict(
            commits=[cls.format_commit(commit.get('commit', {}))
                     for commit in data[:5] or []],
            name=name,
        )
github.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def half_life(issues):
        """Calculate the half life of the service's issues.

        Args:
          issues (:py:class:`list`): The service's issue data.

        Returns:
          :py:class:`datetime.timedelta`: The half life of the issues.

        """
        lives = []
        for issue in issues:
            start = safe_parse(issue.get('created_at'))
            end = safe_parse(issue.get('closed_at'))
            if start and end:
                lives.append(end - start)
        if lives:
            lives.sort()
            size = len(lives)
            return lives[((size + (size % 2)) // 2) - 1]
tracker.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def details(self, iteration):
        """Update the project data with more details.

        Arguments:
          iteration (:py:class:`int`): The current iteration number.

        Returns:
          :py:class:`dict`: Additional detail on the current iteration.

        """
        url = self.url_builder(
            '/projects/{id}/iterations/{number}',
            params={'number': iteration, 'id': self.project_id},
            url_params={'fields': ':default,velocity,stories'},
        )
        response = requests.get(url, headers=self.headers)
        if response.status_code == 200:
            update = response.json()
            return dict(
                stories=self.story_summary(update.get('stories', [])),
                velocity=update.get('velocity', 'unknown'),
            )
        else:
            logger.error('failed to update project iteration details')
        return {}
coveralls.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def format_data(self, name, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.
          name (:py:class:`str`): The name of the repository.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        builds = [self.format_build(build) for build in data.get('builds', [])]

        return dict(
            builds=builds[:4],
            health=self.health(builds[0] if builds else None),
            name=name,
        )
coveralls.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def format_build(cls, build):
        """Re-format the build data for the front-end.

        Arguments:
          build (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        coverage = build.get('covered_percent')
        message = build.get('commit_message')
        return dict(
            author=build.get('committer_name') or '<no author>',
            committed=occurred(build.get('created_at')),
            coverage=None if coverage is None else '{:.1f}%'.format(coverage),
            message_text=remove_tags(message) if message else None,
            raw_coverage=coverage,
        )
travis.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def format_data(self, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        commits = {commit['id']: commit for commit in data.get('commits', [])}
        builds = [
            self.format_build(build, commits.get(build.get('commit_id'), {}))
            for build in data.get('builds', [])
        ]
        estimate_time(builds)
        return dict(
            builds=builds[:4],
            health=health_summary(builds),
            name=self.repo,
        )
travis.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def format_build(cls, build, commit):  # pylint: disable=arguments-differ
        """Re-format the build and commit data for the front-end.

        Arguments:
          build (:py:class:`dict`): The build data from the response.
          commit (:py:class:`dict`): The commit data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        start, finish, elapsed = elapsed_time(
            build.get('started_at'),
            build.get('finished_at'),
        )
        return super().format_build(dict(
            author=commit.get('author_name'),
            duration=(
                None if start is None or finish is None else finish - start
            ),
            elapsed=elapsed,
            message=commit.get('message'),
            outcome=build.get('state'),
            started_at=start,
        ))
k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def watch(self, path):
        params = {'watch': 'true'}
        url = self._base_url + path
        header = {}
        if self.token:
            header.update({'Authorization': 'Bearer %s' % self.token})

        # TODO(ivc): handle connection errors and retry on failure
        while True:
            with contextlib.closing(
                    requests.get(url, params=params, stream=True,
                                 cert=self.cert, verify=self.verify_server,
                                 headers=header)) as response:
                if not response.ok:
                    raise exc.K8sClientException(response.text)
                for line in response.iter_lines(delimiter='\n'):
                    line = line.strip()
                    if line:
                        yield jsonutils.loads(line)
numerapi.py 文件源码 项目:numerai 作者: gansanay 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def download_current_dataset(self, dest_path='.', unzip=True):
        now = datetime.now().strftime('%Y%m%d')
        file_name = 'numerai_dataset_{0}.zip'.format(now)
        dest_file_path ='{0}/{1}'.format(dest_path, file_name)

        r = requests.get(self._dataset_url)
        if r.status_code!=200:
            return r.status_code

        with open(dest_file_path, "wb") as fp:
            for byte in r.content:
                fp.write(byte)

        if unzip:
            with zipfile.ZipFile(dest_file_path, "r") as z:
                z.extractall(dest_path)
        return r.status_code
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_file_report(self, this_hash):
        """ Get the scan results for a file.

        You can also specify a CSV list made up of a combination of hashes and scan_ids
        (up to 4 items with the standard request rate), this allows you to perform a batch
        request with one single call.
        i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.

        :param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
                            retrieve or scan_ids from a previous call to scan_file.
        :return:
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_url_report(self, this_url, scan='0'):
        """ Get the scan results for a URL. (can do batch searches like get_file_report)

        :param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
                         (sha256-timestamp as returned by the URL submission API) to access a specific report. At the
                         same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
                         to perform a batch request with one single call (up to 4 resources per call with the standard
                         request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
                         character.
        :param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
                      for analysis if no report is found for it in VirusTotal's database. In this case the result will
                      contain a scan_id field that can be used to query the analysis report later on.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_upload_url(self):
        """ Get a special URL for submitted files bigger than 32MB.

        In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
        can POST files up to 200MB in size. This API generates such a URL.

        :return: JSON special upload URL to which you can POST files up to 200MB in size.
        """
        params = {'apikey': self.api_key}

        try:
            response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        if response.status_code == requests.codes.ok:
            return response.json()['upload_url']
        else:
            return dict(response_code=response.status_code)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_file_behaviour(self, this_hash):
        """ Get a report about the behaviour of the file in sand boxed environment.

        VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
        attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
        ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
        generated for a given file in our dataset.

        If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
        lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
        behaviour-v1 property of the additional_info field in the JSON report.

        :param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
        :return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
        """
        params = {'apikey': self.api_key, 'hash': this_hash}

        try:
            response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 84 收藏 0 点赞 0 评论 0
def get_file_distribution(self, before='', after='', reports='false', limit='1000'):
        """ Get a live feed with the latest files submitted to VirusTotal.

        Allows you to retrieve a live feed of absolutely all uploaded files to VirusTotal, and download them for
        further scrutiny. This API requires you to stay synced with the live submissions as only a backlog of 6
        hours is provided at any given point in time.

        :param before: (optional) Retrieve files received before the given timestamp, in timestamp descending order.
        :param after: (optional) Retrieve files received after the given timestamp, in timestamp ascending order.
        :param reports: (optional) Include the files' antivirus results in the response. Possible values are 'true' or
        'false' (default value is 'false').
        :param limit: (optional) Retrieve limit file items at most (default: 1000).
        :return: JSON response: please see https://www.virustotal.com/en/documentation/private-api/#file-distribution
        """
        params = {'apikey': self.api_key, 'before': before, 'after': after, 'reports': reports, 'limit': limit}

        try:
            response = requests.get(self.base + 'file/distribution', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_url_report(self, this_url, scan='0', allinfo=1):
        """ Get the scan results for a URL.

        :param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
        (sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
        can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
        request with one single call. The CSV list must be separated by new line characters.
        :param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
        for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
        scan_id field that can be used to query the analysis report later on.
        :param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
        (other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
        related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
        of other tools and datasets when fed with the URL.
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_url_distribution(self, after=None, reports='true', limit=1000):
        """ Get a live feed with the lastest URLs submitted to VirusTotal.

        Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
        call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.

        :param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
        :param reports:  (optional) When set to "true" each item retrieved will include the results for each particular
        URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
        item returned will only contain the scanned URL and its detection ratio.
        :param limit: (optional) Retrieve limit file items at most (default: 1000).
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}

        try:
            response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)


问题


面经


文章

微信
公众号

扫码关注公众号