python类iteritems()的实例源码

export.py 文件源码 项目:dream_blog 作者: fanlion 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _to_xml(self, xml, data):
        if isinstance(data, (list, tuple)):
            for item in data:
                xml.startElement("row", {})
                self._to_xml(xml, item)
                xml.endElement("row")
        elif isinstance(data, dict):
            for key, value in iteritems(data):
                key = key.replace(' ', '_')
                xml.startElement(key, {})
                self._to_xml(xml, value)
                xml.endElement(key)
        else:
            xml.characters(smart_text(data))
extransform.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_device_tensor_initializations(self):
        for device_tensor_view, host_tensor in iteritems(self.device_initializations):
            device_tensor_view[()] = host_tensor
        self.device_initializations = dict()
expass.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def end_pass(self, **kwargs):
        super(SSAConversion, self).end_pass(**kwargs)
        for source_tensor_decl, current_exop in iteritems(self.tensor_map):
            if current_exop.output_decls[0].tensor_decl is source_tensor_decl:
                continue
            if not source_tensor_decl.is_output:
                continue
            copy_exop = ExOp(computation_decl=self.computation_decl,
                             create_value=False,
                             op=WriteOp(axes=[]))
            copy_exop.add_write_arg(source_tensor_decl.exop.output_decls[0])
            copy_exop.add_input_decl(current_exop.output_decls[0])
            self.exop_block.add_exop(copy_exop)
converter.py 文件源码 项目:PyAthenaJDBC 作者: laughingman7743 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        types = jpype.java.sql.Types
        self.jdbc_type_mappings = dict()
        for field in types.__javaclass__.getClassFields():
            self.jdbc_type_mappings[field.getName()] = field.getStaticAttribute()
        _logger.debug(self.jdbc_type_mappings)
        self.converter_mappings = dict()
        for k, v in iteritems(_DEFAULT_CONVERTERS):
            type_code = self.jdbc_type_mappings.get(k, None)
            if type_code is not None:
                self.converter_mappings[type_code] = v
            else:
                _logger.warning('%s is not defined java.sql.Types.', k)
test_openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_reminder_comments_on_referenced_bugs_are_posted():
    args = bugrefs_test_args_factory()
    args.verbose_test = 1
    args.query_issue_status = True
    args.dry_run = True
    report = openqa_review.generate_report(args)

    # test double comment prevention code
    p, pr = list(iteritems(report.report))[0]
    report.report[p + 237] = pr

    openqa_review.reminder_comment_on_issues(report)
    args.dry_run = False
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_results_by_bugref(results, args):
    include_tags = ['STILL_FAILING', 'NEW_ISSUE']
    if args.include_softfails:
        include_tags += soft_fail_states

    # plain for-loop with append is most efficient: https://stackoverflow.com/questions/11276473/append-to-a-dict-of-lists-with-a-dict-comprehension
    results_by_bugref = defaultdict(list)
    for k, v in iteritems(results):
        if not re.match('(' + '|'.join(include_tags) + ')', v['state']):
            continue
        key = v['bugref'] if (args.bugrefs and 'bugref' in v and v['bugref']) else 'todo'
        results_by_bugref[key].append(dict(v, **{'name': k}))
    return results_by_bugref
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, arch, results, args, root_url, progress_browser, bugzilla_browser, test_browser):
        """Construct an archreport object with options."""
        self.arch = arch
        self.args = args
        self.root_url = root_url
        self.progress_browser = progress_browser
        self.bugzilla_browser = bugzilla_browser
        self.test_browser = test_browser

        self.status_badge = set_status_badge([i['state'] for i in results.values()])

        if self.args.bugrefs and self.args.include_softfails:
            self._search_for_bugrefs_for_softfailures(results)

        # if a ticket is known and the same refers to a STILL_FAILING scenario and any NEW_ISSUE we regard that as STILL_FAILING but just visible in more
        # scenarios, ...
        # ... else (no ticket linked) we don't group them as we don't know if it really is the same issue and handle them outside
        results_by_bugref = SortedDict(get_results_by_bugref(results, self.args))
        self.issues = defaultdict(lambda: defaultdict(list))
        for bugref, result_list in iteritems(results_by_bugref):
            if not re.match('(poo|bsc|boo)#', bugref):
                log.info('Skipping unknown bugref \'%s\' in \'%s\'' % (bugref, result_list))
                continue
            bug = result_list[0]
            issue = Issue(bug['bugref'], bug['bugref_href'], self.args.query_issue_status, self.progress_browser, self.bugzilla_browser)
            self.issues[issue_state(result_list)][issue_type(bugref)].append(IssueEntry(self.args, self.root_url, result_list, bug=issue))

        # left to handle are the issues marked with 'todo'
        todo_results = results_by_bugref.get('todo', [])
        new_issues = (r for r in todo_results if r['state'] == 'NEW_ISSUE')
        self.issues['new']['todo'].extend(IssueEntry.for_each(self.args, self.root_url, new_issues, test_browser))
        existing_issues = (r for r in todo_results if r['state'] == 'STILL_FAILING')
        self.issues['existing']['todo'].extend(IssueEntry.for_each(self.args, self.root_url, existing_issues, test_browser))
        if self.args.include_softfails:
            new_soft_fails = [r for r in todo_results if r['state'] == 'NEW_SOFT_ISSUE']
            existing_soft_fails = [r for r in todo_results if r['state'] == 'STILL_SOFT_FAILING']
            if new_soft_fails:
                self.issues['new']['product'].append(IssueEntry(self.args, self.root_url, new_soft_fails))
            if existing_soft_fails:
                self.issues['existing']['product'].append(IssueEntry(self.args, self.root_url, existing_soft_fails))
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def total_issues(self):
        """Return Number of issue entries for this arch."""
        total = 0
        for issue_status, issue_types in iteritems(self.issues):
            for issue_type, ies in iteritems(issue_types):
                total += len(ies)
        return total
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, browser, job_group_url, root_url, args):
        """Construct a product report object with options."""
        self.args = args
        self.job_group_url = job_group_url
        self.group = job_group_url.split('/')[-1]
        current_url, previous_url = get_build_urls_to_compare(browser, job_group_url, args.builds, args.against_reviewed, args.running_threshold)
        # read last finished
        current_details = browser.get_soup(current_url)
        previous_details = browser.get_soup(previous_url)
        for details in current_details, previous_details:
            assert sum(int(badge.text) for badge in details.find_all(class_='badge')) > 0, \
                "invalid page with no test results found reading %s and %s, make sure you specified valid builds (leading zero missing?)" \
                % (current_url, previous_url)
        current_summary = parse_summary(current_details)
        previous_summary = parse_summary(previous_details)

        changes = {k: v - previous_summary.get(k, 0) for k, v in iteritems(current_summary) if k != 'none' and k != 'incomplete'}
        log.info("Changes since last build:\n\t%s" % '\n\t'.join("%s: %s" % (k, v) for k, v in iteritems(changes)))

        self.build = get_build_nr(current_url)
        self.ref_build = get_build_nr(previous_url)

        # for each architecture iterate over all
        cur_archs, prev_archs = (set(arch.text for arch in details.find_all('th', id=re.compile('flavor_'))) for details in [current_details, previous_details])
        archs = cur_archs
        if args.arch:
            assert args.arch in cur_archs, "Selected arch {} was not found in test results {}".format(args.arch, cur_archs)
            archs = [args.arch]
        self.missing_archs = sorted(prev_archs - cur_archs)
        if self.missing_archs:
            log.info("%s missing completely from current run: %s" %
                     (pluralize(len(self.missing_archs), "architecture is", "architectures are"), ', '.join(self.missing_archs)))

        # create arch reports
        self.reports = SortedDict()
        progress_browser = progress_browser_factory(args) if args.query_issue_status else None
        bugzilla_browser = bugzilla_browser_factory(args) if args.query_issue_status else None
        for arch in sorted(archs):
            results = get_arch_state_results(arch, current_details, previous_details, args.output_state_results)
            self.reports[arch] = ArchReport(arch, results, args, root_url, progress_browser, bugzilla_browser, browser)
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_job_groups(browser, root_url, args):
    if args.job_group_urls:
        job_group_urls = args.job_group_urls.split(',')
        log.info("Acting on specified job group URL(s): %s" % ', '.join(job_group_urls))
        job_groups = {i: url for i, url in enumerate(job_group_urls)}
    else:
        parent_groups = get_parent_job_groups(browser, root_url, args)
        if args.no_progress or not humanfriendly_available:
            results = browser.get_json(urljoin(root_url, 'api/v1/job_groups'))
        else:
            with AutomaticSpinner(label='Retrieving job groups'):
                results = browser.get_json(urljoin(root_url, 'api/v1/job_groups'))

        def _pgroup_prefix(group):
            try:
                return '%s / %s' % (parent_groups[group['parent_id']], group['name'])
            except KeyError:
                return group['name']

        job_groups = {}
        for job_group in results:
            job_groups[_pgroup_prefix(job_group)] = urljoin(root_url, '/group_overview/%i' % job_group['id'])
        if args.job_groups:
            job_pattern = re.compile('(%s)' % '|'.join(args.job_groups.split(',')))
            job_groups = {k: v for k, v in iteritems(job_groups) if job_pattern.search(k)}
            log.info("Job group URL for %s: %s" % (args.job_groups, job_groups))
        if args.exclude_job_groups:
            job_pattern = re.compile('(%s)' % '|'.join(args.exclude_job_groups.split(',')))
            job_groups = {k: v for k, v in iteritems(job_groups) if not job_pattern.search(k)}
            log.info("Job group URL excluding %s: %s" % (args.exclude_job_groups, job_groups))
    return SortedDict(job_groups)
openqa_review.py 文件源码 项目:openqa_review 作者: okurz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __str__(self):
        """Generate markdown."""
        report_str = ""
        for k, v in iteritems(self.report):
            report_str += '# %s\n\n%s\n---\n' % (k, v)
        return report_str
test_database.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def assertInstanceEqual(self, expected, inst):
        for field_name, field_value in iteritems(expected):
            self.assertEqual(field_value, getattr(inst, field_name))
database.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def find_backrefs(self):
        for model_name, model in iteritems(self.models):
            logger.debug('Attempting to find backrefs for model: %s' % model_name)
            model.find_additional_rels(self.models)
database.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_db(self, models):
        """Creates the in-memory SQLite database from the model
        configuration."""
        # first create the table definitions
        self.tables = dict([(model_name, self.create_model_table(model)) for model_name, model in iteritems(models)])
        # now create the tables in memory
        logger.debug("Creating %d database table(s)..." % len(self.tables))
        self.Base.metadata.create_all(self.engine)
        self.load_all_model_data(models)
database.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def query(self, query, additional_locals=None, safe_mode=False):
        """Executes the given SQLAlchemy query string.

        Args:
            query: The SQLAlchemy ORM query (or Python code) to be executed.
            additional_locals: Any additional local variables to inject into the execution context when executing
                the query.
            safe_mode: Boolean value indicating whether or not to execute queries in safe mode only. If True,
                this only allows MLAlchemy-style queries. If False, this allows both exec() and MLAlchemy-style
                queries. Default: False.

        Returns:
            The result of executing the query.
        """
        logger.debug("Attempting to execute database query: %s" % query)

        if safe_mode and not isinstance(query, dict):
            raise SafetyViolationError("Queries in safe mode must be MLAlchemy-style queries")

        if isinstance(query, dict):
            logger.debug("Executing query in safe mode (MLAlchemy)")
            return mlalchemy.parse_query(query).to_sqlalchemy(self.session, self.tables).all()
        else:
            logger.debug("Executing unsafe query (Python exec())")
            if additional_locals is not None:
                for k, v in iteritems(additional_locals):
                    locals()[k] = v

            exec(
                compile(
                    'result = %s' % query.strip(),
                    '<string>',
                    'exec'
                ),
                globals(),
                locals()
            )
            return locals()['result']
database.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __repr__(self):
        result_lines = ["<StatikDatabaseInstance model=%s" % self.model.name]
        for field_name, field_value in iteritems(self.field_values):
            model_field = self.model.fields.get(field_name, None)
            if isinstance(model_field, StatikContentField) or isinstance(model_field, StatikTextField):
                result_lines.append("                        %s=<...>" % field_name)
            else:
                result_lines.append("                        %s=%s" % (field_name, field_value))
        result_lines[-1] += '>'
        return '\n'.join(result_lines)
views.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_context_dynamic(self, db, safe_mode=False):
        result = {}
        for var, query in iteritems(self.context_dynamic):
            result[var] = db.query(query, safe_mode=safe_mode)
        return result
tags.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def echo_arguments(*args, **kwargs):
    """ Echoes all parameters back as text (for debugging)
            {% ditto 1 2 3 %} => "ditto(1, 2, 3)"
    """
    args_string = ', '.join(map(lambda x: str(x), args))
    kwargs_string = ', '.join(map(lambda k, v: "%s=%s" % (k, v), iteritems(kwargs)))
    string_lst = filter(lambda x: bool(x), [args_string, kwargs_string])
    return "ditto(%s)" % ", ".join(string_lst)
project.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_project_dynamic_context(self):
        """Loads the dynamic context for this project, if any."""
        context = {}
        for varname, query in iteritems(self.config.context_dynamic):
            context[varname] = self.db.query(query)
        return context
project.py 文件源码 项目:statik 作者: thanethomson 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_views(self):
        """Processes the loaded views to generate the required output data."""
        output = {}
        logger.debug("Processing %d view(s)..." % len(self.views))
        for view_name, view in iteritems(self.views):
            # first update the view's context with the project context
            view.context.update(self.project_context)
            output = deep_merge_dict(output, view.process(self.db, safe_mode=self.safe_mode))
        return output


问题


面经


文章

微信
公众号

扫码关注公众号