def login(self):
cookie_dict = requests.utils.dict_from_cookiejar(self.session.cookies)
if cookie_dict.get('session'):
return True
login_params = {
'submit': 'Login',
'username': self.username,
'password': self.password,
'keeplogged': 0,
}
if not self.get_url(self.urls['login'], post_data=login_params, returns='text'):
logger.log(u"Unable to connect to provider", logger.WARNING)
return False
response = self.get_url(urljoin(self.urls['base_url'],'index.php'), returns='text')
if re.search('<title>Login :: BJ-Share</title>', response):
logger.log(u"Invalid username or password. Check your settings", logger.WARNING)
return False
return True
python类urljoin()的实例源码
def _request_get(self, path, params=None):
url = urljoin(BASE_URL, path)
headers = self._get_request_headers()
response = requests.get(url, params=params, headers=headers)
if response.status_code >= 500:
backoff = self._initial_backoff
for _ in range(self._max_retries):
time.sleep(backoff)
backoff_response = requests.get(url, params=params, headers=headers)
if backoff_response.status_code < 500:
response = backoff_response
break
backoff *= 2
response.raise_for_status()
return response.json()
def cloud_auth(session, login=LOGIN, password=PASSWORD):
try:
r = session.post('https://auth.mail.ru/cgi-bin/auth?lang=ru_RU&from=authpopup',
data = {'Login': login, 'Password': password, 'page': urljoin(CLOUD_URL, '?from=promo'),
'new_auth_form': 1, 'Domain': get_email_domain(login)}, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Cloud auth HTTP request error: {}'.format(e))
return None
if r.status_code == requests.codes.ok:
if LOGIN_CHECK_STRING in r.text:
return True
elif LOGGER:
LOGGER.error('Cloud authorization request error. Check your credentials settings in {}. \
Do not forget to accept cloud LA by entering it in browser. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
elif LOGGER:
LOGGER.error('Cloud authorization request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
return None
def get_csrf(session):
try:
r = session.get(urljoin(CLOUD_URL, 'tokens/csrf'), verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Get csrf HTTP request error: {}'.format(e))
return None
if r.status_code == requests.codes.ok:
r_json = r.json()
token = r_json['body']['token']
assert len(token) == 32, 'invalid CSRF token <{}> lentgh'.format(token)
return token
elif LOGGER:
LOGGER.error('CSRF token request error. Check your connection and credentials settings in {}. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
return None
def get_upload_domain(session, csrf=''):
""" return current cloud's upload domain url
it seems that csrf isn't necessary in session,
but forcing assert anyway to avoid possible future damage
"""
assert csrf is not None, 'no CSRF'
url = urljoin(CLOUD_URL, 'dispatcher?token=' + csrf)
try:
r = session.get(url, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Get upload domain HTTP request error: {}'.format(e))
return None
if r.status_code == requests.codes.ok:
r_json = r.json()
return r_json['body']['upload'][0]['url']
elif LOGGER:
LOGGER.error('Upload domain request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
return None
def _read_actions(self):
action_url = urljoin(self._url_base, self._control_url)
for action_node in self._findall('actionList/action'):
name = action_node.findtext('name', namespaces=action_node.nsmap)
argsdef_in = []
argsdef_out = []
for arg_node in action_node.findall(
'argumentList/argument', namespaces=action_node.nsmap):
findtext = partial(arg_node.findtext, namespaces=arg_node.nsmap)
arg_name = findtext('name')
arg_statevar = self.statevars[findtext('relatedStateVariable')]
if findtext('direction').lower() == 'in':
argsdef_in.append((arg_name, arg_statevar))
else:
argsdef_out.append((arg_name, arg_statevar))
action = Action(action_url, self.service_type, name, argsdef_in, argsdef_out)
self.action_map[name] = action
self.actions.append(action)
def get_log_types():
url = "https://www.bro.org/sphinx/script-reference/"
resp = requests.get(url=url + "log-files.html")
soup = BeautifulSoup(resp.content, "html.parser")
bro_logs = dict(logs=[])
for table in soup.find_all("table", {"class": "docutils"}):
for row in table.find('tbody').find_all('tr'):
log = {}
cols = row.find_all('td')
cols = [ele.text.strip() for ele in cols]
tds = [ele for ele in cols if ele]
log['file'] = tds[0]
log['log_type'] = os.path.splitext(log['file'])[0]
log['description'] = tds[1]
log['fields'] = []
link = row.find('a', href=True)
# do not add a URL for notice_alarm.log
if link is not None and 'notice_alarm' not in log['log_type']:
log['url'] = urljoin(url, link['href'])
logger.info('adding log type: {}'.format(log['log_type']))
bro_logs['logs'].append(log)
return bro_logs
def _authenticate(email=None, password=None):
"""
:param email:
:param password:
:return:
"""
if email is None:
try:
input_fun = raw_input
except NameError:
input_fun = input
email = input_fun('IBM QE user (e-mail) > ')
if password is None:
password = getpass.getpass(prompt='IBM QE password > ')
r = requests.post(urljoin(_api_url, 'users/login'),
data={"email": email, "password": password})
r.raise_for_status()
json_data = r.json()
user_id = json_data['userId']
access_token = json_data['id']
return user_id, access_token
def test_send_real_device_offline(monkeypatch):
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
# Accessing status of device. Return online.
status_url = 'Backends/ibmqx2/queue/status'
if args[0] == urljoin(_api_url_status, status_url):
return MockResponse({"state": False}, 200)
monkeypatch.setattr("requests.get", mocked_requests_get)
shots = 1
json_qasm = "my_json_qasm"
name = 'projectq_test'
with pytest.raises(_ibm_http_client.DeviceOfflineError):
_ibm_http_client.send(json_qasm,
device="ibmqx2",
user=None, password=None,
shots=shots, verbose=True)
def construct_collection(self, instance, spec, loc, context):
"""
Constructor for `.collection` predicate.
This constructor aims to aggregate the cerberus validation schemas
for every single field defined by the collection.
"""
instance = super(self.__class__, self).construct_collection(
instance, spec, loc, context)
self.init_adapter_conf(instance)
schema = {field_name: schema.get(self.ADAPTER_CONF, {})
for field_name, schema in doc.doc_get(
instance, ('*',)).iteritems()}
collection = context.get('parent_name')
endpoint = urljoin(
self.root_url, TRAILING_SLASH.join([loc[0], collection]))
endpoint += TRAILING_SLASH
instance[self.ADAPTER_CONF] = schema
client = ApimasClient(endpoint, schema)
self.clients[loc[0] + '/' + collection] = client
return instance
def _request_post(self, path, data=None, params=None):
url = urljoin(BASE_URL, path)
headers = self._get_request_headers()
response = requests.post(url, json=data, params=params, headers=headers)
response.raise_for_status()
if response.status_code == 200:
return response.json()
def _request_put(self, path, data=None, params=None):
url = urljoin(BASE_URL, path)
headers = self._get_request_headers()
response = requests.put(url, json=data, params=params, headers=headers)
response.raise_for_status()
if response.status_code == 200:
return response.json()
def _request_delete(self, path, params=None):
url = urljoin(BASE_URL, path)
headers = self._get_request_headers()
response = requests.delete(url, params=params, headers=headers)
response.raise_for_status()
if response.status_code == 200:
return response.json()
def req(url, hdr):
try:
res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0)
except requests.Timeout:
raise RequestTimeoutError(url)
except requests.ConnectionError:
raise RequestTimeoutError(url)
if res.status_code != 200:
raise StatusCodeError(url, res.status_code)
return res
def get_cloud_space(session, csrf='', login=LOGIN):
""" returns available free space in bytes """
assert csrf is not None, 'no CSRF'
timestamp = str(int(time.mktime(datetime.datetime.now().timetuple())* 1000))
quoted_login = quote_plus(login)
command = ('user/space?api=' + str(API_VER) + '&email=' + quoted_login +
'&x-email=' + quoted_login + '&token=' + csrf + '&_=' + timestamp)
url = urljoin(CLOUD_URL, command)
try:
r = session.get(url, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Get cloud space HTTP request error: {}'.format(e))
return 0
if r.status_code == requests.codes.ok:
r_json = r.json()
total_bytes = r_json['body']['total'] * 1024 * 1024
used_bytes = r_json['body']['used'] * 1024 * 1024
return total_bytes - used_bytes
elif LOGGER:
LOGGER.error('Cloud free space request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
return 0
def post_file(session, domain='', file='', login=LOGIN):
""" posts file to the cloud's upload server
param: file - string filename with path
"""
assert domain is not None, 'no domain'
assert file is not None, 'no file'
filetype = guess_type(file)[0]
if not filetype:
filetype = DEFAULT_FILETYPE
if LOGGER:
LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))
filename = os.path.basename(file)
quoted_login = quote_plus(login)
timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})
try:
r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Post file HTTP request error: {}'.format(e))
return (None, None)
if r.status_code == requests.codes.ok:
if len(r.content):
hash = r.content[:40].decode()
size = int(r.content[41:-2])
return (hash, size)
elif LOGGER:
LOGGER.error('File {} post error, no hash and size received'.format(file))
elif LOGGER:
LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
return (None, None)
def prepare_url(value):
# Issue #1483: Make sure the URL always has a trailing slash
httpbin_url = value.url.rstrip('/') + '/'
def inner(*suffix):
return urljoin(httpbin_url, '/'.join(suffix))
return inner
def test_get_rfs_url(self):
CONF.podm.url = "https://127.0.0.1:8443"
expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")
# test without service_ext
result = redfish.get_rfs_url("/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("/Systems/1")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("Systems/1")
self.assertEqual(expected, result)
# test with service_ext
result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("/redfish/v1/Systems/1")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("redfish/v1/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("redfish/v1/Systems/1")
self.assertEqual(expected, result)
def test_get_rfs_url_with_tailing_slash(self):
CONF.podm.url = "https://127.0.0.1:8443/"
expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")
# test without service_ext
result = redfish.get_rfs_url("/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("/Systems/1")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("Systems/1")
self.assertEqual(expected, result)
# test with service_ext
result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("/redfish/v1/Systems/1")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("redfish/v1/Systems/1/")
self.assertEqual(expected, result)
result = redfish.get_rfs_url("redfish/v1/Systems/1")
self.assertEqual(expected, result)
def __init__(self, url_base, service_type, service_id, control_url, scpd_url, event_sub_url):
self._url_base = url_base
self.service_type = service_type
self.service_id = service_id
self._control_url = control_url
self.scpd_url = scpd_url
self._event_sub_url = event_sub_url
self.actions = []
self.action_map = {}
self.statevars = {}
self._log = _getLogger('Service')
self._log.debug('%s url_base: %s', self.service_id, self._url_base)
self._log.debug('%s SCPDURL: %s', self.service_id, self.scpd_url)
self._log.debug('%s controlURL: %s', self.service_id, self._control_url)
self._log.debug('%s eventSubURL: %s', self.service_id, self._event_sub_url)
url = urljoin(self._url_base, self.scpd_url)
self._log.info('Reading %s', url)
resp = requests.get(url, timeout=HTTP_TIMEOUT)
resp.raise_for_status()
self.scpd_xml = etree.fromstring(resp.content)
self._find = partial(self.scpd_xml.find, namespaces=self.scpd_xml.nsmap)
self._findtext = partial(self.scpd_xml.findtext, namespaces=self.scpd_xml.nsmap)
self._findall = partial(self.scpd_xml.findall, namespaces=self.scpd_xml.nsmap)
self._read_state_vars()
self._read_actions()
def stats(self):
""" Get global cryptocurrencies statistics.
Returns:
dict: Global markets statistics
"""
url = urljoin(self.urls["api"], 'global/')
response = get(url).json(parse_int=self.parse_int,
parse_float=self.parse_float)
return response
####### WEB PARSER METHODS #######
def _get_ranks(self, query, temp):
"""Internal function for get gainers and losers
Args:
query: Query to obtain ranks, gainers or losers
temp: Temporal period obtaining gainers or losers,
1h, 24h or 7d
"""
url = urljoin(self.urls["web"], 'gainers-losers/')
html = self._html(url)
call = str(query) + '-' + str(temp)
response = []
html_rank = html.find('div', {'id': call}).find_all('tr')
for curr in html_rank[1:]:
_childs, childs = (curr.contents, [])
for c in _childs:
if c != '\n':
childs.append(c)
for n, g in enumerate(childs):
if n == 1:
name = str(g.a.getText())
elif n == 2:
symbol = str(g.string)
elif n == 3:
_volume_24h = sub(r'\$|,', '', g.a.getText())
volume_24h = self.parse_int(_volume_24h)
elif n == 4:
_price = sub(r'\$|,', '', g.a.getText())
price = self.parse_float(_price)
elif n == 5:
percent = self.parse_float(sub(r'%', '', g.string))
currency = {'symbol': symbol, 'name': name,
'24h_volume_usd': volume_24h,
'price_usd': price, 'percent_change': percent}
response.append(currency)
return response
def global_cap(self, bitcoin=True, start=None, end=None):
"""Get global market capitalization graphs, including
or excluding Bitcoin
Args:
bitcoin (bool, optional): Indicates if Bitcoin will
be includedin global market capitalization graph.
As default True
start (optional, datetime): Time to start retrieving
graphs data. If not provided get As default None
end (optional, datetime): Time to end retrieving
graphs data.
Returns (dict):
List of lists with timestamp and values
"""
base_url = self.urls["graphs_api"]
if bitcoin:
endpoint = "global/marketcap-total/"
else:
endpoint = "global/marketcap-altcoin/"
url = urljoin(base_url, endpoint)
if start and end:
url += self._add_start_end(url, start, end)
return get(url).json()
def prepare_url(value):
# Issue #1483: Make sure the URL always has a trailing slash
httpbin_url = value.url.rstrip('/') + '/'
def inner(*suffix):
return urljoin(httpbin_url, '/'.join(suffix))
return inner
def prepare_url(value):
# Issue #1483: Make sure the URL always has a trailing slash
httpbin_url = value.url.rstrip('/') + '/'
def inner(*suffix):
return urljoin(httpbin_url, '/'.join(suffix))
return inner
def build_url(current_url, next_url):
if is_url(next_url):
return next_url
else:
return urljoin(current_url, next_url)
def _run(qasm, device, user_id, access_token, shots):
suffix = 'codes/execute'
r = requests.post(urljoin(_api_url, suffix),
data=qasm,
params={"access_token": access_token,
"deviceRunType": device,
"fromCache": "false",
"shots": shots},
headers={"Content-Type": "application/json"})
r.raise_for_status()
r_json = r.json()
execution_id = r_json["id"]
return execution_id
def _get_result(execution_id, access_token, num_retries=300, interval=1):
suffix = 'Executions/{execution_id}'.format(execution_id=execution_id)
for _ in range(num_retries):
r = requests.get(urljoin(_api_url, suffix),
params={"access_token": access_token})
r.raise_for_status()
r_json = r.json()
status = r_json["status"]["id"]
if status == "DONE":
return r_json["result"]
time.sleep(interval)
def __call__(self, value):
if value is None:
return value
return urljoin(self.ref_endpoint, value).rstrip('/') + '/'
def format_endpoint(self, resource_id):
"""
This method concatenates the resource's endpoint with a specified
identifier.
Example: endpoint/<pk>/
"""
if isinstance(resource_id, unicode):
resource_id = resource_id.encode("utf-8")
return urljoin(self.endpoint, quote(
str(resource_id))) + TRAILING_SLASH