def create_grpc_channel(target, credentials, ssl_credentials_file=None,
grpc_channel_options=[]):
"""Create and return a gRPC channel.
Args:
credentials(google.oauth2.credentials.Credentials): OAuth2 credentials.
ssl_credentials_file(str): Path to SSL credentials.pem file
(for testing).
grpc_channel_options([(option_name, option_val)]): gRPC channel options.
Returns:
grpc.Channel.
"""
ssl_credentials = None
if ssl_credentials_file:
with open(ssl_credentials_file) as f:
ssl_credentials = grpc.ssl_channel_credentials(f.read())
http_request = google.auth.transport.requests.Request()
# TODO(proppy): figure out if/why we need to force a refresh.
# if yes, consider remove access token from serialized credentials.
credentials.refresh(http_request)
return google.auth.transport.grpc.secure_authorized_channel(
credentials, http_request, target,
ssl_credentials=ssl_credentials,
options=grpc_channel_options)
评论列表
文章目录