python类listenSSL()的实例源码

coinswap_run.py 文件源码 项目:CoinSwapCS 作者: AdamISZ 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main_server(options, wallet, test_data=None):
    """The use_ssl option is only for tests, and flags that case.
    """
    if test_data and not test_data['use_ssl']:
        cs_single().config.set("SERVER", "use_ssl", "false")
    cs_single().bc_interface.start_unspent_monitoring(wallet)
    #to allow testing of confirm/unconfirm callback for multiple txs
    if isinstance(cs_single().bc_interface, RegtestBitcoinCoreInterface):
        cs_single().bc_interface.tick_forward_chain_interval = 2
        cs_single().bc_interface.simulating = True
        cs_single().config.set("BLOCKCHAIN", "notify_port", "62652")
        cs_single().config.set("BLOCKCHAIN", "rpc_host", "127.0.0.2")
    #if restart option selected, read state and backout
    #(TODO is to attempt restarting normally before backing out)
    if options.recover:
        session_id = options.recover
        carol = CoinSwapCarol(wallet, 'carolstate')
        carol.bbmb = wallet.get_balance_by_mixdepth(verbose=False)
        carol.load(sessionid=session_id)
        carol.backout("Recovering from shutdown")
        reactor.run()
        return
    #TODO currently ignores server setting here and uses localhost
    port = cs_single().config.getint("SERVER", "port")
    testing_mode = True if test_data else False
    carol_class = test_data['alt_c_class'] if test_data and \
        test_data['alt_c_class'] else CoinSwapCarol
    fcs = test_data["fail_carol_state"] if test_data else None
    #Hidden service has first priority
    if cs_single().config.get("SERVER", "use_onion") != "false":
        s = server.Site(CoinSwapCarolJSONServer(wallet,
                                                    testing_mode=testing_mode,
                                                    carol_class=carol_class,
                                                    fail_carol_state=fcs))
        hiddenservice_dir = os.path.join(cs_single().homedir, "hiddenservice")
        if not os.path.exists(hiddenservice_dir):
            os.makedirs(hiddenservice_dir)
        if 'hs_dir' in cs_single().config.options('SERVER'):
            hiddenservice_dir = cs_single().config.get("SERVER", "hs_dir")
        d = start_tor(s, cs_single().config.getint("SERVER", "onion_port"),
                      hiddenservice_dir)
        #Any callbacks after Tor is inited can be added here with d.addCallback
    elif cs_single().config.get("SERVER", "use_ssl") != "false":
        reactor.listenSSL(int(port), server.Site(CoinSwapCarolJSONServer(wallet,
                testing_mode=testing_mode, carol_class=carol_class,
                fail_carol_state=fcs)), contextFactory = get_ssl_context())
    else:
        cslog.info("WARNING! Serving over HTTP, no TLS used!")
        reactor.listenTCP(int(port), server.Site(CoinSwapCarolJSONServer(wallet,
                                                    testing_mode=testing_mode,
                                                    carol_class=carol_class,
                                                    fail_carol_state=fcs)))
    if not test_data:
        reactor.run()
test_ssl.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bothSidesLoseConnection(self):
        """
        Both sides of SSL connection close connection; the connections should
        close cleanly, and only after the underlying TCP connection has
        disconnected.
        """
        @implementer(interfaces.IHandshakeListener)
        class CloseAfterHandshake(protocol.Protocol):
            gotData = False

            def __init__(self):
                self.done = defer.Deferred()

            def handshakeCompleted(self):
                self.transport.loseConnection()

            def connectionLost(self, reason):
                self.done.errback(reason)
                del self.done

        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        serverProtocol = CloseAfterHandshake()
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = lambda: serverProtocol
        serverPort = reactor.listenSSL(0,
            serverProtocolFactory, self.serverCtxFactory)
        self.addCleanup(serverPort.stopListening)

        clientProtocol = CloseAfterHandshake()
        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = lambda: clientProtocol
        reactor.connectSSL('127.0.0.1',
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        def checkResult(failure):
            failure.trap(ConnectionDone)
        return defer.gatherResults(
            [clientProtocol.done.addErrback(checkResult),
             serverProtocol.done.addErrback(checkResult)])
twistedserver.py 文件源码 项目:reflectrpc 作者: aheck 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        """
        Start the server and listen on host:port
        """
        f = None
        unix_prefix = 'unix://'

        if self.http_enabled:
            rpc = JsonRpcHttpResource()
            rpc.rpcprocessor = self.rpcprocessor
            rpc.tls_client_auth_enabled = self.tls_client_auth_enabled

            if self.http_basic_auth_enabled:
                checker = PasswordChecker(self.passwdCheckFunction)
                realm = HttpPasswordRealm(rpc)
                p = portal.Portal(realm, [checker])

                realm_name = 'Reflect RPC'

                if sys.version_info.major == 2:
                    realm_name = realm_name.encode('utf-8')

                credentialFactory = BasicCredentialFactory(realm_name)
                rpc = HTTPAuthSessionWrapper(p, [credentialFactory])

            root = RootResource(rpc)

            f = server.Site(root)
        else:
            f = JsonRpcProtocolFactory(self.rpcprocessor,
                    self.tls_client_auth_enabled)

        if self.tls_enabled:
            if not self.tls_client_auth_enabled:
                reactor.listenSSL(self.port, f, self.cert.options(),
                        interface=self.host)
            else:
                reactor.listenSSL(self.port, f,
                        self.cert.options(self.client_auth_ca),
                        interface=self.host)
        else:
            if self.host.startswith(unix_prefix):
                path = self.host[len(unix_prefix):]
                reactor.listenUNIX(path, f, backlog=self.unix_socket_backlog,
                        mode=self.unix_socket_mode, wantPID=self.unix_socket_want_pid)
            else:
                reactor.listenTCP(self.port, f, interface=self.host)

        if self.host.startswith(unix_prefix):
            print("Listening on %s" % (self.host))
        else:
            print("Listening on %s:%d" % (self.host, self.port))

        reactor.run()


问题


面经


文章

微信
公众号

扫码关注公众号