python类insecure_channel()的实例源码

channel.py 文件源码 项目:fabric-sdk-py 作者: hyperledger 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def create_grpc_channel(target, pem=None, opts=None):
    """Construct a grpc channel.

    Args:
        target: url of target include host:port
        pem: ssl/tls pem file as bytes
        opts: grpc channel options
                grpc.default_authority: default authority
                grpc.ssl_target_name_override: ssl target name override

    Returns:
        grpc channel

    """
    if pem is None:
        return grpc.insecure_channel(target, opts)
    else:
        creds = grpc.ssl_channel_credentials(pem)
        return grpc.secure_channel(target, creds, opts)
models.py 文件源码 项目:lightning-coindesk 作者: lightninglabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_payment(self):
        """
        Checks if the Lightning payment has been received for this invoice
        """
        if self.status == 'pending_invoice':
            return False

        channel = grpc.insecure_channel(settings.LND_RPCHOST)
        stub = lnrpc.LightningStub(channel)

        r_hash_base64 = self.r_hash.encode('utf-8')
        r_hash_bytes = str(codecs.decode(r_hash_base64, 'base64'))
        invoice_resp = stub.LookupInvoice(ln.PaymentHash(r_hash=r_hash_bytes))

        if invoice_resp.settled:
            # Payment complete
            self.status = 'complete'
            self.save()
            return True
        else:
            # Payment not received
            return False
tiller.py 文件源码 项目:armada 作者: att-comdev 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_channel(self):
        '''
        Return a tiller channel
        '''
        tiller_ip = self._get_tiller_ip()
        tiller_port = self._get_tiller_port()
        try:
            return grpc.insecure_channel(
                '%s:%s' % (tiller_ip, tiller_port),
                options=[
                    ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
                    ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)
                ]
            )
        except Exception:
            raise ex.ChannelException()
p2p.py 文件源码 项目:p2p_grpc_blockchain_package 作者: Lursun 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def send(node,task,message = ""):
        try:
            channel = grpc.insecure_channel(node )
            taskType,task =task / SERVICE ,task % SERVICE
            if taskType == DESCOVERY:
                stub = grpc_pb2_grpc.DiscoveryStub(channel)
                if task ==EXCHANGENODE:
                    response = stub.ExchangeNode(grpc_pb2.Node(number = len(Node.__Nodes),ipport = Node.getNodesList() ))
                    for node in response.ipport :
                        Node.__Nodes.add(node)



            elif taskType == SYNCHRONIZATION:
                stub = grpc_pb2_grpc.SynchronizationStub(channel)
                synchronization.Task(stub,task,message)

        except Exception as e :
            Node.delNode(node)

        return
ClientTestRfBlaster.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run():
  protoConfig = ProtoConfig.getConfig()
  channel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.frontEndPort)
  stub = sensors_pb2.FrontEndStub(channel)
  rfBlasterRequestOn = sensors_pb2.SendToRfBlasterRequest(
      button=2,
      on=True
  )
  rfBlasterRequestOff = sensors_pb2.SendToRfBlasterRequest(
      button=2,
      on=False
  )
  stub.SendToRfBlaster(rfBlasterRequestOn)
  sleep(1)
  stub.SendToRfBlaster(rfBlasterRequestOff)
  exit()
Client_Streaming.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run():
  protoConfig = ProtoConfig.getConfig()
  channel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.pushFrontEndPort)
  stub = sensors_pb2.PushFrontEndStub(channel)

  subscribeRequest = sensors_pb2.SubscribeRequest(
    status=sensors_pb2.SubscribeRequest.SUBSCRIBE,
    username="peridot"
  )
  subscribeReply = stub.Subscribe(subscribeRequest)
  start_index = subscribeReply.start_index or 0
  print(subscribeReply)
  req = sensors_pb2.GetButtonPressedRequest(
    index=subscribeReply.start_index)
  while True:
    for event in stub.StreamButtonPressed(req):
      req.index += 1
      print('index: %s, event: %s' % (req.index, event))
    time.sleep(1)
test_stream_collector.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_stream_max_metrics_buffer():
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
    col = MockStreamCollector("MyStreamCollector", 99)
    col.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = StreamCollectorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    metric = snap.Metric(
        namespace=[snap.NamespaceElement(value="intel"),
                   snap.NamespaceElement(value="streaming"),
                   snap.NamespaceElement(value="random"),
                   snap.NamespaceElement(value="int")],
        version=1,
        config={"max-metrics-buffer": 5},
        unit="some unit",
        description="some description")
    col_arg = CollectArg(metric).pb
    mtr = iter([col_arg])
    metrics = client.StreamMetrics(mtr)
    start_waiting_for_new_metric = time.time()
    a = next(metrics)
    retrieve_metric_time = time.time()
    assert round(retrieve_metric_time - start_waiting_for_new_metric) == 5
    assert len(a.Metrics_Reply.metrics) == 5
    col.stop()
test_stream_collector.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_config_policy():
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
    col = MockStreamCollector("MyStreamCollector", 99)
    col.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = StreamCollectorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    reply = client.GetConfigPolicy(Empty())
    assert reply.error == ""
    assert reply.string_policy["intel.streaming.random"].rules["password"].default == "pass"
    col.stop()
test_processor.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def processor_client():
    """Returns a client (grpc) fixture that is passed into processor
    tests """
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{}']
    proc = MockProcessor("MyProcessor", 1)
    proc.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = ProcessorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    yield client
    proc.stop()
location_client.py 文件源码 项目:object-tracking 作者: athenian-robotics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_values(self, pause_secs=2.0):
        channel = grpc.insecure_channel(self.hostname)
        stub = LocationServiceStub(channel)
        while not self.stopped:
            logger.info("Connecting to gRPC server at {0}...".format(self.hostname))
            try:
                client_info = ClientInfo(info="{0} client".format(socket.gethostname()))
                server_info = stub.registerClient(client_info)
            except BaseException as e:
                logger.error("Failed to connect to gRPC server at {0} [{1}]".format(self.hostname, e))
                time.sleep(pause_secs)
                continue

            logger.info("Connected to gRPC server at {0} [{1}]".format(self.hostname, server_info.info))

            try:
                for val in stub.getLocations(client_info):
                    with self.value_lock:
                        self.__currval = copy.deepcopy(val)
                    self._mark_ready()
            except BaseException as e:
                logger.info("Disconnected from gRPC server at {0} [{1}]".format(self.hostname, e))
                time.sleep(pause_secs)

    # Non-blocking
grpc_client.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect(self, endpoint):
        if self.connected:
            return
        try:
            self.log.info('insecurely-connecting', endpoint=endpoint)
            self.channel = grpc.insecure_channel(endpoint)
            self.connected = True
            self.log.info('insecurely-connected', endpoint=endpoint)
            return

        except _Rendezvous, e:
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                self.log.info('grpc-endpoint-not-available')
            else:
                self.log.exception(e)

        except Exception, e:
            self.log.exception('cannot-connect', endpoint=endpoint)
grpc_client.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run():

    channel = grpc.insecure_channel('localhost:50055')

    # Test fetch the schema
    stub = schema_pb2.SchemaServiceStub(channel)
    res = stub.GetSchema(Empty())
    print '\nSchema:\n'
    for key in res.protos:
        print '%s %s file begins %s\n' % (30 * '~', key, (35 - len(key)) * '~')
        print res.protos[key]
        print '%s %s file ends %s' % (30 * '~', key, (37 - len(key)) * '~')
    for key in res.descriptors:
        print '%s -> descriptor of %d bytes' % (key, len(res.descriptors[key]))

    # Ping health state as an example
    stub = voltha_pb2.HealthServiceStub(channel)
    res = stub.GetHealthStatus(Empty())
    print '\nHealth state:', res.state

    # Try another API
    stub = voltha_pb2.ExampleServiceStub(channel)
    res = stub.ListAddresses(Empty())
    print '\nExample objects returned:\n', res.addresses
sample_client_demo_timeout.py 文件源码 项目:python-grpc-demo 作者: amitsaha 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run():
    channel = grpc.insecure_channel('localhost:50051')
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]
        response = stub.CreateUser(
            users_messages.CreateUserRequest(username='tom'),
            metadata=metadata,
        )
        if response:
            print("User created:", response.user.username)
        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request, timeout=0.00001)
        for resp in response:
            print(resp)
sample_client_demo_timeout.py 文件源码 项目:python-grpc-demo 作者: amitsaha 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run():
    channel = grpc.insecure_channel('localhost:50051')
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]
        response = stub.CreateUser(
            users_messages.CreateUserRequest(username='tom'),
            metadata=metadata,
        )
        if response:
            print("User created:", response.user.username)
        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request, timeout=0.00001)
        for resp in response:
            print(resp)
sample_client_demo.py 文件源码 项目:python-grpc-demo 作者: amitsaha 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run():
    channel = grpc.insecure_channel('localhost:50051')
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]
        response = stub.CreateUser(
            users_messages.CreateUserRequest(username='tom'),
            metadata=metadata,
        )
        if response:
            print("User created:", response.user.username)
        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request)
        for resp in response:
            print(resp)
cli.py 文件源码 项目:QRL 作者: theQRL 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tx_push(ctx, txblob):
    tx = None
    try:
        txbin = bytes(hstr2bin(txblob))
        pbdata = qrl_pb2.Transaction()
        pbdata.ParseFromString(txbin)
        tx = Transaction.from_pbdata(pbdata)
    except Exception as e:
        click.echo("tx blob is not valid")
        quit(1)

    tmp_json = tx.to_json()
    # FIXME: binary fields are represented in base64. Improve output
    print(tmp_json)
    if (len(tx.signature) == 0):
        click.echo('Signature missing')
        quit(1)

    channel = grpc.insecure_channel(ctx.obj.node_public_address)
    stub = qrl_pb2_grpc.PublicAPIStub(channel)
    pushTransactionReq = qrl_pb2.PushTransactionReq(transaction_signed=tx.pbdata)
    pushTransactionResp = stub.PushTransaction(pushTransactionReq, timeout=5)
    print(pushTransactionResp.some_response)
server_test.py 文件源码 项目:PowerSpikeGG 作者: PowerSpikeGG 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        """Instantiate a server for tests"""
        # Avoid setting up a Riot API handler and a cache system
        MatchFetcher.riot_api_handler = mock.MagicMock()
        MatchFetcher.cache_manager = mock.MagicMock()

        cls.server, cls.service = start_server("123", 50002, 10)

        # Only initialize once the stub
        cls.channel = grpc.insecure_channel("localhost:50002")
        cls.stub = service_pb2.MatchFetcherStub(cls.channel)
models.py 文件源码 项目:lightning-coindesk 作者: lightninglabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def generate_invoice(self, user, article):
        """
        Generates a new invoice
        """
        assert self.status == 'pending_invoice', "Already generated invoice"
        channel = grpc.insecure_channel(settings.LND_RPCHOST)
        stub = lnrpc.LightningStub(channel)

        add_invoice_resp = stub.AddInvoice(ln.Invoice(value=settings.MIN_VIEW_AMOUNT, memo="User '{}' | ArticleId {}".format(user.username, article.id)))
        r_hash_base64 = codecs.encode(add_invoice_resp.r_hash, 'base64')
        self.r_hash = r_hash_base64.decode('utf-8')
        self.payment_request = add_invoice_resp.payment_request
        self.status = 'pending_payment'
        self.save()
auth_backends.py 文件源码 项目:lightning-coindesk 作者: lightninglabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authenticate(self, request, signature, csrf_token, username=None):
        channel = grpc.insecure_channel(settings.LND_RPCHOST)
        stub = lnrpc.LightningStub(channel)

        verifymessage_resp = stub.VerifyMessage(ln.VerifyMessageRequest(msg=csrf_token, signature=signature))

        if not verifymessage_resp.valid:
            print "Invalid signature"
            return None

        pubkey = verifymessage_resp.pubkey
        # Try fetching an existing profile
        try:
            profile = Profile.objects.get(identity_pubkey=pubkey)
            return profile.user
        except Profile.DoesNotExist:
            # Create a new profile if they provided a username
            if len(username) > 3 and len(username) < 36:
                user = User(username=username)
                user.save()
                profile, created = Profile.objects.get_or_create(
                    user=user,
                    identity_pubkey=pubkey)
                assert created is True
                # TODO Auth them in
            else:
                raise Exception("No username provided")
        return user
skl_client.py 文件源码 项目:grpc-kubernetes-skl-tutorial 作者: pprett 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run():
    df = pd.DataFrame(columns=list('abc'), data=pd.np.random.rand(10, 3))
    channel = grpc.insecure_channel('localhost:50051')
    stub = skl_pb2.PredictionServiceStub(channel)
    print("-------------- Predict --------------")
    model_spec = model_pb2.ModelSpec(model_id='123abc')
    req = predict_pb2.PredictionRequest(model_spec=model_spec, input=pandas_to_proto(df))
    pred = stub.Predict(req)
    print(pred)
client.py 文件源码 项目:client-python 作者: bblfsh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, endpoint):
        """
        Initializes a new instance of BblfshClient.

        :param endpoint: The address of the Babelfish server, \
                         for example "0.0.0.0:9432"
        :type endpoint: str
        """
        self._channel = grpc.insecure_channel(endpoint)
        self._stub = ProtocolServiceStub(self._channel)
test_greeter_server.py 文件源码 项目:rules_protobuf 作者: pubref 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self._server = greeter_server._GreeterServer(greeter_server._GreeterService(), TEST_PORT)
        self._server.start()
        channel = grpc.insecure_channel('localhost:{port}'.format(port=TEST_PORT))
        self._client = helloworld_pb2_grpc.GreeterStub(channel)
greeter_client.py 文件源码 项目:rules_protobuf 作者: pubref 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = helloworld_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
    print("Greeter client received: " + response.message)
rpc_client.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def build_transformer(self):
        logger.debug("client: build_transformer, server address: %s", self.server_address)
        if self.is_trans_built:
            logger.debug("client: build_transformer: transformer is already built")
            return
        options = [('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1)]
        channel = grpc.insecure_channel(self.server_address, options=options)
        if not is_channel_ready(channel):
            raise RuntimeError("gRPC channel is not ready...")
        self.RPC = hetr_pb2_grpc.HetrStub(channel)

        if self.close_transformer_response_future is not None:
            response = self.close_transformer_response_future.result()
            if not response.status:
                raise RuntimeError("RPC close_transformer request failed: {}"
                                   .format(response.message))
            self.is_trans_built = False
            self.close_transformer_response_future = None

        response = self.RPC.BuildTransformer(
            hetr_pb2.BuildTransformerRequest(transformer_type=self.transformer_type),
            _TIMEOUT_SECONDS)
        if response.status:
            self.is_trans_built = True
        else:
            self.is_trans_built = False
            raise RuntimeError("RPC build_transformer request failed: {}".format(response.message))
Client.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def run():
  protoConfig = ProtoConfig.getConfig()
  channel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.frontEndPort)
  stub = sensors_pb2.FrontEndStub(channel)

  dbchannel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.daoPort)
  dbstub = dao_pb2.DaoStub(dbchannel)

  lux = getLux(stub)
  print('lux: ', lux)

  temperature = getTemperature(stub)
  print('temperature: ', temperature)

  irButton = getIrButtonPressed(stub)
  print('irButton: ', irButton)

  loudness = getSound(stub)
  print('loudness: ', loudness)

  req = sensors_pb2.GetButtonPressedRequest()
  buttonPressed = stub.GetButtonPressed(req).pressed
  print('buttonPressed', buttonPressed)
  req = sensors_pb2.SetLedStripRequest(
    length=30,
    brightness=100 if buttonPressed else 0,
    speed=5)
  response = stub.SetLedStrip(req)

  req = dao_pb2.SelectRequest(
    table='lux',
    limit=10,
    cols=[
      dao_pb2.RequestCol(name='lux'),
      dao_pb2.RequestCol(name='date')
      ],
    )
  columns = dbstub.Select(req).columns
  print('result %s' % columns)
FrontEndServer.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, protoConfig):
    super().__init__()
    wioChannel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.wioPort)
    self.wioStub = sensors_pb2.WioLinkStub(wioChannel)
    arduinoChannel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.arduinoPort)
    self.arduinoStub = sensors_pb2.ArduinoStub(arduinoChannel)
PushFrontEnd.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, protoConfig):
    super().__init__()
    pushchannel = grpc.insecure_channel('localhost:%s' % protoConfig.ports.pushPort)
    self.pushStub = sensors_pb2.PushStub(pushchannel)
ps_bridge_connection.py 文件源码 项目:relaax 作者: deeplearninc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def stub(self):
        if self._stub is None:
            self._stub = bridge_pb2.BridgeStub(grpc.insecure_channel('%s:%d' % self._server))
            self._init_ten_times()
        return self._stub
metrics_bridge_connection.py 文件源码 项目:relaax 作者: deeplearninc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send(self, method_name, message_factory):
        if self._stub is None:
            self._stub = bridge_pb2.BridgeStub(grpc.insecure_channel('%s:%d' % self._server))
            for _ in range(9):
                method = getattr(self._stub, method_name)
                message = message_factory()
                try:
                    return method(message)
                except grpc.RpcError as e:
                    pass
        return getattr(self._stub, method_name)(message_factory())
test_stream_collector.py 文件源码 项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_stream():
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
    col = MockStreamCollector("MyStreamCollector", 99)
    col.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = StreamCollectorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    metric = snap.Metric(
        namespace=[snap.NamespaceElement(value="intel"),
                   snap.NamespaceElement(value="streaming"),
                   snap.NamespaceElement(value="random"),
                   snap.NamespaceElement(value="int")],
        version=1,
        unit="some unit",
        description="some description")
    mtr = iter([CollectArg(metric).pb])
    metrics = client.StreamMetrics(mtr)
    assert next(metrics).Metrics_Reply.metrics[0].int64_data == 200
    start_waiting_for_new_metric = time.time()
    assert next(metrics).Metrics_Reply.metrics[0].int64_data == 200
    retrieve_metric_time = time.time()
    assert round(retrieve_metric_time - start_waiting_for_new_metric) == 1
    col.stop()


问题


面经


文章

微信
公众号

扫码关注公众号