def process_request(self, request, spider):
if request.meta.has_key('PhantomJS'):
log.debug('PhantomJS Requesting: %s' % request.url)
ua = None
try:
ua = UserAgent().random
except:
ua = 'Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11'
webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.settings.userAgent'] = ua
try:
self.driver.get(request.url)
content = self.driver.page_source.encode('utf-8')
url = self.driver.current_url.encode('utf-8')
except:
return HtmlResponse(request.url, encoding='utf-8', status=503, body='')
if content == '<html><head></head><body></body></html>':
return HtmlResponse(request.url, encoding ='utf-8', status=503, body='')
else:
return HtmlResponse(url, encoding='utf-8', status=200, body=content)
else:
log.debug('Common Requesting: %s' % request.url)
python类UserAgent()的实例源码
def req_handle():
ua=UserAgent()
def do_req(u):
return requests.get(u, headers={'user-agent': ua.random})
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(executor, do_req, HOST+w) for w in words
]
for response in await asyncio.gather(*futures):
if response.status_code < 400:
if response.url[-1] == '/':
print("--DIR: %s - %i" % (response.url, response.status_code))
else:
print("%s - %i (%i bytes)" % (response.url, response.status_code, len(response.content)))
pass
def __init__(self, search_page_url):
self.search_page_url = search_page_url
req = Request(
search_page_url,
data=None,
headers={
'User-Agent': UserAgent().chrome
}
)
self.html = urlopen(req).read().decode('utf-8')
self.soup = BeautifulSoup(self.html, 'html.parser')
self.num_results = None
for f in self.soup.find_all('strong'):
if '????????' in f.text:
if f.text.split()[0].isdigit():
self.num_results = int(f.text.split()[0])
def get_page(url, options={}):
ua = UserAgent()
base_headers = {
'User-Agent': ua.random,
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'zh-CN,zh;q=0.8'
}
headers = dict(base_headers, **options)
print('Getting', url)
try:
r = requests.get(url, headers=headers)
print('Getting result', url, r.status_code)
if r.status_code == 200:
return r.text
except ConnectionError:
print('Crawling Failed', url)
return None
def fillProxyPool(self):
global offset
while self.llen < self.size:
url = self.url + '&offset=' + str(offset)
offset += 50
ua = UserAgent()
headers = {'User-Agent' : ua.random}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'lxml')
lists = soup.find('tbody').find_all('tr')
for ls in lists:
tds = ls.find_all('td')
proxy = ''.join(tds[0].text.split())
_type = ''.join(tds[1].text.split()).lower()
validity = self.checkValidity(_type, proxy)
if validity == True:
self.r.lpush(_type, proxy)
print '1 proxy added: %s. http: %d; https: %s.' \
%(proxy, self.r.llen('http'), self.r.llen('https'))
self.__class__.llen += self.r.llen('http') + self.r.llen('https')
def checkValidity(self, _type, proxy):
proxyDict = {_type : _type + '://' + proxy}
ua = UserAgent()
headers = {'User-Agent' : ua.random}
try:
if _type == 'http':
r = requests.get(self.http_test_url, proxies=proxyDict,\
headers=headers, timeout=2)
else:
r = requests.get(self.https_test_url, proxies=proxyDict,\
headers=headers, timeout=2)
except Exception:
return False
soup = BeautifulSoup(r.text, 'lxml')
try:
retDict = eval(soup.find('body').text)
except Exception:
return False
if proxy.split(':')[0] == retDict['origin']:
return True
def get_news_st():
""" Get News From ST """
# Get Text
headers = {'User-Agent': UserAgent().random}
website = r.get('http://www.straitstimes.com/container/custom-landing-page/breaking-news',
headers=headers)
website_text = website.text
# Parse HTML using BS
soup = BeautifulSoup(website_text, 'html.parser')
# Find all Headlines
headlines = soup.findAll('span', {'class' : 'story-headline'})
time_lines = soup.findAll('div', {'class' : 'node-postdate'})
count_ = 0
final_text = "<b>Top Singapore Headlines</b>\n\n"
# Loop Through Headlines!
for headline in headlines[:5]:
final_text += '<a href="' + 'http://www.straitstimes.com' + headline.a['href'] + '">'
final_text += headline.get_text()[1:] + "</a>"
final_text += "\n" + time_lines[count_].get_text() + "\n\n"
count_ += 1
return final_text
def __init__(self, crawler):
super(RandomUserAgentMiddleware, self).__init__()
self.ua = UserAgent()
self.per_proxy = crawler.settings.get('RANDOM_UA_PER_PROXY', False)
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')
self.proxy2ua = {}
def __init__(self, crawler):
super(RandomUserAgentMiddlware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")
def __init__(self, user_agent=''):
self.user_agent = user_agent
try:
self.faker = UserAgent()
except Exception as e:
log.debug("Fake-useragent error, use default. (%s)" % e.message)
self.faker = None
def process_request(self, request, spider):
ua = None
if self.faker:
ua = self.faker.random
else:
ua = random.choice(self.user_agent_list)
log.debug("Current UserAgent: %s" % ua)
request.headers.setdefault('User-Agent', ua)
def __init__(self, settings, user_agent="Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101"
" Firefox/54.0"):
super(RandomUserAgentMiddleware, self).__init__()
self.user_agent = user_agent
try:
self.user_agent_engine = UserAgent()
except Exception, ex:
logging.error("Failed to create user agent engine object. Reason: %s", ex)
def getHeader(self, host='', cookie=''):
# ???useragent
ua = UserAgent()
# ???????useagent
if host:
return {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6',
'Connection': 'keep-alive',
'Accept-Encoding': 'gzip, deflate, sdch',
'Upgrade-Insecure-Requests': '1',
'Host': host,
'Cookie': cookie
}
elif not host and not cookie:
return {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6',
'Connection': 'keep-alive',
'Accept-Encoding': 'gzip, deflate, sdch',
'Upgrade-Insecure-Requests': '1',
}
else:
return {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6',
'Connection': 'keep-alive',
'Accept-Encoding': 'gzip, deflate, sdch',
'Upgrade-Insecure-Requests': '1',
'Cookie': cookie
}
def __init__(self, car_id):
self.car_id = car_id
self.req = Request(
'https://www.car.gr/%s' % self.car_id,
data=None,
headers={
'User-Agent': UserAgent().chrome
}
)
self.html = urlopen(self.req).read().decode('utf-8')
self.soup = BeautifulSoup(self.html, 'html.parser')
def __init__(self, crawler):
super(RandomUserAgentMiddleware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')
def get_user_agent():
ua = UserAgent()
return ua.random
def get_user_agent():
ua = UserAgent()
return ua.random
def get_user_agent():
ua = UserAgent()
return ua.random
def get_user_agent():
ua = UserAgent()
return ua.random
def get_user_agent():
ua = UserAgent()
return ua.random
def __init__(self, crawler):
super(RandomUserAgentMiddleware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')
def retrieve_dls(self):
ua = UserAgent()
ua.update
user_agent = ua.random
for dl_url in self.dl_urls:
headers = { 'User-Agent': user_agent }
req = urllib2.Request(dl_url, headers=headers)
browse = urllib2.urlopen(req)
csv_str = browse.read()
csv_f = StringIO.StringIO(csv_str)
reader = csv.reader(csv_f, delimiter=',')
headers = reader.next()
for row in reader:
ds = zip(headers, row)
self.result_sets.append(dict(ds))
def __init__(self, crawler):
super(RandomUserAgentMiddlware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")
def __fetch_goods__(self):
us = UserAgent()
self.headers['User-Agent'] = us.random
def __init__(self, pool_maxsize=100):
super().__init__()
https_adapter = HTTPAdapter(pool_maxsize=pool_maxsize)
self.mount('https://www.showroom-live.com', https_adapter)
self.headers = {"UserAgent": ua_str}
def process_request(self,request, spider):
user_agent = UserAgent()
ua = user_agent.random
if ua:
#print ua
print "********Current UserAgent:%s************" %ua
#log.msg('Current UserAgent: '+ua, level='INFO')
request.headers.setdefault('User-Agent', ua)
def process_request(self,request,spider):
user_agent = UserAgent()
ua = user_agent.random
if ua:
log.msg('Current UserAgent: '+ua, level=log.INFO)
request.headers.setdefault('User-Agent', ua)
def getHTMLText(url,code="utf-8"):
try:
ua=UserAgent() #????header?????
headers1={'User-Agent': 'ua.random'}#????header?????
r = requests.get(url,headers=headers1)
r.raise_for_status()
r.encoding = code
return r.text
except:
return "getHTML error"
def getHTMLText(url,code="utf-8"):
try:
ua=UserAgent() #????header?????
headers1={'User-Agent': 'ua.random'}#????header?????
r = requests.get(url,headers=headers1)
r.raise_for_status()
r.encoding = code
return r.text
except:
return "getHTML error"
def getHTMLText(url,code="ascii"):
try:
ua=UserAgent()
headers1={'User-Agent': 'ua.random'} # Use random header to imitate human behaviour
r = requests.get(url,headers=headers1)
r.raise_for_status()
r.encoding = code
return r.text
except:
return "getHTML error"