python类Resource()的实例源码

test_server.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        """Create a :class:`CaptchaFetchResource`."""
        secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024)

        # Set up our resources to fake a minimal HTTP(S) server:
        self.pagename = b'fetch'
        self.root = Resource()

        shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas']))

        self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas'])
        self.captchaResource = server.CaptchaFetchResource(
            secretKey=secretKey,
            publicKey=publicKey,
            hmacKey='abcdefghijklmnopqrstuvwxyz012345',
            captchaDir=self.captchaDir,
            useForwardedHeader=True)

        self.root.putChild(self.pagename, self.captchaResource)

        # Set up the basic parts of our faked request:
        self.request = DummyRequest([self.pagename])
test_server.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        """Create a :class:`CaptchaFetchResource`.
        """
        secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024)

        # Set up our resources to fake a minimal HTTP(S) server:
        self.pagename = b'check'
        self.root = Resource()

        shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas']))

        self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas'])
        self.captchaResource = server.CaptchaCheckResource(
            secretKey=secretKey,
            publicKey=publicKey,
            hmacKey='abcdefghijklmnopqrstuvwxyz012345',
            useForwardedHeader=True)

        self.root.putChild(self.pagename, self.captchaResource)

        # Set up the basic parts of our faked request:
        self.request = DummyRequest([self.pagename])
server.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def getClientIP(request, useForwardedHeader=False):
    """Get the client's IP address from the ``'X-Forwarded-For:'``
    header, or from the :api:`request <twisted.web.server.Request>`.

    :type request: :api:`twisted.web.http.Request`
    :param request: A ``Request`` for a :api:`twisted.web.resource.Resource`.
    :param bool useForwardedHeader: If ``True``, attempt to get the client's
        IP address from the ``'X-Forwarded-For:'`` header.
    :rtype: ``None`` or :any:`str`
    :returns: The client's IP address, if it was obtainable.
    """
    ip = None

    if useForwardedHeader:
        header = request.getHeader("X-Forwarded-For")
        if header:
            ip = header.split(",")[-1].strip()
            if not isIPAddress(ip):
                log.msg("Got weird X-Forwarded-For value %r" % header)
                ip = None
    else:
        ip = request.getClientIP()

    return ip
controller.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, m, inputhandlers=None, view=None, controllers=None, templateDirectory = None):
        #self.start = now()
        resource.Resource.__init__(self)
        self.model = m
        # It's the responsibility of the calling code to make sure setView is
        # called on this controller before it's rendered.
        self.view = None
        self.subcontrollers = []
        if self.setupStacks:
            self.setupControllerStack()
        if inputhandlers is None and controllers is None:
            self._inputhandlers = []
        elif inputhandlers:
            print "The inputhandlers arg is deprecated, please use controllers instead"
            self._inputhandlers = inputhandlers
        else:
            self._inputhandlers = controllers
        if templateDirectory is not None:
            self.templateDirectory = templateDirectory
        self._valid = {}
        self._invalid = {}
        self._process = {}
        self._parent = None
script.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ResourceScript(path, registry):
    """
    I am a normal py file which must define a 'resource' global, which should
    be an instance of (a subclass of) web.resource.Resource; it will be
    renderred.
    """
    cs = CacheScanner(path, registry)
    glob = {'__file__': path,
            'resource': noRsrc,
            'registry': registry,
            'cache': cs.cache,
            'recache': cs.recache}
    try:
        execfile(path, glob, glob)
    except AlreadyCached, ac:
        return ac.args[0]
    rsrc = glob['resource']
    if cs.doCache and rsrc is not noRsrc:
        registry.cachePath(path, rsrc)
    return rsrc
test_distrib.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
static.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0):
        """Create a file with the given path.
        """
        resource.Resource.__init__(self)
        filepath.FilePath.__init__(self, path)
        # Remove the dots from the path to split
        self.defaultType = defaultType
        if ignoredExts in (0, 1) or allowExt:
            warnings.warn("ignoredExts should receive a list, not a boolean")
            if ignoredExts or allowExt:
                self.ignoredExts = ['*']
            else:
                self.ignoredExts = []
        else:
            self.ignoredExts = list(ignoredExts)
        self.registry = registry or Registry()
rewrite.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def alias(aliasPath, sourcePath):
    """
    I am not a very good aliaser. But I'm the best I can be. If I'm
    aliasing to a Resource that generates links, and it uses any parts
    of request.prepath to do so, the links will not be relative to the
    aliased path, but rather to the aliased-to path. That I can't
    alias static.File directory listings that nicely. However, I can
    still be useful, as many resources will play nice.
    """
    sourcePath = sourcePath.split('/')
    aliasPath = aliasPath.split('/')
    def rewriter(request):
        if request.postpath[:len(aliasPath)] == aliasPath:
            after = request.postpath[len(aliasPath):]
            request.postpath = sourcePath + after
            request.path = '/'+'/'.join(request.prepath+request.postpath)
    return rewriter
test_node.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-bch 作者: amarian12 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
test_node.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-unitus 作者: amarian12 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
website.py 文件源码 项目:flowder 作者: amir-khakshour 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, app, config):
        resource.Resource.__init__(self)
        self.app = app
        self.IApp = IServiceCollection(app, app)
        self.debug = config.getboolean('debug', False)
        self.nodename = config.get('node_name', socket.gethostname())
        static_serve_path = config.get('static_serve_path', 'files')
        storage_path = config.get('storage_path')

        self.putChild('', Home(self))
        self.putChild('jobs', Jobs(self))
        self.putChild(static_serve_path, File(storage_path))

        services = config.items('services', ())
        for servName, servClsName in services:
            servCls = load_object(servClsName)
            self.putChild(servName, servCls(self))
test_node.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-dgb-sha256 作者: ilsawa 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
server.py 文件源码 项目:wslink 作者: Kitware 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def handle_complex_resource_path(path, root, resource):
    # Handle complex endpoint. Twisted expects byte-type URIs.
    fullpath = path.encode('utf-8').split(b'/')
    parent_path_item_resource = root
    for path_item in fullpath:
        if path_item == fullpath[-1]:
            parent_path_item_resource.putChild(path_item, resource)
        else:
            new_resource = Resource()
            parent_path_item_resource.putChild(path_item, new_resource)
            parent_path_item_resource = new_resource


# =============================================================================
# Start webserver
# =============================================================================
controller.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, m, inputhandlers=None, view=None, controllers=None, templateDirectory = None):
        #self.start = now()
        resource.Resource.__init__(self)
        self.model = m
        # It's the responsibility of the calling code to make sure setView is
        # called on this controller before it's rendered.
        self.view = None
        self.subcontrollers = []
        if self.setupStacks:
            self.setupControllerStack()
        if inputhandlers is None and controllers is None:
            self._inputhandlers = []
        elif inputhandlers:
            print "The inputhandlers arg is deprecated, please use controllers instead"
            self._inputhandlers = inputhandlers
        else:
            self._inputhandlers = controllers
        if templateDirectory is not None:
            self.templateDirectory = templateDirectory
        self._valid = {}
        self._invalid = {}
        self._process = {}
        self._parent = None
script.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ResourceScript(path, registry):
    """
    I am a normal py file which must define a 'resource' global, which should
    be an instance of (a subclass of) web.resource.Resource; it will be
    renderred.
    """
    cs = CacheScanner(path, registry)
    glob = {'__file__': path,
            'resource': noRsrc,
            'registry': registry,
            'cache': cs.cache,
            'recache': cs.recache}
    try:
        execfile(path, glob, glob)
    except AlreadyCached, ac:
        return ac.args[0]
    rsrc = glob['resource']
    if cs.doCache and rsrc is not noRsrc:
        registry.cachePath(path, rsrc)
    return rsrc
test_distrib.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
static.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0):
        """Create a file with the given path.
        """
        resource.Resource.__init__(self)
        filepath.FilePath.__init__(self, path)
        # Remove the dots from the path to split
        self.defaultType = defaultType
        if ignoredExts in (0, 1) or allowExt:
            warnings.warn("ignoredExts should receive a list, not a boolean")
            if ignoredExts or allowExt:
                self.ignoredExts = ['*']
            else:
                self.ignoredExts = []
        else:
            self.ignoredExts = list(ignoredExts)
        self.registry = registry or Registry()
rewrite.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def alias(aliasPath, sourcePath):
    """
    I am not a very good aliaser. But I'm the best I can be. If I'm
    aliasing to a Resource that generates links, and it uses any parts
    of request.prepath to do so, the links will not be relative to the
    aliased path, but rather to the aliased-to path. That I can't
    alias static.File directory listings that nicely. However, I can
    still be useful, as many resources will play nice.
    """
    sourcePath = sourcePath.split('/')
    aliasPath = aliasPath.split('/')
    def rewriter(request):
        if request.postpath[:len(aliasPath)] == aliasPath:
            after = request.postpath[len(aliasPath):]
            request.postpath = sourcePath + after
            request.path = '/'+'/'.join(request.prepath+request.postpath)
    return rewriter
test_node.py 文件源码 项目:p2pool-ltc 作者: ilsawa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-ltc 作者: ilsawa 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
test_node.py 文件源码 项目:p2pool-bsty 作者: amarian12 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-bsty 作者: amarian12 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
test_node.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
deferred_resource.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
vmware_exporter.py 文件源码 项目:vmware_exporter 作者: rverchere 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus')
    parser.add_argument('-c', '--config', dest='config_file',
                        default='config.yml', help="configuration file")
    parser.add_argument('-p', '--port', dest='port', type=int,
                        default=9272, help="HTTP port to expose metrics")

    args = parser.parse_args()

    # Start up the server to expose the metrics.
    root = Resource()
    root.putChild(b'metrics', VMWareMetricsResource(args))

    factory = Site(root)
    print("Starting web server on port {}".format(args.port))
    reactor.listenTCP(args.port, factory)
    reactor.run()
test_measurement.py 文件源码 项目:bwscanner 作者: TheTorProject 项目源码 文件源码 阅读 13 收藏 0 点赞 0 评论 0
def setUp(self):
        yield super(TestBwscan, self).setUp()
        yield setconf_fetch_all_descs(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                size = request.uri.split('/')[-1]
                if 'k' in size:
                    size = int(size[:-1])*(2**10)
                elif 'M' in size:
                    size = int(size[:-1])*(2**20)
                return 'a'*size

        self.port = yield available_tcp_port(reactor)
        self.test_service = yield reactor.listenTCP(
            self.port, Site(DummyResource()))
test_listener.py 文件源码 项目:bwscanner 作者: TheTorProject 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUp(self):
        yield super(TestStreamBandwidthListener, self).setUp()
        self.fetch_size = 8*2**20 # 8MB
        self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                return 'a'*8*2**20

        self.port = yield available_tcp_port(reactor)
        self.site = Site(DummyResource())
        self.test_service = yield reactor.listenTCP(self.port, self.site)

        self.not_enough_measurements = NotEnoughMeasurements(
            "Not enough measurements to calculate STREAM_BW samples.")


问题


面经


文章

微信
公众号

扫码关注公众号