def get_ip_report(self, this_ip):
""" Get information about a given IP address.
Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS
infrastructure).
:param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
supported.
:return: JSON response
"""
params = {'apikey': self.api_key, 'ip': this_ip}
try:
response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
python类get()的实例源码
def get_domain_report(self, this_domain):
""" Get information about a given domain.
Retrieves a report on a given domain (including the information recorded by VirusTotal's passive DNS
infrastructure).
:param this_domain: A domain name.
:return: JSON response
"""
params = {'apikey': self.api_key, 'domain': this_domain}
try:
response = requests.get(self.base + 'domain/report', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def get_comments(self, resource, before=None):
""" Get comments for a file or URL.
Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are
user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of
files up to fully-featured reverse engineering reports on a given sample.
:param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve.
:param before: (optional) A datetime token that allows you to iterate over all comments on a specific item
whenever it has been commented on more than 25 times.
:return: JSON response - The application answers with the comments sorted in descending order according to
their date.
"""
params = dict(apikey=self.api_key, resource=resource, before=before)
try:
response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies)
except requests.RequestException as e:
return dict(error=e.message)
return _return_response_and_status_code(response)
def WaitForFileServerToStart(port):
"""
Wait for the Flask file server to start up. Test it by trying the
PyUpdater update URL, e.g. http://127.0.0.1:12345. If we receive
a ConnectionError, we continue waiting, but if we receive an HTTP
response code (404), we return True. For a frozen app, e.g. a
Mac .app bundle, the location of the updates must be supplied by
an environment variable, whereas when running from the source repo,
the location of the updates is likely to be ./pyu-data/deploy/
"""
url = 'http://%s:%s/fileserver-is-ready' % (LOCALHOST, port)
attempts = 0
while True:
try:
attempts += 1
requests.get(url, timeout=1)
return True
except requests.exceptions.ConnectionError:
time.sleep(0.25)
if attempts > 10:
logger.warning("WaitForFileServerToStart: timeout")
return
def checkFactorDB(n):
"""See if the modulus is already factored on factordb.com,
and if so get the factors"""
# Factordb gives id's of numbers, which act as links for full number
# follow the id's and get the actual numbers
r = requests.get('http://www.factordb.com/index.php?query=%s' % str(n))
regex = re.compile("index\.php\?id\=([0-9]+)", re.IGNORECASE)
ids = regex.findall(r.text)
# These give you ID's to the actual number
num = len(ids)-2
print(ids)
print(num)
if num < 2: return 0
else: return num * (num-1) / 2
#print solve('1ELuX8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
def get_audit_actions(self, date_modified, offset=0, page_length=100):
"""
Get all actions created after a specified date. If the number of actions found is more than 100, this function will
page until it has collected all actions
:param date_modified: ISO formatted date/time string. Only actions created after this date are are returned.
:param offset: The index to start retrieving actions from
:param page_length: How many actions to fetch for each page of action results
:return: Array of action objects
"""
logger = logging.getLogger('sp_logger')
actions_url = self.api_url + 'actions/search'
response = self.authenticated_request_post(
actions_url,
data=json.dumps({
"modified_at": {"from": str(date_modified)},
"offset": offset,
"status": [0, 10, 50, 60]
})
)
result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None
self.log_http_status(response.status_code, 'GET actions')
if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]:
return None
return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
def recursive_scrape(url, count=0):
# The API seems to link the images in a loop, so we can stop once we see an
# image we have already seen.
if url in seen_photos:
return
seen_photos[url] = True
page = requests.get(url)
photo_json = page.json()
print photo_json
yield photo_json
next_url = 'https://earthview.withgoogle.com' + photo_json['nextApi']
# Yielding from recursive functions is a bit funky
for photo_json in recursive_scrape(next_url, count + 1):
yield photo_json
def facebook_id_to_username(self, facebook_id):
"""
Converts a Facebook ID to a username.
Args:
facebook_id: A string representing a Facebook ID.
Returns:
A string representing the username corresponding to facebook_id.
"""
# Store username in self.id_to_username if we have not seen the user ID yet
if facebook_id not in self.id_to_username:
graph_api_request = 'https://graph.facebook.com/' + facebook_id + '?fields=name&access_token=' + self.ACCESS_TOKEN
response_dict = requests.get(graph_api_request).json()
try:
username = response_dict['name']
except KeyError:
self.id_to_username[facebook_id] = facebook_id + '@facebook.com'
else:
self.id_to_username[facebook_id] = username
return self.id_to_username[facebook_id]
def slack(text: hug.types.text):
"""Returns JSON containing an attachment with an image url for the Slack integration"""
title = text
if text == 'top250':
top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
top250_page = html.fromstring(top250_res.text)
candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')
title = random.choice(candidates).text
return dict(
response_type='in_channel',
attachments=[
dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
]
)
def saveFile(self, url, page, idx):
user_define_name = self.now_date() + '_p_' + str(page) + '_' + string.zfill(idx, 2) # ??2?
file_ext = self.file_extension(url) # ???
save_file_name = user_define_name + "_" + file_ext
# ???????open??
# urllib.urlretrieve(item[0], self.save_path + save_file_name)
# ????
url = self.CheckUrlValidate(url)
try:
pic = requests.get(url, timeout=30)
f = open(self.store_dir + os.sep + save_file_name, 'wb')
f.write(pic.content)
f.close()
print '\ndone save file ' + save_file_name
except ReadTimeout:
print 'save file %s failed. cause by timeout(30)' %(save_file_name)
except Exception, e:
print 'this python version does not support https.'
print e
#??url????http:??
def GetTotalPage(self, html):
# create the BeautifulSoup
some_soup = BeautifulSoup(html)
#get the page div
ele_a = some_soup.find('div', attrs={'class': 'page'})
#get the last div>a text='??'
last_a = ele_a.findAll('a')[-1]
#substr 0:.html
pagenum = last_a.get('href')[:-5]
print 'pagenum :', pagenum
# print type(last_a)
self.SaveTotalPageToFile(pagenum)
# store the max page number to totalpage.ini
#new_page_num: new max page num
def new_session(account):
if account.get('session',None) is None:
session = requests.session()
session.verify = True
session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'})
if not account['proxy'] is None:
session.proxies.update(account['proxy'])
account['session'] = session
else:
account['session'].close()
account['session'].cookies.clear()
account['session_time'] = get_time()
account['session_hash'] = os.urandom(32)
account['api_url'] = API_URL
account['auth_ticket'] = None
auto_isolate_from_watchlist.py 文件源码
项目:cbapi-examples
作者: cbcommunity
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_watchlist_id_by_name(watchlistsdict):
"""
For each watchlist name specified in the config file, find the
associated watchlist ID.
NOTE: We trigger on watchlist IDs, and not on watchlist names
"""
global cbtoken
global cbserver
headers = {'X-AUTH-TOKEN': cbtoken}
r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
headers=headers,
verify=False)
parsed_json = json.loads(r.text)
for watchlist in parsed_json:
for key, value in watchlistsdict.iteritems():
if watchlist['name'].lower() == key.lower():
watchlistsdict[key] = watchlist['id']
def get_or_create(self, model, field, value, **kwargs):
"""
Retrieves object of class `model` with lookup key `value` from the cache. If not found,
creates the object based on `field=value` and any other `kwargs`.
Returns a tuple of `(object, created)`, where `created` is a boolean specifying whether an
`object` was created.
"""
result = self[model].get(value)
created = False
if not result:
kwargs[field] = value
result = model.objects.create(**kwargs)
self[model][value] = result
created = True
return result, created
def tags(self):
tags = self._fixed_tags
if self._type == 'reply': # NOTE replies don't ever get put in public directly
out_tags = []
for tag in tags:
if tag.startswith('RRID:'):
continue # we deal with the RRID itself in def rrid(self)
elif tag == self.INCOR_TAG and self.rrid:
continue
else:
out_tags.append(tag)
if self.corrected:
out_tags.append(self.CORR_TAG)
return sorted(out_tags)
else:
return [t for t in tags if not t.startswith('RRID:')] # let self.rrid handle the rrid tags
def clean_dupes(get_annos, repr_issues=False):
annos = get_annos()
seen = set()
dupes = [a.id for a in annos if a.id in seen or seen.add(a.id)]
preunduped = [a for a in annos if a.id in dupes]
for id_ in dupes:
print('=====================')
anns = sorted((a for a in annos if a.id == id_), key=lambda a: a.updated)
if not repr_issues:
[print(a.updated, HypothesisHelper(a, annos)) for a in anns]
for a in anns[:-1]: # all but latest
annos.remove(a)
deduped = [a for a in annos if a.id in dupes]
assert len(preunduped) // len(dupes) == 2, 'Somehow you have managed to get more than 1 duplicate!'
# get_annos.memoize_annos(annos)
embed()
def get_balance(address):
"""
Retrieves the balance from etherscan.io.
The balance is returned in ETH rounded to the second decimal.
"""
address = PyWalib.address_hex(address)
url = 'https://api.etherscan.io/api'
url += '?module=account&action=balance'
url += '&address=%s' % address
url += '&tag=latest'
if ETHERSCAN_API_KEY:
'&apikey=%' % ETHERSCAN_API_KEY
# TODO: handle 504 timeout, 403 and other errors from etherscan
response = requests.get(url)
response_json = response.json()
PyWalib.handle_etherscan_error(response_json)
balance_wei = int(response_json["result"])
balance_eth = balance_wei / float(pow(10, 18))
balance_eth = round(balance_eth, ROUND_DIGITS)
return balance_eth
def get_cids(self, cas):
"""
Use the PubChem API to get the CID
:param cas: string - CAS identifier
:return: list of CIDs
"""
uri = "http://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/%s/cids/json" \
"?email=%s"
try:
response = get((uri % (cas, app.config['ADMIN_EMAIL']))).json()
try:
cids = response['IdentifierList']['CID']
return cids
except KeyError:
return None
except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout,
exceptions.ConnectTimeout, exceptions.ReadTimeout) as e:
# Error. return the error and the CAS number that this error occured on
sys.stderr.write("Error: %s. Occurred on CAS: %s", (e, cas))
sys.stderr.flush()
sys.stdout.flush()
def getRosiItem():
start = time.time()
index = 1
while True:
url = "http://www.mmxyz.net/category/rosi/page/{}/".format(index)
res = requests.get(url,timeout=10)
if res.status_code == 404:
print("+ Time: {:.2f} S +".format(time.time()-start))
print("+ Total Pages: {} +".format(index-1))
print("+ Total Numbers: {} +".format(len(RosiItems)))
print("+-------------------------+\r\n\r\n")
return
soup = BeautifulSoup(res.content, "html.parser")
rosiList = soup.find_all("a", class_="inimg")
for rosi in rosiList:
RosiItems.append(rosi['href'])
index += 1
def getRosiItem():
start = time.time()
index = 1
while True:
url = "http://www.mmxyz.net/category/disi/page/{}/".format(index)
res = requests.get(url,timeout=10)
if res.status_code == 404:
print("+ Time: {:.2f} S +".format(time.time()-start))
print("+ Total Pages: {} +".format(index-1))
print("+ Total Numbers: {} +".format(len(RosiItems)))
print("+-------------------------+\r\n\r\n")
return
soup = BeautifulSoup(res.content, "html.parser")
rosiList = soup.find_all("a", class_="inimg")
for rosi in rosiList:
RosiItems.append(rosi['href'])
index += 1