def chrome(self):
# https://github.com/SeleniumHQ/selenium/blob/master/py/selenium/webdriver/remote/webdriver.py
# http://www.guguncube.com/2983/python-testing-selenium-with-google-chrome
# https://gist.github.com/addyosmani/5336747
# http://blog.likewise.org/2015/01/setting-up-chromedriver-and-the-selenium-webdriver-python-bindings-on-ubuntu-14-dot-04/
# https://sites.google.com/a/chromium.org/chromedriver/getting-started
# http://stackoverflow.com/questions/8255929/running-webdriver-chrome-with-selenium
chrome = webdriver.Chrome()
return chrome
# @property
# def firefox(self):
# profile = webdriver.FirefoxProfile()
# #firefox = webdriver.Firefox(firefox_profile=profile)
# firefox = WebDriver(firefox_profile=profile)
# return firefox
python类org()的实例源码
def create_session(max_connections=8, proxies=None):
"""
Creates a session object that can be used by multiple :class:`Dropbox` and
:class:`DropboxTeam` instances. This lets you share a connection pool
amongst them, as well as proxy parameters.
:param int max_connections: Maximum connection pool size.
:param dict proxies: See the `requests module
<http://docs.python-requests.org/en/latest/user/advanced/#proxies>`_
for more details.
:rtype: :class:`requests.sessions.Session`. `See the requests module
<http://docs.python-requests.org/en/latest/user/advanced/#session-objects>`_
for more details.
"""
# We only need as many pool_connections as we have unique hostnames.
session = pinned_session(pool_maxsize=max_connections)
if proxies:
session.proxies = proxies
return session
def create_session(max_connections=8, proxies=None):
"""
Creates a session object that can be used by multiple :class:`Dropbox` and
:class:`DropboxTeam` instances. This lets you share a connection pool
amongst them, as well as proxy parameters.
:param int max_connections: Maximum connection pool size.
:param dict proxies: See the `requests module
<http://docs.python-requests.org/en/latest/user/advanced/#proxies>`_
for more details.
:rtype: :class:`requests.sessions.Session`. `See the requests module
<http://docs.python-requests.org/en/latest/user/advanced/#session-objects>`_
for more details.
"""
# We only need as many pool_connections as we have unique hostnames.
session = pinned_session(pool_maxsize=max_connections)
if proxies:
session.proxies = proxies
return session
def getshardhost(p_apikey, p_orgid):
#Looks up shard URL for a specific org. Use this URL instead of 'dashboard.meraki.com'
# when making API calls with API accounts that can access multiple orgs.
#On failure returns 'null'
time.sleep(API_EXEC_DELAY)
try:
r = requests.get('https://dashboard.meraki.com/api/v0/organizations/%s/snmp' % p_orgid, headers={'X-Cisco-Meraki-API-Key': p_apikey, 'Content-Type': 'application/json'})
except:
printusertext('ERROR 02: Unable to contact Meraki cloud')
sys.exit(2)
if r.status_code != requests.codes.ok:
return 'null'
rjson = r.json()
return(rjson['hostname'])
def _request(query=None, method="GET", json=None, data=None, headers=None, params=None):
"""
Queries like /repos/:id needs to be appended to the base URL,
Queries like https://raw.githubusercontent.com need not.
full list of kwargs see http://docs.python-requests.org/en/master/api/#requests.request
"""
if query[0] == "/":
query = BASE_URL + query
kwargs = {
"auth": AUTH,
"json": json if json else None,
"data": data if data else None,
"headers": headers if headers else None,
"params": params if params else None,
}
return requests.request(method, query, **kwargs)
def post_settings(greeting_text):
""" Sets the START Button and also the Greeting Text.
The payload for the START Button will be 'USER_START'
:param str greeting_text: Desired Greeting Text (160 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
# Set the greeting texts
url = TD_STS_URL + PAGE_ACCESS_TOKEN
txtpayload = {}
txtpayload['setting_type'] = 'greeting'
txtpayload['greeting'] = {'text': greeting_text}
response_msg = json.dumps(txtpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
# Set the start button
btpayload = {}
btpayload['setting_type'] = 'call_to_actions'
btpayload['thread_state'] = 'new_thread'
btpayload['call_to_actions'] = [{'payload': 'USER_START'}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_start_button(payload='START'):
""" Sets the Thread Settings Greeting Text
(/docs/messenger-platform/thread-settings/get-started-button).
:param str payload: Desired postback payload (Default START).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
btpayload = {}
btpayload["setting_type"] = "call_to_actions"
btpayload["thread_state"] = "new_thread"
btpayload["call_to_actions"] = [{"payload": payload}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'):
""" Sets the whistelisted domains for the Messenger Extension
(/docs/messenger-platform/thread-settings/domain-whitelisting).
:param list whistelisted_domains: Domains to be whistelisted.
:param str domain_action_type: Action to run `add/remove` (Defaut add).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['setting_type'] = 'domain_whitelisting'
payload['whitelisted_domains'] = whitelisted_domains
payload['domain_action_type'] = domain_action_type
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def typing(fbid, sender_action):
""" Displays/Hides the typing gif/mark seen on facebook chat
(/docs/messenger-platform/send-api-reference/sender-actions)
:param str fbid: User id to display action.
:param str sender_action: `typing_off/typing_on/mark_seen`.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['sender_action'] = sender_action
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
# Send API Content Type
def post_text_message(fbid, message):
""" Sends a common text message
(/docs/messenger-platform/send-api-reference/text-message)
:param str fbid: User id to send the text.
:param str message: Text to be displayed for the user (230 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['message'] = {'text': message} # Limit 320 chars
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_attachment(fbid, media_url, file_type):
""" Sends a media attachment
(/docs/messenger-platform/send-api-reference/contenttypes)
:param str fbid: User id to send the audio.
:param str url: Url of a hosted media.
:param str type: image/audio/video/file.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
attachment = {"type": file_type, "payload": {"url": media_url}}
payload['message'] = {"attachment": attachment}
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_start_button(payload='START'):
""" Sets the **Get Started button**.
:usage:
>>> payload = 'GET_STARTED'
>>> response = fbbotw.post_start_button(payload=payload)
:param str payload: Desired postback payload (Default 'START').
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/get-started-button <https://developers.facebook\
.com/docs/messenger-platform/messenger-profile/get-started-button>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload_data = {}
payload_data['get_started'] = {'payload': payload}
data = json.dumps(payload_data)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_domain_whitelist(whitelisted_domains):
""" Sets the whistelisted domains for the Messenger Extension
:usage:
>>> # Define the array of domains to be whitelisted (Max 10)
>>> whistelisted_domains = [
"https://myfirst_domain.com",
"https://another_domain.com"
]
>>> fbbotw.post_domain_whitelist(
whitelisted_domains=whitelisted_domains
)
:param list whistelisted_domains: Domains to be whistelisted (Max 10).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/domain-whitelisting <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['whitelisted_domains'] = whitelisted_domains
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_account_linking_url(account_linking_url):
""" Sets the **liking_url** to connect the user with your business login
:usage:
>>> my_callback_linking_url = "https://my_business_callback.com"
>>> response = fbbotw.post_account_linking_url(
account_linking_url=my_callback_linking_url
)
:param str account_linking_url: URL to the account linking OAuth flow.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/account-linking-url <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/account-linking-url>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['account_linking_url'] = account_linking_url
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def __init__(self, conf):
self.conf = conf
client.Configuration().host = self.conf.kubernetes.kube_host
self.v1 = client.CoreV1Api()
self.v1extention = client.ExtensionsV1beta1Api()
# Create namespace if not exists
self._ensure_namespace()
# Get templates.
template_loader = jinja2.FileSystemLoader(
searchpath=os.path.dirname(TEMPLATES_DIR)
)
jinja_env = jinja2.Environment(
loader=template_loader, autoescape=True, trim_blocks=True,
lstrip_blocks=True
)
self.deployment_template = jinja_env.get_template('deployment.j2')
self.service_template = jinja_env.get_template('service.j2')
self.pod_template = jinja_env.get_template('pod.j2')
# Refer to
# http://docs.python-requests.org/en/master/user/advanced/#session-objects
self.session = requests.Session()
def speech(self, audio_file, verbose=None, headers=None):
""" Sends an audio file to the /speech API.
Uses the streaming feature of requests (see `req`), so opening the file
in binary mode is strongly reccomended (see
http://docs.python-requests.org/en/master/user/advanced/#streaming-uploads).
Add Content-Type header as specified here: https://wit.ai/docs/http/20160526#post--speech-link
:param audio_file: an open handler to an audio file
:param verbose:
:param headers: an optional dictionary with request headers
:return:
"""
params = {}
headers = headers or {}
if verbose:
params['verbose'] = True
resp = req(self.logger, self.access_token, 'POST', '/speech', params,
data=audio_file, headers=headers)
return resp
def __init__(self, apikey=API_KEY, apiurl=API_URL, accept_tac=ACCEPT_TAC, timeout=None, verify_ssl=True, retries=3, proxies=None):
"""
Create a JoeSandbox object.
Parameters:
apikey: the api key
apiurl: the api url
accept_tac: Joe Sandbox Cloud requires accepting the Terms and Conditions.
https://jbxcloud.joesecurity.org/resources/termsandconditions.pdf
timeout: Timeout in seconds for accessing the API. Raises a ConnectionError on timeout.
verify_ssl: Enable or disable checking SSL certificates.
retries: Number of times requests should be retried if they timeout.
proxies: Proxy settings, see the requests library for more information:
http://docs.python-requests.org/en/master/user/advanced/#proxies
"""
self.apikey = apikey
self.apiurl = apiurl.rstrip("/")
self.accept_tac = accept_tac
self.timeout = timeout
self.retries = retries
self.session = requests.Session()
self.session.verify = verify_ssl
self.session.proxies = proxies
def call_api(self, action, params=None, **kwargs):
"""
??API????????SSL?????????
http://www.python-requests.org/en/latest/user/advanced/#ssl-cert-verification
:param action: Method Name?
:param params: Dictionary,form params for api.
:param timeout: (optional) Float describing the timeout of the request.
:return:
"""
return self._http_call(
url=self._join_url(self.api_host, "%s.%s" % (action, self.response_type)),
method="POST",
data=params,
headers=self._headers(),
**kwargs
)
def __init__(self, ise_node, ers_user, ers_pass, verify=False, disable_warnings=False, timeout=2):
"""
Class to interact with Cisco ISE via the ERS API
:param ise_node: IP Address of the primary admin ISE node
:param ers_user: ERS username
:param ers_pass: ERS password
:param verify: Verify SSL cert
:param disable_warnings: Disable requests warnings
:param timeout: Query timeout
"""
self.ise_node = ise_node
self.user_name = ers_user
self.user_pass = ers_pass
self.url_base = 'https://{0}:9060/ers'.format(self.ise_node)
self.ise = requests.session()
self.ise.auth = (self.user_name, self.user_pass)
self.ise.verify = verify # http://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification
self.disable_warnings = disable_warnings
self.timeout = timeout
self.ise.headers.update({'Connection': 'keep_alive'})
if self.disable_warnings:
requests.packages.urllib3.disable_warnings()
def create_session(max_connections=8, proxies=None):
"""
Creates a session object that can be used by multiple :class:`Dropbox` and
:class:`DropboxTeam` instances. This lets you share a connection pool
amongst them, as well as proxy parameters.
:param int max_connections: Maximum connection pool size.
:param dict proxies: See the `requests module
<http://docs.python-requests.org/en/latest/user/advanced/#proxies>`_
for more details.
:rtype: :class:`requests.sessions.Session`. `See the requests module
<http://docs.python-requests.org/en/latest/user/advanced/#session-objects>`_
for more details.
"""
# We only need as many pool_connections as we have unique hostnames.
session = pinned_session(pool_maxsize=max_connections)
if proxies:
session.proxies = proxies
return session
def get_doi_el(wiki, dbcnf):
""" Set of DOI codes from external links. """
dois = set([])
doiquery = """SELECT el_to
FROM externallinks
WHERE el_index LIKE 'https://org.doi.dx./10%'
OR el_index LIKE 'http://org.doi.dx./10%'"""
with get_connection(wiki, dbcnf) as connection:
cursor = connection.cursor()
cursor.execute(doiquery)
for link in cursor.fetchall():
try:
doi = re.findall('10.+$', link[0].decode('utf-8'))[0]
if doi:
dois.add(unquote(doi))
except IndexError:
continue
# print "Found %d DOI external links on %s" % (len(dois), wiki)
return dois
def get_doai_oa(doi):
""" Given a DOI, return DOAI target URL if green open access,
None otherwise.
"""
doaiurl = 'http://doai.io/{}'.format(doi)
try:
doai = SESSIONDOAI.head(url=doaiurl)
except requests.ConnectionError:
time.sleep(random.randint(1, 100))
return False
if doai.status_code == 302:
url = doai.headers['Location']
if re.search('doi.org', url):
return None
else:
return url
def message(client, feed_id, payload):
# Message function will be called when a subscribed feed has a new value.
# The feed_id parameter identifies the feed, and the payload parameter has
# the new value.
print('Feed {0} received new value: {1}'.format(feed_id, payload))
# Update physical dashboard depending on the changed feed.
# Notice the feed_id is checked to find out which feed changed, then the
# appropriate physical dashboard widget is changed.
if feed_id == 'pi-dashboard-slider':
# The requests.post function will make an HTTP request against the
# dashboard. See the requests documentation for more information:
# http://docs.python-requests.org/en/latest/
requests.post('{0}/widgets/slider'.format(DASHBOARD_URL), data={'value': payload})
elif feed_id == 'pi-dashboard-humidity':
requests.post('{0}/widgets/humidity'.format(DASHBOARD_URL), data={'value': payload})
elif feed_id == 'pi-dashboard-temp':
requests.post('{0}/widgets/temp'.format(DASHBOARD_URL), data={'value': payload})
# Create an MQTT client instance.
def test_multiline():
url = 'http://httpbin.org/post'
data = b'content=Im\r\na multiline\r\n\r\nsentence\r\n'
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
'Accept': 'text/plain',
}
HTTPretty.register_uri(
HTTPretty.POST,
url,
)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
expect(HTTPretty.last_request.method).to.equal('POST')
expect(HTTPretty.last_request.path).to.equal('/post')
expect(HTTPretty.last_request.body).to.equal(data)
expect(HTTPretty.last_request.headers['content-length']).to.equal('37')
expect(HTTPretty.last_request.headers['content-type']).to.equal('application/x-www-form-urlencoded; charset=utf-8')
expect(len(HTTPretty.latest_requests)).to.equal(1)
def test_octet_stream():
url = 'http://httpbin.org/post'
data = b"\xf5\x00\x00\x00" # utf-8 with invalid start byte
headers = {
'Content-Type': 'application/octet-stream',
}
HTTPretty.register_uri(
HTTPretty.POST,
url,
)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
expect(HTTPretty.last_request.method).to.equal('POST')
expect(HTTPretty.last_request.path).to.equal('/post')
expect(HTTPretty.last_request.body).to.equal(data)
expect(HTTPretty.last_request.headers['content-length']).to.equal('4')
expect(HTTPretty.last_request.headers['content-type']).to.equal('application/octet-stream')
expect(len(HTTPretty.latest_requests)).to.equal(1)
def test_multipart():
url = 'http://httpbin.org/post'
data = b'--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="content"\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 68\r\n\r\nAction: comment\nText: Comment with attach\nAttachment: x1.txt, x2.txt\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_2"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_1"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz--\r\n'
headers = {'Content-Length': '495', 'Content-Type': 'multipart/form-data; boundary=xXXxXXyYYzzz', 'Accept': 'text/plain'}
HTTPretty.register_uri(
HTTPretty.POST,
url,
)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
expect(HTTPretty.last_request.method).to.equal('POST')
expect(HTTPretty.last_request.path).to.equal('/post')
expect(HTTPretty.last_request.body).to.equal(data)
expect(HTTPretty.last_request.headers['content-length']).to.equal('495')
expect(HTTPretty.last_request.headers['content-type']).to.equal('multipart/form-data; boundary=xXXxXXyYYzzz')
expect(len(HTTPretty.latest_requests)).to.equal(1)
def test_httpretty_allows_to_chose_if_querystring_should_be_matched():
"HTTPretty should provide a way to not match regexes that have a different querystring"
HTTPretty.register_uri(
HTTPretty.GET,
re.compile("https://example.org/(?P<endpoint>\w+)/$"),
body="Nudge, nudge, wink, wink. Know what I mean?",
match_querystring=True
)
response = requests.get('https://example.org/what/')
expect(response.text).to.equal('Nudge, nudge, wink, wink. Know what I mean?')
try:
requests.get('https://example.org/what/?flying=coconuts')
raised = False
except requests.ConnectionError:
raised = True
assert raised is True
def _request(self, method, urlpath, **kwargs):
"""Makes an HTTP call using the Python requests library.
Refer to http://docs.python-requests.org/en/master/api/ for more information on supported
options and features.
Args:
method: HTTP method name as a string (e.g. get, post).
urlpath: URL path of the remote endpoint. This will be appended to the server's base URL.
kwargs: An additional set of keyword arguments to be passed into requests API
(e.g. json, params).
Returns:
dict: The parsed JSON response.
"""
resp = self._session.request(method, ID_TOOLKIT_URL + urlpath, **kwargs)
resp.raise_for_status()
return resp.json()
def request(self, method, url, **kwargs):
"""Makes an HTTP call using the Python requests library.
This is the sole entry point to the requests library. All other helper methods in this
class call this method to send HTTP requests out. Refer to
http://docs.python-requests.org/en/master/api/ for more information on supported options
and features.
Args:
method: HTTP method name as a string (e.g. get, post).
url: URL of the remote endpoint.
kwargs: An additional set of keyword arguments to be passed into the requests API
(e.g. json, params).
Returns:
Response: An HTTP response object.
Raises:
RequestException: Any requests exceptions encountered while making the HTTP call.
"""
resp = self._session.request(method, self._base_url + url, **kwargs)
resp.raise_for_status()
return resp
def _add_encoded_geom(df, geom_col):
"""Add encoded geometry to DataFrame"""
# None if not a GeoDataFrame
is_geopandas = getattr(df, '_geometry_column_name', None)
if is_geopandas is None and geom_col is None:
warn('`encode_geom` works best with Geopandas '
'(http://geopandas.org/) and/or shapely '
'(https://pypi.python.org/pypi/Shapely).')
geom_col = 'geometry' if 'geometry' in df.columns else None
if geom_col is None:
raise KeyError('Geometries were requested to be encoded '
'but a geometry column was not found in the '
'DataFrame.'.format(geom_col=geom_col))
elif is_geopandas and geom_col:
warn('Geometry column of the input DataFrame does not '
'match the geometry column supplied. Using user-supplied '
'column...\n'
'\tGeopandas geometry column: {}\n'
'\tSupplied `geom_col`: {}'.format(is_geopandas,
geom_col))
elif is_geopandas and geom_col is None:
geom_col = is_geopandas
# updates in place
df['the_geom'] = df[geom_col].apply(_encode_geom)
return None