def loop(self):
"""Main server loop for accepting connections. Better call it on its own thread"""
while True:
try:
(csock, (ipaddr, port)) = self.connection["sock"].accept()
self._log("L", "New connection from %s:%s" % (str(ipaddr),
str(port)))
except sock_error:
raise sock_error
try:
csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt",
keyfile="server.key",
ssl_version=ssl.PROTOCOL_TLSv1_2)
except AttributeError: # All PROTOCOL consts are merged on TLS in Python2.7.13
csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt",
keyfile="server.key",
ssl_version=ssl.PROTOCOL_TLS)
self.clients["hosts"][str(self.clients["serial"])] = Host(csock, ipaddr, port,
self.clients["serial"])
self.clients["serial"] += 1
python类PROTOCOL_TLSv1_2()的实例源码
def _get_ssl_options(cls, cert_options):
ssl_options = {}
if cert_options['validate_cert']:
ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
if cert_options['ca_certs'] is not None:
ssl_options["ca_certs"] = cert_options['ca_certs']
else:
ssl_options["ca_certs"] = _default_ca_certs()
if cert_options['client_key'] is not None:
ssl_options["keyfile"] = cert_options['client_key']
if cert_options['client_cert'] is not None:
ssl_options["certfile"] = cert_options['client_cert']
# according to REC 7540:
# deployments of HTTP/2 that use TLS 1.2 MUST
# support TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
ssl_options["ciphers"] = "ECDH+AESGCM"
ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1_2
ssl_options = ssl_options_to_context(ssl_options)
ssl_options.set_alpn_protocols(['h2'])
return ssl_options
def _get_ssl_options(cls, cert_options):
ssl_options = {}
if cert_options['validate_cert']:
ssl_options["cert_reqs"] = ssl.CERT_REQUIRED
if cert_options['ca_certs'] is not None:
ssl_options["ca_certs"] = cert_options['ca_certs']
else:
ssl_options["ca_certs"] = simple_httpclient._default_ca_certs()
if cert_options['client_key'] is not None:
ssl_options["keyfile"] = cert_options['client_key']
if cert_options['client_cert'] is not None:
ssl_options["certfile"] = cert_options['client_cert']
# according to REC 7540:
# deployments of HTTP/2 that use TLS 1.2 MUST
# support TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
ssl_options["ciphers"] = "ECDH+AESGCM"
ssl_options["ssl_version"] = ssl.PROTOCOL_TLSv1_2
ssl_options = netutil.ssl_options_to_context(ssl_options)
ssl_options.set_alpn_protocols(['h2'])
return ssl_options
def __init__(self, listen_host, listen_port, server_key, server_cert, ca_cert, image_path, image_types, exec_handlers, static_paths):
websockets = WebSockets()
wm = pyinotify.WatchManager()
inotify_handler = INotifyHandler(websockets)
self._notifier = pyinotify.Notifier(wm, inotify_handler)
for image_type in image_types:
type_path = os.path.join(image_path, image_type)
wm.add_watch(type_path, pyinotify.IN_MOVED_TO)
exec_handlers = dict(x.split('=', 1) for x in (exec_handlers or []))
static_paths = dict(x.split('=', 1) for x in (static_paths or []))
http_handler = HTTPRequestHandler(image_path, image_types, exec_handlers, static_paths, websockets)
self._httpd = geventserver.WSGIServer(
(listen_host, listen_port),
http_handler,
keyfile=server_key,
certfile=server_cert,
ca_certs=ca_cert,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1_2)
def get_ssl_version(sock):
# Seth behaves differently depeding on the TLS protocol
# https://bugs.python.org/issue31453
# This is an ugly hack (as if the rest of this wasn't...)
versions = [
ssl.PROTOCOL_TLSv1,
ssl.PROTOCOL_TLSv1_1,
ssl.PROTOCOL_TLSv1_2,
]
firstbytes = sock.recv(16, socket.MSG_PEEK)
try:
return versions[firstbytes[10]-1]
except IndexError:
print("Unexpected SSL version: %s" % hexlify(firstbytes))
return versions[-1]
# def launch_rdp_client():
# time.sleep(1)
# p = subprocess.Popen(
# ["xfreerdp",
# "/v:%s:%d" % (args.bind_ip, consts.RELAY_PORT),
# "/u:%s\\%s" % (domain, user),
# ],
# )
def connect(self):
time.sleep(random.randrange(0, 2**self.connection_attempts))
self.connection_attempts += 1
# websocket.enableTrace(True)
ws = websocket.WebSocketApp(self.config_server_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
ws.on_open = self.on_open
if self.config_server_url.startswith("wss://"):
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": ca_cert,
"ssl_version": ssl.PROTOCOL_TLSv1_2,
"keyfile": client_pem,
"certfile": client_crt})
else:
ws.run_forever()
def main():
# websocket.enableTrace(True)
if len(sys.argv) < 2:
host = ws_url
else:
host = sys.argv[1]
ws = websocket.WebSocketApp(host,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
if host.startswith("wss://"):
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": ca_cert,
"ssl_version": ssl.PROTOCOL_TLSv1_2,
"keyfile": client_pem,
"certfile": client_crt})
else:
ws.run_forever()
def run(self):
""" Start the RPyC server
"""
if self.certfile is not None and self.keyfile is not None:
authenticator = SSLAuthenticator(self.certfile, self.keyfile)
self.server = ThreadedServer(
self.serviceClass,
hostname=self.host,
port=self.port,
protocol_config={'allow_all_attrs': True},
authenticator=authenticator,
cert_reqs=ssl.CERT_REQUIRED,
ciphers='EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH',
ssl_version=ssl.PROTOCOL_TLSv1_2)
else:
self.server = ThreadedServer(
self.serviceClass,
hostname=self.host,
port=self.port,
protocol_config={'allow_all_attrs': True})
self.server.start()
def parse_authorized_certs():
if os.path.isfile(AUTHORIZED_CERTS_PATH):
with open(AUTHORIZED_CERTS_PATH, 'r') as f:
try:
authorized_certs = b''.join([base64.b64decode(line.split(' ')[1])
for line in f.readlines() if len(line.split(' ')) == 2])
# testing if valid certificates
ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2).load_verify_locations(cadata=authorized_certs)
return authorized_certs
except Exception as e:
print_with_timestamp('Corrupted authorized_certs file: {}'.format(e))
print_with_timestamp('Please look at authorized_certs and '
'search for obvious errors located at {}'.format(AUTHORIZED_CERTS_PATH))
print_with_timestamp('Or delete the file altogether, '
'but then you would have to pair your phone(s) again')
return b''
else:
return b''
def startSocket():
bindsocket = socket.socket()
bindsocket.bind(('localhost', 12345))
bindsocket.listen(1)
while True:
newsocket, fromaddr = bindsocket.accept()
connstream = ssl.wrap_socket(newsocket,
server_side=True,
certfile=certificate,
keyfile=key, ssl_version = ssl.PROTOCOL_TLSv1_2)
try:
data = deal_with_client(connstream)
finally:
connstream.shutdown(socket.SHUT_RDWR)
connstream.close()
newsocket.close()
break
writeMailTofile(data)
checkMail(data)
def test_ipv4_secure_websocket_connection_by_router_instance(
self, config_path, router
):
try:
ssl.PROTOCOL_TLSv1_2
except AttributeError:
pytest.skip('Python Environment does not support TLS')
with DateService(router=router) as service:
wait_for_registrations(service, 1)
client = Client(router=router)
with client:
wait_for_session(client)
result = client.rpc.get_todays_date()
today = date.today()
assert result == today.isoformat()
def test_ipv4_secure_websocket_connection_by_router_url(self, router):
assert router.url == "wss://localhost:9443"
try:
ssl.PROTOCOL_TLSv1_2
except AttributeError:
pytest.skip('Python Environment does not support TLS')
with DateService(
url="wss://localhost:9443",
cert_path="./wampy/testing/keys/server_cert.pem",
) as service:
wait_for_registrations(service, 1)
client = Client(
url="wss://localhost:9443",
cert_path="./wampy/testing/keys/server_cert.pem",
)
with client:
wait_for_session(client)
result = client.rpc.get_todays_date()
today = date.today()
assert result == today.isoformat()
def set_host(self, **kwargs):
""" Set host to connect to.
:param kwargs: Host Configuration
:type kwargs: dict
"""
user = kwargs["user"]
self._status_topic = "{}agents/{}".format(self._base, user)
self.client.reinitialise(client_id=user,
clean_session=self._clean_session)
self.client.username_pw_set(username=user, password=kwargs["password"])
self.client.will_set(self._status_topic, payload=b'\x00'.decode(),
qos=2, retain=True)
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
if "ca" in kwargs:
self.client.tls_set(ca_certs=kwargs["ca"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None)
self.client.connect_async(kwargs["host"], kwargs["port"],
self._keepalive // 1000)
def run(self):
global httpd, ui_player
print('starting server...')
server_address = ('', self.port)
server_start = False
try:
httpd = ThreadedHTTPServer(server_address, testHTTPServer_RequestHandler)
if self.https_allow and self.cert_file:
if os.path.exists(self.cert_file):
httpd.socket = ssl.wrap_socket(
httpd.socket, certfile=self.cert_file,
ssl_version=ssl.PROTOCOL_TLSv1_2)
server_start = True
except:
txt = 'Your local IP changed..or port is blocked\n..Trying to find new IP'
send_notification(txt)
self.ip = get_ip()
txt = 'Your New Address is '+self.ip + '\n Please restart the player'
send_notification(txt)
change_config_file(self.ip, self.port)
server_address = (self.ip, self.port)
self.ui.local_ip = self.ip
#httpd = ThreadedHTTPServer(server_address, testHTTPServer_RequestHandler)
if server_start:
print('running server...at..'+self.ip+':'+str(self.port))
httpd.serve_forever()
else:
print('server..not..started..')
def test_tls1_2_enabled(self):
self._connect_socket(ssl_version=ssl.PROTOCOL_TLSv1_2)
def test_good_cipher(self):
self._connect_socket(ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers='ECDHE-RSA-AES128-GCM-SHA256')
def test_bad_ciphers(self):
bad_ciphers = ['DH+3DES', 'ADH', 'AECDH', 'RC4', 'aNULL', 'MD5']
for bad_cipher in bad_ciphers:
self.assertRaises(socket.error,
self._connect_socket,
ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers=bad_cipher)
def on_message(client, userdata, msg):
print "Alert..."
print (msg.topic+" " +str(msg.payload))
#client.on_connect = on_connect
#client.on_message = on_message
#client.tls_insecure_set(True)
#client.tls_set("airmap.io.crt",cert_reqs=ssl.CERT_NONE,tls_version=ssl.PROTOCOL_TLSv1_2)
#client.tls_insecure_set(True)
def test_protocol(self):
suite = auth.TLS12AuthenticationSuite()
protocol = suite.protocol
self.assertIsInstance(protocol, int)
self.assertEqual(ssl.PROTOCOL_TLSv1_2, suite.protocol)
def __init__(self):
"""
Create a TLS12AuthenticationSuite object.
"""
super(TLS12AuthenticationSuite, self).__init__()
self._protocol = ssl.PROTOCOL_TLSv1_2
self._ciphers = ':'.join((
'AES128-SHA256',
'AES256-SHA256',
'DH-DSS-AES256-SHA256',
'DH-DSS-AES128-SHA256',
'DH-RSA-AES128-SHA256',
'DHE-DSS-AES128-SHA256',
'DHE-RSA-AES128-SHA256',
'DH-DSS-AES256-SHA256',
'DH-RSA-AES256-SHA256',
'DHE-DSS-AES256-SHA256',
'DHE-RSA-AES256-SHA256',
'ECDH-ECDSA-AES128-SHA256',
'ECDH-ECDSA-AES256-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
'ECDH-RSA-AES128-SHA256',
'ECDH-RSA-AES256-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
))
def set_args(self, args):
DaneChecker.set_args(self, args)
sslcontext = SSLContext(PROTOCOL_TLSv1_2)
sslcontext.verify_mode = CERT_REQUIRED
sslcontext.load_verify_locations(args.castore)
self._sslcontext = sslcontext
def test_protocol(self):
suite = auth.TLS12AuthenticationSuite()
protocol = suite.protocol
self.assertIsInstance(protocol, int)
self.assertEqual(ssl.PROTOCOL_TLSv1_2, suite.protocol)
def __init__(self):
"""
Create a TLS12AuthenticationSuite object.
"""
super(TLS12AuthenticationSuite, self).__init__()
self._protocol = ssl.PROTOCOL_TLSv1_2
self._ciphers = ':'.join((
'AES128-SHA256',
'AES256-SHA256',
'DH-DSS-AES256-SHA256',
'DH-DSS-AES128-SHA256',
'DH-RSA-AES128-SHA256',
'DHE-DSS-AES128-SHA256',
'DHE-RSA-AES128-SHA256',
'DH-DSS-AES256-SHA256',
'DH-RSA-AES256-SHA256',
'DHE-DSS-AES256-SHA256',
'DHE-RSA-AES256-SHA256',
'ECDH-ECDSA-AES128-SHA256',
'ECDH-ECDSA-AES256-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
'ECDH-RSA-AES128-SHA256',
'ECDH-RSA-AES256-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
))
def init():
if os.environ.get('XDG_CONFIG_HOME') is None or os.environ.get('XDG_CONFIG_HOME') == '':
XDG_CONFIG_HOME = os.path.join(os.path.expanduser('~'), '.config')
else:
XDG_CONFIG_HOME = os.environ.get('XDG_CONFIG_HOME')
CONF_DIR_PATH = os.path.join(XDG_CONFIG_HOME, 'an2linux')
CONF_FILE_PATH = os.path.join(CONF_DIR_PATH, 'config')
CERTIFICATE_PATH = os.path.join(CONF_DIR_PATH, 'certificate.pem')
RSA_PRIVATE_KEY_PATH = os.path.join(CONF_DIR_PATH, 'rsakey.pem')
AUTHORIZED_CERTS_PATH = os.path.join(CONF_DIR_PATH, 'authorized_certs')
DHPARAM_PATH = os.path.join(CONF_DIR_PATH, 'dhparam.pem')
TMP_DIR_PATH = os.path.join(tempfile.gettempdir(), 'an2linux')
if not os.path.exists(CONF_DIR_PATH):
os.makedirs(CONF_DIR_PATH)
if not os.path.exists(TMP_DIR_PATH):
os.makedirs(TMP_DIR_PATH)
if not os.path.isfile(CERTIFICATE_PATH) or not os.path.isfile(RSA_PRIVATE_KEY_PATH):
generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH)
else:
# test if valid private key / certificate
try:
ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2).load_cert_chain(CERTIFICATE_PATH,
RSA_PRIVATE_KEY_PATH)
ssl.PEM_cert_to_DER_cert(open(CERTIFICATE_PATH, 'r').read())
except (ssl.SSLError, ValueError) as e:
print_with_timestamp('Something went wrong trying to load your private key and certificate: {}'.format(e))
print_with_timestamp('Will generate new key overwriting old key and certificate')
generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH)
return CONF_FILE_PATH, CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH, AUTHORIZED_CERTS_PATH, DHPARAM_PATH, TMP_DIR_PATH
def connect_mqtt_client(self):
"""Connect to the AWS IoT endpoint."""
# Create MQTT client
self.mqtt_client = mqtt.Client(self.client_id)
self.mqtt_client.on_log = self._on_mqtt_log
# Authenticate using TLS mutual authentication with a client certificate
cafile = self.get_abs_path(CAFILE)
certificate_cert = self.get_abs_path(CERTIFICATE_CERT)
certificate_key = self.get_abs_path(CERTIFICATE_KEY)
self.mqtt_client.tls_set(cafile, certificate_cert, certificate_key, ssl.CERT_REQUIRED, ssl.PROTOCOL_TLSv1_2)
# 8883 is the default port for MQTT over SSL/TLS
self.mqtt_client.connect(self.endpoint_address, port=8883)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize, block=block, ssl_version=ssl.PROTOCOL_TLSv1_2)
def run(self):
global httpd,home
logger.info('starting server...')
httpd = None
try:
cert = os.path.join(home,'cert.pem')
if ui.https_media_server:
if not os.path.exists(cert):
self.cert_signal.emit(cert)
if not ui.https_media_server:
server_address = (self.ip,self.port)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
self.media_server_start.emit('http')
elif ui.https_media_server and os.path.exists(cert):
server_address = (self.ip,self.port)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,certfile=cert,ssl_version=ssl.PROTOCOL_TLSv1_2)
self.media_server_start.emit('https')
#httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
except OSError as e:
e_str = str(e)
logger.info(e_str)
if 'errno 99' in e_str.lower():
txt = 'Your local IP changed..or port is blocked.\n..Trying to find new IP'
send_notification(txt)
self.ip = get_lan_ip()
txt = 'Your New Address is '+self.ip+':'+str(self.port) + '\n Please restart the player'
send_notification(txt)
change_config_file(self.ip,self.port)
server_address = (self.ip,self.port)
#httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
else:
pass
if httpd:
logger.info('running server...at..'+self.ip+':'+str(self.port))
#httpd.allow_reuse_address = True
httpd.serve_forever()
logger.info('quitting http server')
else:
logger.info('server not started')
def run(self):
global httpd,home
logger.info('starting server...')
httpd = None
try:
cert = os.path.join(home,'cert.pem')
if ui.https_media_server:
if not os.path.exists(cert):
self.cert_signal.emit(cert)
if not ui.https_media_server:
server_address = (self.ip,self.port)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
self.media_server_start.emit('http')
elif ui.https_media_server and os.path.exists(cert):
server_address = (self.ip,self.port)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,certfile=cert,ssl_version=ssl.PROTOCOL_TLSv1_2)
self.media_server_start.emit('https')
#httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
except OSError as e:
e_str = str(e)
logger.info(e_str)
if 'errno 99' in e_str.lower():
txt = 'Your local IP changed..or port is blocked.\n..Trying to find new IP'
send_notification(txt)
self.ip = get_lan_ip()
txt = 'Your New Address is '+self.ip+':'+str(self.port) + '\n Please restart the player'
send_notification(txt)
change_config_file(self.ip,self.port)
server_address = (self.ip,self.port)
#httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
else:
pass
if httpd:
logger.info('running server...at..'+self.ip+':'+str(self.port))
#httpd.allow_reuse_address = True
httpd.serve_forever()
logger.info('quitting http server')
else:
logger.info('server not started')
def register_router(self, router):
super(SecureWebSocket, self).register_router(router)
self.ipv = router.ipv
# PROTOCOL_TLSv1_1 and PROTOCOL_TLSv1_2 are only available if Python is
# linked with OpenSSL 1.0.1 or later.
try:
self.ssl_version = ssl.PROTOCOL_TLSv1_2
except AttributeError:
raise WampyError("Your Python Environment does not support TLS")
self.certificate = router.certificate
def __init__(self, ip_address=None, port=None, ssl_version=ssl.PROTOCOL_TLSv1_2):
self.ssl_version = ssl_version
super(BaseSslTcpFingerprinter, self).__init__(ip_address=ip_address, port=port)
# Static Methods
# Class Methods
# Public Methods