python类instance()的实例源码

run_api.py 文件源码 项目:biomatch_prototype_api 作者: CBIIT 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def gene_to_variants():
    variant_gene_dict = {}
    for instance in list(PatientProcessor().build_list_of_aggregate_information(1)) + list(PatientAccessor().get_variant_for_gene_gene_fusions('', 'driverGene')) + list(PatientAccessor().get_variant_for_gene_gene_fusions('', 'partnerGene')):
        if not AltIdentifierController.needs_an_alternative_identifier(instance['_id']["identifier"]):
            if instance['_id']['identifier'] not in variant_gene_dict:
                variant_gene_dict[instance['_id']['identifier']] = {}
                if 'gene' in (instance['_id']).keys():
                    variant_gene_dict[instance['_id']['identifier']][instance['_id']['gene']] = instance['count'] + variant_gene_dict[instance['_id']['identifier']].get(str([instance['_id']['gene']]), 0)
        else:
            alternate_identifier = AltIdentifierController.create_new_identifier(instance)
            if alternate_identifier not in variant_gene_dict:
                variant_gene_dict[alternate_identifier] = {}
                if 'gene' in (instance['_id']).keys():
                    variant_gene_dict[alternate_identifier][instance['_id']['gene']] = instance['count'] + variant_gene_dict[alternate_identifier].get(str([instance['_id']['gene']]), 0)

    return jsonify({'gene for each variant': variant_gene_dict})
test_base.py 文件源码 项目:koi 作者: openpermissions 项目源码 文件源码 阅读 227 收藏 0 点赞 0 评论 0
def test_prepare_invalid_access(verify_token, options):
    handler = mock_handler(base.AuthHandler)
    handler.METHOD_ACCESS = {
        "HEAD": "r",
        "OPTIONS": "r",
        "POST": "w",
        "PATCH": "w",
        "PUT": "w",
        "DELETE": "w"
    }
    handler.request.method = 'GET'

    with pytest.raises(base.HTTPError) as exc:
        IOLoop.instance().run_sync(handler.prepare)

    assert exc.value.status_code == 500
    assert not verify_token.called
web.py 文件源码 项目:get_started_with_respeaker 作者: respeaker 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def listen(self, port, address="", **kwargs):
        """Starts an HTTP server for this application on the given port.

        This is a convenience alias for creating an `.HTTPServer`
        object and calling its listen method.  Keyword arguments not
        supported by `HTTPServer.listen <.TCPServer.listen>` are passed to the
        `.HTTPServer` constructor.  For advanced uses
        (e.g. multi-process mode), do not use this method; create an
        `.HTTPServer` and call its
        `.TCPServer.bind`/`.TCPServer.start` methods directly.

        Note that after calling this method you still need to call
        ``IOLoop.instance().start()`` to start the server.
        """
        # import is here rather than top level because HTTPServer
        # is not importable on appengine
        from tornado.httpserver import HTTPServer
        server = HTTPServer(self, **kwargs)
        server.listen(port, address)
testing.py 文件源码 项目:teleport 作者: eomsoft 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
run.py 文件源码 项目:quupod 作者: alvinwan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main(app: Flask, tornado: bool=False) -> None:
    """Run the Flask application."""
    with app.app_context():
        db.create_all()
        print('[OK] Database creation complete.')

    if tornado:
        from tornado.wsgi import WSGIContainer
        from tornado.httpserver import HTTPServer
        from tornado.ioloop import IOLoop

        http_server = HTTPServer(WSGIContainer(app))
        http_server.listen(int(app.config['INIT_PORT']))
        IOLoop.instance().start()
    else:
        socketio.run(app, **app.config['INIT'])
testing.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
client.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run_tests():
    url = options.url + '/getCaseCount'
    control_ws = yield websocket_connect(url, None)
    num_tests = int((yield control_ws.read_message()))
    logging.info('running %d cases', num_tests)
    msg = yield control_ws.read_message()
    assert msg is None

    for i in range(1, num_tests + 1):
        logging.info('running test case %d', i)
        url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
        test_ws = yield websocket_connect(url, None, compression_options={})
        while True:
            message = yield test_ws.read_message()
            if message is None:
                break
            test_ws.write_message(message, binary=isinstance(message, bytes))

    url = options.url + '/updateReports?agent=%s' % options.name
    update_ws = yield websocket_connect(url, None)
    msg = yield update_ws.read_message()
    assert msg is None
    IOLoop.instance().stop()
testing.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testserver.py 文件源码 项目:deb-python-httpretty 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start(self):
        def go(app, port, data={}):
            from httpretty import HTTPretty
            HTTPretty.disable()

            http = HTTPServer(app)
            HTTPretty.disable()

            http.listen(int(port))
            IOLoop.instance().start()

        app = self.get_handlers()

        data = {}
        args = (app, self.port, data)
        HTTPretty.disable()
        self.process = Process(target=go, args=args)
        self.process.start()
        time.sleep(1)
client.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_tests():
    url = options.url + '/getCaseCount'
    control_ws = yield websocket_connect(url, None)
    num_tests = int((yield control_ws.read_message()))
    logging.info('running %d cases', num_tests)
    msg = yield control_ws.read_message()
    assert msg is None

    for i in range(1, num_tests + 1):
        logging.info('running test case %d', i)
        url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
        test_ws = yield websocket_connect(url, None, compression_options={})
        while True:
            message = yield test_ws.read_message()
            if message is None:
                break
            test_ws.write_message(message, binary=isinstance(message, bytes))

    url = options.url + '/updateReports?agent=%s' % options.name
    update_ws = yield websocket_connect(url, None)
    msg = yield update_ws.read_message()
    assert msg is None
    IOLoop.instance().stop()
testing.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:TornadoWeb 作者: VxCoder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
client.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run_tests():
    url = options.url + '/getCaseCount'
    control_ws = yield websocket_connect(url, None)
    num_tests = int((yield control_ws.read_message()))
    logging.info('running %d cases', num_tests)
    msg = yield control_ws.read_message()
    assert msg is None

    for i in range(1, num_tests + 1):
        logging.info('running test case %d', i)
        url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
        test_ws = yield websocket_connect(url, None, compression_options={})
        while True:
            message = yield test_ws.read_message()
            if message is None:
                break
            test_ws.write_message(message, binary=isinstance(message, bytes))

    url = options.url + '/updateReports?agent=%s' % options.name
    update_ws = yield websocket_connect(url, None)
    msg = yield update_ws.read_message()
    assert msg is None
    IOLoop.instance().stop()


问题


面经


文章

微信
公众号

扫码关注公众号