def run():
channel = grpc.insecure_channel('localhost:50055')
# Test fetch the schema
stub = schema_pb2.SchemaServiceStub(channel)
res = stub.GetSchema(Empty())
print '\nSchema:\n'
for key in res.protos:
print '%s %s file begins %s\n' % (30 * '~', key, (35 - len(key)) * '~')
print res.protos[key]
print '%s %s file ends %s' % (30 * '~', key, (37 - len(key)) * '~')
for key in res.descriptors:
print '%s -> descriptor of %d bytes' % (key, len(res.descriptors[key]))
# Ping health state as an example
stub = voltha_pb2.HealthServiceStub(channel)
res = stub.GetHealthStatus(Empty())
print '\nHealth state:', res.state
# Try another API
stub = voltha_pb2.ExampleServiceStub(channel)
res = stub.ListAddresses(Empty())
print '\nExample objects returned:\n', res.addresses
评论列表
文章目录