def scan_file(self, this_file):
""" Submit a file to be scanned by VirusTotal
:param this_file: File to be scanned (32MB file size limit)
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key}
try:
if type(this_file) == str and os.path.isfile(this_file):
files = {'file': (this_file, open(this_file, 'rb'))}
elif isinstance(this_file, StringIO.StringIO):
files = {'file': this_file.read()}
else:
files = {'file': this_file}
except TypeError as e:
return dict(error=e.message)
try:
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
python类RequestException()的实例源码
def rescan_file(self, this_hash):
""" Rescan a previously submitted filed or schedule an scan to be performed in the future.
:param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
the three allowed hashes (up to 25 items), this allows you to perform a batch request with
one single call. Note that the file must already be present in our file store.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key, 'resource': this_hash}
try:
response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_file_report(self, this_hash):
""" Get the scan results for a file.
You can also specify a CSV list made up of a combination of hashes and scan_ids
(up to 4 items with the standard request rate), this allows you to perform a batch
request with one single call.
i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.
:param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
retrieve or scan_ids from a previous call to scan_file.
:return:
"""
params = {'apikey': self.api_key, 'resource': this_hash}
try:
response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def scan_url(self, this_url):
""" Submit a URL to be scanned by VirusTotal.
:param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
standard request rate) so as to perform a batch scanning request with one single call. The
URLs must be separated by a new line character.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key, 'url': this_url}
try:
response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_url_report(self, this_url, scan='0'):
""" Get the scan results for a URL. (can do batch searches like get_file_report)
:param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the
same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
to perform a batch request with one single call (up to 4 resources per call with the standard
request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
character.
:param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
for analysis if no report is found for it in VirusTotal's database. In this case the result will
contain a scan_id field that can be used to query the analysis report later on.
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}
try:
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
""" Submit a file to be scanned by VirusTotal.
Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
special upload URL.
:param this_file: The file to be uploaded.
:param notify_url: A URL to which a POST notification should be sent when the scan finishes.
:param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
sent only if the scan results differ from the previous analysis.
:return: JSON response that contains scan_id and permalink.
"""
params = {'apikey': self.api_key}
files = {'file': (this_file, open(this_file, 'rb'))}
try:
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_upload_url(self):
""" Get a special URL for submitted files bigger than 32MB.
In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
can POST files up to 200MB in size. This API generates such a URL.
:return: JSON special upload URL to which you can POST files up to 200MB in size.
"""
params = {'apikey': self.api_key}
try:
response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
if response.status_code == requests.codes.ok:
return response.json()['upload_url']
else:
return dict(response_code=response.status_code)
def get_file_behaviour(self, this_hash):
""" Get a report about the behaviour of the file in sand boxed environment.
VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
generated for a given file in our dataset.
If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
behaviour-v1 property of the additional_info field in the JSON report.
:param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
:return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
"""
params = {'apikey': self.api_key, 'hash': this_hash}
try:
response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_file(self, this_hash):
""" Download a file by its hash.
Downloads a file from VirusTotal's store given one of its hashes. This call can be used in conjuction with
the file searching call in order to download samples that match a given set of criteria.
:param this_hash: The md5/sha1/sha256 hash of the file you want to download.
:return: Downloaded file in response.content
"""
params = {'apikey': self.api_key, 'hash': this_hash}
try:
response = requests.get(self.base + 'file/download', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
if response.status_code == requests.codes.ok:
return response.content
elif response.status_code == 403:
return dict(error='You tried to perform calls to functions for which you require a Private API key.',
response_code=response.status_code)
elif response.status_code == 404:
return dict(error='File not found.', response_code=response.status_code)
else:
return dict(response_code=response.status_code)
def get_url_report(self, this_url, scan='0', allinfo=1):
""" Get the scan results for a URL.
:param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
request with one single call. The CSV list must be separated by new line characters.
:param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
scan_id field that can be used to query the analysis report later on.
:param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
(other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
of other tools and datasets when fed with the URL.
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}
try:
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_url_distribution(self, after=None, reports='true', limit=1000):
""" Get a live feed with the lastest URLs submitted to VirusTotal.
Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.
:param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
:param reports: (optional) When set to "true" each item retrieved will include the results for each particular
URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
item returned will only contain the scanned URL and its detection ratio.
:param limit: (optional) Retrieve limit file items at most (default: 1000).
:return: JSON response
"""
params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}
try:
response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_ip_report(self, this_ip):
""" Get information about a given IP address.
Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS
infrastructure).
:param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
supported.
:return: JSON response
"""
params = {'apikey': self.api_key, 'ip': this_ip}
try:
response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def put_comments(self, resource, comment):
""" Post a comment on a file or URL.
Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
Community, under the corresponding tab in the reports for each particular item.
Comments can range from URLs and locations where a given file was found in the wild to full reverse
engineering reports on a given malware specimen, anything that may help other analysts in extending their
knowledge about a particular file or URL.
:param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
to comment on.
:param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
and reference users using the "@" syntax (e.g. @VirusTotalTeam).
:return: JSON response
"""
params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}
try:
response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_comments(self, resource, before=None):
""" Get comments for a file or URL.
Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are
user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of
files up to fully-featured reverse engineering reports on a given sample.
:param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve.
:param before: (optional) A datetime token that allows you to iterate over all comments on a specific item
whenever it has been commented on more than 25 times.
:return: JSON response - The application answers with the comments sorted in descending order according to
their date.
"""
params = dict(apikey=self.api_key, resource=resource, before=before)
try:
response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_hashes_from_search(self, query, page=None):
""" Get the scan results for a file.
Even if you do not have a Private Mass API key that you can use, you can still automate VirusTotal Intelligence
searches pretty much in the same way that the searching for files api call works.
:param query: a VirusTotal Intelligence search string in accordance with the file search documentation .
<https://www.virustotal.com/intelligence/help/file-search/>
:param page: the next_page property of the results of a previously issued query to this API. This parameter
should not be provided if it is the very first query to the API, i.e. if we are retrieving the
first page of results.
apikey: the API key associated to a VirusTotal Community account with VirusTotal Intelligence privileges.
"""
params = {'query': query, 'apikey': self.api_key, 'page': page}
try:
response = requests.get(self.base + 'search/programmatic/', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return response.json()['next_page'], response
def view_autocomplete(self, request, group, **kwargs):
field = request.GET.get('autocomplete_field')
query = request.GET.get('autocomplete_query')
if field != 'issue_id' or not query:
return Response({'issue_id': []})
query = query.encode('utf-8')
_url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
try:
req = self.make_api_request(group.project, _url)
body = safe_urlread(req)
except (requests.RequestException, PluginError) as e:
return self.handle_api_error(e)
try:
json_resp = json.loads(body)
except ValueError as e:
return self.handle_api_error(e)
resp = json_resp.get('stories', {})
stories = resp.get('stories', [])
issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]
return Response({field: issues})
def link_issue(self, request, group, form_data, **kwargs):
comment = form_data.get('comment')
if not comment:
return
_url = '%s/%s/comments' % (self.build_api_url(group, 'stories'), form_data['issue_id'])
try:
req = self.make_api_request(group.project, _url, json_data={"text": comment})
body = safe_urlread(req)
except requests.RequestException as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
if req.status_code > 399:
raise PluginError(json_resp['error'])
def _stream(self, url, headers={}):
response = self.__get(url, headers=headers, stream=True, timeout=self.timeout)
length = 0
piece_size = 65536
try:
for piece in response.iter_content(piece_size):
length += len(piece)
yield piece
except requests.RequestException as re:
raise exceptions.RetryableIOError(re)
if CONTENT_LENGTH in response.headers:
content_length = int(response.headers[CONTENT_LENGTH])
if content_length != length:
raise exceptions.ContentLengthMismatch(
"Length mismatch {} != {}".format(content_length, length))
def test_push_results_to_db_request_post_failed(self):
dic = self._get_env_dict(None)
CONST.__setattr__('results_test_db_url', self.db_url)
with mock.patch.dict(os.environ,
dic,
clear=True), \
mock.patch('functest.utils.functest_utils.logger.error') \
as mock_logger_error, \
mock.patch('functest.utils.functest_utils.requests.post',
side_effect=requests.RequestException):
self.assertFalse(functest_utils.
push_results_to_db(self.project, self.case_name,
self.start_date,
self.stop_date,
self.result, self.details))
mock_logger_error.assert_called_once_with(test_utils.
RegexMatch("Pushing "
"Result to"
" DB"
"(\S+\s*) "
"failed:"))
def callback(request_id, message, callback_url, scheduled_at, last_retry=None, retry_delay=None, _attempts=None):
if retry_delay is None:
retry_delay = DEFAULT_RETRY_DELAY
try:
response = requests.post(callback_url, data=message)
response.raise_for_status()
except requests.RequestException:
callback.retry(
request_id=request_id,
message=message,
callback_url=callback_url,
scheduled_at=scheduled_at,
last_retry=int(time.time()),
retry_delay=retry_delay,
delay=retry_delay,
taskid=request_id,
_attempts=_attempts
)
def http_request(method, url, session=requests, **kwargs):
""" Wrapper for 'requests' silencing exceptions a little bit. """
kwargs.setdefault('timeout', 30.0)
kwargs.setdefault('verify', False)
try:
return getattr(session, method.lower())(url, **kwargs)
except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema):
print_error("Invalid URL format: {}".format(url))
return
except requests.exceptions.ConnectionError:
print_error("Connection error: {}".format(url))
return
except requests.RequestException as error:
print_error(error)
return
except socket.error as err:
print_error(err)
return
except KeyboardInterrupt:
print_info()
print_status("Module has been stopped")
def get_product_info(self, serial, retry=True):
if self.debug:
print '\t[+] Checking possible product "%s"' % serial
timeout = 10
try:
resp = self.requests.get(self.url + '?productId=' + serial, verify=True, timeout=timeout)
msg = 'Status code: %s' % str(resp.status_code)
if str(resp.status_code) == '401':
print '\t[!] HTTP error. Message was: %s' % msg
print '\t[!] waiting for 30 seconds to let the api server calm down'
# suspecting blockage due to to many api calls. Put in a pause of 30 seconds and go on
time.sleep(30)
if retry:
print '\n[!] Retry'
self.get_product_info(serial, False)
else:
return None
else:
return resp.json()
except requests.RequestException as e:
self.error_msg(e)
return None
def get_access_key(self):
if self.debug:
print '\t[+] Getting HP access token'
timeout = 10
payload = {
'apiKey': self.api_key,
'apiSecret': self.api_secret,
'grantType': 'client_credentials',
'scope': 'warranty'
}
headers = {
'Accept': 'application/json',
'Content-type': 'application/x-www-form-urlencoded'
}
try:
resp = requests.post(self.url + '/oauth/v1/token',
data=payload, headers=headers, verify=True, timeout=timeout)
result = json.loads(resp.text)
return result['access_token']
except requests.RequestException as e:
self.error_msg(e)
sys.exit()
def http_request(self, call, url, **kwargs):
try:
# Remove args with no value
kwargs = self.unused(kwargs)
if self.client.timeout:
kwargs['timeout'] = self.client.timeout
if self.debug:
print("-- %s on %s with %s " % (call.__name__.upper(),
url, kwargs))
resp = call(url, **kwargs)
if self.debug:
print("-- response: %s " % resp.text)
if resp.status_code != 200:
raise LunrHttpError("%s returned '%s' with '%s'" %
(url, resp.status_code,
json.loads(resp.text)['reason']),
resp.status_code)
return response(json.loads(resp.text), resp.status_code)
except requests.RequestException as e:
raise LunrError(str(e))
def http_request(method, url, session=requests, **kwargs):
""" Wrapper for 'requests' silencing exceptions a little bit. """
kwargs.setdefault('timeout', 30.0)
kwargs.setdefault('verify', False)
try:
return getattr(session, method.lower())(url, **kwargs)
except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema):
print_error("Invalid URL format: {}".format(url))
return
except requests.exceptions.ConnectionError:
print_error("Connection error: {}".format(url))
return
except requests.RequestException as error:
print_error(error)
return
except socket.error as err:
print_error(err)
return
except KeyboardInterrupt:
print_info()
print_status("Module has been stopped")
def _get_freegeoip() -> Optional[Dict[str, Any]]:
"""Query freegeoip.io for location data."""
try:
raw_info = requests.get(FREEGEO_API, timeout=5).json()
except (requests.RequestException, ValueError):
return None
return {
'ip': raw_info.get('ip'),
'country_code': raw_info.get('country_code'),
'country_name': raw_info.get('country_name'),
'region_code': raw_info.get('region_code'),
'region_name': raw_info.get('region_name'),
'city': raw_info.get('city'),
'zip_code': raw_info.get('zip_code'),
'time_zone': raw_info.get('time_zone'),
'latitude': raw_info.get('latitude'),
'longitude': raw_info.get('longitude'),
}
def _get_ip_api() -> Optional[Dict[str, Any]]:
"""Query ip-api.com for location data."""
try:
raw_info = requests.get(IP_API, timeout=5).json()
except (requests.RequestException, ValueError):
return None
return {
'ip': raw_info.get('query'),
'country_code': raw_info.get('countryCode'),
'country_name': raw_info.get('country'),
'region_code': raw_info.get('region'),
'region_name': raw_info.get('regionName'),
'city': raw_info.get('city'),
'zip_code': raw_info.get('zip'),
'time_zone': raw_info.get('timezone'),
'latitude': raw_info.get('lat'),
'longitude': raw_info.get('lon'),
}
def pull_poloniex_data():
try:
logger.info("pulling Poloniex data...")
req = get('https://poloniex.com/public?command=returnTicker')
data = req.json()
timestamp = time.time()
poloniex_data_point = ExchangeData.objects.create(
source=POLONIEX,
data=json.dumps(data),
timestamp=timestamp
)
logger.info("Saving Poloniex price, volume data...")
_save_prices_and_volumes(data, timestamp)
except RequestException:
return 'Error to collect data from Poloniex'
def request(self, method, url, **kwargs):
headers = kwargs.get('headers')
if headers:
headers = dict(headers)
if 'User-Agent' not in map(str.title, headers.keys()):
headers['User-Agent'] = self.user_agent
else:
headers = self.default_headers
kwargs['headers'] = headers
try:
return super(RequestsSession, self).request(method, url, **kwargs)
except requests.RequestException:
raise RequestError
def working(self, priority: int, url: str, keys: dict, deep: int, repeat: int, proxies=None) -> (int, bool, object):
"""
working function, must "try, except" and don't change the parameters and return
:return (fetch_result, proxies_state, content): fetch_result can be -2(fetch failed, stop thread), -1(fetch failed), 0(need repeat), 1(fetch success)
:return (fetch_result, proxies_state, content): proxies_state can be True(avaiable), False(unavaiable)
:return (fetch_result, proxies_state, content): content can be any object, for example string, list, etc
"""
logging.debug("%s start: %s", self.__class__.__name__, CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
time.sleep(random.randint(0, self._sleep_time))
try:
fetch_result, proxies_state, content = self.url_fetch(priority, url, keys, deep, repeat, proxies=proxies)
except requests.RequestException:
if repeat >= self._max_repeat:
fetch_result, proxies_state, content = -1, True, None
logging.error("%s error: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
else:
fetch_result, proxies_state, content = 0, True, None
logging.debug("%s repeat: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
except Exception:
fetch_result, proxies_state, content = -1, True, None
logging.error("%s error: %s, %s", self.__class__.__name__, extract_error_info(), CONFIG_FETCH_MESSAGE % (priority, keys, deep, repeat, url))
logging.debug("%s end: fetch_result=%s, proxies_state=%s, url=%s", self.__class__.__name__, fetch_result, proxies_state, url)
return fetch_result, proxies_state, content