python类reactor()的实例源码

mix.py 文件源码 项目:txmix 作者: applied-mixnetworks 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def message_received(self, unwrapped_message):
        """
        message is of type UnwrappedMessage
        """

        delay = self._sys_rand.randint(0, self.max_delay)
        action = start_action(
            action_type=u"send delayed message",
            delay=delay,
        )
        with action.context():
            d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message)
            DeferredContext(d).addActionFinish()
            self._pending_sends.add(d)

            def _remove(res, d=d):
                self._pending_sends.remove(d)
                return res

            d.addBoth(_remove)
test_cli.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_cannot_listen(self):
        """
        When the program is run with an argument and a listen address specified
        with a port that we can't listen on (e.g. port 1), a CannotListenError
        is expected to be logged and the program should stop.
        """
        temp_dir = self.useFixture(TempDir())
        yield main(reactor, raw_args=[
            temp_dir.path,
            '--listen', ':1',  # A port we can't listen on
        ])

        # Expect a 'certs' directory to be created
        self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True))

        # Expect a default certificate to be created
        self.assertThat(os.path.isfile(temp_dir.join('default.pem')),
                        Equals(True))

        # Expect to be unable to listen
        flush_logged_errors(CannotListenError)
clients.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_events(self, callbacks):
        """
        Attach to Marathon's event stream using Server-Sent Events (SSE).

        :param callbacks:
            A dict mapping event types to functions that handle the event data
        """
        d = self.request('GET', path='/v2/events', unbuffered=True, headers={
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-store'
        })

        def handler(event, data):
            callback = callbacks.get(event)
            # Deserialize JSON if a callback is present
            if callback is not None:
                callback(json.loads(data))

        return d.addCallback(
            sse_content, handler, reactor=self._reactor, **self._sse_kwargs)
multiple_mllp_plugin.py 文件源码 项目:gloss 作者: openhealthcare 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def makeService(self, options):
        """Construct a server using MLLPFactory.

        :rtype: :py:class:`twisted.application.internet.StreamServerEndpointService`
        """
        from twisted.internet import reactor
        from txHL7.mllp import IHL7Receiver, MLLPFactory

        receiver_name = options['receiver']
        receiver_class = reflect.namedClass(receiver_name)
        verifyClass(IHL7Receiver, receiver_class)
        factory = MLLPFactory(receiver_class())
        multi_service = MultiService()

        for port_number in PORTS:
            port = "tcp:interface={0}:port={1}".format(HOST, port_number,)
            endpoint = endpoints.serverFromString(reactor, port)
            server = internet.StreamServerEndpointService(endpoint, factory)
            server.setName(u"mllp-{0}-{1}".format(receiver_name, port_number))
            multi_service.addService(server)
        return multi_service
strports.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def listen(description, factory, default=None):
    """Listen on a port corresponding to a description

    @type description: C{str}
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}
    @type default: C{str} or C{None}
    @rtype: C{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable
    virtual circuit server.

    See the documentation of the C{parse} function for description
    of the semantics of the arguments.
    """
    from twisted.internet import reactor
    name, args, kw = parse(description, factory, default)
    return getattr(reactor, 'listen'+name)(*args, **kw)
mix.py 文件源码 项目:txmix 作者: applied-mixnetworks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def message_received(self, unwrapped_message):
        """
        message is of type UnwrappedMessage
        """

        self._batch.append(unwrapped_message)  # [(destination, sphinx_packet)
        if len(self._batch) >= self.threshold_count:
            delay = self._sys_rand.randint(0, self.max_delay)
            action = start_action(
                action_type=u"send delayed message batch",
                delay=delay,
            )
            with action.context():
                released = self._batch
                self._batch = []
                random.shuffle(released)
                d = deferLater(self.reactor, delay, self.batch_send, released)
                DeferredContext(d).addActionFinish()
                self._pending_batch_sends.add(d)

                def _remove(res, d=d):
                    self._pending_batch_sends.remove(d)
                    return res

                d.addBoth(_remove)
service.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _start_onion_service(self, factory):

        def progress(percent, tag, message):
            bar = int(percent / 10)
            log.debug('[%s%s] %s' % ('#' * bar, '.' * (10 - bar), message))

        def setup_complete(port):
            port = txtorcon.IHiddenService(port)
            self.uri = "http://%s" % (port.getHost().onion_uri)
            log.info('I have set up a hidden service, advertised at: %s'
                     % self.uri)
            log.info('locally listening on %s' % port.local_address.getHost())

        def setup_failed(args):
            log.error('onion service setup FAILED: %r' % args)

        endpoint = endpoints.serverFromString(reactor, 'onion:80')
        txtorcon.IProgressProvider(endpoint).add_progress_listener(progress)
        d = endpoint.listen(factory)
        d.addCallback(setup_complete)
        d.addErrback(setup_failed)
        return d
vnc_proxy_server.py 文件源码 项目:universe 作者: openai 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def connectionMade(self):
        logger.info('[%s] Connection received from VNC client', self.id)
        factory = protocol.ClientFactory()
        factory.protocol = VNCProxyClient
        factory.vnc_server = self
        factory.deferrable = defer.Deferred()
        endpoint = endpoints.clientFromString(reactor, self.factory.vnc_address)

        def _established_callback(client):
            if self._broken:
                client.close()
            self.vnc_client = client
            self.flush()
        def _established_errback(reason):
            logger.error('[VNCProxyServer] Connection succeeded but could not establish session: %s', reason)
            self.close()
        factory.deferrable.addCallbacks(_established_callback, _established_errback)

        def _connect_errback(reason):
            logger.error('[VNCProxyServer] Connection failed: %s', reason)
            self.close()
        endpoint.connect(factory).addErrback(_connect_errback)

        self.send_ProtocolVersion_Handshake()
redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        """setup the site, start listening on port, setup the looping call to
        :py:meth:`~.update_active_node` every ``self.poll_interval`` seconds,
        and start the Twisted reactor"""
        # get the active node before we start anything...
        self.active_node_ip_port = self.get_active_node()
        if self.active_node_ip_port is None:
            logger.critical("ERROR: Could not get active vault node from "
                            "Consul. Exiting.")
            raise SystemExit(3)
        logger.warning("Initial Vault active node: %s",
                       self.active_node_ip_port)
        site = Site(VaultRedirectorSite(self))
        # setup our HTTP(S) listener
        if self.tls_factory is not None:
            self.listentls(site)
        else:
            self.listentcp(site)
        # setup the update_active_node poll every POLL_INTERVAL seconds
        self.add_update_loop()
        logger.warning('Starting Twisted reactor (event loop)')
        self.run_reactor()
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def se_requester(self):
        """
        While the reactor is polling, we can't make any requests. So have the
        reactor itself make the request and store the result.
        """
        logger.debug('requester called; spawning process')
        # since Python is single-threaded and Twisted is just event-based,
        # we can't do a request and run the redirector from the same script.
        # Best choice is to used popen to run an external script to do the
        # redirect.
        url = 'http://127.0.0.1:%d' % self.cls.bind_port
        path = os.path.join(os.path.dirname(__file__), 'requester.py')
        self.poller = subprocess.Popen(
            [sys.executable, path, url, '/bar/baz', '/vault-redirector-health'],
            stdout=subprocess.PIPE,
            universal_newlines=True
        )
        # run a poller loop to check for process stop and get results
        self.poller_check_task = task.LoopingCall(self.check_request)
        self.poller_check_task.clock = self.cls.reactor
        self.poller_check_task.start(0.5)
        logger.debug('poller_check_task started')
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_request(self):
        """
        check if the self.poller process has finished; if so, handle results
        and stop the poller_check_task. If update_active has also already been
        called, stop the reactor.
        """
        logger.debug('check_request called')
        if self.poller.poll() is None:
            logger.debug('poller process still running')
            return
        # stop the looping task
        self.poller_check_task.stop()
        assert self.poller.returncode == 0
        out, err = self.poller.communicate()
        self.response = out.strip()
        logger.debug('check_request done; response: %s', self.response)
        # on python3, this will be binary
        if not isinstance(self.response, str):
            self.response = self.response.decode('utf-8')
        if self.update_active_called:
            self.stop_reactor()
ldapproxy_plugin.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def makeService(self, options):
        """
        Called by Twisted after having parsed the command-line options.
        :param options: ``usage.Options`` instance
        :return: the server instance
        """
        # Configuration is mandatory
        if options['config'] is None:
            print 'You need to specify a configuration file via `twistd ldap-proxy -c config.ini`.'
            sys.exit(1)

        config = load_config(options['config'])
        factory = ProxyServerFactory(config)

        endpoint_string = serverFromString(reactor, config['ldap-proxy']['endpoint'])
        return internet.StreamServerEndpointService(endpoint_string, factory)
proxy.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect_service_account(self):
        """
        Make a new connection to the LDAP backend server using the credentials of the service account
        :return: A Deferred that fires a `LDAPClient` instance
        """
        client = yield connectToLDAPEndpoint(reactor, self.proxied_endpoint_string, LDAPClient)
        if self.use_tls:
            client = yield client.startTLS()
        try:
            yield client.bind(self.service_account_dn, self.service_account_password)
        except ldaperrors.LDAPException, e:
            # Call unbind() here if an exception occurs: Otherwise, Twisted will keep the file open
            # and slowly run out of open files.
            yield client.unbind()
            raise e
        defer.returnValue(client)
test_libraries.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_twisted(pyi_builder):
    pyi_builder.test_source(
        """
        # Twisted is an event-driven networking engine.
        #
        # The 'reactor' is object that starts the eventloop.
        # There are different types of platform specific reactors.
        # Platform specific reactor is wrapped into twisted.internet.reactor module.
        from twisted.internet import reactor
        # Applications importing module twisted.internet.reactor might fail
        # with error like:
        #
        #     AttributeError: 'module' object has no attribute 'listenTCP'
        #
        # Ensure default reactor was loaded - it has method 'listenTCP' to start server.
        if not hasattr(reactor, 'listenTCP'):
            raise SystemExit('Twisted reactor not properly initialized.')
        """)
activesync.py 文件源码 项目:peas 作者: mwrlabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False):
        self.use_ssl = use_ssl
        self.domain = domain
        self.username = username
        self.password = pw
        self.server = server
        self.device_id = device_id
        if not self.device_id:
            self.device_id = str(uuid.uuid4()).replace("-","")[:32]
        self.server_version = server_version
        self.device_type = device_type
        self.policy_key = policy_key
        self.folder_data = {}
        self.verbose = verbose
        self.collection_data = {}
        clientContext = WebClientContextFactory()
        self.agent = Agent(reactor, clientContext)
        self.operation_queue = defer.DeferredQueue()
        self.queue_deferred = self.operation_queue.get()
        self.queue_deferred.addCallback(self.queue_full)

    # Response processing
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_hpe_create_volume_invalid_provisioning_option(self):
        name = 'test-create-volume-fake'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name,
                u"Opts": {u"provisioning": u"fake"}}

        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, json.dumps({
            u"Err": "Invalid input received: Must specify a valid " +
            "provisioning type ['thin', 'full', " +
            "'dedup'], value 'fake' is invalid."}))
        d.addCallback(self._remove_volume_callback, name)
        d.addErrback(self.cbFailed)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_hpe_create_volume_invalid_option(self):
        name = 'test-create-volume-fake'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name,
                u"Opts": {u"fake": u"fake"}}

        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, json.dumps({
            u"Err": "create volume failed, error is: fake is not a valid "
            "option. Valid options are: ['size', 'provisioning', "
            "'flash-cache']"}))
        d.addCallback(self._remove_volume_callback, name)
        d.addErrback(self.cbFailed)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_volume_mount_path(self, body, name):
        # NOTE: body arg is the result from last deferred call.
        # Python complains about parameter mis-match if you don't include it
        # In this test, we need it to compare expected results with Path
        # request

        # Compare path returned by mount (body) with Get Path request
        path = b"/VolumeDriver.Path"
        newbody = {u"Name": name}
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, body)
        d.addErrback(self.cbFailed)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _mount_the_volume(self, body, name):
        # NOTE: body arg is the result from last deferred call.
        # Python complains about parameter mis-match if you don't include it

        # Mount the previously created volume
        path = b"/VolumeDriver.Mount"
        newbody = {u"Name": name}
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)

        d.addCallback(self.getResponse)

        # If we get a valid response from Path request then we assume
        # the mount passed.
        # TODO: Add additonal logic to verify the mountpath
        d.addCallback(self._get_volume_mount_path, name)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def broken_test_hpe_mount_umount_volume(self):
        name = 'test-mount-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
        d.addErrback(self.cbFailed)

        # Mount the previously created volume
        d.addCallback(self._mount_the_volume, name)

        # UMount the previously created volume
        d.addCallback(self._unmount_the_volume, name)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_hpe_get_volume(self):
        name = 'test-get-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
        d.addErrback(self.cbFailed)

        # Get the previously created volume
        expected = {u"Volume": {u"Status": {},
                                u"Mountpoint": '',
                                u"Name": name},
                    u"Err": ''}
        d.addCallback(self._get_volume, name, expected)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)
        return d
test_hpe_plugin.py 文件源码 项目:python-hpedockerplugin 作者: hpe-storage 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def broken_test_hpe_list_volume(self):
        name = 'test-list-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
                          body_producer)
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))
        d.addErrback(self.cbFailed)

        # List volumes
        expected = {u"Err": '',
                    u"Volumes": [{u"Mountpoint": '',
                                  u"Name": name}]}
        d.addCallback(self._list_volumes, name, expected)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)

        return d
strports.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def listen(description, factory, default=None):
    """Listen on a port corresponding to a description

    @type description: C{str}
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}
    @type default: C{str} or C{None}
    @rtype: C{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable
    virtual circuit server.

    See the documentation of the C{parse} function for description
    of the semantics of the arguments.
    """
    from twisted.internet import reactor
    name, args, kw = parse(description, factory, default)
    return getattr(reactor, 'listen'+name)(*args, **kw)
test_options.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testValidOptionsRequest(self):
        """
        Makes sure that a "regular" OPTIONS request doesn't include the CORS
        specific headers in the response.
        """
        agent = Agent(reactor)
        headers = Headers({'origin': ['http://localhost']})
        response = yield agent.request('OPTIONS', self.uri, headers)

        # Check we get the correct status.
        self.assertEqual(http.OK, response.code)

        # Check we get the correct length
        self.assertEqual(0, response.length)

        # Check we get the right headers back
        self.assertTrue(response.headers.hasHeader('Allow'))
        self.assertFalse(
            response.headers.hasHeader('Access-Control-Allow-Origin'))
        self.assertFalse(response.headers.hasHeader('Access-Control-Max-Age'))
        self.assertFalse(
            response.headers.hasHeader('Access-Control-Allow-Credentials'))
        self.assertFalse(
            response.headers.hasHeader('Access-Control-Allow-Methods'))
test_objects.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testViaAgent(self):
        """
        This is a manual check of a POST to /objects which uses
        L{twisted.web.client.Agent} to make the request. We do not use
        txFluidDB because we need to check that a Location header is
        received and that we receive both a 'URI' and an 'id' in the JSON
        response payload.
        """
        URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName
        basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret'))
        headers = Headers({'accept': ['application/json'],
                           'authorization': [basicAuth]})
        agent = Agent(reactor)
        response = yield agent.request('POST', URI, headers)
        self.assertEqual(http.CREATED, response.code)
        self.assertTrue(response.headers.hasHeader('location'))
        d = defer.Deferred()
        bodyGetter = ResponseGetter(d)
        response.deliverBody(bodyGetter)
        body = yield d
        responseDict = json.loads(body)
        self.assertIn('URI', responseDict)
        self.assertIn('id', responseDict)
test_objects.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testQueryUnicodePath(self):
        """A query on a non-existent Unicode tag, should 404. Part of the
        point here is to make sure that no other error occurs due to
        passing in a Unicode tag path.
        """
        path = u'çóñ/???'
        query = '%s = "hi"' % path
        URI = '%s/%s?query=%s' % (
            self.endpoint,
            defaults.httpObjectCategoryName,
            urllib.quote(query.encode('utf-8')))

        headers = Headers({'accept': ['application/json']})

        agent = Agent(reactor)
        response = yield agent.request('GET', URI, headers)

        self.assertEqual(http.NOT_FOUND, response.code)
test_headers.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testValidCORSRequest(self):
        """
        Sanity check to make sure we get the valid headers back for a CORS
        based request.
        """
        agent = Agent(reactor)
        headers = Headers()
        # The origin to use in the tests
        dummy_origin = 'http://foo.com'
        headers.addRawHeader('Origin', dummy_origin)
        response = yield agent.request('GET', self.uri, headers)

        # Check we get the correct status.
        self.assertEqual(http.OK, response.code)
        # Check we get the right headers back
        self.assertTrue(
            response.headers.hasHeader('Access-Control-Allow-Origin'))
        self.assertTrue(
            response.headers.hasHeader('Access-Control-Allow-Credentials'))
        self.assertTrue(
            dummy_origin in
            response.headers.getRawHeaders('Access-Control-Allow-Origin'))
test_namespaces.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testVersionGets404(self):
        """
        Version numbers used to be able to be given in API calls, but are
        no longer supported.
        """
        version = 20100808

        URI = '%s/%d/%s/%s' % (
            self.endpoint,
            version,
            defaults.httpNamespaceCategoryName,
            defaults.adminUsername)

        headers = Headers({'accept': ['application/json']})

        agent = Agent(reactor)
        response = yield agent.request('GET', URI, headers)
        self.assertEqual(http.NOT_FOUND, response.code)


    # TODO: Add a test for a namespace that we don't have LIST perm on.
    # although that might be done in permissions.py when that finally gets
    # added.
tx.py 文件源码 项目:deb-python-txaio 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def with_config(loop=None):
    global config
    if loop is not None:
        if config.loop is not None and config.loop is not loop:
            raise RuntimeError(
                "Twisted has only a single, global reactor. You passed in "
                "a reactor different from the one already configured "
                "in txaio.config.loop"
            )
    return _TxApi(config)


# NOTE: beware that twisted.logger._logger.Logger copies itself via an
# overriden __get__ method when used as recommended as a class
# descriptor.  So, we override __get__ to just return ``self`` which
# means ``log_source`` will be wrong, but we don't document that as a
# key that you can depend on anyway :/
ProxyRequest.py 文件源码 项目:mgr.p2p.proxy 作者: tomusdrw 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def requestWebObject(self):
        parsed = urlparse.urlparse(self.uri)
        protocol = parsed[0]
        host, port = self.extractHostAndPort(parsed, protocol)
        rest = self.extractQuery(parsed)

        class_ = self.protocols[protocol]

        headers = self.getAllHeaders().copy()

        if 'host' not in headers:
            headers['host'] = host

        log.info('Performing {} request for {}'.format(self.method, self.uri))
        self.content.seek(0, 0)
        s = self.content.read()
        clientFactory = class_(self.method, rest, self.clientproto, headers,
                               s, self)
        self.reactor.connectTCP(host, port, clientFactory)


问题


面经


文章

微信
公众号

扫码关注公众号