def __init__(self, queue, DEBUG=config.DEBUG, reset=False, socksport=None):
if not socksport:
socksport = config.SOCKS_PORT
## TODO add checks that a socks proxy is even open
## TODO add Tor checks to make sure circuits are operating
threading.Thread.__init__(self)
self.reset = reset # Whether to check if a url has been collected
self.queue = queue # Multithreading queue of urls
self.proxysettings = [
'--proxy=127.0.0.1:%s' % socksport,
'--proxy-type=socks5',
]
#self.proxysettings = [] # DEBUG
#self.ignore_ssl = ['--ignore-ssl-errors=true', '--ssl-protocols=any']
self.ignore_ssl = []
self.service_args = self.proxysettings + self.ignore_ssl
self.failcount = 0 # Counts failures
self.donecount = 0 # Counts successes
self.tor = tor.tor() # Manages Tor via control port
if DEBUG: # PhantomJS sends a lot of data if debug set to DEBUG
logging.basicConfig(level=logging.INFO)
python类PhantomJS()的实例源码
def _get_webdriver(self):
"""Return a webdriver instance and set it up
with the according profile/ proxies.
Chrome is quite fast, but not as stealthy as PhantomJS.
Returns:
The appropriate webdriver mode according to self.browser_type.
If no webdriver mode could be found, return False.
"""
if self.browser_type == 'chrome':
return self._get_Chrome()
elif self.browser_type == 'firefox':
return self._get_Firefox()
elif self.browser_type == 'phantomjs':
return self._get_PhantomJS()
return False
def process_request(self, request, spider):
if request.meta.has_key('PhantomJS'):
log.debug('PhantomJS Requesting: %s' % request.url)
ua = None
try:
ua = UserAgent().random
except:
ua = 'Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11'
webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.settings.userAgent'] = ua
try:
self.driver.get(request.url)
content = self.driver.page_source.encode('utf-8')
url = self.driver.current_url.encode('utf-8')
except:
return HtmlResponse(request.url, encoding='utf-8', status=503, body='')
if content == '<html><head></head><body></body></html>':
return HtmlResponse(request.url, encoding ='utf-8', status=503, body='')
else:
return HtmlResponse(url, encoding='utf-8', status=200, body=content)
else:
log.debug('Common Requesting: %s' % request.url)
def main(number):
url = 'http://www.bilibili.com/video/av' + str(number) + '/'
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0"
)
dcap["phantomjs.page.settings.loadImages"] = False
# phantomjs.exe???G:\Anaconda3\phantomjs\bin
driver = webdriver.PhantomJS(executable_path='G:\\Anaconda3\\phantomjs\\bin\\phantomjs.exe',
desired_capabilities=dcap)
try:
driver.get(url)
# time.sleep(random.uniform(1, 5))
content = driver.page_source # ??????
driver.close()
driver.quit()
soup = BeautifulSoup(content, 'lxml')
getInfo(soup)
except Exception:
pass
finally:
if driver:
driver.quit()
def getSoup(start, stop):
try:
for number in range(start, stop + 1):
url = 'http://www.bilibili.com/video/av'+str(number)+'/'
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0"
)
dcap["phantomjs.page.settings.loadImages"] = False
# phantomjs.exe???G:\Anaconda3\phantomjs\bin
driver = webdriver.PhantomJS(executable_path='G:\\Anaconda3\\phantomjs\\bin\\phantomjs.exe',
desired_capabilities=dcap)
driver.get(url)
# time.sleep(1) # ?????????????
content = driver.page_source # ??????
driver.close()
driver.quit()
soup = BeautifulSoup(content, 'lxml')
getInfo(soup)
except Exception:
pass
def give_me_the_page(n, user_name, password, broswer, pt = None):
if not pt:
if broswer=='Chrome':
pt = webdriver.Chrome()
elif broswer=='Safari':
pt = webdriver.Safari()
else:
pt = webdriver.PhantomJS()
pt.get('http://electsys.sjtu.edu.cn/edu/login.aspx')
time.sleep(1)
pt.execute_script("""var img=document.getElementById('form-input').getElementsByTagName('div')[2].getElementsByTagName('img')[0];
var d=document.createElement('CANVAS');
var cxt=d.getContext('2d');
d.width=img.width;
d.height=img.height;
cxt.drawImage(img,0,0);
img.src=d.toDataURL('png');""")
def _click_page(total_posts, pool_size, group_index):
_log.info('?{}?: starting...'.format(group_index + 1))
if group_index > 0 and total_posts < pool_size * group_index:
return
# ????????
_driver = webdriver.PhantomJS()
_driver.get('https://www.xncoding.com/archives/')
global TRY_COUNT
for k in range(1, TRY_COUNT + 1):
# _log.info('?{}?: ?{}???...'.format(group_index + 1, k))
for i in range(pool_size * group_index, min(pool_size * (group_index + 1), total_posts)):
l_xpath = '(//article/header/h1[@class="post-title"]/a[@class="post-title-link"])[{}]'.format(i + 1)
ele = WebDriverWait(_driver, 2).until(
EC.presence_of_element_located((By.XPATH, l_xpath))
)
ele.click()
WebDriverWait(_driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//div[@class="post-body"]'))
)
_driver.back()
_log.info('?{}?: finished.'.format(group_index + 1))
_driver.close()
def just_click():
# ????????
_driver = webdriver.PhantomJS()
_driver.get('https://www.xncoding.com/archives/')
# driver.maximize_window()
posts_count = len(_driver.find_elements_by_xpath(
'//article/header/h1[@class="post-title"]/a[@class="post-title-link"]'))
for cc in range(1, posts_count + 1):
l_xpath = '(//article/header/h1[@class="post-title"]/a[@class="post-title-link"])[{}]'.format(cc)
ele = WebDriverWait(_driver, 10).until(
EC.element_to_be_clickable((By.XPATH, l_xpath))
)
_log.info('???{}???'.format(cc))
ele.click()
WebDriverWait(_driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//div[@class="post-body"]'))
)
_driver.back()
def start_PhantomJS():
uaList = []
for line in open('Base_Data\\Ualist.txt'):
uaList.append(line[:-1])
open('Base_Data\\Ualist.txt').close()
i = random.choice(uaList)
headers = {
'Accept':'*/*',
'Accept-Language':'zh-CN,zh;q=1',
'User-Agent': i,
'Connection': 'keep-alive'
}
service_args = [
#'--proxy=127.0.0.1:9999',
#'--proxy-type=http',
'--ignore-ssl-errors=true',
]
for key,value in headers.items():
webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.customHeaders.{}'.format(key)] = value
webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.settings.userAgent'] = i
dr = webdriver.PhantomJS(executable_path=r'C:\\Users\\sorano\\Desktop\\???????\\Asuna Sword\\bin\\phantomjs.exe',service_args=service_args)
return dr,uaList
def find_hackathon(self):
print('--- Fetching hackathons--- \n')
driver = webdriver.PhantomJS()
driver.get('https://www.hackerearth.com/challenges/')
res = driver.page_source
soup = BeautifulSoup(res, 'lxml')
upcoming = soup.find('div', {'class': 'upcoming challenge-list'})
if upcoming is not None:
all_hackathons = upcoming.find_all('div', {'class': 'challenge-content'})
for i, hackathon in enumerate(all_hackathons, 1):
challenge_type = hackathon.find('div', {'class': 'challenge-type'}).text.replace("\n", " ").strip()
challenge_name = hackathon.find('div', {'class': 'challenge-name'}).text.replace("\n", " ").strip()
date_time = hackathon.find('div', {'class': 'challenge-list-meta challenge-card-wrapper'}).text.replace("\n", " ").strip()
print("[{}] {}\n{}\n{}\n\n".format(str(i), challenge_name, challenge_type, date_time))
else:
print("No hackathon data found.")
def login(rollno, password):
driver = webdriver.PhantomJS()
driver.get("http://slcm.manipal.edu/loginForm.aspx")
user_field = driver.find_element_by_id("txtUserid")
pass_field = driver.find_element_by_id("txtpassword")
user_field.send_keys(rollno)
pass_field.send_keys(password)
sleep(0.5)
driver.find_element_by_css_selector('#btnLogin').click()
sleep(1)
try:
driver.find_element_by_id("txtUserid")
return None
except:
pass
return driver
def create_selenium_driver(browser='chrome'):
# set default browser string based on env (if available)
env_browser = os.environ.get('TOASTER_TESTS_BROWSER')
if env_browser:
browser = env_browser
if browser == 'chrome':
return webdriver.Chrome(
service_args=["--verbose", "--log-path=selenium.log"]
)
elif browser == 'firefox':
return webdriver.Firefox()
elif browser == 'marionette':
capabilities = DesiredCapabilities.FIREFOX
capabilities['marionette'] = True
return webdriver.Firefox(capabilities=capabilities)
elif browser == 'ie':
return webdriver.Ie()
elif browser == 'phantomjs':
return webdriver.PhantomJS()
else:
msg = 'Selenium driver for browser %s is not available' % browser
raise RuntimeError(msg)
def add_url_links(self,links,url=''):
k = 0
for link in sorted(links,key=lambda k: random.random()):
lp = uprs.urlparse(link)
if (lp.scheme == 'http' or lp.scheme == 'https') and not self.blacklisted(link):
if self.add_link(link): k += 1
if k > self.max_links_per_page: break
if self.verbose or self.debug:
current_url = url # default
try:
@self.phantomjs_short_timeout
def phantomjs_current_url(): return self.driver.current_url
current_url = phantomjs_current_url()
# the current_url method breaks on a lot of sites, e.g.
# python3 -c 'from selenium import webdriver; driver = webdriver.PhantomJS(); driver.get("https://github.com"); print(driver.title); print(driver.current_url); driver.quit()'
except Exception as e:
if self.debug: print('.current_url exception:\n{}'.format(e))
if self.debug:
print("{}: {:d} links added, {:d} total, {:.1f} bits domain entropy".format(current_url,k,self.link_count(),self.domain_entropy()))
elif self.verbose:
self.print_progress(current_url,num_links=k)
def load_driver(config, vdisplay=None):
"""Initialize a weddriver selected in config with given config.
Args:
config (dict): The configuration loaded previously in Cabu.
Returns:
webdriver (selenium.webdriver): An instance of selenium webdriver or None.
"""
if config['DRIVER_NAME'] == 'Firefox':
driver = load_firefox(config)
elif config['DRIVER_NAME'] == 'Chrome':
driver = load_chrome(config)
elif config['DRIVER_NAME'] == 'PhantomJS':
driver = load_phantomjs(config)
elif not config.get('DRIVER_NAME'):
return None
else:
raise DriverException(vdisplay, 'Driver unrecognized.')
driver.set_page_load_timeout(config['DRIVER_PAGE_TIMEOUT'])
driver.set_window_size(config['DRIVER_WINDOWS_WIDTH'], config['DRIVER_WINDOWS_HEIGHT'])
return driver
def init_driver(self):
global driver
if self.is_initialized:
return
if self.driver_name == 'chrome':
driver = webdriver.Chrome(executable_path=self.driver_path)
elif self.driver_name == 'phantomjs':
driver = webdriver.PhantomJS(executable_path=self.driver_path)
elif self.driver_name == 'firefox':
driver = webdriver.Firefox(executable_path=self.driver_path)
else:
raise Exception(
'Driver "{}" is not supported'.format(self.driver_name))
self.is_initialized = True
driver.set_window_size(self.width, self.height)
driver.implicitly_wait(5)
def get_webdriver(self):
service_args = []
if self.webdriver_config.proxy:
service_args.extend([
"--proxy=" + self.webdriver_config.proxy,
"--proxy-type=http",
"--ignore-ssl-errors=true"
])
dcapability = dict(DesiredCapabilities.PHANTOMJS)
if self.webdriver_config.header:
dcapability["phantomjs.page.settings.userAgent"] = self.webdriver_config.header['User-Agent']
dcapability["phantomjs.page.customHeaders.User-Agent"] = self.webdriver_config.header['User-Agent']
dcapability["takesScreenshot"] = True
driver = webdriver.PhantomJS(self.webdriver_config.phantomjs_path,
service_args=service_args,
desired_capabilities=dcapability)
driver.set_page_load_timeout(self.webdriver_config.timeout)
return driver
def process_request(self, request, spider):
try:
driver = webdriver.PhantomJS() #????????
# driver = webdriver.Firefox()
print "---"+str(request.meta["page"])+"-----js url start-------"
print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
driver.get(self.pc_index_url+"&page="+str(request.meta["page"]) )
# time.sleep(1)
tmp=driver.find_element_by_id('sf-item-list-data').get_attribute("innerHTML")
print "---"+str(request.meta["page"])+"-----js url end-------"
print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
body = tmp
return HtmlResponse(driver.current_url, body=body, encoding='utf-8', request=request)
except Exception,e:
print "-------------------"
print e.__doc__
print e.message
print "-------------------"
spider_4_standalone_selenium.py 文件源码
项目:scrapy-training
作者: scrapinghub
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def scrape():
driver = webdriver.PhantomJS()
driver.get('http://quotes.toscrape.com/js-onclick')
while True:
sel = parsel.Selector(text=driver.page_source)
for quote in sel.css('div.quote'):
print({
'text': quote.css('span.text::text').extract_first(),
'author': quote.css('span small::text').extract_first(),
'tags': quote.css('div.tags a.tag::text').extract(),
})
try:
next_button = driver.find_element_by_css_selector('li.next > a')
next_button.click()
except NoSuchElementException:
break
def get_html_by_webdirver(url, proxies = ''):
html = None
try:
driver = webdriver.PhantomJS()
if proxies:
proxy=webdriver.Proxy()
proxy.proxy_type=ProxyType.MANUAL
proxy.http_proxy= proxies #'220.248.229.45:3128'
#????????webdriver.DesiredCapabilities.PHANTOMJS?
proxy.add_to_capabilities(webdriver.DesiredCapabilities.PHANTOMJS)
driver.start_session(webdriver.DesiredCapabilities.PHANTOMJS)
driver.get(url)
html = driver.page_source
# driver.save_screenshot('1.png') #????
driver.close()
except Exception as e:
log.error(e)
return html and len(html) < 1024 * 1024 and html or None
def _unshorten_linkbucks(self, uri):
try:
with closing(PhantomJS(
service_log_path=os.path.dirname(os.path.realpath(__file__)) + '/ghostdriver.log')) as browser:
browser.get(uri)
# wait 5 seconds
time.sleep(5)
page_source = browser.page_source
link = re.findall(r'skiplink(.*?)\>', page_source)
if link is not None:
link = re.sub(r'\shref\=|\"', '', link[0])
if link == '':
return uri, 'Failed to extract link.'
return link, 200
else:
return uri, 'Failed to extract link.'
except Exception as e:
return uri, str(e)
def fulltext_extract(bookmark):
browser = webdriver.PhantomJS(service_args=[
"--ignore-ssl-errors=true",
"--ssl-protocol=tlsv1",
"--load-images=no"])
fulltext_bookmark = Bookmark.query.get(bookmark.id)
browser.get(fulltext_bookmark.main_url)
body = browser.find_element_by_tag_name('body')
bodytext = body.text
soup = BeautifulSoup4(bodytext)
full_text = soup.text
full_text = " ".join(full_text.split())
full_text = full_text.replace('\n', '')
full_text = full_text.encode('utf-8')
fulltext_bookmark.full_text = full_text
db.session.commit()
browser.quit()
def phantomjs_opened(self):
capabilities = DesiredCapabilities.PHANTOMJS.copy()
proxy = proxy_pool.random_choice_proxy()
capabilities['proxy'] = {
'proxyType': 'MANUAL',
'ftpProxy': proxy,
'sslProxy': proxy,
'httpProxy': proxy,
'noProxy': None
}
# capabilities['phantomjs.cli.args'] = [
# '--proxy-auth=' + evar.get('WONDERPROXY_USER') + ':' + evar.get('WONDERPROXY_PASS')
# ]
driver = webdriver.PhantomJS(desired_capabilities=capabilities)
driver.set_page_load_timeout(120)
return driver
def getBestSellers(self):
best_Seller_Scraper = Best_Seller_Scraper()
print("Just assigned best_Seller_Scraper = Best_Seller_Scraper.Best_Seller_Scraper")
driver = webdriver.PhantomJS("/phantomjs-2.1.1-windows/bin/phantomjs.exe")
print("Just assigned driver = webdriver.PhantomJS()")
bestSellers = []
#Navigate to Amazon's best seller list
#Scrape all the Best Seller categories from Amazon and return them as an array
bestSellerCategories = best_Seller_Scraper.getAmazonBestSellerCategories(driver)
print("got best seller categories")
#Loop through each of the categories and pass them into the getSubCategories method
for bestSellerCategory in bestSellerCategories:
bestSellerSubCategories = best_Seller_Scraper.getSubCategories(bestSellerCategory, driver)
#Loop through each of the subCategories and pass them into the getBestSeller method
for bestSellerSubCategory in bestSellerSubCategories:
bestSellers = best_Seller_Scraper.getBestSellers(bestSellerSubCategory, driver)
#Return the bestSellers array after it has members added to it
return bestSellers
def phantomjs_process(self,request):
def do_counts(str_counts):
try:
counts = str_counts.replace(',','')
return counts
except:
return 0
def do_item(item):
if item and isinstance(item,list):
return item[0]
return item
try:
url = request.url
driver = webdriver.PhantomJS(executable_path="/usr/bin/phantomjs")
driver.get(request.url)
body = driver.page_source
response = HtmlResponse(url,body=body.encode('UTF-8'),request=request)
except Exception as e:
self.logger.error("phantomjs error:",e,url)
return []
return self.parse_one_news(response)
def get_page(key_words):
html = []
b = webdriver.PhantomJS(executable_path="phantomjs.exe")
#b = webdriver.Firefox()
b.get("https://world.taobao.com/")
time.sleep(3)
b.find_element_by_id('q').send_keys(key_words)
b.find_element_by_xpath('/html/body/div[1]/div[2]/div/div/div/div[2]/div[1]/div[2]/form/div[1]/button').click()
time.sleep(3)
b.execute_script("window.scrollTo(0, document.body.scrollHeight);")
b.maximize_window()
html.append(b.page_source.encode('gbk', 'ignore'))
for i in range(99):
b.find_element_by_xpath('/html/body/div[5]/div[4]/div/div[1]/div[1]/div[4]/div/div/a[last()]/span').click()
page = str(i+1)
time.sleep(5)
b.execute_script("window.scrollTo(0, document.body.scrollHeight);")
html.append(b.page_source.encode('gbk', 'ignore'))
print("?????%s?" %page)
b.close()
return html
#/html/body/div[5]/div[4]/div/div[1]/div[1]/div[4]/div/div/a[last()]/span
#/html/body/div[5]/div[4]/div/div[1]/div[1]/div[4]/div/div/a[7]/span
def grasp_main():
count = 11
driver=webdriver.PhantomJS()
while count:
with open("result{0}.json".format(count),'r') as fobj:
data_list = json.load(fobj)
print(len(data_list))
count = count -1
pool= multiprocessing.Pool()
for data in data_list:
pool.apply_async(get_detail_info, args=(driver,data['href'],))
pool.close()
pool.join()
break
time.sleep(20)
driver.quit()
def request_body(url):
ret = ""
browser = webdriver.PhantomJS()
response = browser.get(url)
content = browser.page_source
soup = BeautifulSoup(content, 'lxml')
bodys = soup.find('div', attrs={"class":"fd_article_ws "})
if not bodys:
print("Error1:" + url)
return "??????"
body = bodys.findAll('div')
if not body:
body = bodys.findAll('p')
if not body:
print("Error2:" + url)
sys.exit()
for item in body:
if item and item.text:
ret += item.text.strip() + "\n"
return ret
def request_body(url):
ret = ""
browser = webdriver.PhantomJS()
response = browser.get(url)
content = browser.page_source
soup = BeautifulSoup(content, 'lxml')
bodys = soup.find('div', attrs={"class":"paper_content"})
if not bodys:
print("Error1:" + url)
sys.exit()
body = bodys.findAll('div')
if not body:
body = bodys.findAll('p')
if not body:
print("Error2:" + url)
sys.exit()
for item in body:
if item and item.text:
ret += item.text.strip() + "\n"
return ret
def process_request(self, request, spider):
if request.url[26] == 'c':
ua = random.choice(self.user_agent_list)
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = ua
dcap["phantomjs.page.settings.loadImages"] = False
driver = webdriver.PhantomJS(executable_path='E:\Webdriver\phantomjs-2.1.1-windows\\bin\phantomjs.exe',
desired_capabilities=dcap)
driver.get(request.url)
sleep_time = random.randint(15, 22)
time.sleep(sleep_time)
try:
detail = driver.find_element_by_xpath('//a[@ng-click="showDetail = btnOnClick(showDetail)"]')
detail.click()
except:
pass
body = driver.page_source
url = driver.current_url
driver.quit()
return HtmlResponse(url=url, body=body, request=request, encoding='utf-8')
def setup_vars():
reg_variable('USER', 'User for ebay')
reg_variable('PASSWORD', 'Password for ebay')
reg_variable('DRIVER', 'Driver to use with selenium', 'PhantomJS',
validate=lambda v: v in ('Chrome', 'Firefox', 'PhantomJS')
)
reg_variable('LOCALE', 'Localization for numerics and monetary stuff',
validate=lambda v: locale.setlocale(locale.LC_ALL, v)
)
reg_variable('BID_AHEAD_SECONDS', 'How many seconds before the actually specified time the bid should be placed',
value=3, type=int
)
reg_variable('HISTORY', 'History file',
os.path.expanduser("~/.ebay_hist")
)
#reg_variable('COOKIE_FILE', 'File for cookies. (Optional)',
# os.path.expandvars('/tmp/ebay-$USER-cookie')
#)
reg_variable('DEBUG', 'Print stacktraces and write ghostdriver.log', type=bool, value=0)
reg_variable('LOGIN_URL', 'URL for ebay login page', 'https://signin.ebay.de/ws/eBayISAPI.dll?SignIn')
reg_variable('LOGIN_URL_RE', 'RegEx to check if URL is a login page', 'https://signin.ebay.de')
reg_variable('LOGIN_FIELD_PASS_RE', 'RegEx to find password input field in login page', 'passwor')
reg_variable('LOGIN_FIELD_USER_RE', 'RegEx to find user input field in login page', 'e-mail')