python类Site()的实例源码

test_web.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def setUp(self):
        self.resrc = SimpleResource()
        self.resrc.putChild('', self.resrc)
        self.site = server.Site(self.resrc)
        self.site = server.Site(self.resrc)
        self.site.logFile = log.logfile

        # HELLLLLLLLLLP!  This harness is Very Ugly.
        self.channel = self.site.buildProtocol(None)
        self.transport = http.StringTransport()
        self.transport.close = lambda *a, **kw: None
        self.transport.disconnecting = lambda *a, **kw: 0
        self.transport.getPeer = lambda *a, **kw: "peer"
        self.transport.getHost = lambda *a, **kw: "host"
        self.channel.makeConnection(self.transport)
        for l in ["GET / HTTP/1.1",
                  "Accept: text/html"]:
            self.channel.lineReceived(l)
test_distrib.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
test_webclient.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        name = str(id(self)) + "_webclient"
        if not os.path.exists(name):
            os.mkdir(name)
            f = open(os.path.join(name, "file"), "wb")
            f.write("0123456789")
            f.close()
        r = static.File(name)
        r.putChild("redirect", util.Redirect("/file"))
        r.putChild("wait", LongTimeTakingResource())
        r.putChild("error", ErrorResource())
        r.putChild("nolength", NoLengthResource())
        r.putChild("host", HostHeaderResource())
        r.putChild("payload", PayloadResource())
        r.putChild("broken", BrokenDownloadResource())
        site = server.Site(r, timeout=None)
        self.port = self._listen(site)
        self.portno = self.port.getHost().port
test_webclient.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        plainRoot = static.Data('not me', 'text/plain')
        tlsRoot = static.Data('me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        from twisted import test
        self.tlsPort = reactor.listenSSL(0, tlsSite,
                                         contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                         interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
        tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
        plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
        tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
test_node.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
test_node.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        """setup the site, start listening on port, setup the looping call to
        :py:meth:`~.update_active_node` every ``self.poll_interval`` seconds,
        and start the Twisted reactor"""
        # get the active node before we start anything...
        self.active_node_ip_port = self.get_active_node()
        if self.active_node_ip_port is None:
            logger.critical("ERROR: Could not get active vault node from "
                            "Consul. Exiting.")
            raise SystemExit(3)
        logger.warning("Initial Vault active node: %s",
                       self.active_node_ip_port)
        site = Site(VaultRedirectorSite(self))
        # setup our HTTP(S) listener
        if self.tls_factory is not None:
            self.listentls(site)
        else:
            self.listentcp(site)
        # setup the update_active_node poll every POLL_INTERVAL seconds
        self.add_update_loop()
        logger.warning('Starting Twisted reactor (event loop)')
        self.run_reactor()
smart-contract-rest-api.py 文件源码 项目:neo-python 作者: CityOfZion 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    # Setup the blockchain
    blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
    Blockchain.RegisterBlockchain(blockchain)
    dbloop = task.LoopingCall(Blockchain.Default().PersistBlocks)
    dbloop.start(.1)
    NodeLeader.Instance().Start()

    # Disable smart contract events for external smart contracts
    settings.set_log_smart_contract_events(False)

    # Start a thread with custom code
    d = threading.Thread(target=custom_background_code)
    d.setDaemon(True)  # daemonizing the thread will kill it when the main thread is quit
    d.start()

    # Hook up Klein API to Twisted reactor
    endpoint_description = "tcp:port=%s:interface=localhost" % API_PORT
    endpoint = endpoints.serverFromString(reactor, endpoint_description)
    endpoint.listen(Site(app.resource()))

    # Run all the things (blocking call)
    logger.info("Everything setup and running. Waiting for events...")
    reactor.run()
    logger.info("Shutting down.")
test_node.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
test_web.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self.resrc = SimpleResource()
        self.resrc.putChild('', self.resrc)
        self.site = server.Site(self.resrc)
        self.site = server.Site(self.resrc)
        self.site.logFile = log.logfile

        # HELLLLLLLLLLP!  This harness is Very Ugly.
        self.channel = self.site.buildProtocol(None)
        self.transport = http.StringTransport()
        self.transport.close = lambda *a, **kw: None
        self.transport.disconnecting = lambda *a, **kw: 0
        self.transport.getPeer = lambda *a, **kw: "peer"
        self.transport.getHost = lambda *a, **kw: "host"
        self.channel.makeConnection(self.transport)
        for l in ["GET / HTTP/1.1",
                  "Accept: text/html"]:
            self.channel.lineReceived(l)
test_distrib.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
test_webclient.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        name = str(id(self)) + "_webclient"
        if not os.path.exists(name):
            os.mkdir(name)
            f = open(os.path.join(name, "file"), "wb")
            f.write("0123456789")
            f.close()
        r = static.File(name)
        r.putChild("redirect", util.Redirect("/file"))
        r.putChild("wait", LongTimeTakingResource())
        r.putChild("error", ErrorResource())
        r.putChild("nolength", NoLengthResource())
        r.putChild("host", HostHeaderResource())
        r.putChild("payload", PayloadResource())
        r.putChild("broken", BrokenDownloadResource())
        site = server.Site(r, timeout=None)
        self.port = self._listen(site)
        self.portno = self.port.getHost().port
test_webclient.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        plainRoot = static.Data('not me', 'text/plain')
        tlsRoot = static.Data('me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        from twisted import test
        self.tlsPort = reactor.listenSSL(0, tlsSite,
                                         contextFactory=ssl.DefaultOpenSSLContextFactory(
            sibpath(test.__file__, 'server.pem'),
            sibpath(test.__file__, 'server.pem'),
            ),
                                         interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
        tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
        plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
        tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
mockserver.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('resource')
    args = parser.parse_args()
    module_name, name = args.resource.rsplit('.', 1)
    sys.path.append('.')
    resource = getattr(import_module(module_name), name)()
    http_port = reactor.listenTCP(PORT, Site(resource))

    def print_listening():
        host = http_port.getHost()
        print('Mock server {} running at http://{}:{}'.format(
            resource, host.host, host.port))

    reactor.callWhenRunning(print_listening)
    reactor.run()
test_node.py 文件源码 项目:p2pool-ltc 作者: ilsawa 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
test_node.py 文件源码 项目:p2pool-bsty 作者: amarian12 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
test_node.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
vmware_exporter.py 文件源码 项目:vmware_exporter 作者: rverchere 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus')
    parser.add_argument('-c', '--config', dest='config_file',
                        default='config.yml', help="configuration file")
    parser.add_argument('-p', '--port', dest='port', type=int,
                        default=9272, help="HTTP port to expose metrics")

    args = parser.parse_args()

    # Start up the server to expose the metrics.
    root = Resource()
    root.putChild(b'metrics', VMWareMetricsResource(args))

    factory = Site(root)
    print("Starting web server on port {}".format(args.port))
    reactor.listenTCP(args.port, factory)
    reactor.run()
test_measurement.py 文件源码 项目:bwscanner 作者: TheTorProject 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setUp(self):
        yield super(TestBwscan, self).setUp()
        yield setconf_fetch_all_descs(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                size = request.uri.split('/')[-1]
                if 'k' in size:
                    size = int(size[:-1])*(2**10)
                elif 'M' in size:
                    size = int(size[:-1])*(2**20)
                return 'a'*size

        self.port = yield available_tcp_port(reactor)
        self.test_service = yield reactor.listenTCP(
            self.port, Site(DummyResource()))
test_listener.py 文件源码 项目:bwscanner 作者: TheTorProject 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        yield super(TestStreamBandwidthListener, self).setUp()
        self.fetch_size = 8*2**20 # 8MB
        self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                return 'a'*8*2**20

        self.port = yield available_tcp_port(reactor)
        self.site = Site(DummyResource())
        self.test_service = yield reactor.listenTCP(self.port, self.site)

        self.not_enough_measurements = NotEnoughMeasurements(
            "Not enough measurements to calculate STREAM_BW samples.")
http.py 文件源码 项目:kotori 作者: daq-tools 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def startService(self):
        """
        Start TCP listener on designated HTTP port,
        serving ``HttpChannelContainer`` as root resource.
        """

        # Don't start service twice
        if self.running == 1:
            return

        self.running = 1

        # Prepare startup
        http_listen = self.settings.kotori.http_listen
        http_port   = int(self.settings.kotori.http_port)
        log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port)

        # Configure root Site object and start listening to requests.
        # This must take place only once - can't bind to the same port multiple times!
        factory = LocalSite(self.root)
        reactor.listenTCP(http_port, factory, interface=http_listen)
server.py 文件源码 项目:kotori 作者: daq-tools 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def boot_frontend(config, debug=False):
    """
    Boot a Pyramid WSGI application as Twisted component
    """

    http_port = int(config.get('config-web', 'http_port'))
    websocket_uri = unicode(config.get('wamp', 'listen'))

    # https://stackoverflow.com/questions/13122519/serving-pyramid-application-using-twistd/13138610#13138610
    config = resource_filename('kotori.frontend', 'development.ini')
    application = get_app(config, 'main')

    # https://twistedmatrix.com/documents/13.1.0/web/howto/web-in-60/wsgi.html
    resource = WSGIResource(reactor, reactor.getThreadPool(), application)

    reactor.listenTCP(http_port, Site(resource))
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_sessionUIDGeneration(self):
        """
        L{site.getSession} generates L{Session} objects with distinct UIDs from
        a secure source of entropy.
        """
        site = server.Site(resource.Resource())
        # Ensure that we _would_ use the unpredictable random source if the
        # test didn't stub it.
        self.assertIdentical(site._entropy, os.urandom)

        def predictableEntropy(n):
            predictableEntropy.x += 1
            return (unichr(predictableEntropy.x) * n).encode("charmap")
        predictableEntropy.x = 0
        self.patch(site, "_entropy", predictableEntropy)
        a = self.getAutoExpiringSession(site)
        b = self.getAutoExpiringSession(site)
        self.assertEqual(a.uid, b"01" * 0x20)
        self.assertEqual(b.uid, b"02" * 0x20)
        # This functionality is silly (the value is no longer used in session
        # generation), but 'counter' was a public attribute since time
        # immemorial so we should make sure if anyone was using it to get site
        # metrics or something it keeps working.
        self.assertEqual(site.counter, 2)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.resrc = SimpleResource()
        self.resrc.putChild(b'', self.resrc)
        self.resrc.putChild(b'with-content-type', SimpleResource(b'image/jpeg'))
        self.site = server.Site(self.resrc)
        self.site.startFactory()
        self.addCleanup(self.site.stopFactory)

        # HELLLLLLLLLLP!  This harness is Very Ugly.
        self.channel = self.site.buildProtocol(None)
        self.transport = http.StringTransport()
        self.transport.close = lambda *a, **kw: None
        self.transport.disconnecting = lambda *a, **kw: 0
        self.transport.getPeer = lambda *a, **kw: "peer"
        self.transport.getHost = lambda *a, **kw: "host"
        self.channel.makeConnection(self.transport)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_processingFailedNoTraceback(self):
        """
        L{Request.processingFailed} when the site has C{displayTracebacks} set
        to C{False} does not write out the failure, but give a generic error
        message.
        """
        d = DummyChannel()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.site.displayTracebacks = False
        fail = failure.Failure(Exception("Oh no!"))
        request.processingFailed(fail)

        self.assertNotIn(b"Oh no!", request.transport.written.getvalue())
        self.assertIn(
            b"Processing Failed", request.transport.written.getvalue()
        )

        # Since we didn't "handle" the exception, flush it to prevent a test
        # failure
        self.assertEqual(1, len(self.flushLoggedErrors()))
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def test_processingFailedDisplayTracebackHandlesUnicode(self):
        """
        L{Request.processingFailed} when the site has C{displayTracebacks} set
        to C{True} writes out the failure, making UTF-8 items into HTML
        entities.
        """
        d = DummyChannel()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.site.displayTracebacks = True
        fail = failure.Failure(Exception(u"\u2603"))
        request.processingFailed(fail)

        self.assertIn(b"☃", request.transport.written.getvalue())

        # On some platforms, we get a UnicodeError when trying to
        # display the Failure with twisted.python.log because
        # the default encoding cannot display u"\u2603".  Windows for example
        # uses a default encodig of cp437 which does not support u"\u2603".
        self.flushLoggedErrors(UnicodeError)

        # Since we didn't "handle" the exception, flush it to prevent a test
        # failure
        self.assertEqual(1, len(self.flushLoggedErrors()))
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_sessionDifferentFromSecureSession(self):
        """
        L{Request.session} and L{Request.secure_session} should be two separate
        sessions with unique ids and different cookies.
        """
        d = DummyChannel()
        d.transport = DummyChannel.SSL()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.sitepath = []
        secureSession = request.getSession()
        self.assertIsNotNone(secureSession)
        self.addCleanup(secureSession.expire)
        self.assertEqual(request.cookies[0].split(b"=")[0],
                         b"TWISTED_SECURE_SESSION")
        session = request.getSession(forceNotSecure=True)
        self.assertIsNotNone(session)
        self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION")
        self.addCleanup(session.expire)
        self.assertNotEqual(session.uid, secureSession.uid)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_sessionAttribute(self):
        """
        On a L{Request}, the C{session} attribute retrieves the associated
        L{Session} only if it has been initialized.  If the request is secure,
        it retrieves the secure session.
        """
        site = server.Site(resource.Resource())
        d = DummyChannel()
        d.transport = DummyChannel.SSL()
        request = server.Request(d, 1)
        request.site = site
        request.sitepath = []
        self.assertIs(request.session, None)
        insecureSession = request.getSession(forceNotSecure=True)
        self.addCleanup(insecureSession.expire)
        self.assertIs(request.session, None)
        secureSession = request.getSession()
        self.addCleanup(secureSession.expire)
        self.assertIsNot(secureSession, None)
        self.assertIsNot(secureSession, insecureSession)
        self.assertIs(request.session, secureSession)
test_distrib.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        agent = client.Agent(reactor)
        d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
                          (self.port2.getHost().port,))
        d.addCallback(client.readBody)
        d.addCallback(self.assertEqual, 'root')
        return d
test_webclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        plainRoot = Data(b'not me', 'text/plain')
        tlsRoot = Data(b'me neither', 'text/plain')

        plainSite = server.Site(plainRoot, timeout=None)
        tlsSite = server.Site(tlsRoot, timeout=None)

        self.tlsPort = reactor.listenSSL(
            0, tlsSite,
            contextFactory=ssl.DefaultOpenSSLContextFactory(
                serverPEMPath, serverPEMPath),
            interface="127.0.0.1")
        self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")

        self.plainPortno = self.plainPort.getHost().port
        self.tlsPortno = self.tlsPort.getHost().port

        plainRoot.putChild(b'one', Redirect(self.getHTTPS('two')))
        tlsRoot.putChild(b'two', Redirect(self.getHTTP('three')))
        plainRoot.putChild(b'three', Redirect(self.getHTTPS('four')))
        tlsRoot.putChild(b'four', Data(b'FOUND IT!', 'text/plain'))


问题


面经


文章

微信
公众号

扫码关注公众号