python类uuid4()的实例源码

workflow3.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def session_id(self):
        """A unique session ID every time the user uses the workflow.

        .. versionadded:: 1.25

        The session ID persists while the user is using this workflow.
        It expires when the user runs a different workflow or closes
        Alfred.

        """
        if not self._session_id:
            sid = os.getenv('_WF_SESSION_ID')
            if not sid:
                from uuid import uuid4
                sid = uuid4().hex
                self.setvar('_WF_SESSION_ID', sid)

            self._session_id = sid

        return self._session_id
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_share_target_is_fetched_if_no_target_found(self, mock_retrieve):
        entity = base.Share(
            guid=str(uuid4()), handle=self.remote_profile.handle, target_guid="notexistingguid",
            target_handle=self.remote_profile2.handle, public=True,
        )
        mock_retrieve.return_value = entities.PostFactory(
            guid=entity.target_guid, handle=self.remote_profile2.handle,
        )
        process_entity_share(entity, self.remote_profile)
        mock_retrieve.assert_called_once_with(entity.target_id, sender_key_fetcher=sender_key_fetcher)
        self.assertTrue(Content.objects.filter(guid=entity.target_guid, content_type=ContentType.CONTENT).exists())
        self.assertTrue(
            Content.objects.filter(
                guid=entity.guid, share_of__guid=entity.target_guid, content_type=ContentType.SHARE
            ).exists()
        )
models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        if self.parent and self.share_of:
            raise ValueError("Can't be both a reply and a share!")
        self.cache_data()

        if self.parent:
            self.content_type = ContentType.REPLY
            # Ensure replies have sane values
            self.visibility = self.parent.visibility
            self.pinned = False
        elif self.share_of:
            self.content_type = ContentType.SHARE

        if not self.pk:
            if not self.guid:
                self.guid = uuid4()
            if self.pinned:
                max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"]
                if max_order is not None:  # If max_order is None, there is likely to be no content yet
                    self.order = max_order + 1

        self.fix_local_uploads()
        super().save(*args, **kwargs)
        self.cache_related_object_data()
__init__.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def define_services(config):
    """Define the service settings for the current app.

    Arguments:
      config (:py:class:`list`): The service configuration required.

    Returns:
      :py:class:`collections.OrderedDict`: Configured services.

    Raises:
      :py:class:`ValueError`: If a non-existent service is requested.

    """
    services = OrderedDict()
    for settings in config:
        name = settings['name']
        if name not in SERVICES:
            logger.warning('unknown service %r', name)
            continue
        services[uuid4().hex] = SERVICES[name].from_config(**settings)
    return services
test_json_kafka_offsets.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_as_dict(self):
        topic_1 = str(uuid.uuid4())
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = str(uuid.uuid4())

        offset_spec = OffsetSpec(
            app_name=app_name_1, topic=topic_1, partition=partition_1,
            from_offset=from_offset_1, until_offset=until_offset_1)
        offset_spec_dict = JSONOffsetSpecs.as_dict(offset_spec)
        self.assertions_on_offset(
            used_value={
                "topic": topic_1, "partition": partition_1,
                "app_name": app_name_1, "from_offset": from_offset_1,
                "until_offset": until_offset_1},
            offset_value=offset_spec_dict)
fake.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _fake_vif(cls=osv_vif.VIFOpenVSwitch):
    vif = cls(
        id=uuid.uuid4(),
        vif_name='h_interface',
        bridge_name='bridge',
        address='3e:94:b7:31:a0:83',
        port_profile=osv_objects.vif.VIFPortProfileOpenVSwitch(
            interface_id='89eccd45-43e9-43d8-b4cc-4c13db13f782',
            profile_id=str(uuid.uuid4()),
        ),
    )
    vif.network = osv_objects.network.Network(id=uuid.uuid4(), mtu=1)
    subnet = osv_objects.subnet.Subnet(
        uuid=uuid.uuid4(),
        dns=['192.168.0.1'],
        cidr='192.168.0.0/24',
        gateway='192.168.0.1',
        routes=osv_objects.route.RouteList(objects=[]),
    )
    subnet.ips = osv_objects.fixed_ip.FixedIPList(objects=[])
    subnet.ips.objects.append(
        osv_objects.fixed_ip.FixedIP(address='192.168.0.2'))
    vif.network.subnets.objects.append(subnet)
    return vif
test_binding.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestDriverMixin, self).setUp()
        self.instance_info = osv_objects.instance_info.InstanceInfo(
            uuid=uuid.uuid4(), name='foo')
        self.ifname = 'c_interface'
        self.netns = '/proc/netns/1234'

        # Mock IPDB context managers
        self.ipdbs = {}
        self.m_bridge_iface = mock.Mock(__exit__=mock.Mock())
        self.m_c_iface = mock.Mock()
        self.m_h_iface = mock.Mock()
        self.h_ipdb, self.h_ipdb_exit = self._mock_ipdb_context_manager(None)
        self.c_ipdb, self.c_ipdb_exit = self._mock_ipdb_context_manager(
            self.netns)
        self.m_create = mock.Mock()
        self.c_ipdb.create = mock.Mock(
            return_value=mock.Mock(
                __enter__=mock.Mock(return_value=self.m_create),
                __exit__=mock.Mock()))
methods.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def execute(self):
        try:
            name = self.params['name']
            guid =  uuid.uuid4()
            query = """
            INSERT INTO test_table(name, guid) values(%(name)s, %(guid)s);
            """
            query_get = """
            SELECT id, name, guid from test_table where guid = %(guid)s;
            """
            ins = await self.app.db.execute('test_db', query, {'name': name, 'guid': guid}, 'insert')
            _data = await self.app.db.execute('test_db', query_get, {'guid':guid}, 'select')
            self.result = [dict(d) for d in _data]
        except Exception as e:
            self.errors.append({
                'code': 502,
                'message': '{}'.format(e)
            })
        return self.result
graph.py 文件源码 项目:graph 作者: noxern 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def slack(text: hug.types.text):
    """Returns JSON containing an attachment with an image url for the Slack integration"""
    title = text

    if text == 'top250':
        top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
        top250_page = html.fromstring(top250_res.text)
        candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')

        title = random.choice(candidates).text

    return dict(
        response_type='in_channel',
        attachments=[
            dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
        ]
    )
test_webhooks.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_unregistered_event(self):
        project = self.project  # force creation
        url = '/plugins/github/organizations/{}/webhook/'.format(
            project.organization.id,
        )

        secret = 'b3002c3e321d4b7880360d397db2ccfd'

        OrganizationOption.objects.set_value(
            organization=project.organization,
            key='github:webhook_secret',
            value=secret,
        )

        response = self.client.post(
            path=url,
            data=PUSH_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='UnregisteredEvent',
            HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 204
test_webhooks.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_invalid_signature_event(self):
        project = self.project  # force creation

        url = '/plugins/github/organizations/{}/webhook/'.format(
            project.organization.id,
        )

        secret = '2d7565c3537847b789d6995dca8d9f84'

        OrganizationOption.objects.set_value(
            organization=project.organization,
            key='github:webhook_secret',
            value=secret,
        )

        response = self.client.post(
            path=url,
            data=PUSH_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='push',
            HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 401
test_webhooks.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_simple(self):
        url = '/plugins/github/installations/webhook/'

        response = self.client.post(
            path=url,
            data=INSTALLATION_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='installation',
            HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 204

        assert Integration.objects.filter(
            provider='github_apps',
            external_id=2,
            name='octocat',
        ).exists()
plugin.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def get_webhook_secret(self, organization):
        lock = locks.get('github:webhook-secret:{}'.format(organization.id), duration=60)
        with lock.acquire():
            # TODO(dcramer): get_or_create would be a useful native solution
            secret = OrganizationOption.objects.get_value(
                organization=organization,
                key='github:webhook_secret',
            )
            if secret is None:
                secret = uuid4().hex + uuid4().hex
                OrganizationOption.objects.set_value(
                    organization=organization,
                    key='github:webhook_secret',
                    value=secret,
                )
        return secret
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_delete_upload_file(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)

        delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)

        patcher.tearDown()
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_delete_file_not_deleted_if_resources_first(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_delete_file_not_deleted_if_resources_second(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/data/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_delete_passes_if_os_exeception(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.os.remove',
                        side_effect=OSError):

            delete_local_uploaded_file(resource_id)


        patcher.tearDown()
__init__.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_ghost_replay(self, login):
        replay_name = 'dedimania_{}.Replay.Gbx'.format(uuid.uuid4().hex)
        try:
            await self.instance.gbx('SaveBestGhostsReplay', login, replay_name)
        except:
            return None
        try:
            async with self.instance.storage.open('UserData/Replays/{}'.format(replay_name)) as ghost_file:
                return await ghost_file.read()
        except FileNotFoundError as e:
            message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
                      'the driven replay!'
            logger.error('Please make sure we can access the dedicated files. Configure your storage driver correctly! '
                         '{}'.format(str(e)))
            await self.instance.chat(message)
            raise DedimaniaException('Can\'t access replay file')
        except PermissionError as e:
            message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
                      'the driven replay because of an permission problem!'
            logger.error('We can\'t read files in the dedicated folder, your permissions don\'t allow us to read it! '
                         '{}'.format(str(e)))
            await self.instance.chat(message)
            raise DedimaniaException('Can\'t access files due to permission problems')
swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sync_rings_request(self, broker_token, builders_only=False):
        """Request for peers to sync rings from leader.

        NOTE: this action must only be performed by the cluster leader.

        :param broker_token: token to identify sync request.
        :param builders_only: if False, tell peers to sync builders only (not
                              rings).
        """
        if not is_elected_leader(SWIFT_HA_RES):
            errmsg = "Leader function called by non-leader"
            raise SwiftProxyCharmException(errmsg)

        rq = self.template()
        rq['trigger'] = str(uuid.uuid4())

        if builders_only:
            rq['sync-only-builders'] = 1

        rq['broker-token'] = broker_token
        rq['broker-timestamp'] = "{:f}".format(time.time())
        rq['builder-broker'] = self._hostname
        return rq
swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def notify_leader_changed(self, token):
        """Notify peers that leader has changed.

        The token passed in must be that associated with the sync we claim to
        have been interrupted. It will be re-used by the restored leader once
        it receives this notification.

        NOTE: this action must only be performed by the cluster leader that
              has relinquished it's leader status as part of the current hook
              context.
        """
        if not is_elected_leader(SWIFT_HA_RES):
            errmsg = "Leader function called by non-leader"
            raise SwiftProxyCharmException(errmsg)

        rq = self.template()
        rq['trigger'] = str(uuid.uuid4())
        rq[self.KEY_NOTIFY_LEADER_CHANGED] = token
        return rq
swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def notify_storage_rings_available():
    """Notify peer swift-storage relations that they should synchronise ring
    and builder files.

    Note that this should only be called from the leader unit.
    """
    if not is_elected_leader(SWIFT_HA_RES):
        log("Ring availability storage-relation broadcast requested by "
            "non-leader - skipping", level=WARNING)
        return

    hostname = get_hostaddr()
    hostname = format_ipv6_addr(hostname) or hostname
    path = os.path.basename(get_www_dir())
    rings_url = 'http://{}/{}'.format(hostname, path)
    trigger = uuid.uuid4()
    # Notify storage nodes that there is a new ring to fetch.
    log("Notifying storage nodes that new rings are ready for sync.",
        level=INFO)
    for relid in relation_ids('swift-storage'):
        relation_set(relation_id=relid, swift_hash=get_swift_hash(),
                     rings_url=rings_url, trigger=trigger)
test_swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
        mock_uuid.uuid4.return_value = 'token2'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1')
        self.assertEqual({'trigger': 'token2',
                          'broker-token': None,
                          'builder-broker': None,
                          'broker-timestamp': None,
                          'peers-only': '1',
                          'leader-changed-notification': None,
                          'resync-request': None,
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': 'token1',
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
test_swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_cluster_rpc_sync_request(self, mock_uuid, mock_time):
        mock_time.time = mock.Mock(return_value=float(1.234))
        mock_uuid.uuid4.return_value = 'token2'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.sync_rings_request('token1')
        self.assertEqual({'trigger': 'token2',
                          'broker-token': 'token1',
                          'broker-timestamp': '1.234000',
                          'builder-broker': '1.2.3.4',
                          'peers-only': None,
                          'leader-changed-notification': None,
                          'resync-request': None,
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': None,
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
test_swift_utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_cluster_rpc_notify_leader_changed(self, mock_uuid):
        mock_uuid.uuid4.return_value = 'e4b67426-6cc0-4aa3-829d-227999cd0a75'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.notify_leader_changed('token1')
        self.assertEqual({'trigger': 'e4b67426-6cc0-4aa3-829d-227999cd0a75',
                          'broker-token': None,
                          'builder-broker': None,
                          'broker-timestamp': None,
                          'peers-only': None,
                          'leader-changed-notification': 'token1',
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': None,
                          'resync-request': None,
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template().keys())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
test_controller.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_disable_enable_user(event_loop):
    async with base.CleanController() as controller:
        username = 'test-disable{}'.format(uuid.uuid4())
        user = await controller.add_user(username)

        await user.disable()
        assert not user.enabled
        assert user.disabled

        fresh = await controller.get_user(username)  # fetch fresh copy
        assert not fresh.enabled
        assert fresh.disabled

        await user.enable()
        assert user.enabled
        assert not user.disabled

        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.enabled
        assert not fresh.disabled
test_controller.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_change_user_password(event_loop):
    async with base.CleanController() as controller:
        username = 'test-password{}'.format(uuid.uuid4())
        user = await controller.add_user(username)
        await user.set_password('password')
        # Check that we can connect with the new password.
        new_connection = None
        try:
            kwargs = controller.connection().connect_params()
            kwargs['username'] = username
            kwargs['password'] = 'password'
            new_connection = await Connection.connect(**kwargs)
        except JujuAPIError:
            raise AssertionError('Unable to connect with new password')
        finally:
            if new_connection:
                await new_connection.close()
test_controller.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_grant_revoke(event_loop):
    async with base.CleanController() as controller:
        username = 'test-grant{}'.format(uuid.uuid4())
        user = await controller.add_user(username)
        await user.grant('superuser')
        assert user.access == 'superuser'
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access == 'superuser'
        await user.grant('login')
        assert user.access == 'login'
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access == 'login'
        await user.revoke()
        assert user.access is ''
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access is ''
__init__.py 文件源码 项目:daisy 作者: llllllllll 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _(node, dask, scope):
    def retrieve(term):
        try:
            return scope[term]
        except KeyError:
            scope[term] = ret = _ltree_to_dask(term, dask, scope)
            return ret

    name = '%s-%s' % (node.func, uuid4())
    dask[name] = (
        apply,
        retrieve(node.func),
        list(map(retrieve, node.args)),
        (dict, list(map(list, valmap(retrieve, node.kwargs).items()))),
    )
    scope[node] = name
    return name
test_saved_query_api.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_get_saved_query(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys):
    saved_query_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=json.dumps({'saved_query': SAVED_QUERY_RESPONSE}))

    api.get_saved_query(saved_query_id)
    out, err = capsys.readouterr()

    assert "Name:" in out
    assert "Logs:" in out
    assert "ID:" in out
    assert "Statement:" in out
    assert "Time range:" in out
    assert "From:" in out
    assert "To:" in out
test_saved_query_api.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_patch_saved_query_none_fields(mocked_url, mocked_rw_apikey, mocked_account_resource_id,
                                       capsys):
    test_saved_query_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=json.dumps({"saved_query": SAVED_QUERY_RESPONSE}))

    api.update_saved_query(test_saved_query_id, name=None,
                           statement="new_statement")
    out, err = capsys.readouterr()

    assert "Saved query with id %s updated" % test_saved_query_id in out
    body = json.loads(httpretty.last_request().body)['saved_query']
    assert "name" not in body
    assert "statement" in body['leql']


问题


面经


文章

微信
公众号

扫码关注公众号