def format_yahoo_index_url(symbol, start_date, end_date):
"""
Format a URL for querying Yahoo Finance for Index data.
"""
return (
'http://ichart.finance.yahoo.com/table.csv?' + urlencode({
's': symbol,
# start_date month, zero indexed
'a': start_date.month - 1,
# start_date day
'b': start_date.day,
# start_date year
'c': start_date.year,
# end_date month, zero indexed
'd': end_date.month - 1,
# end_date day
'e': end_date.day,
# end_date year
'f': end_date.year,
# daily frequency
'g': 'd',
})
)
python类urlencode()的实例源码
def unauthorized_token_request(self):
"""Return request for unauthorized token (first stage)"""
params = self.request_token_extra_arguments()
params.update(self.get_scope_argument())
key, secret = self.get_key_and_secret()
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
auth = OAuth1(
key,
secret,
callback_uri=self.get_redirect_uri(state),
decoding=decoding,
signature_method=SIGNATURE_HMAC,
signature_type=SIGNATURE_TYPE_QUERY
)
url = self.REQUEST_TOKEN_URL + '?' + urlencode(params)
url, _, _ = auth.client.sign(url)
return url
def auth_url(self):
"""Return redirect url"""
state = None
if self.STATE_PARAMETER or self.REDIRECT_STATE:
# Store state in session for further request validation. The state
# value is passed as state parameter (as specified in OAuth2 spec),
# but also added to redirect_uri, that way we can still verify the
# request if the provider doesn't implement the state parameter.
# Reuse token if any.
name = self.name + '_state'
state = self.strategy.session_get(name) or self.state_token()
self.strategy.session_set(name, state)
params = self.auth_params(state)
params.update(self.get_scope_argument())
params.update(self.auth_extra_arguments())
return self.AUTHORIZATION_URL + '?' + urlencode(params)
def modify_start_url(self, start_url):
"""
Given a SAML redirect URL, parse it and change the ID to
a consistent value, so the request is always identical.
"""
# Parse the SAML Request URL to get the XML being sent to TestShib
url_parts = urlparse(start_url)
query = dict((k, v[0]) for (k, v) in
parse_qs(url_parts.query).items())
xml = OneLogin_Saml2_Utils.decode_base64_and_inflate(
query['SAMLRequest']
)
# Modify the XML:
xml = xml.decode()
xml, changed = re.subn(r'ID="[^"]+"', 'ID="TEST_ID"', xml)
self.assertEqual(changed, 1)
# Update the URL to use the modified query string:
query['SAMLRequest'] = OneLogin_Saml2_Utils.deflate_and_base64_encode(
xml
)
url_parts = list(url_parts)
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
def run_test(test_no):
"""Run a test from its index."""
qs = urlencode({'case': test_no, 'agent': USER_AGENT})
url = server + '/runCase?{}'.format(qs)
run_ws(url)
def run_test_cases(case_tuples):
"""Run a number of test cases from their 'case tuple'"""
for case_tuple in case_tuples:
print("\n[{}]".format(case_tuple))
qs = urlencode({'agent': USER_AGENT})
url = server + '/runCase?{}'.format(qs)
run_ws(url)
update_report()
def update_report():
"""Tell wstest to update reports."""
print("Updating reports...")
qs = urlencode({'agent': USER_AGENT})
url = server + "/updateReports?{}".format(qs)
for _ in WebSocket(url):
pass
def prepare_request_uri(self, redirect_uri='http://localhost:3000/', scope=list(), force_verify=False, state=''):
q = Qry('authorize')
q.add_param('response_type', 'token')
q.add_param('client_id', self.client_id)
q.add_param('redirect_uri', redirect_uri)
q.add_param('scope', ' '.join(scope))
q.add_param('force_verify', str(force_verify).lower())
q.add_param('state', state)
return '?'.join([q.url, urlencode(q.params)])
def prepare_token_uri(self, scope=list()):
q = Qry('token')
q.add_param('client_id', self.client_id)
q.add_param('client_secret', self.client_secret)
q.add_param('grant_type', 'client_credentials')
q.add_param('scope', ' '.join(scope))
return '?'.join([q.url, urlencode(q.params)])
def prepare_revoke_uri(self, token):
q = Qry('revoke')
q.add_param('client_id', self.client_id)
q.add_param('token', token)
return '?'.join([q.url, urlencode(q.params)])
def auth_url(self):
"""Return authorization redirect url."""
key, secret = self.get_key_and_secret()
callback = self.strategy.absolute_uri(self.redirect_uri)
callback = sub(r'^https', 'http', callback)
query = urlencode({'cb': callback})
return 'https://www.twilio.com/authorize/{0}?{1}'.format(key, query)
def revoke_token(self, token, uid):
if self.REVOKE_TOKEN_URL:
url = self.revoke_token_url(token, uid)
params = self.revoke_token_params(token, uid)
headers = self.revoke_token_headers(token, uid)
data = urlencode(params) if self.REVOKE_TOKEN_METHOD != 'GET' \
else None
response = self.request(url, params=params, headers=headers,
data=data, method=self.REVOKE_TOKEN_METHOD)
return self.process_revoke_token_response(response)
def oauth_authorization_request(self, token):
"""Generate OAuth request to authorize token."""
if not isinstance(token, dict):
token = parse_qs(token)
params = self.auth_extra_arguments() or {}
params.update(self.get_scope_argument())
params[self.OAUTH_TOKEN_PARAMETER_NAME] = token.get(
self.OAUTH_TOKEN_PARAMETER_NAME
)
state = self.get_or_create_state()
params[self.REDIRECT_URI_PARAMETER_NAME] = self.get_redirect_uri(state)
return '{0}?{1}'.format(self.authorization_url(), urlencode(params))
def auth_url(self):
"""Return redirect url"""
state = self.get_or_create_state()
params = self.auth_params(state)
params.update(self.get_scope_argument())
params.update(self.auth_extra_arguments())
params = urlencode(params)
if not self.REDIRECT_STATE:
# redirect_uri matching is strictly enforced, so match the
# providers value exactly.
params = unquote(params)
return '{0}?{1}'.format(self.authorization_url(), params)
def register_oauth_flow():
"""Register URIs used for OAuth flow."""
# https://developer.github.com/v3/oauth/
httpretty.register_uri(
httpretty.GET,
'https://github.com/login/oauth/authorize',
status=200,
body='User required to accept/reject scopes on this page'
)
def access_token_callback(request, uri, headers):
assert request.method == 'POST'
parsed_body = request.parse_request_body(request.body)
assert 'client_id' in parsed_body
assert 'client_secret' in parsed_body
assert 'code' in parsed_body
assert 'redirect_uri' in parsed_body
if parsed_body['code'][0] == 'bad_verification_code':
body = dict(
error_uri='http://developer.github.com/v3/oauth/'
'#bad-verification-code',
error_description='The code passed is '
'incorrect or expired.',
error='bad_verification_code',
)
else:
body = dict(
access_token='%s_token' % parsed_body['code'][0],
scope='admin:repo_hook,user:email',
token_type='bearer',
)
headers['content-type'] = 'application/x-www-form-urlencoded'
return (
200,
headers,
urllib_parse.urlencode(body)
)
httpretty.register_uri(
httpretty.POST,
'https://github.com/login/oauth/access_token',
body=access_token_callback,
)
def record_to_dict(record):
# Encodes record title if it is not empty.
if record.title:
record.title = record.title.encode('ascii', 'ignore').decode('utf-8')
bbox = wkt2geom(record.wkt_geometry)
min_x, min_y, max_x, max_y = bbox[0], bbox[1], bbox[2], bbox[3]
record_dict = {
'title': record.title,
'abstract': record.abstract,
'title_alternate': record.title_alternate,
'checks_list': [],
'bbox': bbox,
'min_x': min_x,
'min_y': min_y,
'max_x': max_x,
'max_y': max_y,
'source': record.source,
'source_type': record.type,
'tile_url': '/layer/%s/wmts/%s/default_grid/{z}/{x}/{y}.png' % (record.identifier, record.title_alternate),
'layer_date': record.date_modified,
'layer_originator': record.creator,
'layer_identifier': record.identifier,
'links': {
'xml': '/'.join(['layer', record.identifier, 'xml']),
'yml': '/'.join(['layer', record.identifier, 'yml']),
'png': '/'.join(['layer', record.identifier, 'png'])
},
# 'rectangle': box(min_x, min_y, max_x, max_y),
'layer_geoshape': {
'type': 'envelope',
'coordinates': [
[min_x, max_y], [max_x, min_y]
]
}
}
if(record.format == 'OGC:WMS'):
legend_opts = {
'SERVICE': 'WMS',
'VERSION': '1.1.1',
'REQUEST': 'GetLegendGraphic',
'FORMAT': 'image/png',
'LAYER': record.title_alternate
}
record_dict['legend_url'] = '/layer/%s/service?' % record.identifier + urlencode(legend_opts)
record_dict = include_registry_tags(record_dict, record.xml)
if record.links:
record_dict['references'] = parse_references(record.links)
if record.source:
record_dict['source_host'] = urlparse(record.source).netloc
return record_dict