def create_grpc_channel(target, pem=None, opts=None):
"""Construct a grpc channel.
Args:
target: url of target include host:port
pem: ssl/tls pem file as bytes
opts: grpc channel options
grpc.default_authority: default authority
grpc.ssl_target_name_override: ssl target name override
Returns:
grpc channel
"""
if pem is None:
return grpc.insecure_channel(target, opts)
else:
creds = grpc.ssl_channel_credentials(pem)
return grpc.secure_channel(target, creds, opts)
python类secure_channel()的实例源码
def get_channel(self):
if self.channel is None:
device = self.adapter_agent.get_device(self.device_id)
# read in certificate
try:
with open('/voltha/pki/voltha-CA.pem') as f:
trusted_certs = f.read()
with open('/voltha/pki/voltha.crt') as f:
client_cert = f.read()
with open('/voltha/pki/voltha.key') as f:
client_key = f.read()
except Exception as e:
log.error('failed-to-read-cert-keys', reason=e)
# create credentials
credentials = grpc.ssl_channel_credentials( root_certificates=trusted_certs, private_key=client_key, certificate_chain=client_cert)
# create channel using ssl credentials
my_server_host_override_string = "ABCD" # Server's CN Name, Ugly but no other Choice.
self.channel = grpc.secure_channel(device.host_and_port, credentials, options=(('grpc.ssl_target_name_override', my_server_host_override_string,),))
return self.channel
cisco_grpc_client.py 文件源码
项目:ios-xr-grpc-python
作者: cisco-grpc-connection-libs
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self, host, port, timeout, user, password, creds=None, options=None):
""":param user: Username for device login
:param password: Password for device login
:param host: The ip address for the device
:param port: The port for the device
:param timeout: how long before the rpc call timesout
:param creds: Input of the pem file
:param options: TLS server name
:type password: str
:type user: str
:type host: str
:type port: int
:type timeout:int
:type creds: str
:type options: str
"""
if creds != None:
self._target = '%s:%d' % (host, port)
self._creds = implementations.ssl_channel_credentials(creds)
self._options = options
channel = grpc.secure_channel(
self._target, self._creds, (('grpc.ssl_target_name_override', self._options,),))
self._channel = implementations.Channel(channel)
else:
self._host = host
self._port = port
self._channel = implementations.insecure_channel(self._host, self._port)
self._stub = ems_grpc_pb2.beta_create_gRPCConfigOper_stub(self._channel)
self._timeout = float(timeout)
self._metadata = [('username', user), ('password', password)]
def __init__(self, rpc_port):
self.port = rpc_port
cred = grpc.ssl_channel_credentials(open('tls.cert').read())
channel = grpc.secure_channel('localhost:{}'.format(rpc_port), cred)
self.stub = lnrpc_grpc.LightningStub(channel)
def getGRPCChannel(ipAddress, port, root_certificates, ssl_target_name_override):
# channel = grpc.insecure_channel("{0}:{1}".format(ipAddress, 7051), options = [('grpc.max_message_length', 100*1024*1024)])
# creds = grpc.ssl_channel_credentials(root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain)
creds = grpc.ssl_channel_credentials(root_certificates=root_certificates)
channel = grpc.secure_channel("{0}:{1}".format(ipAddress, port), creds,
options=(('grpc.ssl_target_name_override', ssl_target_name_override,),('grpc.default_authority', ssl_target_name_override,),('grpc.max_receive_message_length', 100*1024*1024)))
# print("Returning GRPC for address: {0}".format(ipAddress))
return channel
def run():
creds = grpc.ssl_channel_credentials(root_certificates=open('/certs/cert.pem', 'rb').read())
channel = grpc.secure_channel('lb:50051', creds)
stub = helloworld_pb2_grpc.GreeterStub(channel)
while True:
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
logger.error("Greeter client received: " + response.message)
time.sleep(3)
except Exception as e:
logger.error('Could not connect load-balancer. error {}'.format(e))
time.sleep(3)
def secure_channel(target, credentials, options=None, *, loop=None, executor=None,
standalone_pool_for_streaming=False):
"""Creates a secure Channel to a server.
Args:
target: The server address.
credentials: A ChannelCredentials instance.
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
Returns:
A Channel object.
"""
return Channel(_grpc.secure_channel(target, credentials, options),
loop, executor, standalone_pool_for_streaming)
def run():
# read in certificate
with open('server.crt') as f:
trusted_certs = f.read().encode()
# create credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)
try:
grpc.channel_ready_future(channel).result(timeout=10)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = users_service.UsersStub(channel)
metadata = [('ip', '127.0.0.1')]
try:
response = stub.CreateUser(
users_messages.CreateUserRequest(username='tom'),
metadata=metadata,
)
except grpc.RpcError as e:
print('CreateUser failed with {0}: {1}'.format(e.code(), e.details()))
else:
print("User created:", response.user.username)
request = users_messages.GetUsersRequest(
user=[users_messages.User(username="alexa", user_id=1),
users_messages.User(username="christie", user_id=1)]
)
response = stub.GetUsers(request)
for resp in response:
print(resp)
def __init__(self, host='localhost', port=2379,
ca_cert=None, cert_key=None, cert_cert=None, timeout=None,
user=None, password=None):
self._url = '{host}:{port}'.format(host=host, port=port)
cert_params = [c is not None for c in (cert_cert, cert_key)]
if ca_cert is not None:
if all(cert_params):
credentials = self._get_secure_creds(
ca_cert,
cert_key,
cert_cert
)
self.uses_secure_channel = True
self.channel = grpc.secure_channel(self._url, credentials)
elif any(cert_params):
# some of the cert parameters are set
raise ValueError(
'to use a secure channel ca_cert is required by itself, '
'or cert_cert and cert_key must both be specified.')
else:
credentials = self._get_secure_creds(ca_cert, None, None)
self.uses_secure_channel = True
self.channel = grpc.secure_channel(self._url, credentials)
else:
self.uses_secure_channel = False
self.channel = grpc.insecure_channel(self._url)
self.timeout = timeout
self.call_credentials = None
cred_params = [c is not None for c in (user, password)]
if all(cred_params):
self.auth_stub = etcdrpc.AuthStub(self.channel)
auth_request = etcdrpc.AuthenticateRequest(
name=user,
password=password
)
resp = self.auth_stub.Authenticate(auth_request, self.timeout)
self.call_credentials = grpc.metadata_call_credentials(
EtcdTokenCallCredentials(resp.token))
elif any(cred_params):
raise Exception(
'if using authentication credentials both user and password '
'must be specified.'
)
self.kvstub = etcdrpc.KVStub(self.channel)
self.watcher = watch.Watcher(
etcdrpc.WatchStub(self.channel),
timeout=self.timeout,
call_credentials=self.call_credentials,
)
self.clusterstub = etcdrpc.ClusterStub(self.channel)
self.leasestub = etcdrpc.LeaseStub(self.channel)
self.maintenancestub = etcdrpc.MaintenanceStub(self.channel)
self.transactions = Transactions()
def secure_authorized_channel(
credentials, request, target, ssl_credentials=None, **kwargs):
"""Creates a secure authorized gRPC channel.
This creates a channel with SSL and :class:`AuthMetadataPlugin`. This
channel can be used to create a stub that can make authorized requests.
Example::
import google.auth
import google.auth.transport.grpc
import google.auth.transport.requests
from google.cloud.speech.v1 import cloud_speech_pb2
# Get credentials.
credentials, _ = google.auth.default()
# Get an HTTP request function to refresh credentials.
request = google.auth.transport.requests.Request()
# Create a channel.
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, 'speech.googleapis.com:443', request)
# Use the channel to create a stub.
cloud_speech.create_Speech_stub(channel)
Args:
credentials (google.auth.credentials.Credentials): The credentials to
add to requests.
request (google.auth.transport.Request): A HTTP transport request
object used to refresh credentials as needed. Even though gRPC
is a separate transport, there's no way to refresh the credentials
without using a standard http transport.
target (str): The host and port of the service.
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
credentials. This can be used to specify different certificates.
kwargs: Additional arguments to pass to :func:`grpc.secure_channel`.
Returns:
grpc.Channel: The created gRPC channel.
"""
# Create the metadata plugin for inserting the authorization header.
metadata_plugin = AuthMetadataPlugin(credentials, request)
# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
if ssl_credentials is None:
ssl_credentials = grpc.ssl_channel_credentials()
# Combine the ssl credentials and the authorization credentials.
composite_credentials = grpc.composite_channel_credentials(
ssl_credentials, google_auth_credentials)
return grpc.secure_channel(target, composite_credentials, **kwargs)