python类instance()的实例源码

recipe-576958.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_method_dispatcher():
    """
    This function can be used to test that the MethodDispatcher is working
    properly. It is called automatically when this script is executed directly.
    """
    import logging
    from tornado.ioloop import IOLoop
    from tornado.httpserver import HTTPServer
    from tornado.options import define, options, parse_command_line
    define("port", default=8888, help="Run on the given port", type=int)
    parse_command_line()
    logging.info(
        "Test Server Listening on http://0.0.0.0:%s/" % options.port
    )
    http_server = HTTPServer(TestApplication())
    http_server.listen(options.port)
    IOLoop.instance().start()
cli.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def web_server(wiki, port, debug=False):

    def kill_handler(signal_number, stack_frame):
        logger.info('\nStopping wiki')
        sys.exit(1)

    signal.signal(signal.SIGINT, kill_handler)

    logger.info('Starting wiki on port {}. Ctrl+C will kill it.'.format(port))
    HTTPServer(WSGIContainer(wiki)).listen(port)
    ioloop = IOLoop.instance()

    if debug:
        autoreload.start(ioloop)

    ioloop.start()
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
test_rolld.py 文件源码 项目:rolld 作者: Hipo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def exit_test():
    global periodic_checker
    if periodic_checker:
        periodic_checker.stop()
    os.kill(rolld_proc.pid, signal.SIGTERM)
    os.kill(nginx_proc.pid, signal.SIGTERM)
    # IOLoop.instance().add_timeout(time.time() + 5, partial(sys.exit, 0))
    # check if we have zombies left
    try:
        lines = subprocess.check_output('ps auxw | grep python | grep app.py | grep -v grep', shell=True)
        print lines
        assert len(lines) == 0
    except subprocess.CalledProcessError as grepexc:
        # grep shouldnt find anything so exit code should be 1
        if grepexc.returncode == 1:
            pass
        else:
            raise
    # if everything is fine, just stop our ioloop now.
    IOLoop.current().stop()
irc.py 文件源码 项目:labots 作者: SilverRainZ 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, host, port, nick,
            relaybots = [],
            charset = 'utf-8',
            ioloop = False):
        logger.info('Connecting to %s:%s', host, port)

        self.host = host
        self.port = port
        self.nick = nick
        self.relaybots = relaybots

        self._charset = charset
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._ioloop = ioloop or IOLoop.instance()
        self._stream = IOStream(sock, io_loop = self._ioloop)
        self._stream.connect((host, port), self._login)

        self._last_pong = time.time()
        self._timer = PeriodicCallback(self._keep_alive,
                60 * 1000, io_loop=self._ioloop)
        self._timer.start()

        self._send_timer = PeriodicCallback(self._sock_send,
                600, io_loop=self._ioloop)
        self._send_timer.start()
mc_web.py 文件源码 项目:mediachain-indexer 作者: mediachain 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def web(port = 23456,
        via_cli = False,
        ):
    """
    Bind Tornado server to specified port.    
    """

    print ('BINDING',port)

    try:
        tornado.options.parse_command_line()
        http_server = HTTPServer(Application(),
                                 xheaders=True,
                                 )
        http_server.bind(port)
        http_server.start(16) # Forks multiple sub-processes
        tornado.ioloop.IOLoop.instance().set_blocking_log_threshold(0.5)
        IOLoop.instance().start()

    except KeyboardInterrupt:
        print 'Exit'

    print ('WEB_STARTED')
testing.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
registry.py 文件源码 项目:nakadi-end2end 作者: zalando-nakadi 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _register_invocation(self, connector):
        use_sync_calculator = connector.interval >= 2.

        def _invoke():
            if not connector.active:
                return
            self.value += 1

            data = DataToSend(self.value, connector)

            connector.send_and_receive(data, use_sync_calculator)
            IOLoop.instance().call_later(connector.interval, _invoke)
            IOLoop.instance().call_later(connector.max_wait,
                                         functools.partial(data.on_timeout_passed, use_sync_calculator))
            self.rps.on_call()

        IOLoop.instance().add_callback(_invoke)
nakadi.py 文件源码 项目:nakadi-end2end 作者: zalando-nakadi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def value_callback(self, value):
        async_max_callback = None
        async_callback = None
        with self.guard:
            if value in self.async_callbacks:
                count, async_callback, async_max_callback = self.async_callbacks.pop(value)
                if count != self.receivers:
                    async_callback = None
                count -= 1
                if count > 0:
                    self.async_callbacks[value] = count, None, async_max_callback
                    async_max_callback = None
            else:
                logging.error('Callback for instance {} and value {} is not found'.format(self.instance_id, value))
        if async_callback:
            IOLoop.instance().add_callback(async_callback)
        if async_max_callback:
            IOLoop.instance().add_callback(async_max_callback)
main.py 文件源码 项目:nakadi-end2end 作者: zalando-nakadi 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start(config, port, token):
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s\t%(levelname)s\t%(message)s')
    logging.getLogger('tornado.curl_httpclient').setLevel(logging.WARN)
    logging.info('Reading configuration from {}'.format(config))
    with open(config, 'r') as f:
        items = yaml.load(f)
        connectors = load_connectors(items.get('connectors', {}))
    if not connectors:
        raise Exception('No connectors information found in {}'.format(config))
    if token:
        security.use_static_token(token)
    else:
        security.use_berry_token('end2end_nakadi')
    start_http_server(port)
    registry.instance().set_items(connectors)
    IOLoop.instance().start()
httpclient.py 文件源码 项目:time2go 作者: twitchyliquid64 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __new__(cls, io_loop=None, max_clients=10, force_instance=False, 
                **kwargs):
        io_loop = io_loop or IOLoop.instance()
        if cls is AsyncHTTPClient:
            if cls._impl_class is None:
                from tornado.simple_httpclient import SimpleAsyncHTTPClient
                AsyncHTTPClient._impl_class = SimpleAsyncHTTPClient
            impl = AsyncHTTPClient._impl_class
        else:
            impl = cls
        if io_loop in impl._async_clients() and not force_instance:
            return impl._async_clients()[io_loop]
        else:
            instance = super(AsyncHTTPClient, cls).__new__(impl)
            args = {}
            if cls._impl_kwargs:
                args.update(cls._impl_kwargs)
            args.update(kwargs)
            instance.initialize(io_loop, max_clients, **args)
            if not force_instance:
                impl._async_clients()[io_loop] = instance
            return instance
httpclient.py 文件源码 项目:time2go 作者: twitchyliquid64 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def configure(impl, **kwargs):
        """Configures the AsyncHTTPClient subclass to use.

        AsyncHTTPClient() actually creates an instance of a subclass.
        This method may be called with either a class object or the
        fully-qualified name of such a class (or None to use the default,
        SimpleAsyncHTTPClient)

        If additional keyword arguments are given, they will be passed
        to the constructor of each subclass instance created.  The
        keyword argument max_clients determines the maximum number of
        simultaneous fetch() operations that can execute in parallel
        on each IOLoop.  Additional arguments may be supported depending
        on the implementation class in use.

        Example::

           AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
        """
        if isinstance(impl, (unicode, bytes_type)):
            impl = import_object(impl)
        if impl is not None and not issubclass(impl, AsyncHTTPClient):
            raise ValueError("Invalid AsyncHTTPClient implementation")
        AsyncHTTPClient._impl_class = impl
        AsyncHTTPClient._impl_kwargs = kwargs
netutil.py 文件源码 项目:time2go 作者: twitchyliquid64 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_sockets(self, sockets):
        """Makes this server start accepting connections on the given sockets.

        The ``sockets`` parameter is a list of socket objects such as
        those returned by `bind_sockets`.
        `add_sockets` is typically used in combination with that
        method and `tornado.process.fork_processes` to provide greater
        control over the initialization of a multi-process server.
        """
        if self.io_loop is None:
            self.io_loop = IOLoop.instance()

        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection,
                               io_loop=self.io_loop)
netutil.py 文件源码 项目:time2go 作者: twitchyliquid64 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_accept_handler(sock, callback, io_loop=None):
    """Adds an ``IOLoop`` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    ``IOLoop`` handlers.
    """
    if io_loop is None:
        io_loop = IOLoop.instance()
    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
twitterdemo.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    parse_command_line(final=False)
    parse_config_file(options.config_file)

    app = Application(
        [
            ('/', MainHandler),
            ('/login', LoginHandler),
            ('/logout', LogoutHandler),
        ],
        login_url='/login',
        **options.group_dict('application'))
    app.listen(options.port)

    logging.info('Listening on http://localhost:%d' % options.port)
    IOLoop.instance().start()
benchmark.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run():
    app = Application([("/", RootHandler)])
    port = random.randrange(options.min_port, options.max_port)
    app.listen(port, address='127.0.0.1')
    signal.signal(signal.SIGCHLD, handle_sigchld)
    args = ["ab"]
    args.extend(["-n", str(options.n)])
    args.extend(["-c", str(options.c)])
    if options.keepalive:
        args.append("-k")
    if options.quiet:
        # just stops the progress messages printed to stderr
        args.append("-q")
    args.append("http://127.0.0.1:%d/" % port)
    subprocess.Popen(args)
    IOLoop.instance().start()
    IOLoop.instance().close()
    del IOLoop._instance
    assert not IOLoop.initialized()
chunk_benchmark.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    parse_command_line()
    app = Application([('/', ChunkHandler)])
    app.listen(options.port, address='127.0.0.1')
    def callback(response):
        response.rethrow()
        assert len(response.body) == (options.num_chunks * options.chunk_size)
        logging.warning("fetch completed in %s seconds", response.request_time)
        IOLoop.instance().stop()

    logging.warning("Starting fetch with curl client")
    curl_client = CurlAsyncHTTPClient()
    curl_client.fetch('http://localhost:%d/' % options.port,
                      callback=callback)
    IOLoop.instance().start()

    logging.warning("Starting fetch with simple client")
    simple_client = SimpleAsyncHTTPClient()
    simple_client.fetch('http://localhost:%d/' % options.port,
                        callback=callback)
    IOLoop.instance().start()
web.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def listen(self, port, address="", **kwargs):
        """Starts an HTTP server for this application on the given port.

        This is a convenience alias for creating an `.HTTPServer`
        object and calling its listen method.  Keyword arguments not
        supported by `HTTPServer.listen <.TCPServer.listen>` are passed to the
        `.HTTPServer` constructor.  For advanced uses
        (e.g. multi-process mode), do not use this method; create an
        `.HTTPServer` and call its
        `.TCPServer.bind`/`.TCPServer.start` methods directly.

        Note that after calling this method you still need to call
        ``IOLoop.instance().start()`` to start the server.
        """
        # import is here rather than top level because HTTPServer
        # is not importable on appengine
        from tornado.httpserver import HTTPServer
        server = HTTPServer(self, **kwargs)
        server.listen(port, address)
testing.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Clean up Subprocess, so it can be used again with a new ioloop.
        Subprocess.uninitialize()
        self.io_loop.clear_current()
        if (not IOLoop.initialized() or
                self.io_loop is not IOLoop.instance()):
            # Try to clean up any file descriptors left open in the ioloop.
            # This avoids leaks, especially when tests are run repeatedly
            # in the same process with autoreload (because curl does not
            # set FD_CLOEXEC on its file descriptors)
            self.io_loop.close(all_fds=True)
        super(AsyncTestCase, self).tearDown()
        # In case an exception escaped or the StackContext caught an exception
        # when there wasn't a wait() to re-raise it, do so here.
        # This is our last chance to raise an exception in a way that the
        # unittest machinery understands.
        self.__rethrow()
client.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_tests():
    url = options.url + '/getCaseCount'
    control_ws = yield websocket_connect(url, None)
    num_tests = int((yield control_ws.read_message()))
    logging.info('running %d cases', num_tests)
    msg = yield control_ws.read_message()
    assert msg is None

    for i in range(1, num_tests + 1):
        logging.info('running test case %d', i)
        url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
        test_ws = yield websocket_connect(url, None, compression_options={})
        while True:
            message = yield test_ws.read_message()
            if message is None:
                break
            test_ws.write_message(message, binary=isinstance(message, bytes))

    url = options.url + '/updateReports?agent=%s' % options.name
    update_ws = yield websocket_connect(url, None)
    msg = yield update_ws.read_message()
    assert msg is None
    IOLoop.instance().stop()
benchmark.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run():
    app = Application([("/", RootHandler)])
    port = random.randrange(options.min_port, options.max_port)
    app.listen(port, address='127.0.0.1')
    signal.signal(signal.SIGCHLD, handle_sigchld)
    args = ["ab"]
    args.extend(["-n", str(options.n)])
    args.extend(["-c", str(options.c)])
    if options.keepalive:
        args.append("-k")
    if options.quiet:
        # just stops the progress messages printed to stderr
        args.append("-q")
    args.append("http://127.0.0.1:%d/" % port)
    subprocess.Popen(args)
    IOLoop.instance().start()
    IOLoop.instance().close()
    del IOLoop._instance
    assert not IOLoop.initialized()
httpclient.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __new__(cls, io_loop=None, max_clients=10, force_instance=False, 
                **kwargs):
        io_loop = io_loop or IOLoop.instance()
        if cls is AsyncHTTPClient:
            if cls._impl_class is None:
                from tornado.simple_httpclient import SimpleAsyncHTTPClient
                AsyncHTTPClient._impl_class = SimpleAsyncHTTPClient
            impl = AsyncHTTPClient._impl_class
        else:
            impl = cls
        if io_loop in impl._async_clients() and not force_instance:
            return impl._async_clients()[io_loop]
        else:
            instance = super(AsyncHTTPClient, cls).__new__(impl)
            args = {}
            if cls._impl_kwargs:
                args.update(cls._impl_kwargs)
            args.update(kwargs)
            instance.initialize(io_loop, max_clients, **args)
            if not force_instance:
                impl._async_clients()[io_loop] = instance
            return instance
httpclient.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def configure(impl, **kwargs):
        """Configures the AsyncHTTPClient subclass to use.

        AsyncHTTPClient() actually creates an instance of a subclass.
        This method may be called with either a class object or the
        fully-qualified name of such a class (or None to use the default,
        SimpleAsyncHTTPClient)

        If additional keyword arguments are given, they will be passed
        to the constructor of each subclass instance created.  The
        keyword argument max_clients determines the maximum number of
        simultaneous fetch() operations that can execute in parallel
        on each IOLoop.  Additional arguments may be supported depending
        on the implementation class in use.

        Example::

           AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
        """
        if isinstance(impl, (unicode, bytes_type)):
            impl = import_object(impl)
        if impl is not None and not issubclass(impl, AsyncHTTPClient):
            raise ValueError("Invalid AsyncHTTPClient implementation")
        AsyncHTTPClient._impl_class = impl
        AsyncHTTPClient._impl_kwargs = kwargs
netutil.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def add_sockets(self, sockets):
        """Makes this server start accepting connections on the given sockets.

        The ``sockets`` parameter is a list of socket objects such as
        those returned by `bind_sockets`.
        `add_sockets` is typically used in combination with that
        method and `tornado.process.fork_processes` to provide greater
        control over the initialization of a multi-process server.
        """
        if self.io_loop is None:
            self.io_loop = IOLoop.instance()

        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection,
                               io_loop=self.io_loop)
netutil.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_accept_handler(sock, callback, io_loop=None):
    """Adds an ``IOLoop`` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    ``IOLoop`` handlers.
    """
    if io_loop is None:
        io_loop = IOLoop.instance()
    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
run_api.py 文件源码 项目:biomatch_prototype_api 作者: CBIIT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def variant_identifier():
    variants_and_count = {}
    variants_list = list(PatientAccessor().get_variant_identifiers())
    unified_gene_fusions_list = list(PatientAccessor().get_unified_gene_fusion_identifiers())
    negative_strand_list = StrandProcessor().get_negative_strand_list('negative_strand_list')
    for instance in variants_list:
        if instance['_id']['gene'] in negative_strand_list:
            AltAndRefSwapperController.alt_and_ref_swapper(instance['_id'])
        AltIdentifierController.create_alternative_identifier(instance, [])
        variants_and_count[instance['_id']['identifier']] = instance['count'] + variants_and_count.get(instance['_id']['identifier'], 0)

    for instance in unified_gene_fusions_list:
        variants_and_count[instance['_id']] = instance['count'] + variants_and_count.get(instance['_id'], 0)

    return jsonify({'variants': variants_and_count})


# Question Number 3 ---------------------------------------------------
run_api.py 文件源码 项目:biomatch_prototype_api 作者: CBIIT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def identifier_to_disease():
    all_links = PatientProcessor().build_list_of_aggregate_information(0)
    variant_to_disease = {}
    for instance in all_links:
        if not AltIdentifierController.needs_an_alternative_identifier(instance['_id']["identifier"]):
            if instance['_id']['identifier'] not in variant_to_disease:
                variant_to_disease[instance['_id']['identifier']] = {}
            variant_to_disease[instance['_id']['identifier']][instance['_id']['disease']] = instance['count'] + variant_to_disease[instance['_id']['identifier']].get(instance['_id']['disease'], 0)
        else:
            alternate_identifier = AltIdentifierController.create_new_identifier(instance)
            if alternate_identifier not in variant_to_disease:
                variant_to_disease[alternate_identifier] = {}
            variant_to_disease[alternate_identifier][instance['_id']['disease']] = instance['count'] + variant_to_disease[alternate_identifier].get(instance['_id']['disease'], 0)
    return jsonify({'variants:disease pairs': variant_to_disease})


# Question Number 7 ---------------------------------------------------
run_api.py 文件源码 项目:biomatch_prototype_api 作者: CBIIT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def hotspots_not_in_patients():
    file = "/match_hotspots_v4.1_blist.bed"
    BED_data = BEDController().get_hotspots_from_bed(file)
    variant_types = ['indels', 'singleNucleotideVariants', 'copyNumberVariants', 'unifiedGeneFusions']
    positive_negative = ['ne', 'eq']
    all_links = []
    for variant_type in variant_types:    #only COSMs in bed file, so don't need to worry about reversing ref/alt
        for strand_direction in positive_negative:
            all_links += PatientAccessor().get_variant_for_gene(variant_type, '', '', strand_direction)
    patient_COSMS = set()
    for instance in all_links:
        patient_COSMS.add(instance['_id']['identifier'])
    difference_of_sets = BED_data.difference(patient_COSMS)
    return jsonify({'hotspots_in_bed_but_not_patients': list(difference_of_sets)})


# -----------------------MISC-------------------------------------------------------------------------------------------


问题


面经


文章

微信
公众号

扫码关注公众号