def client_establish_socket(self) -> None:
"""Establish IPC client."""
try:
phase("Waiting for connection to NH", offset=11)
while True:
try:
socket_number = TXM_DD_LISTEN_SOCKET if self.settings.data_diode_sockets else NH_LISTEN_SOCKET
self.interface = multiprocessing.connection.Client(('localhost', socket_number))
phase("Established", done=True)
break
except socket.error:
time.sleep(0.1)
except KeyboardInterrupt:
graceful_exit()
评论列表
文章目录