python类OrderedDict()的实例源码

utils.py 文件源码 项目:IgDiscover 作者: NBISweden 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def iterative_consensus(sequences, program='muscle-medium', threshold=0.6,
        subsample_size=200, maximum_subsample_size=1600):
    """
    Compute a consensus sequence of the given sequences, but do not use all
    sequences if there are many: First, try to compute the consensus from a
    small subsample. If there are 'N' bases, increase the subsample size and
    repeat until either there are no more 'N' bases, all available sequences
    have been used or maximum_subsample_size is reached.
    """
    while True:
        sample = downsampled(sequences, subsample_size)
        aligned = multialign(OrderedDict(enumerate(sample)), program=program)
        cons = consensus(aligned, threshold=threshold).strip('N')
        if 'N' not in cons:
            # This consensus is good enough
            break
        if len(sequences) <= subsample_size:
            # We have already used all the sequences that are available
            break
        subsample_size *= 2
        if subsample_size > maximum_subsample_size:
            break
    return cons
pydotenv.py 文件源码 项目:libbuild 作者: appscode 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
    """
    Adds or Updates a key/value to the given .env

    If the .env path given doesn't exist, fails instead of risking creating
    an orphan .env somewhere in the filesystem
    """
    key_to_set = str(key_to_set)
    value_to_set = str(value_to_set).strip("'").strip('"')
    if not os.path.exists(dotenv_path):
        warnings.warn("can't write to %s - it doesn't exist." % dotenv_path)
        return None, key_to_set, value_to_set
    dotenv_as_dict = OrderedDict(parse_dotenv(dotenv_path))
    dotenv_as_dict[key_to_set] = value_to_set
    success = flatten_and_write(dotenv_path, dotenv_as_dict, quote_mode)
    return success, key_to_set, value_to_set
period.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rollover(self):
        self.starting_value = self.ending_value
        self.starting_exposure = self.ending_exposure
        self.starting_cash = self.ending_cash
        self.period_cash_flow = 0.0
        self.pnl = 0.0
        self.processed_transactions = {}
        self.orders_by_modified = {}
        self.orders_by_id = OrderedDict()

        payout_assets = self._payout_last_sale_prices.keys()

        for asset in payout_assets:
            if asset in self._payout_last_sale_prices:
                self._payout_last_sale_prices[asset] = \
                    self.position_tracker.positions[asset].last_sale_price
            else:
                del self._payout_last_sale_prices[asset]
period.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __setstate__(self, state):

        OLDEST_SUPPORTED_STATE = 3
        version = state.pop(VERSION_LABEL)

        if version < OLDEST_SUPPORTED_STATE:
            raise BaseException("PerformancePeriod saved state is too old.")

        processed_transactions = {}
        processed_transactions.update(state.pop('processed_transactions'))

        orders_by_id = OrderedDict()
        orders_by_id.update(state.pop('orders_by_id'))

        orders_by_modified = {}
        orders_by_modified.update(state.pop('orders_by_modified'))
        self.processed_transactions = processed_transactions
        self.orders_by_id = orders_by_id
        self.orders_by_modified = orders_by_modified

        self._execution_cash_flow_multipliers = {}

        self.__dict__.update(state)
position_tracker.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __setstate__(self, state):
        OLDEST_SUPPORTED_STATE = 3
        version = state.pop(VERSION_LABEL)

        if version < OLDEST_SUPPORTED_STATE:
            raise BaseException("PositionTracker saved state is too old.")

        self.asset_finder = state['asset_finder']
        self.positions = positiondict()
        # note that positions_store is temporary and gets regened from
        # .positions
        self._positions_store = zp.Positions()

        self._unpaid_dividends = state['unpaid_dividends']

        # Arrays for quick calculations of positions value
        self._position_value_multipliers = OrderedDict()
        self._position_exposure_multipliers = OrderedDict()

        # Update positions is called without a finder
        self.update_positions(state['positions'])
xlsx_usage.py 文件源码 项目:table-compositor 作者: InvestmentSystems 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
html_usage.py 文件源码 项目:table-compositor 作者: InvestmentSystems 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
common.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def build_metrics_summary_csv(filename, sample_properties, sample_data, pipeline):
    metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline)

    tables, _ = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes)
    if not tables:
        sys.stderr.write("No metrics tables were generated, skipping CSV generation.\n")
        return

    csv_metrics = collections.OrderedDict()
    for table in tables:
    if not table:
        continue
        for metric, _, value in table['rows']:
            if type(metric) == dict:
                metric = metric['v']
            if type(value) == dict:
                value = value['v']
            if metric not in csv_metrics:
                csv_metrics[metric] = value

    with open(filename, 'wb') as f:
        writer = csv.writer(f)
        writer.writerow(csv_metrics.keys())
        writer.writerow(csv_metrics.values())
data_fetcher.py 文件源码 项目:scikit-dataaccess 作者: MITHaystack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def getAntennaLogs():
        '''
        Retrieve information about antenna changes

        @return dictionary of antenna changes
        '''
        store_location = data_util.getDataLocation('ngl_gps')
        store = pd.HDFStore(store_location, 'r')
        logs_df = store['ngl_steps']
        store.close()

        metadata = DataFetcher.getStationMetadata()

        logs_dict = OrderedDict()

        for station in metadata.index:
            offset_dates = logs_df[logs_df['Station']==station].index.unique()
            offset_dates = pd.Series(offset_dates)
            logs_dict[station] = offset_dates

        return logs_dict
modis_util.py 文件源码 项目:scikit-dataaccess 作者: MITHaystack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getModisData(dataset, variable_name):
    '''
    Loads modis data

    @param dataset: netCDF4 dataset
    @param variable_name: Name of variable to extract from dataset

    @return (modis_data, metadata)
    '''

    variable = dataset[variable_name]
    variable.set_auto_maskandscale(False)
    data = variable[:,:]
    metadata = OrderedDict()
    for attribute in variable.ncattrs():
        metadata[attribute] = variable.getncattr(attribute)

    return data,metadata
argparse.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self,
                 option_strings,
                 prog,
                 parser_class,
                 dest=SUPPRESS,
                 help=None,
                 metavar=None):

        self._prog_prefix = prog
        self._parser_class = parser_class
        self._name_parser_map = _collections.OrderedDict()
        self._choices_actions = []

        super(_SubParsersAction, self).__init__(
            option_strings=option_strings,
            dest=dest,
            nargs=PARSER,
            choices=self._name_parser_map,
            help=help,
            metavar=metavar)
belt.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, servo_group, event, belt_speed, frequency,
                 mqtt_client, master_shadow, args=(), kwargs={}):
        super(BeltControlThread, self).__init__(
            name="belt_control_thread", args=args, kwargs=kwargs
        )
        self.sg = servo_group
        self.rolling = False
        self.cmd_event = event
        self.belt_speed = belt_speed
        self.frequency = frequency
        self.reversed = False
        self.active_state = 'initialized'
        self.last_state = 'initialized'
        self.control_stages = collections.OrderedDict()
        self.control_stages['roll'] = self.roll
        self.mqttc = mqtt_client
        self.master_shadow = master_shadow

        self.master_shadow.shadowRegisterDeltaCallback(self.shadow_mgr)
        log.debug("[bct.__init__] shadowRegisterDeltaCallback()")
arm.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, servo_group, event, stage_topic, mqtt_client,
                 master_shadow, args=(), kwargs={}):
        super(ArmControlThread, self).__init__(
            name="arm_control_thread", args=args, kwargs=kwargs
        )
        self.sg = servo_group
        log.debug("[act.__init__] servo_group:{0}".format(self.sg))
        self.cmd_event = event
        self.active_state = 'initialized'
        self.last_state = 'initialized'
        self.control_stages = collections.OrderedDict()
        self.control_stages['home'] = self.home
        self.control_stages['find'] = self.find
        self.control_stages['pick'] = self.pick
        self.control_stages['sort'] = self.sort
        self.stage_topic = stage_topic
        self.mqtt_client = mqtt_client
        self.master_shadow = master_shadow
        self.found_box = None

        self.master_shadow.shadowRegisterDeltaCallback(self.shadow_mgr)
        log.debug("[arm.__init__] shadowRegisterDeltaCallback()")
energy_exp.py 文件源码 项目:bnn-analysis 作者: myshkov 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    experiment = EnergyExp()

    queue = OrderedDict()
    queue['HMC'] = experiment.run_baseline_hmc
    queue['SGLD'] = experiment.run_sgld
    queue['SGHMC'] = experiment.run_sghmc
    queue['pSGLD'] = experiment.run_psgld
    queue['BBB'] = experiment.run_bbb
    # queue["PBP"] = experiment.run_pbp
    queue['Dropout'] = experiment.run_dropout

    experiment.run_queue(queue, cpu=True)
    experiment.report_metrics_table(queue)

    del queue['HMC']

    max_time = 15
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['KS'], max_time=max_time, title_name='KS distance')
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['Precision'], max_time=max_time, title_name='Precision')
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['Recall'], max_time=max_time, title_name='Recall')
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["KL"])
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["F1"], max_time=max_time, title_name="F1 score")
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["IoU"], max_time=max_time)
bh_exp.py 文件源码 项目:bnn-analysis 作者: myshkov 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
    experiment = BostonHousingExp()

    queue = OrderedDict()
    # queue['HMC'] = experiment.run_baseline_hmc
    # queue['SGLD'] = experiment.run_sgld
    # queue['SGHMC'] = experiment.run_sghmc
    # queue['pSGLD'] = experiment.run_psgld
    queue["BBB"] = experiment.run_bbb
    # queue["PBP"] = experiment.run_pbp
    queue['Dropout'] = experiment.run_dropout

    experiment.run_queue(queue, cpu=True)
    experiment.report_metrics_table(queue)

    del queue['HMC']

    max_time = 15
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['KS'], max_time=max_time, title_name='KS distance')
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['Precision'], max_time=max_time, title_name='Precision')
    experiment.plot_multiple_metrics('HMC', queue.keys(), ['Recall'], max_time=max_time, title_name='Recall')
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["KL"])
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["F1"], max_time=max_time, title_name="F1 score")
    # experiment.plot_multiple_metrics("HMC", queue.keys(), ["IoU"], max_time=max_time)
compiler.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):
        self.executable_path = None

        self.definitions = {}
        self.flags = {}
        self.include_dirs = DirectorySet()
        self.linker_flags = []
        self.library_dirs = DirectorySet()
        self.libraries = collections.OrderedDict()

        self.unique_flags = {}

        self.recompile = recompile.PreprocessHash(self)
router.py 文件源码 项目:django-openapi-gen 作者: Ecognize 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_root_apiview(self):
        handlers = sorted(self.handlers.items(), key = lambda x : x[1]['display'])

        def list_handlers(self, request, *args, **kwargs):
            resp = OrderedDict()

            # get all names
            for regex, data in handlers:
                name = data['name']
                alias = data['display']

                if alias != APIROOT_NAME:
                    try:
                        resp[alias] = reverse(name, args = args, kwargs = kwargs, request = request, format = kwargs.get('format', None))
                    except NoReverseMatch:
                        # here we've got a path with defined params which are not specified in request
                        continue

            return Response(resp, status = status.HTTP_200_OK)

        # get available info from schema
        info = self.schema.get('info', None)
        name = info.get('title', APIROOT_NAME).strip(' ').replace(' ', '_')
        vers = info.get('version', 'unknown')
        desc = info.get('description', 'Enumerates all available endpoints for current schema')

        # construct class
        apiroot = LazyClass(name, SwaggerViewClass)

        apiroot.set_attr('get', list_handlers)
        apiroot.set_attr('__doc__', 'v.' + vers + '\n\n' + desc)

        return apiroot().as_view()

    #: main schema processing function
test_pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_restart_map(self):
        _restart_map = nutils.restart_map()
        expect = OrderedDict([
            (nutils.PG_CONF, ['plumgrid']),
            (nutils.PG_HN_CONF, ['plumgrid']),
            (nutils.PG_HS_CONF, ['plumgrid']),
            (nutils.OPS_CONF, ['plumgrid']),
            (nutils.PG_IFCS_CONF, []),
        ])
        self.assertEqual(expect, _restart_map)
        for item in _restart_map:
            self.assertTrue(item in _restart_map)
            self.assertTrue(expect[item] == _restart_map[item])
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _extract_services_list_helper(services):
    """Extract a OrderedDict of {service: [ports]} of the supplied services
    for use by the other functions.

    The services object can either be:
      - None : no services were passed (an empty dict is returned)
      - a list of strings
      - A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
      - An array of [{'service': service_name, ...}, ...]

    @param services: see above
    @returns OrderedDict(service: [ports], ...)
    """
    if services is None:
        return {}
    if isinstance(services, dict):
        services = services.values()
    # either extract the list of services from the dictionary, or if
    # it is a simple string, use that. i.e. works with mixed lists.
    _s = OrderedDict()
    for s in services:
        if isinstance(s, dict) and 'service' in s:
            _s[s['service']] = s.get('ports', [])
        if isinstance(s, str):
            _s[s] = []
    return _s
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _check_running_services(services):
    """Check that the services dict provided is actually running and provide
    a list of (service, boolean) tuples for each service.

    Returns both a zipped list of (service, boolean) and a list of booleans
    in the same order as the services.

    @param services: OrderedDict of strings: [ports], one for each service to
                     check.
    @returns [(service, boolean), ...], : results for checks
             [boolean]                  : just the result of the service checks
    """
    services_running = [service_running(s) for s in services]
    return list(zip(services, services_running)), services_running


问题


面经


文章

微信
公众号

扫码关注公众号