def session_id(self):
"""A unique session ID every time the user uses the workflow.
.. versionadded:: 1.25
The session ID persists while the user is using this workflow.
It expires when the user runs a different workflow or closes
Alfred.
"""
if not self._session_id:
sid = os.getenv('_WF_SESSION_ID')
if not sid:
from uuid import uuid4
sid = uuid4().hex
self.setvar('_WF_SESSION_ID', sid)
self._session_id = sid
return self._session_id
python类uuid4()的实例源码
def test_share_target_is_fetched_if_no_target_found(self, mock_retrieve):
entity = base.Share(
guid=str(uuid4()), handle=self.remote_profile.handle, target_guid="notexistingguid",
target_handle=self.remote_profile2.handle, public=True,
)
mock_retrieve.return_value = entities.PostFactory(
guid=entity.target_guid, handle=self.remote_profile2.handle,
)
process_entity_share(entity, self.remote_profile)
mock_retrieve.assert_called_once_with(entity.target_id, sender_key_fetcher=sender_key_fetcher)
self.assertTrue(Content.objects.filter(guid=entity.target_guid, content_type=ContentType.CONTENT).exists())
self.assertTrue(
Content.objects.filter(
guid=entity.guid, share_of__guid=entity.target_guid, content_type=ContentType.SHARE
).exists()
)
def save(self, *args, **kwargs):
if self.parent and self.share_of:
raise ValueError("Can't be both a reply and a share!")
self.cache_data()
if self.parent:
self.content_type = ContentType.REPLY
# Ensure replies have sane values
self.visibility = self.parent.visibility
self.pinned = False
elif self.share_of:
self.content_type = ContentType.SHARE
if not self.pk:
if not self.guid:
self.guid = uuid4()
if self.pinned:
max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"]
if max_order is not None: # If max_order is None, there is likely to be no content yet
self.order = max_order + 1
self.fix_local_uploads()
super().save(*args, **kwargs)
self.cache_related_object_data()
def define_services(config):
"""Define the service settings for the current app.
Arguments:
config (:py:class:`list`): The service configuration required.
Returns:
:py:class:`collections.OrderedDict`: Configured services.
Raises:
:py:class:`ValueError`: If a non-existent service is requested.
"""
services = OrderedDict()
for settings in config:
name = settings['name']
if name not in SERVICES:
logger.warning('unknown service %r', name)
continue
services[uuid4().hex] = SERVICES[name].from_config(**settings)
return services
def test_as_dict(self):
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_spec = OffsetSpec(
app_name=app_name_1, topic=topic_1, partition=partition_1,
from_offset=from_offset_1, until_offset=until_offset_1)
offset_spec_dict = JSONOffsetSpecs.as_dict(offset_spec)
self.assertions_on_offset(
used_value={
"topic": topic_1, "partition": partition_1,
"app_name": app_name_1, "from_offset": from_offset_1,
"until_offset": until_offset_1},
offset_value=offset_spec_dict)
def _fake_vif(cls=osv_vif.VIFOpenVSwitch):
vif = cls(
id=uuid.uuid4(),
vif_name='h_interface',
bridge_name='bridge',
address='3e:94:b7:31:a0:83',
port_profile=osv_objects.vif.VIFPortProfileOpenVSwitch(
interface_id='89eccd45-43e9-43d8-b4cc-4c13db13f782',
profile_id=str(uuid.uuid4()),
),
)
vif.network = osv_objects.network.Network(id=uuid.uuid4(), mtu=1)
subnet = osv_objects.subnet.Subnet(
uuid=uuid.uuid4(),
dns=['192.168.0.1'],
cidr='192.168.0.0/24',
gateway='192.168.0.1',
routes=osv_objects.route.RouteList(objects=[]),
)
subnet.ips = osv_objects.fixed_ip.FixedIPList(objects=[])
subnet.ips.objects.append(
osv_objects.fixed_ip.FixedIP(address='192.168.0.2'))
vif.network.subnets.objects.append(subnet)
return vif
def setUp(self):
super(TestDriverMixin, self).setUp()
self.instance_info = osv_objects.instance_info.InstanceInfo(
uuid=uuid.uuid4(), name='foo')
self.ifname = 'c_interface'
self.netns = '/proc/netns/1234'
# Mock IPDB context managers
self.ipdbs = {}
self.m_bridge_iface = mock.Mock(__exit__=mock.Mock())
self.m_c_iface = mock.Mock()
self.m_h_iface = mock.Mock()
self.h_ipdb, self.h_ipdb_exit = self._mock_ipdb_context_manager(None)
self.c_ipdb, self.c_ipdb_exit = self._mock_ipdb_context_manager(
self.netns)
self.m_create = mock.Mock()
self.c_ipdb.create = mock.Mock(
return_value=mock.Mock(
__enter__=mock.Mock(return_value=self.m_create),
__exit__=mock.Mock()))
def execute(self):
try:
name = self.params['name']
guid = uuid.uuid4()
query = """
INSERT INTO test_table(name, guid) values(%(name)s, %(guid)s);
"""
query_get = """
SELECT id, name, guid from test_table where guid = %(guid)s;
"""
ins = await self.app.db.execute('test_db', query, {'name': name, 'guid': guid}, 'insert')
_data = await self.app.db.execute('test_db', query_get, {'guid':guid}, 'select')
self.result = [dict(d) for d in _data]
except Exception as e:
self.errors.append({
'code': 502,
'message': '{}'.format(e)
})
return self.result
def slack(text: hug.types.text):
"""Returns JSON containing an attachment with an image url for the Slack integration"""
title = text
if text == 'top250':
top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
top250_page = html.fromstring(top250_res.text)
candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')
title = random.choice(candidates).text
return dict(
response_type='in_channel',
attachments=[
dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
]
)
def test_unregistered_event(self):
project = self.project # force creation
url = '/plugins/github/organizations/{}/webhook/'.format(
project.organization.id,
)
secret = 'b3002c3e321d4b7880360d397db2ccfd'
OrganizationOption.objects.set_value(
organization=project.organization,
key='github:webhook_secret',
value=secret,
)
response = self.client.post(
path=url,
data=PUSH_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='UnregisteredEvent',
HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 204
def test_invalid_signature_event(self):
project = self.project # force creation
url = '/plugins/github/organizations/{}/webhook/'.format(
project.organization.id,
)
secret = '2d7565c3537847b789d6995dca8d9f84'
OrganizationOption.objects.set_value(
organization=project.organization,
key='github:webhook_secret',
value=secret,
)
response = self.client.post(
path=url,
data=PUSH_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='push',
HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 401
def test_simple(self):
url = '/plugins/github/installations/webhook/'
response = self.client.post(
path=url,
data=INSTALLATION_EVENT_EXAMPLE,
content_type='application/json',
HTTP_X_GITHUB_EVENT='installation',
HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9',
HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
)
assert response.status_code == 204
assert Integration.objects.filter(
provider='github_apps',
external_id=2,
name='octocat',
).exists()
def get_webhook_secret(self, organization):
lock = locks.get('github:webhook-secret:{}'.format(organization.id), duration=60)
with lock.acquire():
# TODO(dcramer): get_or_create would be a useful native solution
secret = OrganizationOption.objects.get_value(
organization=organization,
key='github:webhook_secret',
)
if secret is None:
secret = uuid4().hex + uuid4().hex
OrganizationOption.objects.set_value(
organization=organization,
key='github:webhook_secret',
value=secret,
)
return secret
def test_delete_upload_file(self, mock_open):
resource_id = str(uuid.uuid4())
path = '/doesnt_exist/resources/{}/{}/{}'.format(
resource_id[0:3], resource_id[3:6], resource_id[6:]
)
patcher = fake_filesystem_unittest.Patcher()
patcher.setUp()
patcher.fs.CreateFile(path)
assert os.path.exists(path)
delete_local_uploaded_file(resource_id)
assert not os.path.exists(path)
patcher.tearDown()
def test_delete_file_not_deleted_if_resources_first(self, mock_open):
resource_id = str(uuid.uuid4())
path = '/doesnt_exist/resources/{}'.format(resource_id)
patcher = fake_filesystem_unittest.Patcher()
patcher.setUp()
patcher.fs.CreateFile(path)
assert os.path.exists(path)
with mock.patch('ckanext.validation.utils.get_local_upload_path',
return_value=path):
delete_local_uploaded_file(resource_id)
assert not os.path.exists(path)
assert os.path.exists('/doesnt_exist/resources')
patcher.tearDown()
def test_delete_file_not_deleted_if_resources_second(self, mock_open):
resource_id = str(uuid.uuid4())
path = '/doesnt_exist/resources/data/{}'.format(resource_id)
patcher = fake_filesystem_unittest.Patcher()
patcher.setUp()
patcher.fs.CreateFile(path)
assert os.path.exists(path)
with mock.patch('ckanext.validation.utils.get_local_upload_path',
return_value=path):
delete_local_uploaded_file(resource_id)
assert not os.path.exists(path)
assert os.path.exists('/doesnt_exist/resources')
patcher.tearDown()
def test_delete_passes_if_os_exeception(self, mock_open):
resource_id = str(uuid.uuid4())
path = '/doesnt_exist/resources/{}/{}/{}'.format(
resource_id[0:3], resource_id[3:6], resource_id[6:]
)
patcher = fake_filesystem_unittest.Patcher()
patcher.setUp()
patcher.fs.CreateFile(path)
assert os.path.exists(path)
with mock.patch('ckanext.validation.utils.os.remove',
side_effect=OSError):
delete_local_uploaded_file(resource_id)
patcher.tearDown()
def get_ghost_replay(self, login):
replay_name = 'dedimania_{}.Replay.Gbx'.format(uuid.uuid4().hex)
try:
await self.instance.gbx('SaveBestGhostsReplay', login, replay_name)
except:
return None
try:
async with self.instance.storage.open('UserData/Replays/{}'.format(replay_name)) as ghost_file:
return await ghost_file.read()
except FileNotFoundError as e:
message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
'the driven replay!'
logger.error('Please make sure we can access the dedicated files. Configure your storage driver correctly! '
'{}'.format(str(e)))
await self.instance.chat(message)
raise DedimaniaException('Can\'t access replay file')
except PermissionError as e:
message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
'the driven replay because of an permission problem!'
logger.error('We can\'t read files in the dedicated folder, your permissions don\'t allow us to read it! '
'{}'.format(str(e)))
await self.instance.chat(message)
raise DedimaniaException('Can\'t access files due to permission problems')
def sync_rings_request(self, broker_token, builders_only=False):
"""Request for peers to sync rings from leader.
NOTE: this action must only be performed by the cluster leader.
:param broker_token: token to identify sync request.
:param builders_only: if False, tell peers to sync builders only (not
rings).
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
if builders_only:
rq['sync-only-builders'] = 1
rq['broker-token'] = broker_token
rq['broker-timestamp'] = "{:f}".format(time.time())
rq['builder-broker'] = self._hostname
return rq
def notify_leader_changed(self, token):
"""Notify peers that leader has changed.
The token passed in must be that associated with the sync we claim to
have been interrupted. It will be re-used by the restored leader once
it receives this notification.
NOTE: this action must only be performed by the cluster leader that
has relinquished it's leader status as part of the current hook
context.
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_NOTIFY_LEADER_CHANGED] = token
return rq
def notify_storage_rings_available():
"""Notify peer swift-storage relations that they should synchronise ring
and builder files.
Note that this should only be called from the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Ring availability storage-relation broadcast requested by "
"non-leader - skipping", level=WARNING)
return
hostname = get_hostaddr()
hostname = format_ipv6_addr(hostname) or hostname
path = os.path.basename(get_www_dir())
rings_url = 'http://{}/{}'.format(hostname, path)
trigger = uuid.uuid4()
# Notify storage nodes that there is a new ring to fetch.
log("Notifying storage nodes that new rings are ready for sync.",
level=INFO)
for relid in relation_ids('swift-storage'):
relation_set(relation_id=relid, swift_hash=get_swift_hash(),
rings_url=rings_url, trigger=trigger)
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token2'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1')
self.assertEqual({'trigger': 'token2',
'broker-token': None,
'builder-broker': None,
'broker-timestamp': None,
'peers-only': '1',
'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': None,
'stop-proxy-service-ack': 'token1',
'sync-only-builders': None}, rq)
template_keys = set(rpc.template())
self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_cluster_rpc_sync_request(self, mock_uuid, mock_time):
mock_time.time = mock.Mock(return_value=float(1.234))
mock_uuid.uuid4.return_value = 'token2'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.sync_rings_request('token1')
self.assertEqual({'trigger': 'token2',
'broker-token': 'token1',
'broker-timestamp': '1.234000',
'builder-broker': '1.2.3.4',
'peers-only': None,
'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': None,
'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq)
template_keys = set(rpc.template())
self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_cluster_rpc_notify_leader_changed(self, mock_uuid):
mock_uuid.uuid4.return_value = 'e4b67426-6cc0-4aa3-829d-227999cd0a75'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.notify_leader_changed('token1')
self.assertEqual({'trigger': 'e4b67426-6cc0-4aa3-829d-227999cd0a75',
'broker-token': None,
'builder-broker': None,
'broker-timestamp': None,
'peers-only': None,
'leader-changed-notification': 'token1',
'stop-proxy-service': None,
'stop-proxy-service-ack': None,
'resync-request': None,
'sync-only-builders': None}, rq)
template_keys = set(rpc.template().keys())
self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_disable_enable_user(event_loop):
async with base.CleanController() as controller:
username = 'test-disable{}'.format(uuid.uuid4())
user = await controller.add_user(username)
await user.disable()
assert not user.enabled
assert user.disabled
fresh = await controller.get_user(username) # fetch fresh copy
assert not fresh.enabled
assert fresh.disabled
await user.enable()
assert user.enabled
assert not user.disabled
fresh = await controller.get_user(username) # fetch fresh copy
assert fresh.enabled
assert not fresh.disabled
def test_change_user_password(event_loop):
async with base.CleanController() as controller:
username = 'test-password{}'.format(uuid.uuid4())
user = await controller.add_user(username)
await user.set_password('password')
# Check that we can connect with the new password.
new_connection = None
try:
kwargs = controller.connection().connect_params()
kwargs['username'] = username
kwargs['password'] = 'password'
new_connection = await Connection.connect(**kwargs)
except JujuAPIError:
raise AssertionError('Unable to connect with new password')
finally:
if new_connection:
await new_connection.close()
def test_grant_revoke(event_loop):
async with base.CleanController() as controller:
username = 'test-grant{}'.format(uuid.uuid4())
user = await controller.add_user(username)
await user.grant('superuser')
assert user.access == 'superuser'
fresh = await controller.get_user(username) # fetch fresh copy
assert fresh.access == 'superuser'
await user.grant('login')
assert user.access == 'login'
fresh = await controller.get_user(username) # fetch fresh copy
assert fresh.access == 'login'
await user.revoke()
assert user.access is ''
fresh = await controller.get_user(username) # fetch fresh copy
assert fresh.access is ''
def _(node, dask, scope):
def retrieve(term):
try:
return scope[term]
except KeyError:
scope[term] = ret = _ltree_to_dask(term, dask, scope)
return ret
name = '%s-%s' % (node.func, uuid4())
dask[name] = (
apply,
retrieve(node.func),
list(map(retrieve, node.args)),
(dict, list(map(list, valmap(retrieve, node.kwargs).items()))),
)
scope[node] = name
return name
def test_get_saved_query(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys):
saved_query_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
content_type='application/json',
body=json.dumps({'saved_query': SAVED_QUERY_RESPONSE}))
api.get_saved_query(saved_query_id)
out, err = capsys.readouterr()
assert "Name:" in out
assert "Logs:" in out
assert "ID:" in out
assert "Statement:" in out
assert "Time range:" in out
assert "From:" in out
assert "To:" in out
def test_patch_saved_query_none_fields(mocked_url, mocked_rw_apikey, mocked_account_resource_id,
capsys):
test_saved_query_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200,
content_type='application/json',
body=json.dumps({"saved_query": SAVED_QUERY_RESPONSE}))
api.update_saved_query(test_saved_query_id, name=None,
statement="new_statement")
out, err = capsys.readouterr()
assert "Saved query with id %s updated" % test_saved_query_id in out
body = json.loads(httpretty.last_request().body)['saved_query']
assert "name" not in body
assert "statement" in body['leql']