def __init__(self, bind, ps_factory):
self.server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=1))
bridge_pb2.add_BridgeServicer_to_server(Servicer(ps_factory), self.server)
self.server.add_insecure_port('%s:%d' % bind)
python类server()的实例源码
def start(self):
self.server.start()
def __init__(self):
self.meta = None
self.proxy = None
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self._port = 0
self._last_ping = time.time()
self._shutting_down = False
self._monitor = None
self._mode = PluginMode.normal
self._config = {}
self._flags = _Flags()
self.standalone_server = None
# init argparse module and add arguments
self._parser = argparse.ArgumentParser(description="%(prog)s - a Snap framework plugin.",
usage="%(prog)s [options]",
formatter_class=lambda prog:
argparse.HelpFormatter(prog, max_help_position=30))
self._parser.add_argument("framework_config", nargs="?", default=None, help=argparse.SUPPRESS)
flags = [
("config", FlagType.value, "JSON Snap global config"),
("port", FlagType.value, "GRPC server port"),
("stand-alone", FlagType.toggle, "enable stand alone mode"),
("stand-alone-port", FlagType.value, "http port for stand alone mode", 8182),
Flag("log-level", FlagType.value, "logging level 0:panic - 5:debug", 3, json_name="LogLevel"),
Flag("tls", FlagType.toggle, "enable tls", json_name="TLSEnabled"),
Flag("root-cert-paths", FlagType.value, "paths to root certificate; delimited by ':'", json_name="RootCertPaths"),
Flag("key-path", FlagType.value, "path to server private key", json_name="KeyPath"),
Flag("cert-path", FlagType.value, "path to server certificate", json_name="CertPath"),
]
self._flags.add_multiple(flags)
def stop_plugin(self):
"""Stops the plugin"""
LOG.debug("plugin stopping")
self._shutting_down = True
_stop_event = self.server.stop(0)
while not _stop_event.is_set():
time.sleep(.1)
LOG.debug("plugin stopped")
def _generate_preamble_and_serve(self):
if self._config.get("TLSEnabled", False) == True:
try:
self._tls_setup()
credentials = self._generate_tls_credentials()
self._port = self.server.add_secure_port('127.0.0.1:{}'.format(self._port), credentials)
LOG.info("Configured secure port on {}.".format(self._port))
except Exception as e:
raise Exception("TLS setup failed. Unable to add secure port. {}".format(str(e)))
else:
self._port = self.server.add_insecure_port('127.0.0.1:{}'.format(self._port))
LOG.info("Configured insecure port on {}.".format(self._port))
self.server.start()
return json.dumps(
{
"Meta": {
"Name": self.meta.name,
"Version": self.meta.version,
"Type": self.meta.type,
"RPCType": self.meta.rpc_type,
"RPCVersion": self.meta.rpc_version,
"ConcurrencyCount": self.meta.concurrency_count,
"Exclusive": self.meta.exclusive,
"CacheTTL": self.meta.cache_ttl,
"RoutingStrategy": self.meta.routing_strategy,
"RootCertPaths": self.meta.root_cert_paths,
"CertPath": self.meta.server_cert_path,
"KeyPath": self.meta.private_key_path,
"Ciphers": self.meta.cipher_suites,
"TLSEnabled": self._config.get("TLSEnabled"),
},
"ListenAddress": "127.0.0.1:{!s}".format(self._port),
"Token": None,
"PublicKey": None,
"Type": self.meta.type,
"ErrorMessage": None,
"State": PluginResponseState.plugin_success,
},
cls=_EnumEncoder
) + "\n"
def run(self, conn):
logging.debug("Container run...")
if self._type == ServerType.GRPC:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=conf.MAX_WORKERS))
loopchain_pb2_grpc.add_ContainerServicer_to_server(self, server)
server.add_insecure_port('[::]:' + str(self._port))
elif self._type == ServerType.REST_PEER:
server = RestServer(self._port, self._peer_ip)
else:
server = RestServerRS(self._port)
server.start()
command = None
while command != "quit":
try:
command, param = conn.recv() # Queue ? ??? ??? ??? ??? ?? ??. ??? Sleep ? ?? ??.
logging.debug("Container got: " + str(param))
except Exception as e:
logging.warning("Container conn.recv() error: " + str(e))
if self._type == ServerType.GRPC:
server.stop(0)
else:
server.stop()
logging.info("Server Container Ended.")
def start_server(self, server, listen_address):
server.add_insecure_port(listen_address)
server.start()
logging.info("Server now listen: " + listen_address)
def run(self):
self.start_server(self.outer_server, '[::]:' + str(self.__port))
# Bind Only loopback address (ip4) - TODO IP6
self.start_server(self.inner_server, conf.INNER_SERVER_BIND_IP + ':' + str(self.__inner_service_port))
# Block Generator ? subscribe ?? ?? Block Generator ? peer ? channel ??? ????.
# ??? peer ? gRPC ??? ??? ??? ? Block Generator ? subscribe ??? ??? ??.
time.sleep(conf.WAIT_GRPC_SERVICE_START)
try:
while self.is_run():
self.__run_loop_functions()
time.sleep(conf.SLEEP_SECONDS_IN_SERVICE_NONE)
except KeyboardInterrupt:
logging.info("Server Stop by KeyboardInterrupt")
finally:
while not self.__subscriptions.empty():
channel, subscribe_stub = self.__subscriptions.get()
logging.info(f"Un subscribe to channel({channel}) server({subscribe_stub.target})")
self.__un_subscribe(channel, self.__port, subscribe_stub)
self.__stop_broadcast_process()
if self.__inner_service_port is not None:
self.inner_server.stop(0)
self.outer_server.stop(0)
logging.info("Server thread Ended.")
def receive_forever():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=int(get_config('system', 'server_threads', 5))))
messages_pb2_grpc.add_RegistrationServiceServicer_to_server(RegistrationAgent(), server)
binding = '[::]:%s' % get_config('messaging', 'bind_port', 50051)
logger.info("Binding rpc registration server to: %s" % binding)
server.add_insecure_port(binding)
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
logger.debug("Stopping server")
server.stop(True)
def run_enqueue_server(episodes):
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
model_pb2.add_ModelServicer_to_server(EnqueueServer(episodes), grpc_server)
grpc_server.add_insecure_port("[::]:%d" % opts.trainer_port)
grpc_server.start()
while True:
time.sleep(10)
def __init__(self, port=None):
super(LocationServer, self).__init__(port=port, desc="location server")
self.grpc_server = None
def _start_server(self):
logger.info("Starting gRPC {0} listening on {1}".format(self.desc, self.hostname))
self.grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_LocationServiceServicer_to_server(self, self.grpc_server)
self.grpc_server.add_insecure_port(self.hostname)
self.grpc_server.start()
try:
while not self.stopped:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
self.stop()
def _run_server(port):
server = LocationServer(port).start()
for i in range(100):
server.write_location(x=i, y=i + 1, width=i + 2, height=i + 3, middle_inc=i + 4)
time.sleep(1)
def __init__(self, app):
self.app = app
self.setup_logger()
self.workers = self.app.config.get('GRPC_WORKERS')
self.host = self.app.config.get('GRPC_HOST')
self.port = self.app.config.get('GRPC_PORT')
self.server = grpc.server(
futures.ThreadPoolExecutor(
max_workers=self.workers))
self.server.add_insecure_port(
'{}:{}'.format(self.host, self.port))
self._stopped = False
def run(self):
"""run the server
"""
for name, (add_func, servicer) in self.app.servicers.items():
add_func(servicer(), self.server)
self.server.start()
started.send(self)
self.register_signal()
while not self._stopped:
time.sleep(1)
stopped.send(self)
return True
def _stop_handler(self, signum, frame):
self.server.stop(0)
self._stopped = True
def serve():
parser = argparse.ArgumentParser()
parser.add_argument(
'--log_payloads',
action='store_true',
help='log request/response objects to open-tracing spans')
args = parser.parse_args()
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='trivial-server')
tracer = config.initialize_tracer()
tracer_interceptor = open_tracing_server_interceptor(
tracer, log_payloads=args.log_payloads)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = intercept_server(server, tracer_interceptor)
command_line_pb2.add_CommandLineServicer_to_server(CommandLine(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
time.sleep(2)
tracer.close()
time.sleep(2)
def serve():
parser = argparse.ArgumentParser()
parser.add_argument(
'--log_payloads',
action='store_true',
help='log request/response objects to open-tracing spans')
args = parser.parse_args()
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='integration-server')
tracer = config.initialize_tracer()
tracer_interceptor = open_tracing_server_interceptor(
tracer, log_payloads=args.log_payloads)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = intercept_server(server, tracer_interceptor)
command_line_pb2.add_CommandLineServicer_to_server(
CommandLine(tracer), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
time.sleep(2)
tracer.close()
time.sleep(2)
def __init__(self,
client_interceptors,
server_interceptors,
handler=Handler()):
self.handler = handler
self._server_pool = logging_pool.pool(2)
self._server = grpcext.intercept_server(
grpc.server(self._server_pool), *server_interceptors)
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self.handler),))
self._server.start()
self.channel = grpcext.intercept_channel(
grpc.insecure_channel('localhost:%d' % port), *client_interceptors)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)