python类iteritems()的实例源码

_brave.py 文件源码 项目:brave 作者: chorusai 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def __parse_relations(self):
        relation_args = {}
        for rel in self.doc_data['relations']:
            key, name, role_ents = rel
            for role, ent_key in role_ents:
                curr_roles = relation_args.get(name, {})
                curr_types = curr_roles.get(role, set())
                curr_types.add(self.ent_dict[ent_key])
                curr_roles[role] = curr_types
                relation_args[name] = curr_roles
        range_ = range(0, len(relations_palette), (len(relations_palette) // len(relation_args.keys())))
        colors = [relations_palette[i] for i in range_]
        rel_colors = dict(zip(relation_args.keys(), colors))
        relation_types = []
        for name, args in iteritems(relation_args):
            rel_dict = {
                'args': [{'role': role, 'targets': list(targets)} for role, targets in iteritems(args)],
                'color': rel_colors[name],
                'dashArray': '3,3',
                'labels': [name, name[0:3]],
                'type': name

            }
            relation_types.append(rel_dict)
        self.coll_data['relation_types'] = relation_types
formatter.py 文件源码 项目:PyAthena 作者: laughingman7743 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def format(self, operation, parameters=None):
        if not operation or not operation.strip():
            raise ProgrammingError('Query is none or empty.')
        operation = operation.strip()

        if operation.upper().startswith('SELECT') or operation.upper().startswith('WITH'):
            escaper = _escape_presto
        else:
            escaper = _escape_hive

        kwargs = dict()
        if parameters:
            if isinstance(parameters, dict):
                for k, v in iteritems(parameters):
                    func = self.get_formatter(v)
                    kwargs.update({k: func(self, escaper, v)})
            else:
                raise ProgrammingError('Unsupported parameter ' +
                                       '(Support for dict only): {0}'.format(parameters))

        return (operation % kwargs).strip() if kwargs else operation.strip()
connection.py 文件源码 项目:PyAthenaJDBC 作者: laughingman7743 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _build_driver_args(self, **kwargs):
        props = jpype.java.util.Properties()
        if self.credential_file:
            props.setProperty('aws_credentials_provider_class',
                              'com.amazonaws.athena.jdbc.shaded.' +
                              'com.amazonaws.auth.PropertiesFileCredentialsProvider')
            props.setProperty('aws_credentials_provider_arguments',
                              self.credential_file)
        elif self.token:
            props.setProperty('aws_credentials_provider_class',
                              'com.amazonaws.athena.jdbc.shaded.' +
                              'com.amazonaws.auth.DefaultAWSCredentialsProviderChain')
        else:
            props.setProperty('user', self.access_key)
            props.setProperty('password', self.secret_key)
        props.setProperty('s3_staging_dir', self.s3_staging_dir)
        for k, v in iteritems(kwargs):
            if k and v:
                props.setProperty(k, v)
        return props
formatter.py 文件源码 项目:PyAthenaJDBC 作者: laughingman7743 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def format(self, operation, parameters=None):
        if not operation or not operation.strip():
            raise ProgrammingError('Query is none or empty.')
        operation = operation.strip()

        if operation.upper().startswith('SELECT') or operation.upper().startswith('WITH'):
            escaper = _escape_presto
        else:
            escaper = _escape_hive

        kwargs = dict()
        if parameters:
            if isinstance(parameters, dict):
                for k, v in iteritems(parameters):
                    func = self.get_formatter(v)
                    kwargs.update({k: func(self, escaper, v)})
            else:
                raise ProgrammingError('Unsupported parameter ' +
                                       '(Support for dict only): {0}'.format(parameters))

        return (operation % kwargs).strip() if kwargs else operation.strip()
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_arch_state_results(arch, current_details, previous_details, output_state_results=False):
    result_re = re.compile(arch + '_')
    test_results = current_details.find_all('td', id=result_re)
    test_results_previous = previous_details.find_all('td', id=result_re)
    # find differences from previous to current (result_X)
    test_results_dict = {i['id']: i for i in test_results}
    test_results_previous_dict = {i['id']: i for i in test_results_previous if i['id'] in test_results_dict.keys()}
    states = SortedDict(get_state(v, test_results_previous_dict) for k, v in iteritems(test_results_dict))
    # intermediate step:
    # - print report of differences
    interesting_states = SortedDict({k.split(arch + '_')[1]: v for k, v in iteritems(states) if v['state'] != 'STABLE'})
    if output_state_results:
        print("arch: %s" % arch)
        for state in interesting_states_names:
            print("\n%s:\n\t%s\n" % (state, ', '.join(k for k, v in iteritems(interesting_states) if v['state'] == state)))
    return interesting_states
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def find_builds(builds, running_threshold=0):
    """Find finished builds, ignore still running or empty."""
    threshold = float(running_threshold) if running_threshold is not None else 0

    # filter out empty builds
    def non_empty(r):
        return r['total'] != 0 and r['total'] > r['skipped'] and not ('build' in r.keys() and r['build'] is None)
    builds = {build: result for build, result in iteritems(builds) if non_empty(result)}

    finished = {build: result for build, result in iteritems(builds) if not result['unfinished'] or
                (100 * float(result['unfinished']) / result['total']) <= threshold}

    log.debug("Found the following finished non-empty builds: %s" % ', '.join(finished.keys()))
    if len(finished) < 2:
        raise NotEnoughBuildsError("not enough finished builds found")
    assert len(finished.keys()) >= 2
    return finished.keys()
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, browser, args, root_url, job_groups):
        """Create openQA review report."""
        self.browser = browser
        self.args = args
        self.root_url = root_url
        self.job_groups = job_groups

        self._label = 'Gathering data and processing report'
        self._progress = 0
        self.report = SortedDict()

        for k, v in iteritems(job_groups):
            log.info("Processing '%s'" % v)
            if args.no_progress or not humanfriendly_available:
                self.report[k] = self._one_report(v)
            else:
                with AutomaticSpinner(label=self._next_label()):
                    self.report[k] = self._one_report(v)
            self._progress += 1
        if not args.no_progress:
            sys.stderr.write("\r%s\n" % self._next_label())  # It's nice to see 100%, too :-)
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reminder_comment_on_issues(report, min_days_unchanged=MIN_DAYS_UNCHANGED):
    processed_issues = set()
    report.report = SortedDict({p: pr for p, pr in iteritems(report.report) if isinstance(pr, ProductReport)})
    for product, pr in iteritems(report.report):
        for arch, ar in iteritems(pr.reports):
            for issue_status, issue_types in iteritems(ar.issues):
                for issue_type, ies in iteritems(issue_types):
                    for ie in ies:
                        issue = ie.bug
                        if issue:
                            bugref = issue.bugref.replace('bnc', 'bsc').replace('boo', 'bsc')
                            if bugref not in processed_issues:
                                try:
                                    reminder_comment_on_issue(ie, min_days_unchanged)
                                except HTTPError as e:  # pragma: no cover
                                    log.error("Encountered error trying to post a reminder comment on issue '%s': %s. Skipping." % (ie, e))
                                    continue
                                processed_issues.add(bugref)
models.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def find_additional_rels(self, all_models):
        """Attempts to scan for additional relationship fields for this model based on all of the other models'
        structures and relationships.
        """
        for model_name, model in iteritems(all_models):
            if model_name != self.name:
                for field_name in model.field_names:
                    field = model.fields[field_name]
                    # if this field type references the current model
                    if field.field_type == self.name and field.back_populates is not None and \
                            (isinstance(field, StatikForeignKeyField) or isinstance(field, StatikManyToManyField)):
                        self.additional_rels[field.back_populates] = {
                            'to_model': model_name,
                            'back_populates': field_name,
                            'secondary':
                                (model_name, field.field_type) if isinstance(field, StatikManyToManyField) else None
                        }
                        logger.debug('Additional relationship %s.%s -> %s (%s)' % (
                            self.name,
                            field.back_populates,
                            model_name,
                            self.additional_rels[field.back_populates]
                        ))
utils.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def deep_merge_dict(a, b):
    """Deep merges dictionary b into dictionary a."""
    _a = copy(a)
    _b = copy(b)

    for key_b, val_b in iteritems(_b):
        # if it's a sub-dictionary
        if isinstance(val_b, dict):
            if key_b not in _a or not isinstance(_a[key_b], dict):
                _a[key_b] = {}

            # perform the deep merge recursively
            _a[key_b] = deep_merge_dict(_a[key_b], val_b)
        else:
            _a[key_b] = val_b

    # b should now be deep-merged into a
    return _a
utils.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def dict_strip(d):
    """Strips whitespace from the string values of the given dictionary (recursively).

    Args:
        d: A dictionary object.

    Returns:
        A new dictionary object, whose string values' whitespace has been stripped out.
    """
    _d = deepcopy(d)
    for k, v in iteritems(d):
        if isinstance(v, str):
            _d[k] = v.strip()
        elif isinstance(v, dict):
            _d[k] = dict_strip(v)

    return _d
bitbucket_buildbot.py 文件源码 项目:buildbot-contrib 作者: buildbot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def addChange(self, dummy, remote, changei, src='hg'):
        """
        Sends changes from the commit to the buildmaster.
        """
        logging.debug("addChange %s, %s", repr(remote), repr(changei))
        try:
            change = changei.next()
        except StopIteration:
            remote.broker.transport.loseConnection()
            return None

        logging.info("New revision: %s", change['revision'][:8])
        for key, value in iteritems(change):
            logging.debug("  %s: %s", key, value)

        change['src'] = src
        deferred = remote.callRemote('addChange', change)
        deferred.addCallback(self.addChange, remote, changei, src)
        return deferred
github_buildbot.py 文件源码 项目:buildbot-contrib 作者: buildbot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def addChange(self, _, remote, changei, src='git'):
        """
        Sends changes from the commit to the buildmaster.
        """
        logging.debug("addChange %r, %r", remote, changei)
        try:
            change = changei.next()
        except StopIteration:
            remote.broker.transport.loseConnection()
            return None

        logging.info("New revision: %s", change['revision'][:8])
        for key, value in iteritems(change):
            logging.debug("  %s: %s", key, value)

        change['src'] = src
        deferred = remote.callRemote('addChange', change)
        deferred.addCallback(self.addChange, remote, changei, src)
        return deferred
misc.py 文件源码 项目:FightstickDisplay 作者: calexil 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def elements(self):
        '''Iterator over elements repeating each as many times as its count.

        >>> c = Counter('ABCABC')
        >>> sorted(c.elements())
        ['A', 'A', 'B', 'B', 'C', 'C']

        If an element's count has been set to zero or is a negative number,
        elements() will ignore it.

        '''
        for elem, count in iteritems(self):
            for _ in range(count):
                yield elem

    # Override dict methods where the meaning changes for Counter objects.
_dual_basis_trotter_error.py 文件源码 项目:FermiLib 作者: ProjectQ-Framework 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ordered_dual_basis_terms_no_info(dual_basis_hamiltonian):
    """Give terms from the dual basis Hamiltonian in dictionary output order.

    Args:
        dual_basis_hamiltonian (FermionOperator): The Hamiltonian.

    Returns:
        A list of terms from the dual basis Hamiltonian in simulated order.
    """
    n_qubits = count_qubits(dual_basis_hamiltonian)
    terms = []

    for operators, coefficient in iteritems(dual_basis_hamiltonian.terms):
        terms += [FermionOperator(operators, coefficient)]

    return terms
ws_thread.py 文件源码 项目:bitmex-market-maker 作者: amanelis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_ticker(self, symbol):
        '''Return a ticker object. Generated from instrument.'''

        instrument = self.get_instrument(symbol)

        # If this is an index, we have to get the data from the last trade.
        if instrument['symbol'][0] == '.':
            ticker = {}
            ticker['mid'] = ticker['buy'] = ticker['sell'] = ticker['last'] = instrument['markPrice']
        # Normal instrument
        else:
            bid = instrument['bidPrice'] or instrument['lastPrice']
            ask = instrument['askPrice'] or instrument['lastPrice']
            ticker = {
                "last": instrument['lastPrice'],
                "buy": bid,
                "sell": ask,
                "mid": (bid + ask) / 2
            }

        # The instrument has a tickSize. Use it to round values.
        return {k: round(float(v or 0), instrument['tickLog']) for k, v in iteritems(ticker)}
misc.py 文件源码 项目:cryptogram 作者: xinmingzhang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def elements(self):
        '''Iterator over elements repeating each as many times as its count.

        >>> c = Counter('ABCABC')
        >>> sorted(c.elements())
        ['A', 'A', 'B', 'B', 'C', 'C']

        If an element's count has been set to zero or is a negative number,
        elements() will ignore it.

        '''
        for elem, count in iteritems(self):
            for _ in range(count):
                yield elem

    # Override dict methods where the meaning changes for Counter objects.
factory.py 文件源码 项目:osisoftpy 作者: dstcontrols 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create(factory, thing, session, webapi=None):
    """
    Return an object created with factory
    :param webapi: 
    :param factory: 
    :param params: 
    :param session: 
    :return: 
    """


    payload = dict(map(lambda k_v: (k_v[0].lower(), k_v[1]), iteritems(thing)))

    # added to avoid creating Value objects if the value was considered bad values
    # but we don't need this since we don't want the library to cull bad values that
    # the pi web api gave us.
    #
    # if 'good' in payload:
    #     if not payload['good']:
    #         return None

    payload.update({'session': session, 'webapi': webapi})
    thing = factory.create(**payload)
    return thing
ws_thread.py 文件源码 项目:market-maker 作者: Behappy123 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_ticker(self, symbol):
        '''Return a ticker object. Generated from instrument.'''

        instrument = self.get_instrument(symbol)

        # If this is an index, we have to get the data from the last trade.
        if instrument['symbol'][0] == '.':
            ticker = {}
            ticker['mid'] = ticker['buy'] = ticker['sell'] = ticker['last'] = instrument['markPrice']
        # Normal instrument
        else:
            bid = instrument['bidPrice'] or instrument['lastPrice']
            ask = instrument['askPrice'] or instrument['lastPrice']
            ticker = {
                "last": instrument['lastPrice'],
                "buy": bid,
                "sell": ask,
                "mid": (bid + ask) / 2
            }

        # The instrument has a tickSize. Use it to round values.
        return {k: round(float(v or 0), instrument['tickLog']) for k, v in iteritems(ticker)}
license_identifier.py 文件源码 项目:lid 作者: codeauroraforum 项目源码 文件源码 阅读 68 收藏 0 点赞 0 评论 0
def get_top_candidates(self, source):
        # First, compute n-grams for all lines in the source file
        src_ng = n_grams.NGrams()
        src_ng.parse_text_list_items(
            source.lines,
            universe_ng=self.license_library.universe_n_grams)

        # Measure n-gram similarity relative to all licenses in the library
        similarities = OrderedDict()
        for license_name, lic in iteritems(self.license_library.licenses):
            similarity_score = lic.n_grams.measure_similarity(src_ng)
            similarities[license_name] = similarity_score

        # Filter out low-scoring licenses
        best_score = max(similarities.values())
        current_threshold = max(self.threshold,
                                best_score * self.keep_fraction_of_best)

        top_candidates = OrderedDict()
        for license_name, score in iteritems(similarities):
            if score >= current_threshold:
                top_candidates[license_name] = score

        return top_candidates
runner.py 文件源码 项目:wft4galaxy 作者: phnmnl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def find_missing_tools(self, workflow=None):
        """
        Find tools required by the workflow to test and not installed on the configured Galaxy server.

        :type workflow: :class:`bioblend.galaxy.objects.wrappers.Workflow`
        :param workflow: an optional instance of :class:`bioblend.galaxy.objects.wrappers.Workflow`

        :rtype: list
        :return: the list of missing tools
        """
        _logger.debug("Checking required tools ...")
        workflow = self.get_galaxy_workflow() if not workflow else workflow
        available_tools = self._galaxy_instance.tools.list()
        missing_tools = []
        _logger.debug("Available tools: %s", ", ".join(["{0}, {1}".format(t.id, t.version) for t in available_tools]))
        for order, step in _iteritems(workflow.steps):
            if step.tool_id and len([t for t in available_tools
                                     if t.id == step.tool_id and t.version == step.tool_version]) == 0:
                missing_tools.append((step.tool_id, step.tool_version))
        _logger.debug("Missing tools: {0}".format("None"
                                                  if len(missing_tools) == 0
                                                  else ", ".join(["{0} (version {1})"
                                                                 .format(x[0], x[1]) for x in missing_tools])))
        _logger.debug("Checking required tools: DONE")
        return missing_tools
runner.py 文件源码 项目:wft4galaxy 作者: phnmnl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def cleanup(self, output_folder=None):
        """
        Perform a complete clean up of the data produced during the execution of a workflow test,
        i.e., the uploaded workflow and the created history are removed from Galaxy and the actual
        output datasets (downloaded from Galaxy) are deleted from the output path of the local file system.
        """
        _logger.debug("Cleanup of workflow test '%s'...", self._uuid)
        for test_uuid, test_result in _iteritems(self._test_cases):
            if test_result.output_history:
                self._galaxy_instance.histories.delete(test_result.output_history.id)
            self.cleanup_output_folder(test_result)
        if self._galaxy_workflow:
            self._workflow_loader.unload_workflow(self._galaxy_workflow.id)
            self._galaxy_workflow = None
        _logger.debug("Cleanup of workflow test '%s': DONE", self._uuid)
        if output_folder and _os.path.exists(output_folder):
            _shutil.rmtree(output_folder)
            _logger.debug("Deleted WF output folder '%s': DONE", output_folder)
diagnostic_interface.py 文件源码 项目:iCompleteMe 作者: jerrymarino 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _GetKeptAndNewSigns( placed_signs, buffer_number_to_line_to_diags,
                         next_sign_id ):
  new_signs = []
  kept_signs = []
  for buffer_number, line_to_diags in iteritems(
                                            buffer_number_to_line_to_diags ):
    if not vimsupport.BufferIsVisible( buffer_number ):
      continue

    for line, diags in iteritems( line_to_diags ):
      # Only one sign is visible by line.
      first_diag = diags[ 0 ]
      sign = _DiagSignPlacement( next_sign_id,
                                 line,
                                 buffer_number,
                                 _DiagnosticIsError( first_diag ) )
      if sign not in placed_signs:
        new_signs.append( sign )
        next_sign_id += 1
      else:
        # We use .index here because `sign` contains a new id, but
        # we need the sign with the old id to unplace it later on.
        # We won't be placing the new sign.
        kept_signs.append( placed_signs[ placed_signs.index( sign ) ] )
  return new_signs, kept_signs, next_sign_id
get_reads.py 文件源码 项目:NAIBR 作者: raphael-group 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def largest_overlap(items):
    positions_i = collections.defaultdict(int)
    positions_j = collections.defaultdict(int)
    for disc in items:
        start,end = signi(disc)
        for pos in range(start,end):
            positions_i[pos] += 1
        start,end = signj(disc)
        for pos in range(start,end):
            positions_j[pos] += 1
    i = 0
    overlap_i = 0
    for pos,overlap in iteritems(positions_i):
        if overlap > overlap_i:
            overlap_i = overlap
            i = pos
    j = 0
    overlap_j = 0
    for pos,overlap in iteritems(positions_j):
        if overlap > overlap_j:
            overlap_j = overlap
            j = pos
    return i,j,overlap_i,overlap_j
get_reads.py 文件源码 项目:NAIBR 作者: raphael-group 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_candidates(discs,reads_by_LR):
    candidates = []
    r = lmax
    p_len,p_rate,barcode_overlap = get_distributions(reads_by_LR)
    if p_len == None or p_rate == None:
        return None,None,None
    num_cands = 0
    for key,items in iteritems(discs):
        orient = key[4]
        si,ei,sj,ej = disc_intersection(items)
        if si and sj and len(items) >= MIN_DISCS:
            i = coordinates(si,ei,orient[0])
            j = coordinates(sj,ej,orient[1])
            cand = copy.copy(items[0])
            cand.i = i
            cand.j = j
            barcode_overlaps = barcode_overlap[(cand.chrm,int(cand.i/d)*d,cand.nextchrm,int(cand.j/d)*d)]
            if not inblacklist(cand) and ((cand.chrm == cand.nextchrm and cand.j-cand.i < d)\
                or barcode_overlaps >= k):
                already_appended = sum([1 for x in candidates if x.i == cand.i and x.j == cand.j])
                if not already_appended:
                    num_cands += 1
                    candidates.append(cand)
    return candidates,p_len,p_rate
distributions.py 文件源码 项目:NAIBR 作者: raphael-group 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_distributions(reads_by_LR):
    LRs = []
    global barcode_overlap,LRs_by_barcode
    LRs_by_barcode = collections.defaultdict(list)
    barcode_overlap = collections.defaultdict(int)
    for key,reads in iteritems(reads_by_LR):
        chrom,barcode = key
        barcode_LRS = linked_reads(reads,chrom)
        LRs += barcode_LRS
        LRs_by_barcode[barcode] += barcode_LRS
    for barcode,barcode_LRS in iteritems(LRs_by_barcode):
        if len(barcode_LRS) > 1:
            get_overlap(barcode_LRS)
    if len(LRs) < 100:
        return None,None,None
    p_rate = get_rate_distr(LRs)
    p_len = get_length_distr(LRs)   
    return p_len,p_rate,barcode_overlap
Table.py 文件源码 项目:LTTL 作者: axanthos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def to_transposed(self):
        """Return a transposed copy of the crosstab"""
        new_col_ids = self.row_ids[:]
        return PivotCrosstab(
            self.col_ids[:],
            new_col_ids,
            dict(
                (tuple(reversed(key)), count)
                for key, count in iteritems(self.values)
            ),
            self.header_col_id,
            self.header_col_type,
            self.header_row_id,
            self.header_row_type,
            dict([(col_id, 'continuous') for col_id in new_col_ids]),
            None,
            self.missing,
            self.header_col_id,    # TODO: check this (was self._cached_row_id).
        )

    # TODO: test.
Utils.py 文件源码 项目:LTTL 作者: axanthos 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def sample_dict(dictionary, sample_size):
    """Return a randomly sampled frequency dict"""
    new_dict = dict()
    num_to_sample = sample_size
    num_to_process = sum(itervalues(dictionary))
    for (k, v) in iteritems(dictionary):
        for i in range(v):
            if random.random() < num_to_sample / num_to_process:
                new_dict[k] = new_dict.get(k, 0) + 1
                num_to_sample -= 1
            num_to_process -= 1
            if num_to_sample == 0:
                break
        else:
            continue
        break
    if num_to_sample > 0:
        raise ValueError(u'Not enough elements in dictionary')
    return new_dict
create_logger.py 文件源码 项目:pylogging 作者: ansrivas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __set_log_levels(level_dict):
    """Set the log levels for any log-handler for e.g. level_dict = {'requests':'error'}."""
    if not isinstance(level_dict, dict):
        rwt(TypeError('Expecting dict object with format:  \{\'requests\':\'warning\'\} \n' +
                      'Available levels are: {0}'.format(LogLevel.levels.keys)))
    else:
        for key, val in iteritems(level_dict):
            logging.getLogger(key).setLevel(LogLevel.get_level(val))
export.py 文件源码 项目:blog_django 作者: chnpmy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _to_xml(self, xml, data):
        if isinstance(data, (list, tuple)):
            for item in data:
                xml.startElement("row", {})
                self._to_xml(xml, item)
                xml.endElement("row")
        elif isinstance(data, dict):
            for key, value in iteritems(data):
                key = key.replace(' ', '_')
                xml.startElement(key, {})
                self._to_xml(xml, value)
                xml.endElement(key)
        else:
            xml.characters(smart_text(data))


问题


面经


文章

微信
公众号

扫码关注公众号