def __init__(self, _channel, loop=None, executor=None, standalone_pool_for_streaming=False):
"""Constructor.
Args:
_channel: wrapped grpc.Channel
loop: asyncio event loop
executor: a thread pool, or None to use the default pool of the loop
standalone_pool_for_streaming: create a new thread pool (with 1 thread) for each streaming
method
"""
self._channel = _channel
if loop is None:
loop = _asyncio.get_event_loop()
self._loop = loop
self._executor = executor
self._standalone_pool = standalone_pool_for_streaming
self._subscribe_map = {}
python类Channel()的实例源码
def channel_ready_future(channel):
"""Creates a Future that tracks when a Channel is ready.
Cancelling the Future does not affect the channel's state machine.
It merely decouples the Future from channel state machine.
Args:
channel: A Channel object.
Returns:
A Future object that matures when the channel connectivity is
ChannelConnectivity.READY.
"""
fut = channel._loop.create_future()
def _set_result(state):
if not fut.done() and state is _grpc.ChannelConnectivity.READY:
fut.set_result(None)
fut.add_done_callback(lambda f: channel.unsubscribe(_set_result))
channel.subscribe(_set_result, try_to_connect=True)
return fut
def __init__(self, target, options=None, credentials=None):
options = options if options is not None else ()
self.target = target
self.channel = channel = cygrpc.Channel(
_common.encode(target),
_common.channel_args(_options(options)), credentials)
self.managed_call = _channel_managed_call_management(
_ChannelCallState(channel))
self.connectivity_state = _ChannelConnectivityState(channel)
def get_channel(self):
addr = self.select_target()
try:
return self._channels[addr]
except KeyError:
channel = Channel(addr)
self._channels[addr] = channel
return channel
def _retrieve_schema(self):
"""
Retrieve schema from gRPC end-point, and save all *.proto files in
the work directory.
"""
assert isinstance(self.channel, grpc.Channel)
stub = SchemaServiceStub(self.channel)
# try:
schemas = stub.GetSchema(Empty())
# except _Rendezvous, e:
# if e.code == grpc.StatusCode.UNAVAILABLE:
#
# else:
# raise e
os.system('mkdir -p %s' % self.work_dir)
os.system('rm -fr /tmp/%s/*' %
self.work_dir.replace('/tmp/', '')) # safer
for proto_file in schemas.protos:
proto_fname = proto_file.file_name
# TODO: Do we need to process a set of files using a prefix
# instead of just one?
proto_content = proto_file.proto
log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
length=len(proto_content))
with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
f.write(proto_content)
desc_content = decompress(proto_file.descriptor)
desc_fname = proto_fname.replace('.proto', '.desc')
log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
length=len(desc_content))
with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
f.write(desc_content)
return schemas.yang_from
def insecure_channel(target, options=None, *, loop=None, executor=None,
standalone_pool_for_streaming=False):
"""Creates an insecure Channel to a server.
Args:
target: The server address
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
Returns:
A Channel object.
"""
return Channel(_grpc.insecure_channel(target, options), loop, executor, standalone_pool_for_streaming)
def secure_channel(target, credentials, options=None, *, loop=None, executor=None,
standalone_pool_for_streaming=False):
"""Creates a secure Channel to a server.
Args:
target: The server address.
credentials: A ChannelCredentials instance.
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
Returns:
A Channel object.
"""
return Channel(_grpc.secure_channel(target, credentials, options),
loop, executor, standalone_pool_for_streaming)
def secure_authorized_channel(
credentials, request, target, ssl_credentials=None, **kwargs):
"""Creates a secure authorized gRPC channel.
This creates a channel with SSL and :class:`AuthMetadataPlugin`. This
channel can be used to create a stub that can make authorized requests.
Example::
import google.auth
import google.auth.transport.grpc
import google.auth.transport.requests
from google.cloud.speech.v1 import cloud_speech_pb2
# Get credentials.
credentials, _ = google.auth.default()
# Get an HTTP request function to refresh credentials.
request = google.auth.transport.requests.Request()
# Create a channel.
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, 'speech.googleapis.com:443', request)
# Use the channel to create a stub.
cloud_speech.create_Speech_stub(channel)
Args:
credentials (google.auth.credentials.Credentials): The credentials to
add to requests.
request (google.auth.transport.Request): A HTTP transport request
object used to refresh credentials as needed. Even though gRPC
is a separate transport, there's no way to refresh the credentials
without using a standard http transport.
target (str): The host and port of the service.
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
credentials. This can be used to specify different certificates.
kwargs: Additional arguments to pass to :func:`grpc.secure_channel`.
Returns:
grpc.Channel: The created gRPC channel.
"""
# Create the metadata plugin for inserting the authorization header.
metadata_plugin = AuthMetadataPlugin(credentials, request)
# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
if ssl_credentials is None:
ssl_credentials = grpc.ssl_channel_credentials()
# Combine the ssl credentials and the authorization credentials.
composite_credentials = grpc.composite_channel_credentials(
ssl_credentials, google_auth_credentials)
return grpc.secure_channel(target, composite_credentials, **kwargs)