def route_micro(micro):
'''
Micro to real URL redirection handler.
'''
try:
temp = lookup_micro(micro)
if urlcheck(temp):
return redirect(temp)
elif domaincheck(temp):
return redirect("http://" + temp)
elif ipcheck(temp.split(':')[0]) and urlcheck('http://' + temp):
# checks for plain ip or an ip with something after it
return redirect("http://" + temp)
else:
abort(404)
except Exception as e:
# If micro is not registered, handle the exception from trying to look
# it up and raise a 404 HTTP error.
sys.stderr.write(str(e))
abort(404)
python类url()的实例源码
def __do_http_request(self, type_, url, data):
"""make http get and post requests"""
parsed_url = self.__parse_url(url)
parameter = self.__get_parameter_from_parsed_url(parsed_url)
hostname = self.__get_host_from_parsed_url(parsed_url)
url = hostname + parsed_url.path # url is overwritten
payload = {
parameter: data
}
if type_ == 'GET':
request = requests.get(url, payload)
elif type_ == 'POST':
request = requests.post(url, payload)
else:
request = None
return self.__validate_request_status(request)
def import_extract_main(chars={}, datafile=os.path.join("data", "ccew", "extract_main_charity.csv")):
with open(datafile, encoding="latin1") as a:
csvreader = csv.reader(a, doublequote=False, escapechar='\\')
ccount = 0
for row in csvreader:
if len(row) > 1:
row = clean_row(row)
if row[1]:
chars[row[0]]["company_number"].append({
"number": parse_company_number(row[1]),
"url": "http://beta.companieshouse.gov.uk/company/" + parse_company_number(row[1]),
"source": "ccew"
})
if row[9]:
chars[row[0]]["url"] = row[9]
if row[6]:
chars[row[0]]["latest_income"] = int(row[6])
ccount += 1
if ccount % 10000 == 0:
print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount, end='')
print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount)
return chars
def clean_chars(chars={}, pc_es=None, es_pc_index="postcode", es_pc_type="postcode"):
ccount = 0
for c in chars:
if pc_es:
geo_data = fetch_postcode(chars[c]["geo"]["postcode"], pc_es, es_pc_index, es_pc_type)
if geo_data:
chars[c]["geo"]["location"] = geo_data[0]
chars[c]["geo"]["areas"] = geo_data[1]
chars[c]["url"] = parse_url(chars[c]["url"])
chars[c]["domain"] = get_domain(chars[c]["url"])
chars[c]['org-ids'] = add_org_id_prefix(chars[c])
chars[c]["alt_names"] = [n["name"] for n in chars[c]["names"] if n["name"] != chars[c]["known_as"]]
# @TODO capitalisation of names
ccount += 1
if ccount % 10000 == 0:
print('\r', "[Geo] %s charites added location details" % ccount, end='')
print('\r', "[Geo] %s charites added location details" % ccount)
return chars
def checkType(self, argument):
"""
Identify observable type
"""
if validators.url(argument):
return "URL"
elif validators.md5(argument):
return "MD5"
elif validators.sha1(argument):
return "SHA1"
elif validators.sha256(argument):
return "SHA256"
elif validators.sha512(argument):
return "SHA512"
elif validators.ipv4(argument):
return "IPv4"
elif validators.ipv6(argument):
return "IPv6"
elif validators.domain(argument):
return "domain"
else:
mod.display("MAIN", argument, "ERROR", "Unable to retrieve observable type")
return None
def do_import_bookmarks(filename):
content = []
first = _("Oops, import failed")
second = _("could be corrupted or a invalid HTML bookmark file")
with open(filename) as f: l = f.readlines()
if not re.findall("<!DOCTYPE NETSCAPE-Bookmark-file-1>", l[0], re.IGNORECASE):
dialog().error(first, "<span size='small'>\"<b>{}</b>\" {}.</span>".format(filename, second))
return True
title = re.findall(r'<a[^>]*>(.*?)</a>', str(l), re.IGNORECASE)
url = re.findall(r'<a[^>]* href="([^"]*)"', str(l), re.IGNORECASE)
for c, i in enumerate(title):
if title[c] and url[c]: content.append([title[c]] + [url[c]])
return content
def do_export_bookmarks(list):
content = []
header = "<!DOCTYPE NETSCAPE-Bookmark-file-1><!--This is an automatically generated file.\
It will be read and overwritten. Do Not Edit! --><META HTTP-EQUIV=\"Content-Type\" CONTENT=\"text/html;\
charset=UTF-8\"><Title>{}</Title><H1>{}</H1><DL><p>".format(_("Bookmarks"), _("Bookmarks"))
footer = "</DL><p>"
content.append(header)
for i in list:
timestamp = int(datetime.datetime.strptime(i[0], "%Y-%m-%d %H:%M").timestamp())
title = i[1]
url = i[2]
content.append("<DT><A HREF=\"{}\" ADD_DATE=\"{}\">{}</A>".format(url, timestamp, title))
content.append(footer)
content = "".join([s for s in content])
return content
def on_insert_bookmarks(self, title, url):
with bookmarks_con:
cur = bookmarks_con.cursor()
cur.execute("SELECT * FROM bookmarks;")
urls = cur.fetchall()
if len(urls) != 0:
for i in urls:
if url == i[1]:
return True
cur.execute("INSERT INTO bookmarks VALUES(?, ?, ?);",\
(title.replace("\n","").strip(), url, time.strftime("%Y-%m-%d %H:%M")))
self.refresh_liststore(1)
return True
def on_js_switch(self, button, active):
if not set_enable_javascript: return True
page = self.tabs[self.current_page][0]
settings = page.webview.get_settings()
if button.get_active():
settings.set_property("enable-javascript", True)
self.js_label.set_markup(self.jse_label_text)
else:
settings.set_property("enable-javascript", False)
self.js_label.set_markup(self.jsd_label_text)
page.webview.set_settings(settings)
url = page.webview.get_uri()
if url and validators.url(url): page.webview.reload()
def on_decide_destination(self, download, name):
url = download.get_request().get_uri()
if not name: name = get_domain(url).replace(".", "_")
if not "." in name:
mime = download.get_response().get_mime_type()
suf = mime.split("/")
name = "{}.{}".format(name, suf[1])
for i in self.dlview:
for a in i:
if type(a) == Gtk.ModelButton:
if a.get_name().split("/")[-1] == name:
self.downloads_menu.show()
return True
if url: pathchooser().save(name, download, url)
def dynamic_title(self, view, title):
url = view.get_uri()
if not url and not title: title = tab_name
if not title: title = url
counter = 0
for tab, widget in self.tabs:
widget = self.check_tab(widget, 0)
if tab.webview is view:
if widget:
widget.set_text(minify(title, 50))
widget.set_tooltip_text("")
if len(title) > 50: widget.set_tooltip_text(title)
counter += 1
def is_url_safe(url):
if not url.startswith("https://"):
return False
if not validators.url(url, public=True):
return False
whitelist_urls = os.environ["WHITELISTED_CALLBACK_URLS"].split(';')
if url in whitelist_urls:
return True
forbidden_urls = os.environ["FORBIDDEN_CALLBACK_URLS"].split(';')
for furl in forbidden_urls:
if furl in url:
return False
return True
def __init__(self, url, max_worker=10, timeout=3,
scan_dict=None, verbose=False, status=None):
self.site_lang = ''
self.raw_base_url = url
self.base_url = url
self.max_worker = max_worker
self.timeout = timeout
self.scan_dict = scan_dict
self.verbose = verbose
self.first_item = ''
self.dict_data = {}
self.first_queue = []
self.found_items = {}
if status is None or len(status) == 0:
self.status = [200, 301, 302, 304, 401, 403]
else:
self.status = [int(t) for t in status]
def on_response(self, url, item, method, response, queue):
if response.code in self.status:
if item in self.found_items:
return
self.found_items[item] = None
logger.warning('[Y] %s %s %s' % (response.code, method, url))
# ??????????????????????
if any(map(item.endswith, ['.php', '.asp', '.jsp'])):
bak_list = self.make_bak_file_list(item)
bak_list = [(t, 'HEAD') for t in bak_list]
queue.extendleft(bak_list)
else:
if response.code == 405 and method != 'POST':
queue.appendleft((item, 'POST'))
if self.verbose:
logger.info('[N] %s %s %s' % (response.code, method, url))
def prepare_url(self):
url_parsed = urlparse(self.raw_base_url)
items = url_parsed.path.split('/')
if len(items) > 0:
item = items[-1]
items = items[:-1]
new_path = '/'.join(items)
else:
item = ''
new_path = url_parsed.path
url = urlunparse((url_parsed.scheme, url_parsed.netloc, new_path, '', '', ''))
if item.endswith('.php'):
self.site_lang = 'php'
elif item.endswith('.asp'):
self.site_lang = 'asp'
elif item.endswith('.aspx'):
self.site_lang = 'aspx'
if self.site_lang != '':
logger.info('site_lang: %s' % self.site_lang)
self.base_url = url
self.first_item = item
logger.info('base_url: %s' % url)
logger.info('first_item: %s' % item)
core_extract_comments.py 文件源码
项目:amazon-reviews-scraper
作者: philipperemy
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_comments_based_on_keyword(search):
logging.info('SEARCH = {}'.format(search))
url = 'http://www.amazon.co.jp/s/ref=nb_sb_noss_2?url=search-alias%3Daps&field-keywords=' + \
search + '&rh=i%3Aaps%2Ck%3A' + search
soup = get_soup(url)
items = []
for a in soup.find_all('a', class_='s-access-detail-page'):
if a.find('h2') is not None and validators.url(a.get('href')):
name = str(a.find('h2').string)
link = a.get('href')
items.append((link, name))
logging.info('Found {} items.'.format(len(items)))
for (link, name) in items:
logging.debug('link = {}, name = {}'.format(link, name))
product_id = extract_product_id(link)
get_comments_with_product_id(product_id)
def get_paginated_list(results, url, start, size, page_size=settings.PAGINATION_SIZE):
# check if page exists
count = size
# make response
obj = {}
obj['start'] = start
obj['page_size'] = page_size
obj['count'] = count
# make URLs
# make previous url
if start == 1:
obj['previous'] = ''
else:
start_copy = max(1, start - page_size)
page_size_copy = start - 1
obj['previous'] = url + '?start=%d' % (start_copy)
# make next url
if start + page_size > count:
obj['next'] = ''
else:
start_copy = start + page_size
obj['next'] = url + '?start=%d' % (start_copy)
# finally extract result according to bounds
obj['results'] = results
return obj
def format_urls_in_text(text):
new_text = []
accepted_protocols = ['http://', 'https://', 'ftp://', 'ftps://']
for word in str(text).split():
new_word = word
accepted = [protocol for protocol in accepted_protocols if protocol in new_word]
if not accepted:
new_word = 'http://{0}'.format(new_word)
if validators.url(new_word)==True:
new_word = '<a href="{0}">{1}</a>'.format(new_word, word)
else:
new_word = word
new_text.append(new_word)
return ' '.join(new_text)
def loadm3u(url):
hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
req = urllib2.Request(url, headers=hdr)
response = urllib2.urlopen(req)
data = response.read()
if not 'EXTM3U' in data:
raise Exception(url + " is not a m3u8 file.")
#return data.encode('utf-8')
return data
def dictToM3U(cumulustv):
channels = cumulustv["channels"]
channelDataMap = [
("number", "tvg-id"),
("name", "tvg-name"),
("logo", "tvg-logo"),
("genres", "group-title"),
("country", "tvg-country"),
("lang", "tvg-language")
]
m3uStr = "#EXTM3U\n"
for channel in channels:
m3uStr += "#EXTINF:-1"
for dataId, extinfId in channelDataMap:
if channel[dataId] is not None and channel[dataId] != "":
m3uStr += " " + extinfId + "=\"" + channel[dataId].strip() + "\""
m3uStr += "," + channel["name"].strip() + "\n"
m3uStr += channel["url"] + "\n"
return m3uStr
def get_api_url():
"""
Get management url from the config file
"""
config_key = 'api_url'
try:
url = CONFIG.get(URL_SECTION, config_key)
if validators.url(str(url)):
return url
else:
print_config_error_and_exit(URL_SECTION, 'REST API URL(%s)' % config_key)
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
return DEFAULT_API_URL
def build_url(nodes):
"""
Build a url with the given array of nodes for the url and return path and url respectively
Ordering is important
"""
path = str.join('/', nodes)
url = str.join('/', [get_api_url(), path])
return path, url
def parse_result(self):
"""
Abstract parse_result method.
It calls when analyze is finished.
It uptade malware with indicators.
"""
if not self._result:
return
json_ole = self.json_decode(self._result)
if not json_ole:
return
for item in json_ole:
if "IOC" in item["type"]:
score = 7
if "URL" in item['description'] and validators.url(item['keyword']):
extract_malware = self.malware.add_extract_malware(
self.module_cls_name, item['keyword'], Type.get_label(Type.URL))
Input.analyse_malware(extract_malware)
elif "AutoExec" in item["type"]:
score = 7
elif "Suspicious" in item["type"]:
score = 5
elif "VBA string" in item["type"]:
score = 3
elif "Hex String" in item["type"]:
score = 1
else:
score = -1
indicator = Indicator.factory(module_cls_name=self.module_cls_name,
name="item",
content_type=Type.JSON,
content=json.dumps(item),
score=score)
self._malware.get_module_status(self.module_cls_name
).add_indicator(indicator)
def success():
try:
URL = session["url"]
if URL.find("jobs") is not -1 and URL.find("metadata") is -1:
data = urllib.urlopen(URL).read()
data = json.loads(data)
temp = data["job_ids"]
if temp:
info = {}
for ID in temp:
url = URL + "?id=" + ID + "&type=metadata"
data_temp = urllib.urlopen(url).read()
data_temp = json.loads(data_temp)
report_data = get_data(data_temp)[-1]
info[ID] = report_data
return render_template('plot_jobs.html', results=info)
if validators.url(URL):
data = urllib.urlopen(URL).read()
else:
data = open("./static/testdata/" + URL).read()
data = json.loads(data)
response = get_data(data)
if response[0] == "single":
metrics, report_data = response[1], response[2]
results = response[3]
return render_template('plot_tables.html',
metrics=metrics, report_data=report_data,
results=results)
else:
return render_template('plot_multi_data.html',
results=response[1])
except Exception as e:
session['server_error'] = e.message + ' ' + repr(e.args)
return redirect(url_for('file_not_found'))
def url():
if request.method == 'POST':
url = request.form['url']
session["url"] = url
return redirect(url_for('success'))
def validate_result(current, default, type):
"""
Validates the data, whether it needs to be url, twitter, linkedin link etc.
"""
if current is None:
current = ""
if default is None:
default = ""
if type == "URL" and validators.url(current, require_tld=True) and not validators.url(default, require_tld=True):
return current
if type == "EMAIL" and validators.email(current) and not validators.email(default):
return current
return default
def fetch_tsv_data(gid):
base_url = 'https://docs.google.com/spreadsheets/d/' + SHEET_ID + '/export?format=tsv'
url = base_url + '&gid=' + gid
logging.info('GET ' + url)
res = urllib2.urlopen(url)
return res.read()
def iterate_bytechunks(hashme, is_string, use_json, hash_many):
"""
Prep our bytes.
"""
# URL
if not is_string and validators.url(hashme):
if not use_json:
click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many)
try:
response = requests.get(hashme)
except requests.exceptions.ConnectionError as e:
raise ValueError("Not a valid URL. :(")
except Exception as e:
raise ValueError("Not a valid URL. {}.".format(e))
if response.status_code != 200:
click.echo("Response returned %s. :(" % response.status_code, err=True)
bytechunks = response.iter_content()
# File
elif os.path.exists(hashme) and not is_string:
if os.path.isdir(hashme):
if not use_json:
click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True)
return None
if not use_json:
click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many)
bytechunks = FileIter(open(hashme, mode='rb'))
# String
else:
if not use_json:
click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many)
bytechunks = (hashme.encode('utf-8'), )
return bytechunks
def test_valid_project_url():
assert validators.url(quantopian_tools.__project_url__)
def post(self):
url = self.get_body_argument("url")
if not validators.url(url):
self.set_status(400, "bad URL")
return
with self._connect() as connection:
try:
createSource(connection, url)
except sqlite3.IntegrityError:
self.set_status(400, "duplicate URL")
return
self.set_status(201)