def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
python类get()的实例源码
def main():
for url in url_list :
try:
r = requests.get(url)
except : continue
tree = html.fromstring(r.text)
script = tree.xpath('//script[@language="javascript"]/text()')[0]
json_string = regex.findall(script)[0]
json_data = json.loads(json_string)
next_page_url = tree.xpath('//footer/a/@href')
links = [domain + x['nodeRef'] for x in json_data]
for link in links:
extract(link)
def sendRequest(ip, port, route, data=None, protocol="http"):
url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)
if data is not None:
try:
resp = requests.post(url, data=data)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
else:
try:
resp = requests.get(url)
except requests.HTTPError as e:
raise PipelineServiceError("{reason}".format(reason=e))
return resp
def run(self):
Analyzer.run(self)
if self.data_type == 'domain' or self.data_type == 'url':
try:
pattern = re.compile("(?:Category: )([\w\s]+)")
baseurl = 'http://www.fortiguard.com/webfilter?q='
url = baseurl + self.getData()
req = requests.get(url)
category_match = re.search(pattern, req.content, flags=0)
self.report({
'category': category_match.group(1)
})
except ValueError as e:
self.unexpectedError(e)
else:
self.notSupported()
def judge_ip(self, ip, port):
#??ip????
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url,
}
response = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print ("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = response.status_code
if code >= 200 and code < 300:
print ("effective ip")
return True
else:
print ("invalid ip and port")
self.delete_ip(ip)
return False
def watchJob(jobId, exchangeName):
queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
queue.bindToExchange(exchangeName, jobId)
while True:
body, method = queue.get()
if method:
body = json.loads(body)
if body["current_status"] == "SUCCEEDED":
return jobId
else:
raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
else:
pass
def _verify(self):
try:
self.read(self.path)
except IOError as e:
print "Couldn't open {path}: {reason}".format(path=self.path, reason=e)
exit(-1)
else:
d = {}
for name, attrs in self._configParams.iteritems():
if attrs["required"]:
if not self.has_section(attrs["section"]):
raise LookupError("missing required section {s} in the configuration!\nRUN `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"]))
if not self.has_option(attrs["section"], name):
raise LookupError("missing required option {o} in section {s}!\nRun `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"], o=name))
try:
d[name] = self.get(attrs["section"], name)
except NoOptionError:
pass
except NoSectionError:
pass
return d
def get_best(url):
url = 'http://www.infoarena.ro' + url
source_code = requests.get(url)
plain_text = source_code.text
soup = BeautifulSoup(plain_text, "html.parser")
name = soup.find('span', {'class': 'username'}).find('a')['href'][35:]
tests = soup.find_all('td', {'class': 'number'})
max_ms = -1
for test in tests:
test = test.string
if test.endswith('ms'):
time = int(test.strip('ms'))
max_ms = max(max_ms, time)
if name not in d or max_ms < d[name][0]:
d[name] = (max_ms, url)
print(max_ms, name, url)
def list(package_name):
# lists all of the packages for a user, or all of the implementations for a package
# <username> / <package> , <implementation>
# detemine if there's a user and package, or just a user
p = split_package_name(package_name)
if p['username'] != None:
# get all of the packages and print their names in a pretty print
if p['package'] != None:
# get all implementations and print their names in a pretty print
if p['implementation'] != None:
print('Cannot list one specific implementation. Use "print".')
return
return
return
print('Error parsing arguments. Got {}. Specify in format such that: <username>/<package> with <package> being optional.'.format(p))
# @cli.command()
# @click.argument('payload')
# def print(payload):
# pass
def __get_api_conf(self, sfile, conf_name):
full_path = Fun.get_file_in_directory_full_path(sfile)
print full_path
if not os.path.exists(full_path):
print("Error: Cannot get config file")
sys.exit(-1)
sfile = full_path
conf = ConfigParser.ConfigParser()
conf.read(sfile)
print conf.sections()
try:
self.url = conf.get(conf_name, "url")
self.access_token = conf.get(conf_name, "access_token")
self.api_token = conf.get(conf_name, "api_token")
except Exception, e:
print("Error: " + str(e))
sys.exit(-1)
# ????
def pull_user_data(session):
print 'pulling users'
user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
loaded_data = json.loads(user_data.text)
for user_dict in loaded_data:
user = User(
id=user_dict['id'],
name=user_dict['name'],
email=user_dict['email'],
admin=user_dict['admin'],
avatar=user_dict['avatar'],
active=user_dict['active'],
created_at=user_dict['created_at'],
elo=user_dict['elo'],
wins=user_dict['wins'],
losses=user_dict['losses']
)
session.add(user)
session.commit()
print 'done pulling users'
def pull_game_data(session):
print 'pulling games'
game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
loaded_data = json.loads(game_data.text)
for game_dict in loaded_data:
game = Game(
id=game_dict['id'],
created_at=game_dict['created_at'],
deleted_at=game_dict['deleted_at'],
winner_id=game_dict['winner_id'],
winner_elo_score=game_dict['winner_elo_score'],
loser_id=game_dict['loser_id'],
loser_elo_score=game_dict['loser_elo_score'],
submitted_by_id=game_dict['submitted_by_id']
)
session.add(game)
session.commit()
print 'done pulling games'
def format_data(cls, data):
"""Re-format the response data for the front-end.
Arguments:
data (:py:class:`dict`): The JSON data from the response.
Returns:
:py:class:`dict`: The re-formatted data.
"""
builds = [cls.format_build(build) for build in data.get('builds', [])]
estimate_time(builds)
return dict(
builds=builds[:4],
health=health_summary(builds),
name=data.get('repository_name'),
)
def format_build(cls, build):
"""Re-format the build data for the front-end.
Arguments:
build (:py:class:`dict`): The JSON data from the response.
Returns:
:py:class:`dict`: The re-formatted data.
"""
start, finish, elapsed = elapsed_time(
build.get('started_at'),
build.get('finished_at'),
)
return super().format_build(dict(
author=build.get('github_username'),
duration=(
None if start is None or finish is None else finish - start
),
elapsed=elapsed,
message=build.get('message'),
outcome=build.get('status'),
started_at=start,
))
def format_data(cls, name, data):
"""Re-format the response data for the front-end.
Arguments:
data (:py:class:`list`): The JSON data from the response.
name (:py:class:`str`): The name of the repository.
Returns:
:py:class:`dict`: The re-formatted data.
"""
return dict(
commits=[cls.format_commit(commit.get('commit', {}))
for commit in data[:5] or []],
name=name,
)
def half_life(issues):
"""Calculate the half life of the service's issues.
Args:
issues (:py:class:`list`): The service's issue data.
Returns:
:py:class:`datetime.timedelta`: The half life of the issues.
"""
lives = []
for issue in issues:
start = safe_parse(issue.get('created_at'))
end = safe_parse(issue.get('closed_at'))
if start and end:
lives.append(end - start)
if lives:
lives.sort()
size = len(lives)
return lives[((size + (size % 2)) // 2) - 1]
def details(self, iteration):
"""Update the project data with more details.
Arguments:
iteration (:py:class:`int`): The current iteration number.
Returns:
:py:class:`dict`: Additional detail on the current iteration.
"""
url = self.url_builder(
'/projects/{id}/iterations/{number}',
params={'number': iteration, 'id': self.project_id},
url_params={'fields': ':default,velocity,stories'},
)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
update = response.json()
return dict(
stories=self.story_summary(update.get('stories', [])),
velocity=update.get('velocity', 'unknown'),
)
else:
logger.error('failed to update project iteration details')
return {}
def format_data(self, name, data):
"""Re-format the response data for the front-end.
Arguments:
data (:py:class:`dict`): The JSON data from the response.
name (:py:class:`str`): The name of the repository.
Returns:
:py:class:`dict`: The re-formatted data.
"""
builds = [self.format_build(build) for build in data.get('builds', [])]
return dict(
builds=builds[:4],
health=self.health(builds[0] if builds else None),
name=name,
)
def format_build(cls, build):
"""Re-format the build data for the front-end.
Arguments:
build (:py:class:`dict`): The JSON data from the response.
Returns:
:py:class:`dict`: The re-formatted data.
"""
coverage = build.get('covered_percent')
message = build.get('commit_message')
return dict(
author=build.get('committer_name') or '<no author>',
committed=occurred(build.get('created_at')),
coverage=None if coverage is None else '{:.1f}%'.format(coverage),
message_text=remove_tags(message) if message else None,
raw_coverage=coverage,
)
def format_data(self, data):
"""Re-format the response data for the front-end.
Arguments:
data (:py:class:`dict`): The JSON data from the response.
Returns:
:py:class:`dict`: The re-formatted data.
"""
commits = {commit['id']: commit for commit in data.get('commits', [])}
builds = [
self.format_build(build, commits.get(build.get('commit_id'), {}))
for build in data.get('builds', [])
]
estimate_time(builds)
return dict(
builds=builds[:4],
health=health_summary(builds),
name=self.repo,
)
def format_build(cls, build, commit): # pylint: disable=arguments-differ
"""Re-format the build and commit data for the front-end.
Arguments:
build (:py:class:`dict`): The build data from the response.
commit (:py:class:`dict`): The commit data from the response.
Returns:
:py:class:`dict`: The re-formatted data.
"""
start, finish, elapsed = elapsed_time(
build.get('started_at'),
build.get('finished_at'),
)
return super().format_build(dict(
author=commit.get('author_name'),
duration=(
None if start is None or finish is None else finish - start
),
elapsed=elapsed,
message=commit.get('message'),
outcome=build.get('state'),
started_at=start,
))
def watch(self, path):
params = {'watch': 'true'}
url = self._base_url + path
header = {}
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
# TODO(ivc): handle connection errors and retry on failure
while True:
with contextlib.closing(
requests.get(url, params=params, stream=True,
cert=self.cert, verify=self.verify_server,
headers=header)) as response:
if not response.ok:
raise exc.K8sClientException(response.text)
for line in response.iter_lines(delimiter='\n'):
line = line.strip()
if line:
yield jsonutils.loads(line)
def download_current_dataset(self, dest_path='.', unzip=True):
now = datetime.now().strftime('%Y%m%d')
file_name = 'numerai_dataset_{0}.zip'.format(now)
dest_file_path ='{0}/{1}'.format(dest_path, file_name)
r = requests.get(self._dataset_url)
if r.status_code!=200:
return r.status_code
with open(dest_file_path, "wb") as fp:
for byte in r.content:
fp.write(byte)
if unzip:
with zipfile.ZipFile(dest_file_path, "r") as z:
z.extractall(dest_path)
return r.status_code
def get_file_report(self, this_hash):
""" Get the scan results for a file.
You can also specify a CSV list made up of a combination of hashes and scan_ids
(up to 4 items with the standard request rate), this allows you to perform a batch
request with one single call.
i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.
:param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
retrieve or scan_ids from a previous call to scan_file.
:return:
"""
params = {'apikey': self.api_key, 'resource': this_hash}
try:
response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_url_report(self, this_url, scan='0'):
""" Get the scan results for a URL. (can do batch searches like get_file_report)
:param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the
same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
to perform a batch request with one single call (up to 4 resources per call with the standard
request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
character.
:param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
for analysis if no report is found for it in VirusTotal's database. In this case the result will
contain a scan_id field that can be used to query the analysis report later on.
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}
try:
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_upload_url(self):
""" Get a special URL for submitted files bigger than 32MB.
In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
can POST files up to 200MB in size. This API generates such a URL.
:return: JSON special upload URL to which you can POST files up to 200MB in size.
"""
params = {'apikey': self.api_key}
try:
response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
if response.status_code == requests.codes.ok:
return response.json()['upload_url']
else:
return dict(response_code=response.status_code)
def get_file_behaviour(self, this_hash):
""" Get a report about the behaviour of the file in sand boxed environment.
VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
generated for a given file in our dataset.
If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
behaviour-v1 property of the additional_info field in the JSON report.
:param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
:return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
"""
params = {'apikey': self.api_key, 'hash': this_hash}
try:
response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_file_distribution(self, before='', after='', reports='false', limit='1000'):
""" Get a live feed with the latest files submitted to VirusTotal.
Allows you to retrieve a live feed of absolutely all uploaded files to VirusTotal, and download them for
further scrutiny. This API requires you to stay synced with the live submissions as only a backlog of 6
hours is provided at any given point in time.
:param before: (optional) Retrieve files received before the given timestamp, in timestamp descending order.
:param after: (optional) Retrieve files received after the given timestamp, in timestamp ascending order.
:param reports: (optional) Include the files' antivirus results in the response. Possible values are 'true' or
'false' (default value is 'false').
:param limit: (optional) Retrieve limit file items at most (default: 1000).
:return: JSON response: please see https://www.virustotal.com/en/documentation/private-api/#file-distribution
"""
params = {'apikey': self.api_key, 'before': before, 'after': after, 'reports': reports, 'limit': limit}
try:
response = requests.get(self.base + 'file/distribution', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_url_report(self, this_url, scan='0', allinfo=1):
""" Get the scan results for a URL.
:param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
request with one single call. The CSV list must be separated by new line characters.
:param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
scan_id field that can be used to query the analysis report later on.
:param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
(other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
of other tools and datasets when fed with the URL.
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}
try:
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_url_distribution(self, after=None, reports='true', limit=1000):
""" Get a live feed with the lastest URLs submitted to VirusTotal.
Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.
:param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
:param reports: (optional) When set to "true" each item retrieved will include the results for each particular
URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
item returned will only contain the scanned URL and its detection ratio.
:param limit: (optional) Retrieve limit file items at most (default: 1000).
:return: JSON response
"""
params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}
try:
response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)