def create_grpc_channel(target, pem=None, opts=None):
"""Construct a grpc channel.
Args:
target: url of target include host:port
pem: ssl/tls pem file as bytes
opts: grpc channel options
grpc.default_authority: default authority
grpc.ssl_target_name_override: ssl target name override
Returns:
grpc channel
"""
if pem is None:
return grpc.insecure_channel(target, opts)
else:
creds = grpc.ssl_channel_credentials(pem)
return grpc.secure_channel(target, creds, opts)
python类ssl_channel_credentials()的实例源码
def _get_secure_creds(self, ca_cert, cert_key=None, cert_cert=None):
cert_key_file = None
cert_cert_file = None
with open(ca_cert, 'rb') as f:
ca_cert_file = f.read()
if cert_key is not None:
with open(cert_key, 'rb') as f:
cert_key_file = f.read()
if cert_cert is not None:
with open(cert_cert, 'rb') as f:
cert_cert_file = f.read()
return grpc.ssl_channel_credentials(
ca_cert_file,
cert_key_file,
cert_cert_file
)
def get_channel(self):
if self.channel is None:
device = self.adapter_agent.get_device(self.device_id)
# read in certificate
try:
with open('/voltha/pki/voltha-CA.pem') as f:
trusted_certs = f.read()
with open('/voltha/pki/voltha.crt') as f:
client_cert = f.read()
with open('/voltha/pki/voltha.key') as f:
client_key = f.read()
except Exception as e:
log.error('failed-to-read-cert-keys', reason=e)
# create credentials
credentials = grpc.ssl_channel_credentials( root_certificates=trusted_certs, private_key=client_key, certificate_chain=client_cert)
# create channel using ssl credentials
my_server_host_override_string = "ABCD" # Server's CN Name, Ugly but no other Choice.
self.channel = grpc.secure_channel(device.host_and_port, credentials, options=(('grpc.ssl_target_name_override', my_server_host_override_string,),))
return self.channel
def create_grpc_channel(target, credentials, ssl_credentials_file=None,
grpc_channel_options=[]):
"""Create and return a gRPC channel.
Args:
credentials(google.oauth2.credentials.Credentials): OAuth2 credentials.
ssl_credentials_file(str): Path to SSL credentials.pem file
(for testing).
grpc_channel_options([(option_name, option_val)]): gRPC channel options.
Returns:
grpc.Channel.
"""
ssl_credentials = None
if ssl_credentials_file:
with open(ssl_credentials_file) as f:
ssl_credentials = grpc.ssl_channel_credentials(f.read())
http_request = google.auth.transport.requests.Request()
# TODO(proppy): figure out if/why we need to force a refresh.
# if yes, consider remove access token from serialized credentials.
credentials.refresh(http_request)
return google.auth.transport.grpc.secure_authorized_channel(
credentials, http_request, target,
ssl_credentials=ssl_credentials,
options=grpc_channel_options)
def __init__(self, rpc_port):
self.port = rpc_port
cred = grpc.ssl_channel_credentials(open('tls.cert').read())
channel = grpc.secure_channel('localhost:{}'.format(rpc_port), cred)
self.stub = lnrpc_grpc.LightningStub(channel)
def getGRPCChannel(ipAddress, port, root_certificates, ssl_target_name_override):
# channel = grpc.insecure_channel("{0}:{1}".format(ipAddress, 7051), options = [('grpc.max_message_length', 100*1024*1024)])
# creds = grpc.ssl_channel_credentials(root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain)
creds = grpc.ssl_channel_credentials(root_certificates=root_certificates)
channel = grpc.secure_channel("{0}:{1}".format(ipAddress, port), creds,
options=(('grpc.ssl_target_name_override', ssl_target_name_override,),('grpc.default_authority', ssl_target_name_override,),('grpc.max_receive_message_length', 100*1024*1024)))
# print("Returning GRPC for address: {0}".format(ipAddress))
return channel
def run():
creds = grpc.ssl_channel_credentials(root_certificates=open('/certs/cert.pem', 'rb').read())
channel = grpc.secure_channel('lb:50051', creds)
stub = helloworld_pb2_grpc.GreeterStub(channel)
while True:
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
logger.error("Greeter client received: " + response.message)
time.sleep(3)
except Exception as e:
logger.error('Could not connect load-balancer. error {}'.format(e))
time.sleep(3)
def run():
# read in certificate
with open('server.crt') as f:
trusted_certs = f.read().encode()
# create credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)
try:
grpc.channel_ready_future(channel).result(timeout=10)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = users_service.UsersStub(channel)
metadata = [('ip', '127.0.0.1')]
try:
response = stub.CreateUser(
users_messages.CreateUserRequest(username='tom'),
metadata=metadata,
)
except grpc.RpcError as e:
print('CreateUser failed with {0}: {1}'.format(e.code(), e.details()))
else:
print("User created:", response.user.username)
request = users_messages.GetUsersRequest(
user=[users_messages.User(username="alexa", user_id=1),
users_messages.User(username="christie", user_id=1)]
)
response = stub.GetUsers(request)
for resp in response:
print(resp)
def secure_authorized_channel(
credentials, request, target, ssl_credentials=None, **kwargs):
"""Creates a secure authorized gRPC channel.
This creates a channel with SSL and :class:`AuthMetadataPlugin`. This
channel can be used to create a stub that can make authorized requests.
Example::
import google.auth
import google.auth.transport.grpc
import google.auth.transport.requests
from google.cloud.speech.v1 import cloud_speech_pb2
# Get credentials.
credentials, _ = google.auth.default()
# Get an HTTP request function to refresh credentials.
request = google.auth.transport.requests.Request()
# Create a channel.
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, 'speech.googleapis.com:443', request)
# Use the channel to create a stub.
cloud_speech.create_Speech_stub(channel)
Args:
credentials (google.auth.credentials.Credentials): The credentials to
add to requests.
request (google.auth.transport.Request): A HTTP transport request
object used to refresh credentials as needed. Even though gRPC
is a separate transport, there's no way to refresh the credentials
without using a standard http transport.
target (str): The host and port of the service.
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
credentials. This can be used to specify different certificates.
kwargs: Additional arguments to pass to :func:`grpc.secure_channel`.
Returns:
grpc.Channel: The created gRPC channel.
"""
# Create the metadata plugin for inserting the authorization header.
metadata_plugin = AuthMetadataPlugin(credentials, request)
# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
if ssl_credentials is None:
ssl_credentials = grpc.ssl_channel_credentials()
# Combine the ssl credentials and the authorization credentials.
composite_credentials = grpc.composite_channel_credentials(
ssl_credentials, google_auth_credentials)
return grpc.secure_channel(target, composite_credentials, **kwargs)