def handle_phantomjs (self):
'''
????phantomjs???
:return: driver
'''
conf = {}
for line in fileinput.input("..//..//abuyun.conf"):
lines = line.replace(' ', '').replace('\n', '').split("=")
conf[lines[0]] = lines[1]
print '??'
# ?????
proxyHost = conf["proxyHost"]
proxyPort = conf["proxyPort"]
# ???????????
proxyUser = conf["proxyUser"]
proxyPass = conf["proxyPass"]
service_args = [
"--proxy-type=http",
"--proxy=%(host)s:%(port)s" % {
"host": proxyHost,
"port": proxyPort,
},
"--proxy-auth=%(user)s:%(pass)s" % {
"user": proxyUser,
"pass": proxyPass,
},
]
phantomjs_path = r"phantomjs"
dcap = dict(DesiredCapabilities.PHANTOMJS)
# ?????UA??????????
ua = self.rad_ua() ##?????UA
dcap["phantomjs.page.settings.userAgent"] = ua
driver = webdriver.PhantomJS(desired_capabilities=dcap, executable_path=phantomjs_path, service_args=service_args)
return driver
python类PHANTOMJS的实例源码
def __init__(self, executable_path="phantomjs",
port=0, desired_capabilities=DesiredCapabilities.PHANTOMJS,
service_args=None, service_log_path=None):
"""
Creates a new instance of the PhantomJS / Ghostdriver.
Starts the service and then creates new instance of the driver.
:Args:
- executable_path - path to the executable. If the default is used it assumes the executable is in the $PATH
- port - port you would like the service to run, if left as 0, a free port will be found.
- desired_capabilities: Dictionary object with non-browser specific
capabilities only, such as "proxy" or "loggingPref".
- service_args : A List of command line arguments to pass to PhantomJS
- service_log_path: Path for phantomjs service to log to.
"""
self.service = Service(
executable_path,
port=port,
service_args=service_args,
log_path=service_log_path)
self.service.start()
try:
RemoteWebDriver.__init__(
self,
command_executor=self.service.service_url,
desired_capabilities=desired_capabilities)
except:
self.quit()
raise
self._is_remote = False
def __init__(self, executable_path="phantomjs",
port=0, desired_capabilities=DesiredCapabilities.PHANTOMJS,
service_args=None, service_log_path=None):
"""
Creates a new instance of the PhantomJS / Ghostdriver.
Starts the service and then creates new instance of the driver.
:Args:
- executable_path - path to the executable. If the default is used it assumes the executable is in the $PATH
- port - port you would like the service to run, if left as 0, a free port will be found.
- desired_capabilities: Dictionary object with non-browser specific
capabilities only, such as "proxy" or "loggingPref".
- service_args : A List of command line arguments to pass to PhantomJS
- service_log_path: Path for phantomjs service to log to.
"""
self.service = Service(executable_path, port=port,
service_args=service_args, log_path=service_log_path)
self.service.start()
try:
RemoteWebDriver.__init__(self,
command_executor=self.service.service_url,
desired_capabilities=desired_capabilities)
except:
self.quit()
raise
self._is_remote = False
def get_driver_phantomjs():
"""
References:
PhantomJS:
1. [??PHANTOMJS?USER-AGENT](http://smilejay.com/2013/12/set-user-agent-for-phantomjs/)
2. [Selenium 2 - Setting user agent for IE and Chrome](http://stackoverflow.com/questions/6940477/selenium-2-setting-user-agent-for-ie-and-chrome)
"""
dcap = dict(DesiredCapabilities.PHANTOMJS)
# Setting User-Agent
ua = random.choice(RotateUserAgentMiddleware.user_agent_list)
if ua:
print("Current User-Agent is:", ua)
dcap["phantomjs.page.settings.userAgent"] = ua
driver = webdriver.PhantomJS(executable_path=r"/home/lxw/Downloads/phantomjs/phantomjs-2.1.1-linux-x86_64/bin/phantomjs", desired_capabilities=dcap)
"""
# Setting IP Proxies
# ??DesiredCapabilities(????)??????????sessionId????????????????????????????url
proxy = webdriver.Proxy()
proxy.proxy_type = ProxyType.MANUAL
ip_proxy = get_proxy()
if ip_proxy:
proxy.http_proxy = ip_proxy
# ????????webdriver.DesiredCapabilities.PHANTOMJS?
# proxy.add_to_capabilities(DesiredCapabilities.PHANTOMJS)
# driver.start_session(DesiredCapabilities.PHANTOMJS)
proxy.add_to_capabilities(dcap)
driver.start_session(dcap)
"""
# ??????
driver.set_page_load_timeout(TIMEOUT)
driver.set_script_timeout(TIMEOUT) # ???????????
return driver
def createHeadlessBrowser(proxy=None, XResolution=1024, YResolution=768):
#proxy = None
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.86 Safari/537.36')
if proxy != None:
service_args = ['--proxy={}'.format(proxy),'--proxy-type=https','--ignore-ssl-errors=true', '--ssl-protocol=any', '--web-security=false',]
driver = webdriver.PhantomJS(service_args=service_args, desired_capabilities=dcap)
else:
driver = webdriver.PhantomJS(desired_capabilities=dcap)
driver.set_window_size(XResolution,YResolution)
driver.set_page_load_timeout(20)
return driver
def _init_browser(self):
''' Setup selenium browser. Uses default path location
if none is specified. Returns browser object or
None if it fails.'''
# User Agent
uas = [
"Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
]
ua = random.choice(uas)
## PhantomJS Binary files
phantoms = config.PHANTOM_PATH
phantompath = random.choice(phantoms)
# Custom user agent
dc = dict(DesiredCapabilities.PHANTOMJS)
dc["phantomjs.page.settings.userAgent"] = ua
#dc["pages.settings.XSSAuditEnabled"] = "true"
try:
browser = webdriver.PhantomJS(
phantompath,
service_args=self.service_args,
desired_capabilities=dc
)
except WebDriverException as err:
logging.error("Could not create browser. Check path")
logging.error(err)
return None
except:
logging.error("Major problem with webdriver. "
"Could be related to performance."
"Decrease the number of threads.")
return None
browser.set_page_load_timeout(45)
## DELETED GOOD STUFF ##
return browser
def download(self, link, name, url):
"""
????????????
:param link:
:param name:
:param url:
:return:
"""
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
random.choice(self.agents)
)
dcap["takesScreenshot"] = False
dcap["phantomjs.page.customHeaders.Cookie"] = random.choice(self.cookie)
# dcap["phantomjs.page.settings.resourceTimeout"] = ("1000")
try:
driver1 = webdriver.PhantomJS(desired_capabilities=dcap, service_args=['--load-images=no', ])
except Exception as e:
with open(r'list_error.txt', 'a') as f:
f.write(name.encode('utf-8'))
f.write('\n')
print(datetime.datetime.now())
print(url)
print(e)
else:
try:
driver1.set_page_load_timeout(20)
driver1.get(link)
b = True
try:
driver1.find_element_by_class_name('page_verify')
except:
b = False
if b is True:
print('page needs verify, stop the program')
print('the last weixinNUM is %s\n' % name)
self.ocr4wechat(link)
time.sleep(5)
with open(r'list_error.txt', 'a') as f:
f.write(name.encode('utf-8'))
f.write('\n')
else:
html = driver1.page_source
return link, html
except Exception as e:
with open(r'list_error.txt', 'a') as f:
f.write(name.encode('utf-8'))
f.write('\n')
print(url)
print(datetime.datetime.now())
print(e)
finally:
driver1.quit()
def downloader_html_ph(url, up_num): ##??PhantomJS??????
'''
url :??????url
up_num :?????
'''
# print driver.service
print '????????! URL?', url, ' ?????:', up_num
conf = {}
for line in fileinput.input("..//..//abuyun.conf"):
lines = line.replace(' ', '').replace('\n', '').split("=")
conf[lines[0]] = lines[1]
# ?????
proxyHost = conf["proxyHost"]
proxyPort = conf["proxyPort"]
# ???????????
proxyUser = conf["proxyUser"]
proxyPass = conf["proxyPass"]
service_args = [
"--proxy-type=http",
"--proxy=%(host)s:%(port)s" % {
"host": proxyHost,
"port": proxyPort,
},
"--proxy-auth=%(user)s:%(pass)s" % {
"user": proxyUser,
"pass": proxyPass,
},
]
phantomjs_path = r"phantomjs"
dcap = dict(DesiredCapabilities.PHANTOMJS)
# ?????UA??????????
ua = rad_ua() ##?????UA
dcap["phantomjs.page.settings.userAgent"] = ua
# ,service_args=service_args ?????
driver = webdriver.PhantomJS(desired_capabilities=dcap, executable_path=phantomjs_path)
driver.get(url)
time.sleep(2)
##???????????????
dian = ''
print '?????',
for i in range(up_num):
driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
##??????????????
time.sleep(2)
dian = dian + '.'
print '.',
print driver.current_url, '?????????????'
data = driver.page_source.encode("utf-8")
# ??????
html_parser = HTMLParser.HTMLParser()
data = html_parser.unescape(data)
return data
def get_browser(self, browser_name):
"""get a webdriver browser instance """
self._browser_name = browser_name
if browser_name == 'firefox':
logger.debug("getting Firefox browser (local)")
if 'DISPLAY' not in os.environ:
logger.debug("exporting DISPLAY=:0")
os.environ['DISPLAY'] = ":0"
browser = webdriver.Firefox()
elif browser_name == 'chrome':
logger.debug("getting Chrome browser (local)")
browser = webdriver.Chrome()
browser.set_window_size(1920, 1080)
browser.implicitly_wait(2)
elif browser_name == 'chrome-headless':
logger.debug('getting Chrome browser (local) with --headless')
chrome_options = Options()
chrome_options.add_argument("--headless")
browser = webdriver.Chrome(chrome_options=chrome_options)
browser.set_window_size(1920, 1080)
browser.implicitly_wait(2)
elif browser_name == 'phantomjs':
logger.debug("getting PhantomJS browser (local)")
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = self.user_agent
args = [
'--cookies-file={c}'.format(c=self._cookie_file),
'--ssl-protocol=any',
'--ignore-ssl-errors=true',
'--web-security=false'
]
browser = webdriver.PhantomJS(
desired_capabilities=dcap, service_args=args
)
browser.set_window_size(1024, 768)
else:
raise SystemExit(
"ERROR: browser type must be one of 'firefox', 'chrome', "
"'chrome-headless' or 'phantomjs', not '{b}'".format(
b=browser_name
)
)
logger.debug("returning browser")
return browser
def selenium_request(url ,isscreen = False):
osurl = '%s/xici/validateimg/' % os.path.dirname(os.path.abspath("scrapy.cfg"))
ua_list = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/48.0.2564.82 Chrome/48.0.2564.82 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.93 Safari/537.36",
"Mozilla/5.0 (X11; OpenBSD i386) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36"
]
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.resourceTimeout"] = 15
dcap["phantomjs.page.settings.loadImages"] = True
dcap["phantomjs.page.settings.userAgent"] = choice(ua_list)
driver = webdriver.PhantomJS(executable_path='/Users/felixchan/Tool/phantomjs',desired_capabilities=dcap)
# driver = webdriver.Firefox()
driver.get(url)
if isscreen:
imgURL = '%s%s.png' % (osurl,int(time.time()))
uploadimg = '%s%s_2.png' % (osurl,int(time.time()))
driver.save_screenshot(imgURL) # ????
time.sleep(1)
ocr = RClient(VALIDATE['username'], VALIDATE['password'], VALIDATE['soft_id'], VALIDATE['soft_key'])
left = 260
top = 12
right = 396
bottom = 70
im = Image.open(imgURL)
im = im.crop((left, top, right, bottom))
im.save(uploadimg)
ims = open(uploadimg, 'rb').read()
post_result = ocr.create(uploadimg,ims, 3040)
varidate_code = post_result['Result']
print(post_result)
elem = driver.find_element_by_id('input')
elem.send_keys(varidate_code)
#elem.send_keys(Keys.ENTER) #??????Enter??
driver.find_element_by_id('bt').click()
driver.refresh()
driver.implicitly_wait(2)
time.sleep(1)
true_page = driver.page_source # .decode('utf-8','ignore')
driver.close()
return true_page
def __init__(self,
url="http://www.gsxt.gov.cn/index.html",
#url="http://sh.gsxt.gov.cn/notice",
#search_text = u"????",
search_text = u"????????????",
input_id='keyword',
search_element_id='btn_query',
gt_element_class_name='gt_box',
gt_slider_knob_name='gt_slider_knob',
result_numbers_xpath='/html/body/div[2]/div[3]/div[1]/span',
result_list_verify_id=None,
result_list_verify_class=None,
is_gap_every_broad=True):
"""
url: ??????
search_text: ??????
input_id: ???????id
search_element_id: ????????id
gt_element_class_name: ??????????class?????????????????
gt_slider_knob_name: ????????????class?????????????????
result_numbers_xpath: ??????????? ???????xpath,??????`50`?????????
result_list_verify_id: ?????????????id????????????????(??????ajax) or
result_list_verify_class: ?????????????class?????????????????(??????ajax)
is_gap_every_broad: ???????True????????????????????????????????????????
"""
self.url = url
self.search_text = search_text
self.input_id = input_id
self.search_element_id = search_element_id
self.gt_element_class_name = gt_element_class_name
self.gt_slider_knob_name = gt_slider_knob_name
self.result_numbers_xpath = result_numbers_xpath
self.result_list_verify_id = result_list_verify_id
self.result_list_verify_class = result_list_verify_class
self.is_gap_every_broad = is_gap_every_broad
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36"
)
#self.driver = webdriver.PhantomJS(desired_capabilities=dcap)
self.driver = webdriver.Chrome("/home/hee/driver/chromedriver")
#self.driver.maximize_window()
time.sleep(random.uniform(2.0, 3.0))
def __init__(self,
url="http://www.gsxt.gov.cn/index.html",
#url="http://sh.gsxt.gov.cn/notice",
#search_text = u"????",
search_text = u"????????????",
input_id='keyword',
search_element_id='btn_query',
gt_element_class_name='gt_box',
gt_slider_knob_name='gt_slider_knob',
result_numbers_xpath='/html/body/div[2]/div[3]/div[1]/span',
result_list_verify_id=None,
result_list_verify_class=None,
is_gap_every_broad=True):
"""
url: ??????
search_text: ??????
input_id: ???????id
search_element_id: ????????id
gt_element_class_name: ??????????class?????????????????
gt_slider_knob_name: ????????????class?????????????????
result_numbers_xpath: ??????????? ???????xpath,??????`50`?????????
result_list_verify_id: ?????????????id????????????????(??????ajax) or
result_list_verify_class: ?????????????class?????????????????(??????ajax)
is_gap_every_broad: ???????True????????????????????????????????????????
"""
self.url = url
self.search_text = search_text
self.input_id = input_id
self.search_element_id = search_element_id
self.gt_element_class_name = gt_element_class_name
self.gt_slider_knob_name = gt_slider_knob_name
self.result_numbers_xpath = result_numbers_xpath
self.result_list_verify_id = result_list_verify_id
self.result_list_verify_class = result_list_verify_class
self.is_gap_every_broad = is_gap_every_broad
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36"
)
#self.driver = webdriver.PhantomJS(desired_capabilities=dcap)
# self.driver = webdriver.Chrome("/home/hee/driver/chromedriver") # hee
self.driver = webdriver.Chrome(r"/home/lxw/Software/chromedirver_selenium/chromedriver") # lxw
#self.driver.maximize_window()
time.sleep(random.uniform(2.0, 3.0))
def __init__(self,
url="http://www.gsxt.gov.cn/index.html",
#url="http://sh.gsxt.gov.cn/notice",
#search_text = u"????",
search_text = u"????????????",
input_id='keyword',
search_element_id='btn_query',
gt_element_class_name='gt_box',
gt_slider_knob_name='gt_slider_knob',
result_numbers_xpath='/html/body/div[2]/div[3]/div[1]/span',
result_list_verify_id=None,
result_list_verify_class=None,
is_gap_every_broad=True):
"""
url: ??????
search_text: ??????
input_id: ???????id
search_element_id: ????????id
gt_element_class_name: ??????????class?????????????????
gt_slider_knob_name: ????????????class?????????????????
result_numbers_xpath: ??????????? ???????xpath,??????`50`?????????
result_list_verify_id: ?????????????id????????????????(??????ajax) or
result_list_verify_class: ?????????????class?????????????????(??????ajax)
is_gap_every_broad: ???????True????????????????????????????????????????
"""
self.url = url
self.search_text = search_text
self.input_id = input_id
self.search_element_id = search_element_id
self.gt_element_class_name = gt_element_class_name
self.gt_slider_knob_name = gt_slider_knob_name
self.result_numbers_xpath = result_numbers_xpath
self.result_list_verify_id = result_list_verify_id
self.result_list_verify_class = result_list_verify_class
self.is_gap_every_broad = is_gap_every_broad
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36"
)
#self.driver = webdriver.PhantomJS(desired_capabilities=dcap)
self.driver = webdriver.Chrome("/home/hee/driver/chromedriver") # hee
# self.driver = webdriver.Chrome(r"/home/lxw/Software/chromedirver_selenium/chromedriver") # lxw
#self.driver.maximize_window()
time.sleep(random.uniform(2.0, 3.0))
def findTrip():
url = "http://flights.ctrip.com/booking/XMN-BJS-day-1.html?DDate1=2016-04-18"
ua_list = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/48.0.2564.82 Chrome/48.0.2564.82 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.93 Safari/537.36",
"Mozilla/5.0 (X11; OpenBSD i386) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36"
]
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.resourceTimeout"] = 15
dcap["phantomjs.page.settings.loadImages"] = False
dcap["phantomjs.page.settings.userAgent"] = choice(ua_list)
#driver = webdriver.PhantomJS(executable_path=u'/home/icgoo/pywork/spider/phantomjs',desired_capabilities=dcap)
#driver = webdriver.PhantomJS(executable_path=u'/home/fank/pywork/spider/phantomjs',desired_capabilities=dcap)
driver = webdriver.Firefox()
driver.get(url)
driver.implicitly_wait(3)
time.sleep(5)
page = driver.page_source # .decode('utf-8','ignore')
html = etree.HTML(page)
fligint_div = "//div[@id='J_flightlist2']/div"
items = html.xpath(fligint_div)
detail = []
for index,item in enumerate(items):
flight_tr = fligint_div+'['+str(index+1)+']'+'//tr'
istrain = html.xpath(flight_tr + "//div[@class='train_flight_tit']")
if istrain:
pass # is train add
else:
company = html.xpath(flight_tr + "//div[@class='info-flight J_flight_no']//text()")
flight_time_from = html.xpath(flight_tr + "//td[@class='right']/div[1]//text()")
flight_time_to = html.xpath(flight_tr + "//td[@class='left']/div[1]//text()")
flight_time = [flight_time_from,flight_time_to]
airports_from = html.xpath(flight_tr + "//td[@class='right']/div[2]//text()")
airports_to = html.xpath(flight_tr + "//td[@class='left']/div[2]//text()")
airports = [airports_from,airports_to]
price = html.xpath(flight_tr + "[1]//td[@class='price middle ']/span//text()")
detail.append(
dict(
company=company,
flight_time=flight_time,
airports=airports,
price=price
))
print detail
driver.close()
return detail