def discover_domains(subdomain_id, request_result_text):
# retrieve subdomain object
subdomain = Subdomain.objects.get(id=subdomain_id)
# Create and start logger
logger = create_logger('discover_{0}.log'.format(subdomain.id))
logger.info('discover {0} START'.format(subdomain.id))
# keep list or extracted subdomains to limit db queries
extracted_subdomain = []
for link in BeautifulSoup(request_result_text,
'html.parser', # todo use lxml to speed things up
parseOnlyThese=SoupStrainer('a')):
# todo this only saves 'href' attributes in 'a' elements, can be missing valid entries
if link.has_attr('href'):
href = link['href']
extract_result = extract_subdomain(href)
if extract_result not in extracted_subdomain:
extracted_subdomain.append(extract_result)
new_subdomain = import_subdomain(href,
discovered_by=subdomain)
logger.info('discover found {0}'.format(new_subdomain))
logger.info('discover {0} DONE'.format(subdomain_id))
# release memory
gc.collect()
python类SoupStrainer()的实例源码
def run(self):
while True:
data = self._queue_data.get()
self._index = data[0]
html_contents = data[1]
html_contents = re.sub('<br />', '\n', html_contents)
only_main3 = SoupStrainer(class_="main3")
soup_only_main3 = BeautifulSoup(
html_contents, 'html.parser', parse_only=only_main3)
# ?????
if self._num_empty > 1000:
break
# ?????
if soup_only_main3.get_text(strip=True) == self._delete:
self._num_empty += 1
continue
else:
self._num_empty = 0
title_poetry = soup_only_main3.find(class_='son1').h1.string
soup_only_main3.find(class_='son2').p.span.decompose()
dynasty_poetry = soup_only_main3.find(class_='son2').p.string
soup_only_main3.find(class_='son2').p.decompose()
soup_only_main3.find(class_='son2').p.span.decompose()
author_poetry = soup_only_main3.find(class_='son2').p.string
soup_only_main3.find(class_='son2').p.decompose()
soup_only_main3.find(class_='son2').p.decompose()
soup_only_main3.find(class_='yizhu').decompose()
content_poetry = soup_only_main3.find(
class_='cont',id='cont').get_text()
content_poetry = re.sub('[\n]+', '\n', content_poetry)
content_poetry = content_poetry.strip('\n')
path_html, path_txt = get_output_path(dynasty_poetry, self._index)
file_html = open(path_html, 'w')
file_html.writelines(data[1].encode('utf-8'))
file_html.close()
file_txt = open(path_txt, 'w')
file_txt.writelines(title_poetry.encode('utf-8') + '\n')
file_txt.writelines(dynasty_poetry.encode('utf-8') + '\n')
file_txt.writelines(author_poetry.encode('utf-8') + '\n')
file_txt.writelines(content_poetry.encode('utf-8') + '\n')
file_txt.close()
print '-----------------------------------------------------------'
print 'Parser: ', self._index
print '???', title_poetry
print '???', dynasty_poetry
print '???', author_poetry
print '???\n', content_poetry
print 'Parser finish'
def _check_latest_version_by_dir(self, dirver, package, package_regex,
current_version, ud, d):
"""
Scan every directory in order to get upstream version.
"""
version_dir = ['', '', '']
version = ['', '', '']
dirver_regex = re.compile("(?P<pfx>\D*)(?P<ver>(\d+[\.\-_])+(\d+))")
s = dirver_regex.search(dirver)
if s:
version_dir[1] = s.group('ver')
else:
version_dir[1] = dirver
dirs_uri = bb.fetch.encodeurl([ud.type, ud.host,
ud.path.split(dirver)[0], ud.user, ud.pswd, {}])
bb.debug(3, "DirURL: %s, %s" % (dirs_uri, package))
soup = BeautifulSoup(self._fetch_index(dirs_uri, ud, d), "html.parser", parse_only=SoupStrainer("a"))
if not soup:
return version[1]
for line in soup.find_all('a', href=True):
s = dirver_regex.search(line['href'].strip("/"))
if s:
sver = s.group('ver')
# When prefix is part of the version directory it need to
# ensure that only version directory is used so remove previous
# directories if exists.
#
# Example: pfx = '/dir1/dir2/v' and version = '2.5' the expected
# result is v2.5.
spfx = s.group('pfx').split('/')[-1]
version_dir_new = ['', sver, '']
if self._vercmp(version_dir, version_dir_new) <= 0:
dirver_new = spfx + sver
path = ud.path.replace(dirver, dirver_new, True) \
.split(package)[0]
uri = bb.fetch.encodeurl([ud.type, ud.host, path,
ud.user, ud.pswd, {}])
pupver = self._check_latest_version(uri,
package, package_regex, current_version, ud, d)
if pupver:
version[1] = pupver
version_dir = version_dir_new
return version[1]
def extract_links(response_content, unique=False, blacklist_domains=[],
whitelist_domains=[], regex=None, zen_path=None,
blacklist_extensions=[], whitelist_extensions=[]):
"""Extract links from a response content.
Args:
response_content (str): The HTML page received in a Response Object.
unique (bool): A parameter defining if the list can contain duplicates.
Defaults to False.
blacklist_domains (list): List of domains to exclude from the result.
whitelist_domains (list): List of domains to include from the result.
regex (list): A regular expression filter on the link.
Defaults to None.
zen_path (list): A selector to restrict the XPath to parse with bs4.
Returns:
links (list): A list of extracted and filtered links.
"""
if any([item in blacklist_domains for item in whitelist_domains]) \
or any([item in blacklist_extensions for item in whitelist_extensions]):
raise LinkExtractorException('blacklist_domains and whitelist_domains '
'can`t contain common value(s).')
soup = BeautifulSoup(
response_content, "html.parser", parse_only=SoupStrainer('a')
)
links = [a.text for a in soup]
if unique:
links = list(set(links))
if regex:
links = filter_links(links, regex)
if whitelist_domains:
for domn in whitelist_domains:
links = filter_links(links, domn.replace('.', '\.'), include=True)
if blacklist_domains:
for domn in blacklist_domains:
links = filter_links(links, domn.replace('.', '\.'), include=False)
if whitelist_extensions:
for ext in whitelist_extensions:
links = filter_links(links, ext.replace('.', '\.'), include=True)
if blacklist_extensions:
for ext in blacklist_extensions:
links = filter_links(links, ext.replace('.', '\.'), include=False)
return links
def getFriendsBirthdays(birthdays,friendsDict,s):
# --------- Getting Birthday Info -----------
relatStrainer = SoupStrainer(text=re.compile("Birthday"))
relatExt = "/about"
relatExtBeta = "&sk=about"
fbook = "https://facebook.com"
#***** Note: will have to perform additional string methods because scraping from main page
for friend in friendsDict:
if (friendsDict[friend].find("php") != -1):
relatURL = fbook + friendsDict[friend] + relatExtBeta
else:
relatURL = fbook + friendsDict[friend] + relatExt
relatInfo = s.get(relatURL)
soup = BeautifulSoup(relatInfo.text,"lxml",parse_only=relatStrainer)
subString = soup.find(text=re.compile("Birthday"))
if (subString != None):
# Cut off everthing before Birthday
stringIndex = subString.find('Birthday')
subString = subString[stringIndex:]
# Cut off the prefix to get the birthdate and everything after
stringIndex = subString.find('<div>')
subString = subString[(stringIndex+5):]
# Get rid of everything after the birthday
stringIndex = subString.find('</div>')
subString = subString[:stringIndex]
# Standardize the birthday date by cutting off the year if there is one
commaIndex = subString.find(',')
if (commaIndex != -1):
subString = subString[:commaIndex]
if (subString in birthdays):
birthdays[subString].append(friend)
else:
birthdays[subString] = [friend]
print friend + " has birthday " + subString
return
def get_film_info_dytt():
items = []
target_url = 'http://www.dy2018.com/'
content = urllib2.urlopen(target_url).read()
content = unicode(content,'GBK').encode('utf-8')
only_hotl_tags = SoupStrainer(class_='co_content222')
soup = BeautifulSoup(content, "html.parser", parse_only=only_hotl_tags)
i = 0
key = re.compile(r'?(.+?)?')
for link in soup.find_all('li', limit=8):
if i != 0:
link_url = target_url + link.findChildren('a')[0].get('href')
link_time = link.findChildren('span')[0].string
link_title = link.findChildren('a')[0].get('title')[5:]
file_name = re.findall(u'?(.*?)[?|?]', link_title)[0]
# print file_name.encode("utf-8")
douban_api = 'https://api.douban.com/v2/movie/search?q=' + file_name.encode("utf-8")
user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
headers = {'User-Agent': user_agent}
req = urllib2.Request(douban_api, None, headers)
api_content = urllib2.urlopen(req)
json_content = json.load(api_content)['subjects'][0]['images']['small']
img_url = json_content
#print img_url
save_path = os.path.abspath("./icons/icon")
img_data = urllib2.urlopen(img_url).read()
file_name = save_path + str(i) + '.jpg'
output = open(file_name, 'wb+')
output.write(img_data)
output.close()
json_item = dict(title=link_title, subtitle='??: '+link_time, arg=link_url, icon='icons/icon' + str(i) + '.jpg')
items.append(json_item)
i = i + 1
return generate_xml(items)
# print(get_film_info_dytt())
def get_standard():
standard_url = 'https://www.standardmedia.co.ke/'
if check_connection(standard_url):
standard = requests.get(standard_url)
soup = BeautifulSoup(standard.text, 'lxml', parse_only=SoupStrainer('div'))
standard = []
for link in soup.select('.col-xs-8.zero a', limit=11):
if link.get_text():
news_title = '{}({})'.format(link.get_text().strip(), link.get('href'))
standard_link = requests.get(link.get('href'))
soup_link = BeautifulSoup(standard_link.text, 'lxml', parse_only=SoupStrainer(['script', 'div']))
try:
data = json.loads(soup_link.find('script', type='application/ld+json').text.replace("\\", r"\\"))
article_date = data['dateModified']
image = data['image']['url']
if image == 'https://www.standardmedia.co.ke':
image = ''
except (ValueError, AttributeError):
print('Standard: invalid json detected')
continue
try:
content = get_content(soup_link, 'main-article')
except AttributeError:
try:
content = get_content(soup_link, 'story')
except AttributeError:
print('Standard: No content found')
continue
news_dict = {
'category': 'news',
'source': 'standard',
'title': link.get_text().strip(),
'link': link.get('href'),
'image': image,
'content': content,
'date': article_date,
'date_added': datetime.datetime.utcnow()
}
collection.update({'link': link.get('href')}, news_dict, upsert=True)
standard.append(news_dict)
return standard