python类exceptions()的实例源码

Plugin.py 文件源码 项目:Chainmail 作者: Chainmail-Project 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def new_version_available(self) -> bool:
        """
        Checks the remote manifest to see if a new version is available
        :return: Whether or not there is a new version available
        """
        if self.manifest.get("remote_manifest", "") == "":
            return False
        self.logger.info("Checking for update...")
        try:
            manifest = requests.get(self.manifest.get("remote_manifest", "")).json()
            version_remote = manifest["version"].split(".")
            version_local = self.manifest["version"].split(".")
            for i in range(len(version_local)):
                if int(version_local[i]) < int(get_item_from_list(version_remote, i, "0")):
                    self.logger.info(f"An update is available. Current version is v{self.manifest['version']}, updated version is v{manifest['version']}.")
                    return True
            self.logger.info("No update required.")
            return False
        except requests.exceptions.ConnectionError:
            self.logger.warning("Failed to check for update.")
            return False
util.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
caservice.py 文件源码 项目:fabric-sdk-py 作者: hyperledger 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def enroll(self, enrollment_id, enrollment_secret):
        """Enroll a registered user in order to receive a signed X509 certificate

        Args:
            enrollment_id (str): The registered ID to use for enrollment
            enrollment_secret (str): The secret associated with the
                                     enrollment ID

        Returns: PEM-encoded X509 certificate

        Raises:
            RequestException: errors in requests.exceptions
            ValueError: Failed response, json parse error, args missing

        """
        private_key = self._crypto.generate_private_key()
        csr = self._crypto.generate_csr(private_key, x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
        cert = self._ca_client.enroll(
            enrollment_id, enrollment_secret,
            csr.public_bytes(Encoding.PEM).decode("utf-8"))

        return Enrollment(private_key, cert)
webinspect.py 文件源码 项目:webinspectapi 作者: target 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None):

        self.host = host
        self.username = username
        self.password = password
        self.cert = cert
        self.verify_ssl = verify_ssl

        if not user_agent:
            self.user_agent = 'webinspectapi/' + version
        else:
            self.user_agent = user_agent

        if not self.verify_ssl:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # Set auth_type based on what's been provided
        if username is not None:
            self.auth_type = 'basic'
        elif cert is not None:
            self.auth_type = 'certificate'
        else:
            self.auth_type = 'unauthenticated'
resolver.py 文件源码 项目:metatab 作者: Metatab 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def find_decl_doc(self, name):


        raise IncludeError(name)

        import requests
        from requests.exceptions import InvalidSchema
        url = METATAB_ASSETS_URL + name + '.csv'
        try:
            # See if it exists online in the official repo
            r = requests.head(url, allow_redirects=False)
            if r.status_code == requests.codes.ok:
                return url

        except InvalidSchema:
            pass  # It's probably FTP
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 204
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
wsman.py 文件源码 项目:python-wsmanclient 作者: intelsdi-x 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, endpoint, resource_uri, optimization=True,
                 max_elems=100, filter_query=None, filter_dialect=None):
        self.endpoint = endpoint
        self.resource_uri = resource_uri
        self.filter_dialect = None
        self.filter_query = None
        self.optimization = optimization
        self.max_elems = max_elems

        if filter_query is not None:
            try:
                self.filter_dialect = FILTER_DIALECT_MAP[filter_dialect]
            except KeyError:
                valid_opts = ', '.join(FILTER_DIALECT_MAP)
                raise exceptions.WSManInvalidFilterDialect(
                    invalid_filter=filter_dialect, supported=valid_opts)

            self.filter_query = filter_query
communication.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def blender_id_server_validate(token) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
    """Validate the auth token with the server.

    @param token: the authentication token
    @type token: str
    @returns: tuple (expiry, error).
        The expiry is the expiry date of the token if it is valid, else None.
        The error is None if the token is valid, or an error message when it's invalid.
    """

    import requests
    import requests.exceptions

    try:
        r = requests.post(blender_id_endpoint('u/validate_token'),
                          data={'token': token}, verify=True)
    except requests.exceptions.RequestException as e:
        return (str(e), None)

    if r.status_code != 200:
        return (None, 'Authentication token invalid')

    response = r.json()
    return (response['token_expires'], None)
communication.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def make_authenticated_call(method, url, auth_token, data):
    """Makes a HTTP call authenticated with the OAuth token."""

    import requests
    import requests.exceptions

    try:
        r = requests.request(method,
                             blender_id_endpoint(url),
                             data=data,
                             headers={'Authorization': 'Bearer %s' % auth_token},
                             verify=True)
    except (requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError) as e:
        raise BlenderIdCommError(str(e))

    return r
botan.py 文件源码 项目:flibusta_bot 作者: Kurbezz 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def track(token, uid, message, name='Message'):
    try:
        r = requests.post(
            TRACK_URL,
            params={"token": token, "uid": uid, "name": name},
            data=json.dumps(message),
            headers={'Content-type': 'application/json'},
        )
        return r.json()
    except requests.exceptions.Timeout:
        # set up for a retry, or continue in a retry loop
        return False
    except (requests.exceptions.RequestException, ValueError) as e:
        # catastrophic error
        print(e)
        return False
connection.py 文件源码 项目:python-matchlightsdk 作者: TerbiumLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _request(self, method, url, data=None, **kwargs):
        try:
            response = self.session.request(method, url, data=data, **kwargs)
            if response.status_code == 200:
                return response
            else:
                try:
                    data = response.json()
                except ValueError:
                    data = None
                raise matchlight.error.APIError(
                    response.status_code, response.reason, data)
        except requests.exceptions.RetryError:
            raise matchlight.error.ConnectionError(
                'Matchlight API request failed with too many retries')
        except requests.exceptions.ConnectionError:
            raise matchlight.error.ConnectionError(
                'Matchlight API request failed with connection error')
registration.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def register_update(eapi):
    """Ensures the inbound URL for the BTS is up to date."""
    vpn_ip = system_utilities.get_vpn_ip()
    vpn_status = 'up' if vpn_ip else 'down'

    # This could fail when offline! Must handle connection exceptions.
    params = {
        'bts_uuid': _get_snowflake(),
        'vpn_status': vpn_status,
        'vpn_ip': vpn_ip,
        'federer_port': '80',
    }
    try:
        d = _send_cloud_req(
            requests.get,
            '/bts/register',
            'BTS registration',
            params=params,
            headers=eapi.auth_header,
            timeout=11)
        if 'bts_secret' in d:
            conf['bts_secret'] = d['bts_secret']
    except RegistrationError as ex:
        logger.error(str(ex))
clients.py 文件源码 项目:aiolocust 作者: kpidata 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __exit__(self, exc, value, traceback):
        if self._is_reported:
            # if the user has already manually marked this response as failure or success
            # we can ignore the default haviour of letting the response code determine the outcome
            return exc is None

        if exc:
            if isinstance(value, ResponseError):
                self.failure(value)
            else:
                return False
        else:
            try:
                self.raise_for_status()
            except requests.exceptions.RequestException as e:
                self.failure(e)
            else:
                self.success()
        return True
client.py 文件源码 项目:telnet-iot-honeypot 作者: Phype 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def put_session(self, session, retry=True):

        try:
            r = requests.put(self.url + "/conns", auth=self.auth, json=session, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return []

        if r.status_code == 200:
            return r.json()
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_session(session, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
client.py 文件源码 项目:telnet-iot-honeypot 作者: Phype 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def put_sample_info(self, f, retry=True):
        try:
            sha256 = f["sha256"]
            r = requests.put(self.url + "/sample/" + sha256, auth=self.auth, json=f, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return

        if r.status_code == 200:
            return r.json()
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_sample_info(f, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
client.py 文件源码 项目:telnet-iot-honeypot 作者: Phype 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def put_sample(self, data, retry=True):

        try:
            r = requests.post(self.url + "/file", auth=self.auth, data=data, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return

        if r.status_code == 200:
            return
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_sample(sha256, filename, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
test_ez_outlet.py 文件源码 项目:ezoutlet 作者: jtpereyda 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_reset_no_response_get(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
         and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
        """
        _ = mock_time

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
        mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
test_ez_outlet.py 文件源码 项目:ezoutlet 作者: jtpereyda 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_reset_no_response_raise(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: reset() raises ez_outlet.EzOutletError, e.
         and: str(e) == ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(timeout).
        """
        _ = mock_time
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        with self.assertRaises(ezoutlet.exceptions.EzOutletError) as e:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)

        # Then
        self.assertEqual(str(e.exception),
                         ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(self.timeout))
test_ez_outlet.py 文件源码 项目:ezoutlet 作者: jtpereyda 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_reset_no_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
        """
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_time.sleep.assert_not_called()


# Suppress since PyCharm doesn't recognize @mock.patch.object
# noinspection PyUnresolvedReferences
test_ez_outlet.py 文件源码 项目:ezoutlet 作者: jtpereyda 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_reset_unexpected_response_get(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests module configured to give unexpected_response_contents
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
         and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
        """
        _ = mock_time

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
        mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
test_ez_outlet.py 文件源码 项目:ezoutlet 作者: jtpereyda 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_reset_unexpected_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests module configured to give unexpected_response_contents
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
        """
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_time.sleep.assert_not_called()
translation_api.py 文件源码 项目:Facebook-Bot 作者: codelovin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def directions(self, proxies=None):
    """
    Returns list with translate directions
    >>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e")
    >>> directions = translate.directions
    >>> len(directions) > 0
    True
    """
    try:
      response = requests.get(self.url("langs"), params={"key": self.api_key}, proxies=proxies)
    except requests.exceptions.ConnectionError:
      raise YandexTranslateException(self.error_codes[503])
    else:
      response = response.json()
    status_code = response.get("code", 200)
    if status_code != 200:
      raise YandexTranslateException(status_code)
    return response.get("dirs")
connection.py 文件源码 项目:qarnot-sdk-python 作者: qarnot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def retrieve_pool(self, uuid):
        """Retrieve a :class:`qarnot.pool.Pool` from its uuid

        :param str uuid: Desired pool uuid
        :rtype: :class:`~qarnot.pool.Pool`
        :returns: Existing pool defined by the given uuid
        :raises qarnot.exceptions.MissingPoolException: pool does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('pool update', uuid=uuid))
        if response.status_code == 404:
            raise MissingPoolException(response.json()['message'])
        raise_on_error(response)
        return Pool.from_json(self, response.json())
connection.py 文件源码 项目:qarnot-sdk-python 作者: qarnot 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def retrieve_task(self, uuid):
        """Retrieve a :class:`qarnot.task.Task` from its uuid

        :param str uuid: Desired task uuid
        :rtype: :class:`~qarnot.task.Task`
        :returns: Existing task defined by the given uuid
        :raises qarnot.exceptions.MissingTaskException: task does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('task update', uuid=uuid))
        if response.status_code == 404:
            raise MissingTaskException(response.json()['message'])
        raise_on_error(response)
        return Task.from_json(self, response.json())
connection.py 文件源码 项目:qarnot-sdk-python 作者: qarnot 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def retrieve_disk(self, uuid):
        """Retrieve a :class:`~qarnot.disk.Disk` from its uuid

        :param str uuid: Desired disk uuid
        :rtype: :class:`~qarnot.disk.Disk`
        :returns: Existing disk defined by the given uuid
        :raises ValueError: no such disk
        :raises qarnot.exceptions.MissingDiskException: disk does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('disk info', name=uuid))
        if response.status_code == 404:
            raise MissingDiskException(response.json()['message'])
        raise_on_error(response)
        return Disk.from_json(self, response.json())
connection.py 文件源码 项目:qarnot-sdk-python 作者: qarnot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_disk(self, description, lock=False, tags=None):
        """Create a new :class:`~qarnot.disk.Disk`.

        :param str description: a short description of the disk
        :param bool lock: prevents the disk to be removed accidentally
        :param tags: custom tags
        :type tags: list(`str`)

        :rtype: :class:`qarnot.disk.Disk`
        :returns: The created :class:`~qarnot.disk.Disk`.

        :raises qarnot.exceptions.MaxDiskException: disk quota reached
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        """
        disk = Disk(self, description, lock=lock, tags=tags)
        disk.create()
        return disk
connection.py 文件源码 项目:qarnot-sdk-python 作者: qarnot 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def profiles(self):
        """Get list of profiles available on the cluster.

        :rtype: List of :class:`Profile`

        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        url = get_url('profiles')
        response = self._get(url)
        raise_on_error(response)
        profiles_list = []
        for p in response.json():
            url = get_url('profile details', profile=p)
            response2 = self._get(url)
            if response2.status_code == 404:
                continue
            raise_on_error(response2)
            profiles_list.append(Profile(response2.json()))
        return profiles_list


问题


面经


文章

微信
公众号

扫码关注公众号