python类s()的实例源码

link_manager.py 文件源码 项目:djangocms-link-manager 作者: divio 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_mailto(self, parts, verify_exists=False):
        """
        Validates a mailto URL, by using Django's EmailValidator.
        `verify_exists` does nothing at this time.

        :param parts:
        :param verify_exists:
        :return:
        """
        validator = EmailValidator()
        try:
            validator(parts['path'])
        except ValidationError:
            return False
        else:
            return True
_environ_config.py 文件源码 项目:environ_config 作者: hynek 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def to_config(config_cls, environ=os.environ):
    if config_cls._prefix:
        app_prefix = (config_cls._prefix,)
    else:
        app_prefix = ()

    def default_get(environ, metadata, prefix, name):
        ce = metadata[CNF_KEY]
        if ce.name is not None:
            var = ce.name
        else:
            var = ("_".join(app_prefix + prefix + (name,))).upper()

        log.debug("looking for env var '%s'." % (var,))
        val = environ.get(var, ce.default)
        if val is RAISE:
            raise MissingEnvValueError(var)
        return val

    return _to_config(config_cls, default_get, environ, ())
secrets.py 文件源码 项目:environ_config 作者: hynek 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get(self, environ, metadata, prefix, name):
        ce = metadata[CNF_KEY]

        if ce.name is not None:
            var = ce.name
        else:
            if callable(self.vault_prefix):
                vp = self.vault_prefix(environ)
            else:
                vp = self.vault_prefix
            var = "_".join(
                ((vp,) + prefix + (name,))
            ).upper()

        log.debug("looking for env var '%s'." % (var,))
        val = environ.get(var, ce.default)
        if val is RAISE:
            raise MissingSecretError(var)
        return _SecretStr(val)
io_handling.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _ensure_dir_exists(filename):
    # type: (str) -> None
    """Creates a directory tree if it does not already exist.

    :param str filename: Full path to file in destination directory
    """
    dest_final_dir = filename.rsplit(os.sep, 1)[0]
    if dest_final_dir == filename:
        # File is in current directory
        _LOGGER.debug('Target dir is current dir')
        return
    try:
        os.makedirs(dest_final_dir)
    except _file_exists_error():
        # os.makedirs(... exist_ok=True) does not work in 2.7
        pass
    else:
        _LOGGER.info('Created directory: %s', dest_final_dir)
io_handling.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def output_filename(source_filename, destination_dir, mode, suffix):
    # type: (str, str, str, str) -> str
    """Duplicates the source filename in the destination directory, adding or stripping
    a suffix as needed.

    :param str source_filename: Full file path to source file
    :param str destination_dir: Full file path to destination directory
    :param str mode: Operating mode (encrypt/decrypt)
    :param str suffix: Suffix to append to output filename
    :returns: Full file path of new destination file in destination directory
    :rtype: str
    """
    if suffix is None:
        suffix = OUTPUT_SUFFIX[mode]
    else:
        _LOGGER.debug('Using custom suffix "%s" to create output file', suffix)
    filename = source_filename.rsplit(os.sep, 1)[-1]
    _LOGGER.debug('Duplicating filename %s into %s', filename, destination_dir)
    return os.path.join(destination_dir, filename) + suffix
extract_tokens.py 文件源码 项目:python-miio 作者: rytilahti 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def read_tokens(self, db) -> Iterator[DeviceConfig]:
        """Read device information out from a given database file.

        :param str db: Database file"""
        self.db = db
        _LOGGER.info("Reading database from %s" % db)
        self.conn = sqlite3.connect(db)

        self.conn.row_factory = sqlite3.Row
        with self.conn:
            is_android = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
            is_apple = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
            if is_android:
                yield from self.read_android()
            elif is_apple:
                yield from self.read_apple()
            else:
                _LOGGER.error("Error, unknown database type!")
onion_transport.py 文件源码 项目:txmix 作者: applied-mixnetworks 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_start(self):
        """
        make this transport begin listening on the specified interface and UDP port
        interface must be an IP address
        """
        # save a TorConfig so we can later use it to send messages
        self.torconfig = txtorcon.TorConfig(control=self.tor.protocol)
        yield self.torconfig.post_bootstrap

        hs_strings = []
        if len(self.onion_unix_socket) == 0:
            local_socket_endpoint_desc = "tcp:interface=%s:%s" % (self.onion_tcp_interface_ip, self.onion_tcp_port)
        else:
            local_socket_endpoint_desc = "unix:%s" % self.onion_unix_socket
        onion_service_endpoint = endpoints.serverFromString(self.reactor, local_socket_endpoint_desc)
        datagram_proxy_factory = OnionDatagramProxyFactory(received_handler=lambda x: self.datagram_received(x))
        yield onion_service_endpoint.listen(datagram_proxy_factory)
        if len(self.onion_unix_socket) == 0:
            hs_strings.append("%s %s:%s" % (self.onion_port, self.onion_tcp_interface_ip, self.onion_tcp_port))
        else:
            hs_strings.append("%s unix:%s" % (self.onion_port, self.onion_unix_socket))
        hs = txtorcon.torconfig.EphemeralHiddenService(hs_strings, key_blob_or_type=self.onion_key)
        yield hs.add_to_tor(self.tor.protocol)
formation.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def validate(self):
        """
        Cross-checks the settings we have against the options the Container has
        """
        # Verify all link targets are possible
        for alias, target in list(self.links.items()):
            if isinstance(target, str):
                raise ValueError("Link target {} is still a string!".format(target))
            if target.container not in self.container.graph.dependencies(self.container):
                warnings.warn("It is not possible to link %s to %s as %s" % (target, self.container, alias))
                del self.links[alias]
        # Verify devmodes exist
        for devmode in list(self.devmodes):
            if devmode not in self.container.devmodes:
                warnings.warn("Invalid devmode %s on container %s" % (devmode, self.container.name))
                self.devmodes.remove(devmode)
formation.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def resolve_links(self):
        """
        Resolves any links that are still names to instances from the formation
        """
        for alias, target in list(self.links.items()):
            # If it's a string, it's come from an introspection process where we couldn't
            # resolve into an instance at the time (as not all of them were around)
            if isinstance(target, str):
                try:
                    target = self.formation[target]
                except KeyError:
                    # We don't error here as that would prevent you stopping orphaned containers;
                    # instead, we delete the link and warn the user. The deleted link means `up` will recreate it
                    # if it's orphaned.
                    del self.links[alias]
                    warnings.warn("Could not resolve link {} to an instance for {}".format(target, self.name))
                else:
                    self.links[alias] = target
            elif isinstance(target, ContainerInstance):
                pass
            else:
                raise ValueError("Invalid link value {}".format(repr(target)))
argument_types.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def convert(self, value, param, ctx):
        self.context = ctx

        # Exact match
        if value in self.choices:
            return value

        # Match through normalization
        if ctx is not None and \
           ctx.token_normalize_func is not None:
            value = ctx.token_normalize_func(value)
            for choice in self.choices:
                if ctx.token_normalize_func(choice) == value:
                    return choice

        self.fail('invalid choice: %s. %s' % (PURPLE(value), self.get_missing_message(param, value)), param, ctx)
images.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def image_version(self, image_name, image_tag):
        """
        Returns the Docker image hash of the requested image and tag, or
        raises ImageNotFoundException if it's not available on the host.
        """
        if image_tag == "local":
            image_tag = "latest"
        try:
            docker_info = self.host.client.inspect_image("{}:{}".format(image_name, image_tag))
            return docker_info['Id']
        except NotFound:
            # TODO: Maybe auto-build if we can?
            raise ImageNotFoundException(
                "Cannot find image {}:{}".format(image_name, image_tag),
                image=image_name,
                image_tag=image_tag,
            )
attach.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def attach(app, container, host, command):
    """
    Attaches to a container
    """
    if command:
        shell = ['/bin/bash', '-lc', ' '.join(command)]
    else:
        shell = ['/bin/bash']

    # See if the container is running
    formation = FormationIntrospector(host, app.containers).introspect()
    for instance in formation:
        if instance.container == container:
            # Work out anything to put before the shell (e.g. ENV)
            pre_args = []
            if os.environ.get("TERM", None):
                pre_args = ["env", "TERM=%s" % os.environ['TERM']]
            # Launch into an attached shell
            status_code = subprocess.call(["docker", "exec", "-it", instance.name] + pre_args + shell)
            sys.exit(status_code)
    # It's not running ;(
    click.echo(RED("Container {name} is not running. It must be started to attach - try `bay run {name}`.".format(
        name=container.name,
    )))
volume.py 文件源码 项目:bay 作者: eventbrite 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def destroy(app, host, name):
    """
    Destroys a single volume
    """
    task = Task("Destroying volume {}".format(name))
    # Run GC first to clean up stopped containers
    from .gc import GarbageCollector
    GarbageCollector(host).gc_all(task)
    # Remove the volume
    formation = FormationIntrospector(host, app.containers).introspect()
    instance_conflicts = [instance.container.name for instance in formation.get_instances_using_volume(name)]
    if instance_conflicts:
        task.finish(status="Volume {} is in use by container(s): {}".format(
            name, ",".join(instance_conflicts)), status_flavor=Task.FLAVOR_BAD)
    else:
        try:
            host.client.remove_volume(name)
        except NotFound:
            task.add_extra_info("There is no volume called {}".format(name))
            task.finish(status="Not found", status_flavor=Task.FLAVOR_BAD)
        else:
            task.finish(status="Done", status_flavor=Task.FLAVOR_GOOD)
shell.py 文件源码 项目:seashore 作者: shopkick 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setenv(self, key, val):
        """
        Set internal environment variable.

        Changes internal environment in which subprocesses will be run.
        Does not change the process's own environment.

        :param key: name of variable
        :param value: value of variable
        """
        if val is None:
            if key in self._env:
                del self._env[key]
            return
        key = str(key)  # keys must be strings
        val = str(val)  # vals must be strings
        self._env[key] = val
archive.py 文件源码 项目:aptrepo 作者: jwodder 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def fetch_suite(self, suite):
        flat = suite.endswith('/')
        if flat:
            baseurl = joinurl(self.uri, suite)
        else:
            baseurl = joinurl(self.uri, 'dists', suite)
        log.info('Fetching InRelease file from %s', baseurl)
        r = self.session.get(joinurl(baseurl, 'InRelease'))
        if not (400 <= r.status_code < 500):
            r.raise_for_status()
            release = ReleaseFile.parse_signed(r.content)
        else:
            log.info('Server returned %d; fetching Release file instead',
                     r.status_code)
            r = self.session.get(joinurl(baseurl, 'Release'))
            r.raise_for_status()
            release = ReleaseFile.parse(r.content)
        ### TODO: Handle/fetch/verify PGP stuff
        if flat:
            return FlatRepository(self, suite, release)
        else:
            return Suite(self, suite, release)
urltool.py 文件源码 项目:Codado 作者: corydodt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _iterClass(self, cls, prefix=''):
        """
        Descend a Klein()'s url_map, and generate ConvertedRule() for each one
        """
        iterableRules = [(prefix, cls, cls.app.url_map.iter_rules())]
        for prefix, currentClass, i in iter(iterableRules):
            for rule in i:
                converted = dumpRule(currentClass, rule, prefix)
                if converted.branch:
                    continue

                if converted.subKlein:
                    clsDown = namedAny(converted.subKlein)
                    iterableRules.append((converted.rulePath, clsDown, clsDown.app.url_map.iter_rules()))

                yield converted
_sync.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def statistics(self):
        """Return an object containing debugging information.

        Currently the following fields are defined:

        * ``borrowed_tokens``: The number of tokens currently borrowed from
          the sack.
        * ``total_tokens``: The total number of tokens in the sack. Usually
          this will be larger than ``borrowed_tokens``, but it's possibly for
          it to be smaller if :attr:`total_tokens` was recently decreased.
        * ``borrowers``: A list of all tasks or other entities that currently
          hold a token.
        * ``tasks_waiting``: The number of tasks blocked on this
          :class:`CapacityLimiter`\'s :meth:`acquire` or
          :meth:`acquire_on_behalf_of` methods.

        """
        return _CapacityLimiterStatistics(
            borrowed_tokens=len(self._borrowers),
            total_tokens=self._total_tokens,
            # Use a list instead of a frozenset just in case we start to allow
            # one borrower to hold multiple tokens in the future
            borrowers=list(self._borrowers),
            tasks_waiting=len(self._lot),
        )
_sync.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def statistics(self):
        """Return an object containing debugging information.

        Currently the following fields are defined:

        * ``locked``: boolean indicating whether the lock is held.
        * ``owner``: the :class:`trio.hazmat.Task` currently holding the lock,
          or None if the lock is not held.
        * ``tasks_waiting``: The number of tasks blocked on this lock's
          :meth:`acquire` method.

        """
        return _LockStatistics(
            locked=self.locked(),
            owner=self._owner,
            tasks_waiting=len(self._lot),
        )
_sync.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def statistics(self):
        """Returns an object containing debugging information.

        Currently the following fields are defined:

        * ``qsize``: The number of items currently in the queue.
        * ``capacity``: The maximum number of items the queue can hold.
        * ``tasks_waiting_put``: The number of tasks blocked on this queue's
          :meth:`put` method.
        * ``tasks_waiting_get``: The number of tasks blocked on this queue's
          :meth:`get` method.

        """
        return _QueueStats(
            qsize=len(self._data),
            capacity=self.capacity,
            tasks_waiting_put=self._put_semaphore.statistics().tasks_waiting,
            tasks_waiting_get=self._get_semaphore.statistics().tasks_waiting,
            tasks_waiting_join=self._join_lot.statistics().tasks_waiting
        )
_threads.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def current_default_worker_thread_limiter():
    """Get the default :class:`CapacityLimiter` used by
    :func:`run_sync_in_worker_thread`.

    The most common reason to call this would be if you want to modify its
    :attr:`~CapacityLimiter.total_tokens` attribute.

    """
    try:
        limiter = _limiter_local.limiter
    except AttributeError:
        limiter = _limiter_local.limiter = CapacityLimiter(DEFAULT_LIMIT)
    return limiter


# Eventually we might build this into a full-fledged deadlock-detection
# system; see https://github.com/python-trio/trio/issues/182
# But for now we just need an object to stand in for the thread, so we can
# keep track of who's holding the CapacityLimiter's token.
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_main_and_task_both_crash(recwarn):
    # If main crashes and there's also a task crash, then we get both in a
    # MultiError
    async def crasher():
        raise ValueError

    async def main(wait):
        async with _core.open_nursery() as nursery:
            crasher_task = nursery.spawn(crasher)
            if wait:
                await crasher_task.wait()
            raise KeyError

    for wait in [True, False]:
        with pytest.raises(_core.MultiError) as excinfo:
            _core.run(main, wait)
        print(excinfo.value)
        assert set(type(exc) for exc in excinfo.value.exceptions) == {
            ValueError, KeyError
        }
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def test_broken_abort():
    async def main():
        # These yields are here to work around an annoying warning -- we're
        # going to crash the main loop, and if we (by chance) do this before
        # the run_sync_soon task runs for the first time, then Python gives us
        # a spurious warning about it not being awaited. (I mean, the warning
        # is correct, but here we're testing our ability to deliver a
        # semi-meaningful error after things have gone totally pear-shaped, so
        # it's not relevant.) By letting the run_sync_soon_task run first, we
        # avoid the warning.
        await _core.checkpoint()
        await _core.checkpoint()
        with _core.open_cancel_scope() as scope:
            scope.cancel()
            # None is not a legal return value here
            await _core.wait_task_rescheduled(lambda _: None)

    with pytest.raises(_core.TrioInternalError):
        _core.run(main)

    # Because this crashes, various __del__ methods print complaints on
    # stderr. Make sure that they get run now, so the output is attached to
    # this test.
    gc_collect_harder()
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_system_task_crash_KeyboardInterrupt():
    async def ki():
        raise KeyboardInterrupt

    async def main():
        _core.spawn_system_task(ki)
        await sleep_forever()

    # KI doesn't get wrapped with TrioInternalError
    with pytest.raises(KeyboardInterrupt):
        _core.run(main)


# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
#    the next time that it's blocked.
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_exc_info_after_yield_error():
    child_task = None

    async def child():
        nonlocal child_task
        child_task = _core.current_task()

        try:
            raise KeyError
        except Exception:
            try:
                await sleep_forever()
            except Exception:
                pass
            raise

    with pytest.raises(KeyError):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(child)
            await wait_all_tasks_blocked()
            _core.reschedule(child_task, _core.Error(ValueError()))


# Similar to previous test -- if the ValueError() gets sent in via 'throw',
# then Python's normal implicit chaining stuff is broken.
test_run.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_TrioToken_run_sync_soon_massive_queue():
    # There are edge cases in the wakeup fd code when the wakeup fd overflows,
    # so let's try to make that happen. This is also just a good stress test
    # in general. (With the current-as-of-2017-02-14 code using a socketpair
    # with minimal buffer, Linux takes 6 wakeups to fill the buffer and MacOS
    # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes
    # ~600,000 wakeups, but has the same code paths...)
    COUNT = 1000
    token = _core.current_trio_token()
    counter = [0]

    def cb(i):
        # This also tests FIFO ordering of callbacks
        assert counter[0] == i
        counter[0] += 1

    for i in range(COUNT):
        token.run_sync_soon(cb, i)
    await wait_all_tasks_blocked()
    assert counter[0] == COUNT
_multierror.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 13 收藏 0 点赞 0 评论 0
def __exit__(self, etype, exc, tb):
        if exc is not None:
            filtered_exc = MultiError.filter(self._handler, exc)
            if filtered_exc is exc:
                # Let the interpreter re-raise it
                return False
            if filtered_exc is None:
                # Swallow the exception
                return True
            # When we raise filtered_exc, Python will unconditionally blow
            # away its __context__ attribute and replace it with the original
            # exc we caught. So after we raise it, we have to pause it while
            # it's in flight to put the correct __context__ back.
            old_context = filtered_exc.__context__
            try:
                raise filtered_exc
            finally:
                _, value, _ = sys.exc_info()
                assert value is filtered_exc
                value.__context__ = old_context
_io_epoll.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def flags(self):
        flags = 0
        if self.read_task is not None:
            flags |= select.EPOLLIN
        if self.write_task is not None:
            flags |= select.EPOLLOUT
        if not flags:
            return None
        # XX not sure if EPOLLEXCLUSIVE is actually safe... I think
        # probably we should use it here unconditionally, but:
        # https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering
        #flags |= select.EPOLLEXCLUSIVE
        # We used to use ONESHOT here also, but it turns out that it's
        # confusing/complicated: you can't use ONESHOT+EPOLLEXCLUSIVE
        # together, you ONESHOT doesn't delete the registration but just
        # "disables" it so you re-enable with CTL rather than ADD (or
        # something?)...
        # https://lkml.org/lkml/2016/2/4/541
        return flags
test_base.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_do():
    @attr.s
    class EDo:
        arg = attr.ib()

    @do
    def do_func(a, b):
        done_a = yield Effect(EDo(a))
        done_b = yield Effect(EDo(b))
        return [done_a, done_b]

    effect = do_func(1, 2)
    assert isinstance(effect, Effect)
    assert isinstance(effect.intent, ChainedIntent)
    dispatcher = TypeDispatcher({
        EDo: lambda intent: 'done: %s' % intent.arg
    })
    ret = sync_perform(dispatcher, effect)
    assert ret == ['done: 1', 'done: 2']
test_base.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_chained_intent(self):
        @attr.s
        class ENumToString:
            num = attr.ib()

        def collect_intent_results():
            intent_results = []
            for i in range(5):
                res = yield Effect(ENumToString(i))
                intent_results.append(res)
            return ''.join(intent_results)

        effect = Effect(ChainedIntent(collect_intent_results()))
        dispatcher = TypeDispatcher({
            ENumToString: lambda intent: str(intent.num)
        })
        ret = await asyncio_perform(dispatcher, effect)
        assert ret == '01234'
block.py 文件源码 项目:parsec-cloud 作者: Scille 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def block_connection_factory(url):
    if url.startswith('s3:'):
        try:
            from parsec.core.block_s3 import S3BlockConnection
            _, region, bucket, key_id, key_secret = url.split(':')
        except ImportError as exc:
            raise SystemExit('Parsec needs boto3 to support S3 block storage (error: %s).' %
                             exc)
        except ValueError:
            raise SystemExit('Invalid s3 block store '
                             ' (should be `s3:<region>:<bucket>:<id>:<secret>`.')
        return S3BlockConnection(region, bucket, key_id, key_secret)
    elif url.startswith('http://'):
        return RESTBlockConnection(url)
    else:
        raise SystemExit('Unknown block store `%s`.' % url)


问题


面经


文章

微信
公众号

扫码关注公众号