def run(self, sync):
app = sync.app
remote = sync.get('projects/%s/branches/' % urlparse.quote_plus(self.project_name))
remote_branches = set()
for x in remote:
m = self.branch_re.match(x['ref'])
if m:
remote_branches.add(m.group(1))
with app.db.getSession() as session:
local = {}
project = session.getProjectByName(self.project_name)
for branch in project.branches:
local[branch.name] = branch
local_branches = set(local.keys())
for name in local_branches-remote_branches:
session.delete(local[name])
self.log.info("Deleted branch %s from project %s in local DB.", name, project.name)
for name in remote_branches-local_branches:
project.createBranch(name)
self.log.info("Added branch %s to project %s in local DB.", name, project.name)
python类quote_plus()的实例源码
def main(total_users, host, user, password, db_name):
engine = sqlalchemy.create_engine(
'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format(
user=user,
password=parse.quote_plus(password),
host=host,
db_name=db_name))
session = create_session(engine)
try:
populate_db(session, total_users)
finally:
session.close()
def format_twitter_post_to_share(post):
return quote_plus(format_twitter_post(post))
test_confidential_client_flow.py 文件源码
项目:globus-sdk-python
作者: globus
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_oauth2_start_flow_default(self):
"""
Starts a default GlobusAuthorizationCodeFlowManager,
Confirms flow is initialized as expected, and can be used.
"""
# starting with no flow
self.assertIsNone(self.cac.current_oauth2_flow_manager)
# confirms flow initialized with default flow values
flow = self.cac.oauth2_start_flow("uri")
self.assertIsInstance(flow, GlobusAuthorizationCodeFlowManager)
self.assertEqual(flow.redirect_uri, "uri")
self.assertEqual(flow.requested_scopes,
" ".join(DEFAULT_REQUESTED_SCOPES))
self.assertEqual(flow.state, "_default")
self.assertFalse(flow.refresh_tokens)
# confirm client can get url via flow
url_res = self.cac.oauth2_get_authorize_url()
expected_vals = [self.cac.base_url + "v2/oauth2/authorize?",
"client_id=" + self.cac.client_id,
"redirect_uri=" + "uri",
"scope=" + quote_plus(
" ".join(DEFAULT_REQUESTED_SCOPES)),
"state=" + "_default",
"access_type=" + "online"]
for val in expected_vals:
self.assertIn(val, url_res)
# confirm client can try exchanging code for tokens via flow
with self.assertRaises(AuthAPIError) as apiErr:
self.cac.oauth2_exchange_code_for_tokens("invalid_code")
self.assertEqual(apiErr.exception.http_status, 401)
self.assertEqual(apiErr.exception.code, "Error")
def test_oauth2_start_flow_default(self):
"""
Starts a default GlobusNativeAppFlowManager,
Confirms flow is initialized as expected, and can be used.
"""
# starting with no flow
self.assertIsNone(self.nac.current_oauth2_flow_manager)
# confirms flow initialized with default flow values
flow = self.nac.oauth2_start_flow()
self.assertIsInstance(flow, GlobusNativeAppFlowManager)
self.assertEqual(flow.redirect_uri,
self.nac.base_url + "v2/web/auth-code")
self.assertEqual(flow.requested_scopes,
" ".join(DEFAULT_REQUESTED_SCOPES))
self.assertEqual(flow.state, "_default")
self.assertFalse(flow.refresh_tokens)
# confirm client can get url via flow
url_res = self.nac.oauth2_get_authorize_url()
expected_vals = [self.nac.base_url + "v2/oauth2/authorize?",
"client_id=" + self.nac.client_id,
"redirect_uri=" +
quote_plus(self.nac.base_url + "v2/web/auth-code"),
"scope=" + quote_plus(
" ".join(DEFAULT_REQUESTED_SCOPES)),
"state=" + "_default",
"code_challenge=" + quote_plus(flow.challenge),
"access_type=" + "online"]
for val in expected_vals:
self.assertIn(val, url_res)
# confirm client can try exchanging code for tokens via flow
with self.assertRaises(AuthAPIError) as apiErr:
self.nac.oauth2_exchange_code_for_tokens("invalid_code")
self.assertEqual(apiErr.exception.http_status, 401)
self.assertEqual(apiErr.exception.code, "Error")
def test_oauth2_start_flow_specified(self):
"""
Starts a GlobusNativeAppFlowManager with specified parameters,
Confirms flow is initialized as expected, and can be used.
"""
# starting with no flow
self.assertIsNone(self.nac.current_oauth2_flow_manager)
# confirms flow initialized with specified values
flow = self.nac.oauth2_start_flow(
requested_scopes="scopes", redirect_uri="uri",
state="state", verifier=("v" * 43), refresh_tokens=True)
self.assertIsInstance(flow, GlobusNativeAppFlowManager)
self.assertEqual(flow.redirect_uri, "uri")
self.assertEqual(flow.requested_scopes, "scopes")
self.assertEqual(flow.state, "state")
self.assertTrue(flow.refresh_tokens)
# confirm client can get url via flow
url_res = self.nac.oauth2_get_authorize_url()
verifier, remade_challenge = make_native_app_challenge("v" * 43)
expected_vals = [self.nac.base_url + "v2/oauth2/authorize?",
"client_id=" + self.nac.client_id,
"redirect_uri=" + "uri",
"scope=" + "scopes",
"state=" + "state",
"code_challenge=" + quote_plus(remade_challenge),
"access_type=" + "offline"]
for val in expected_vals:
self.assertIn(val, url_res)
# confirm client can try exchanging code for tokens via flow
with self.assertRaises(AuthAPIError) as apiErr:
self.nac.oauth2_exchange_code_for_tokens("invalid_code")
self.assertEqual(apiErr.exception.http_status, 401)
self.assertEqual(apiErr.exception.code, "Error")
def build_dynamic_field(self, group, field_meta):
"""
Builds a field based on JIRA's meta field information
"""
schema = field_meta['schema']
# set up some defaults for form fields
fieldtype = 'text'
fkwargs = {
'label': field_meta['name'],
'required': field_meta['required'],
}
# override defaults based on field configuration
if (schema['type'] in ['securitylevel', 'priority']
or schema.get('custom') == JIRA_CUSTOM_FIELD_TYPES['select']):
fieldtype = 'select'
fkwargs['choices'] = self.make_choices(field_meta.get('allowedValues'))
elif field_meta.get('autoCompleteUrl') and \
(schema.get('items') == 'user' or schema['type'] == 'user'):
fieldtype = 'select'
sentry_url = '/api/0/issues/%s/plugins/%s/autocomplete' % (group.id, self.slug)
fkwargs['url'] = '%s?jira_url=%s' % (
sentry_url, quote_plus(field_meta['autoCompleteUrl']),
)
fkwargs['has_autocomplete'] = True
fkwargs['placeholder'] = 'Start typing to search for a user'
elif schema['type'] in ['timetracking']:
# TODO: Implement timetracking (currently unsupported alltogether)
return None
elif schema.get('items') in ['worklog', 'attachment']:
# TODO: Implement worklogs and attachments someday
return None
elif schema['type'] == 'array' and schema['items'] != 'string':
fieldtype = 'select'
fkwargs.update(
{
'multiple': True,
'choices': self.make_choices(field_meta.get('allowedValues')),
'default': []
}
)
# break this out, since multiple field types could additionally
# be configured to use a custom property instead of a default.
if schema.get('custom'):
if schema['custom'] == JIRA_CUSTOM_FIELD_TYPES['textarea']:
fieldtype = 'textarea'
fkwargs['type'] = fieldtype
return fkwargs
def encode_uri(uri):
split = list(urlsplit(uri))
split[1] = split[1].encode('idna').decode('ascii')
split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
query = list((q, quote_plus(v.encode('utf-8')))
for (q, v) in parse_qsl(split[3]))
split[3] = urlencode(query).decode('ascii')
return urlunsplit(split)
def get(self, arg, value):
"""Retrieve single user record by id or username
Warnings:
User display names are not unique. If using `display_name`, method will fail if multiple Users are returned
with the same display name
Keyword Args:
id (str): Full User ID
display_name (str): User display name
Returns:
User: User instance matching provided inputs
Raises:
TypeError: Unexpected or more than one keyword argument provided
ValueError: No matching user found based on provided inputs, or multiple Users with same display name
"""
if arg == 'id':
response = self._swimlane.request('get', 'user/{}'.format(value))
try:
user_data = response.json()
except ValueError:
raise ValueError('Unable to find user with ID "{}"'.format(value))
return User(self._swimlane, user_data)
else:
response = self._swimlane.request('get', 'user/search?query={}'.format(quote_plus(value)))
matched_users = response.json()
# Display name not unique, fail if multiple users share the same target display name
target_matches = []
for user_data in matched_users:
user_display_name = user_data.get('displayName')
if user_display_name == value:
target_matches.append(user_data)
# No matches
if not target_matches:
raise ValueError('Unable to find user with display name "{}"'.format(value))
# Multiple matches
if len(target_matches) > 1:
raise ValueError('Multiple users returned with display name "{}". Matching user IDs: {}'.format(
value,
', '.join(['"{}"'.format(r['id']) for r in target_matches])
))
return User(self._swimlane, target_matches[0])
def test_oauth2_get_authorize_url_native(self):
"""
Starts an auth flow with a NativeAppFlowManager, gets the authorize url
validates expected results with both default and specified parameters.
"""
ac = globus_sdk.AuthClient(
client_id=get_client_data()["native_app_client1"]["id"])
# default parameters for starting auth flow
flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager(ac)
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
expected_vals = [ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" +
quote_plus(ac.base_url + "v2/web/auth-code"),
"scope=" + quote_plus(
" ".join(DEFAULT_REQUESTED_SCOPES)),
"state=" + "_default",
"response_type=" + "code",
"code_challenge=" +
quote_plus(flow_manager.challenge),
"code_challenge_method=" + "S256",
"access_type=" + "online"]
for val in expected_vals:
self.assertIn(val, url_res)
# starting flow with specified paramaters
flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager(
ac, requested_scopes="scopes", redirect_uri="uri",
state="state", verifier=("a" * 43), refresh_tokens=True)
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
verifier, remade_challenge = make_native_app_challenge("a" * 43)
expected_vals = [ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" + "uri",
"scope=" + "scopes",
"state=" + "state",
"response_type=" + "code",
"code_challenge=" + quote_plus(remade_challenge),
"code_challenge_method=" + "S256",
"access_type=" + "offline"]
for val in expected_vals:
self.assertIn(val, url_res)
def test_oauth2_get_authorize_url_confidential(self):
"""
Starts an auth flow with a NativeAppFlowManager, gets the authorize url
validates expected results with both default and specified parameters.
"""
ac = globus_sdk.AuthClient(
client_id=get_client_data()["confidential_app_client1"]["id"])
# default parameters for starting auth flow
flow_manager = globus_sdk.auth.GlobusAuthorizationCodeFlowManager(
ac, "uri")
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
expected_vals = [ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" + "uri",
"scope=" + quote_plus(
" ".join(DEFAULT_REQUESTED_SCOPES)),
"state=" + "_default",
"response_type=" + "code",
"access_type=" + "online"]
for val in expected_vals:
self.assertIn(val, url_res)
# starting flow with specified paramaters
flow_manager = globus_sdk.auth.GlobusAuthorizationCodeFlowManager(
ac, requested_scopes="scopes", redirect_uri="uri",
state="state", refresh_tokens=True)
ac.current_oauth2_flow_manager = flow_manager
# get url_and validate results
url_res = ac.oauth2_get_authorize_url()
expected_vals = [ac.base_url + "v2/oauth2/authorize?",
"client_id=" + ac.client_id,
"redirect_uri=" + "uri",
"scope=" + "scopes",
"state=" + "state",
"response_type=" + "code",
"access_type=" + "offline"]
for val in expected_vals:
self.assertIn(val, url_res)
def selectors_to_qs(selectors):
"""Convert list of selector dict to query string.
:param list selectors: list of dicts representing selectors
:returns: querystring
:rtype: str|None
"""
qs = None
if not isinstance(selectors, list) or not selectors:
return qs
qs_list = []
for s in selectors:
key = s.get('key')
if not key:
# invalid w/o key
break
val = s.get('value')
# default missing op to equal with the assumption that the
# intent is exists or equal.
op = s.get('op', '=')
# set-based has equivalence to equality-based, therefore leverage
# the set-based formatting
if op in ['=', '==', 'in']:
# in / equality / exists
if val is None:
qs_list.append('{}'.format(key))
else:
if not isinstance(val, list):
val = [val]
qs_list.append('{} in ({})'.format(key, ','.join(val)))
elif op in ['!=', 'notin']:
# not in / non-equality / not exists
if val is None:
qs_list.append('!{}'.format(key))
else:
if not isinstance(val, list):
val = [val]
qs_list.append('{} notin ({})'.format(key, ','.join(val)))
else:
# unknown op
break
else:
# successfully processed each selector; format as proper query string
qs = '?labelSelector=' + url_parse.quote_plus(','.join(qs_list))
return qs