python类EX_OK的实例源码

daemon.py 文件源码 项目:concierge 作者: 9seconds 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def manage_events(self, notify):
        filename = os.path.basename(self.source_path)

        while True:
            try:
                events = notify.read()
            except KeyboardInterrupt:
                return os.EX_OK
            else:
                LOG.debug("Caught %d events", len(events))

            events = self.filter_events(filename, events)
            descriptions = self.describe_events(events)
            LOG.debug("Got %d events after filtration: %s",
                      len(descriptions), descriptions)

            if events:
                self.output()

            LOG.info("Config was managed. Going to the next loop.")
testcase.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __str__(self):
        try:
            assert self.project_name
            assert self.case_name
            result = 'PASS' if(self.is_successful(
                ) == TestCase.EX_OK) else 'FAIL'
            msg = prettytable.PrettyTable(
                header_style='upper', padding_width=5,
                field_names=['test case', 'project', 'duration',
                             'result'])
            msg.add_row([self.case_name, self.project_name,
                         self.get_duration(), result])
            return msg.get_string()
        except AssertionError:
            self.__logger.error("We cannot print invalid objects")
            return super(TestCase, self).__str__()
run_tests.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run_tier(self, tier):
        """Run one tier"""
        tier_name = tier.get_name()
        tests = tier.get_tests()
        if not tests:
            LOGGER.info("There are no supported test cases in this tier "
                        "for the given scenario")
            self.overall_result = Result.EX_ERROR
        else:
            LOGGER.info("Running tier '%s'", tier_name)
            for test in tests:
                self.run_test(test)
                test_case = self.executed_test_cases[test.get_name()]
                if test_case.is_successful() != testcase.TestCase.EX_OK:
                    LOGGER.error("The test case '%s' failed.", test.get_name())
                    if test.get_project() == "functest":
                        self.overall_result = Result.EX_ERROR
                    if test.is_blocking():
                        raise BlockingTestFailed(
                            "The test case {} failed and is blocking".format(
                                test.get_name()))
        return self.overall_result
test_redisrwlock_cmdline.py 文件源码 项目:redisrwlock 作者: veshboo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_option_repeat_interval(self):
        """test --retry and --interval options"""
        # run with --retry, see 2 lines, then kill -INT
        cmd, output = runCmdOutput(['-p', '7788', '-r'],
                                   wait=False, limit=2)
        cmd.send_signal(signal.SIGINT)
        self.assertEqual(cmd.wait(), 1)
        cmd.stdout.close()
        # run with --retry, see 4 lines, then kill -INT
        cmd, output = runCmdOutput(['-p', '7788', '-r', '-i', '1'],
                                   wait=False, limit=4)
        cmd.send_signal(signal.SIGINT)
        self.assertEqual(cmd.wait(), 1)
        cmd.stdout.close()
        # invalid --interval option argument (int > 0)
        cmd, output = runCmdOutput(['-p', '7788', '-i', '0'])
        self.assertEqual(cmd.returncode, os.EX_USAGE)
        # --interval option argument ignored if no --retry
        cmd, output = runCmdOutput(['-p', '7788', '-i', '1000'])
        self.assertEqual(cmd.returncode, os.EX_OK)
typecheck.py 文件源码 项目:temci 作者: parttimenerd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
        if not isinstance(value, str) or value == "":
            return info.errormsg(self)
        value = os.path.expanduser(value)
        if self.allow_std and value == "-" and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        is_valid = True
        if os.path.exists(value):
            if os.path.isfile(value) and os.access(os.path.abspath(value), os.W_OK)\
                    and (self.constraint is None or self.constraint(value)):
                return info.wrap(True)
            return info.errormsg(self)
        if not self.allow_non_existent:
            return info.errormsg(self, "File doesn't exist")
        abs_name = os.path.abspath(value)
        dir_name = os.path.dirname(abs_name)
        if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
            and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        return info.errormsg(self)
test_inventory.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_main_list(monkeypatch, capsys, mocked_sysexit, mocked_configure):
    server_id = pytest.faux.gen_uuid()
    host = pytest.faux.gen_alphanumeric()
    username = pytest.faux.gen_alphanumeric()
    initiator_id = pytest.faux.gen_uuid()

    tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
    tsk = tsk.create()

    monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
    monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
    monkeypatch.setattr("sys.argv", ["progname", "--list"])

    assert inventory.main() == os.EX_OK

    mocked_sysexit.assert_not_called()

    out, _ = capsys.readouterr()
    arg = json.loads(out)
    assert arg["new"]["hosts"] == [host]
    assert arg["_meta"]["hostvars"][host]["ansible_user"] == username
test_inventory.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_main_host_ok(monkeypatch, capsys, mocked_sysexit, mocked_configure):
    server_id = pytest.faux.gen_uuid()
    host = pytest.faux.gen_alphanumeric()
    username = pytest.faux.gen_alphanumeric()
    initiator_id = pytest.faux.gen_uuid()

    tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
    tsk = tsk.create()

    monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
    monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
    monkeypatch.setattr("sys.argv", ["progname", "--host", host])

    assert inventory.main() == os.EX_OK

    mocked_sysexit.assert_not_called()

    out, _ = capsys.readouterr()
    arg = json.loads(out)
    assert arg["ansible_user"] == username
migration.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self):
        if self.finished:
            return

        self.process = subprocess.Popen(
            [str(self.path)],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True
        )

        LOG.info("Run %s. Pid %d", self.path, self.process.pid)
        self.process.wait()
        logmethod = LOG.info if self.process.returncode == os.EX_OK \
            else LOG.warning
        logmethod("%s has been finished. Exit code %s",
                  self.path, self.process.returncode)

        self.stdout = self.process.stdout.read().decode("utf-8")
        self.stderr = self.process.stderr.read().decode("utf-8")

        if self.process.returncode != os.EX_OK:
            raise RuntimeError(
                "Program {0} has been finished with exit code {1}",
                self.path, self.process.returncode)
ceph_version.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_package_version(prefix, connection, package_name):
    command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
        package_name)
    result = await connection.run(command)

    if result.exit_status != os.EX_OK:
        click.echo(
            "{0}package (failed {1}): {2} - {3}".format(
                prefix, result.exit_status, package_name, result.stderr.strip()
            )
        )
    else:
        click.echo(
            "{0}package (ok): {1}=={2}".format(
                prefix, package_name, result.stdout.strip()
            )
        )
test.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def call(self, cmd, **kwargs):
        print('Running "{}"'.format(cmd), file=sys.stderr)
        expect = kwargs.pop("expect", [dict(return_codes=[os.EX_OK], stdout=None, stderr=None)])
        process = subprocess.Popen(cmd, stdin=kwargs.get("stdin", subprocess.PIPE), stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE, **kwargs)
        out, err = process.communicate()
        return_code = process.poll()
        out = out.decode(sys.stdin.encoding)
        err = err.decode(sys.stdin.encoding)

        def match(return_code, out, err, expected):
            exit_ok = return_code in expected["return_codes"]
            stdout_ok = re.search(expected.get("stdout") or "", out)
            stderr_ok = re.search(expected.get("stderr") or "", err)
            return exit_ok and stdout_ok and stderr_ok
        if not any(match(return_code, out, err, exp) for exp in expect):
            print(err)
            e = subprocess.CalledProcessError(return_code, cmd, output=out)
            e.stdout, e.stderr = out, err
            raise e
        return self.SubprocessResult(out, err, return_code)
ls.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def grep(args):
    filter_args = dict(logGroupName=args.log_group)
    if args.log_stream:
        filter_args.update(logStreamNames=[args.log_stream])
    if args.pattern:
        filter_args.update(filterPattern=args.pattern)
    if args.start_time:
        filter_args.update(startTime=int(timestamp(args.start_time) * 1000))
    if args.end_time:
        filter_args.update(endTime=int(timestamp(args.end_time) * 1000))
    num_results = 0
    while True:
        for event in paginate(clients.logs.get_paginator("filter_log_events"), **filter_args):
            if "timestamp" not in event or "message" not in event:
                continue
            print(str(Timestamp(event["timestamp"])), event["message"])
            num_results += 1
        if args.follow:
            time.sleep(1)
        else:
            return SystemExit(os.EX_OK if num_results > 0 else os.EX_DATAERR)
driver.py 文件源码 项目:python-tldp 作者: tLDP 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def build(config, docs, **kwargs):
    if not config.pubdir:
        return ERR_NEEDPUBDIR + "to --build"
    ready, error = builddir_setup(config)
    if not ready:
        return error
    ready, error = prepare_docs_build_mode(config, docs)
    if not ready:
        return error
    buildsuccess, results = docbuild(config, docs, **kwargs)
    for x, (buildcode, source) in enumerate(results, 1):
        if buildcode:
            logger.info("success (%d of %d) available in %s",
                        x, len(results), source.working.dirname)
        else:
            logger.info("FAILURE (%d of %d) available in %s",
                        x, len(results), source.working.dirname)
    if buildsuccess:
        return os.EX_OK
    else:
        return "Build failed, see logging output in %s." % (config.builddir,)
driver.py 文件源码 项目:python-tldp 作者: tLDP 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def publish(config, docs, **kwargs):
    config.build = True
    result = build(config, docs, **kwargs)
    if result != os.EX_OK:
        return result
    for x, source in enumerate(docs, 1):
        logger.info("Publishing (%d of %d) to %s.",
                    x, len(docs), source.output.dirname)
        # -- swapdirs must raise an error if there are problems
        #
        swapdirs(source.working.dirname, source.output.dirname)
        if os.path.isdir(source.working.dirname):
            logger.debug("%s removing old directory %s",
                         source.stem, source.working.dirname)
            shutil.rmtree(source.working.dirname)
    workingdirs = list(set([x.dtworkingdir for x in docs]))
    workingdirs.append(config.builddir)
    post_publish_cleanup(workingdirs)
    return os.EX_OK
test_driver.py 文件源码 项目:python-tldp 作者: tLDP 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_summary_longnames(self):
        c = self.config
        names = self.publishDocumentsWithLongNames(5)
        stdout = io.StringIO()
        result = tldp.driver.summary(c, file=stdout)
        self.assertEqual(result, os.EX_OK)
        stdout.seek(0)
        data = stdout.read()
        self.assertTrue('and 4 more' in data)
        c.verbose = True
        stdout = io.StringIO()
        result = tldp.driver.summary(c, file=stdout)
        self.assertEqual(result, os.EX_OK)
        stdout.seek(0)
        data = stdout.read()
        for name in names:
            self.assertTrue(name in data)
test_driver.py 文件源码 项目:python-tldp 作者: tLDP 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_run(self):
        c = self.config
        ex = example.ex_linuxdoc
        self.add_published('Published-HOWTO', ex)
        self.add_new('New-HOWTO', ex)
        self.add_stale('Stale-HOWTO', ex)
        self.add_orphan('Orphan-HOWTO', ex)
        self.add_broken('Broken-HOWTO', ex)
        fullpath = opj(self.tempdir, 'sources', 'New-HOWTO.sgml')
        argv = self.argv
        argv.extend(['--publish', 'stale', 'Orphan-HOWTO', fullpath])
        exitcode = tldp.driver.run(argv)
        self.assertEqual(exitcode, os.EX_OK)
        inv = tldp.inventory.Inventory(c.pubdir, c.sourcedir)
        self.assertEqual(4, len(inv.published.keys()))
        self.assertEqual(1, len(inv.broken.keys()))
testcase.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def is_successful(self):
        """Interpret the result of the test case.

        It allows getting the result of TestCase. It completes run()
        which only returns the execution status.

        It can be overriden if checking result is not suitable.

        Returns:
            TestCase.EX_OK if result is 'PASS'.
            TestCase.EX_TESTCASE_FAILED otherwise.
        """
        try:
            assert self.criteria
            assert self.result is not None
            if (not isinstance(self.result, str) and
                    not isinstance(self.criteria, str)):
                if self.result >= self.criteria:
                    return TestCase.EX_OK
            else:
                # Backward compatibility
                # It must be removed as soon as TestCase subclasses
                # stop setting result = 'PASS' or 'FAIL'.
                # In this case criteria is unread.
                self.__logger.warning(
                    "Please update result which must be an int!")
                if self.result == 'PASS':
                    return TestCase.EX_OK
        except AssertionError:
            self.__logger.error("Please run test before checking the results")
        return TestCase.EX_TESTCASE_FAILED
testcase.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def push_to_db(self):
        """Push the results of the test case to the DB.

        It allows publishing the results and to check the status.

        It could be overriden if the common implementation is not
        suitable. The following attributes must be set before pushing
        the results to DB:

            * project_name,
            * case_name,
            * result,
            * start_time,
            * stop_time.

        Returns:
            TestCase.EX_OK if results were pushed to DB.
            TestCase.EX_PUSH_TO_DB_ERROR otherwise.
        """
        try:
            assert self.project_name
            assert self.case_name
            assert self.start_time
            assert self.stop_time
            pub_result = 'PASS' if self.is_successful(
                ) == TestCase.EX_OK else 'FAIL'
            if ft_utils.push_results_to_db(
                    self.project_name, self.case_name, self.start_time,
                    self.stop_time, pub_result, self.details):
                self.__logger.info(
                    "The results were successfully pushed to DB")
                return TestCase.EX_OK
            else:
                self.__logger.error("The results cannot be pushed to DB")
                return TestCase.EX_PUSH_TO_DB_ERROR
        except Exception:  # pylint: disable=broad-except
            self.__logger.exception("The results cannot be pushed to DB")
            return TestCase.EX_PUSH_TO_DB_ERROR
testcases.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _run(self, args):  # pylint: disable=no-self-use
        """ The built_in function to run a test case """

        case_name = args.get('testcase')
        self._update_logging_ini(args.get('task_id'))

        try:
            cmd = "run_tests -t {}".format(case_name)
            runner = ft_utils.execute_command(cmd)
        except Exception:  # pylint: disable=broad-except
            result = 'FAIL'
            LOGGER.exception("Running test case %s failed!", case_name)
        if runner == os.EX_OK:
            result = 'PASS'
        else:
            result = 'FAIL'

        env_info = {
            'installer': CONST.__getattribute__('INSTALLER_TYPE'),
            'scenario': CONST.__getattribute__('DEPLOY_SCENARIO'),
            'build_tag': CONST.__getattribute__('BUILD_TAG'),
            'ci_loop': CONST.__getattribute__('CI_LOOP')
        }
        result = {
            'task_id': args.get('task_id'),
            'testcase': case_name,
            'env_info': env_info,
            'result': result
        }

        return {'result': result}
run_tests.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self):
        self.executed_test_cases = {}
        self.overall_result = Result.EX_OK
        self.clean_flag = True
        self.report_flag = False
        self._tiers = tb.TierBuilder(
            CONST.__getattribute__('INSTALLER_TYPE'),
            CONST.__getattribute__('DEPLOY_SCENARIO'),
            pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
run_tests.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main(self, **kwargs):
        """Entry point of class Runner"""
        if 'noclean' in kwargs:
            self.clean_flag = not kwargs['noclean']
        if 'report' in kwargs:
            self.report_flag = kwargs['report']
        try:
            if 'test' in kwargs:
                self.source_rc_file()
                LOGGER.debug("Test args: %s", kwargs['test'])
                if self._tiers.get_tier(kwargs['test']):
                    self.run_tier(self._tiers.get_tier(kwargs['test']))
                elif self._tiers.get_test(kwargs['test']):
                    result = self.run_test(
                        self._tiers.get_test(kwargs['test']))
                    if result != testcase.TestCase.EX_OK:
                        LOGGER.error("The test case '%s' failed.",
                                     kwargs['test'])
                        self.overall_result = Result.EX_ERROR
                elif kwargs['test'] == "all":
                    self.run_all()
                else:
                    LOGGER.error("Unknown test case or tier '%s', or not "
                                 "supported by the given scenario '%s'.",
                                 kwargs['test'],
                                 CONST.__getattribute__('DEPLOY_SCENARIO'))
                    LOGGER.debug("Available tiers are:\n\n%s",
                                 self._tiers)
                    return Result.EX_ERROR
            else:
                self.run_all()
        except BlockingTestFailed:
            pass
        except Exception:  # pylint: disable=broad-except
            LOGGER.exception("Failures when running testcase(s)")
            self.overall_result = Result.EX_ERROR
        if not self._tiers.get_test(kwargs['test']):
            self.summary(self._tiers.get_tier(kwargs['test']))
        LOGGER.info("Execution exit value: %s", self.overall_result)
        return self.overall_result
run_tests.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def summary(self, tier=None):
        """To generate functest report showing the overall results"""
        msg = prettytable.PrettyTable(
            header_style='upper', padding_width=5,
            field_names=['env var', 'value'])
        for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG',
                        'CI_LOOP']:
            msg.add_row([env_var, CONST.__getattribute__(env_var)])
        LOGGER.info("Deployment description:\n\n%s\n", msg)
        msg = prettytable.PrettyTable(
            header_style='upper', padding_width=5,
            field_names=['test case', 'project', 'tier',
                         'duration', 'result'])
        tiers = [tier] if tier else self._tiers.get_tiers()
        for each_tier in tiers:
            for test in each_tier.get_tests():
                try:
                    test_case = self.executed_test_cases[test.get_name()]
                except KeyError:
                    msg.add_row([test.get_name(), test.get_project(),
                                 each_tier.get_name(), "00:00", "SKIP"])
                else:
                    result = 'PASS' if(test_case.is_successful(
                        ) == test_case.EX_OK) else 'FAIL'
                    msg.add_row(
                        [test_case.case_name, test_case.project_name,
                         self._tiers.get_tier_name(test_case.case_name),
                         test_case.get_duration(), result])
            for test in each_tier.get_skipped_test():
                msg.add_row([test.get_name(), test.get_project(),
                             each_tier.get_name(), "00:00", "SKIP"])
        LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
pyrate.py 文件源码 项目:pyrate-build 作者: pyrate-build 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def main():
    version_info = 'pyrate version ' + __version__
    try:
        if os.environ.get('TESTOLDIMPORTS'):
            raise ImportError()
        import argparse
        parser = argparse.ArgumentParser()
        parser.add_argument('build_file', nargs = '?', default = 'build.py',
            help = 'name of the input file - default: build.py')
        parser.add_argument('-V', '--version', action = 'version', version = version_info)
        parser.add_argument('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
        parser.add_argument('-o', '--output', nargs = 1, default = None,
            help = 'name of output build file')
        args = parser.parse_args()
        if args.output:
            args.output = args.output[0]
        bfn = args.build_file
    except ImportError:
        optparse = __import__('optparse')
        parser = optparse.OptionParser(usage = 'pyrate [options] build_file')
        parser.add_option('-V', '--version', action='store_true', help = 'display version')
        parser.add_option('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
        parser.add_option('-o', '--output', default = None,
            help = 'name of output build file', dest='output')
        (args, posargs) = parser.parse_args()
        if len(posargs) > 1:
            sys.stderr.write('too many build_file arguments provided! %s\n' % repr(posargs))
            return os.EX_USAGE
        elif not posargs:
            posargs = ['build.py']
        bfn = posargs[0]
        if args.version:
            sys.stderr.write(version_info + '\n')
            sys.exit(os.EX_OK)

    generate_build_file(bfn, args.output, args.makefile)

################################################################################
# Externals + helper functions
################################################################################
test_redisrwlock_cmdline.py 文件源码 项目:redisrwlock 作者: veshboo 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_option_help(self):
        """test --help option"""
        cmd, output = runCmdOutput(['--help'])
        self.assertEqual(cmd.returncode, os.EX_OK)
test_redisrwlock_cmdline.py 文件源码 项目:redisrwlock 作者: veshboo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_option_version(self):
        """test --version option"""
        cmd, output = runCmdOutput(['--version'])
        self.assertEqual(cmd.returncode, os.EX_OK)
test_redisrwlock_cmdline.py 文件源码 项目:redisrwlock 作者: veshboo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_logging_config(self):
        """test logging config from file or default"""
        topdir = os.path.dirname(os.path.dirname(__file__))
        # logging config from default
        os.system('rm %s/logging.conf' % topdir)
        cmd, output = runCmdOutput(['-p', '7788'])
        self.assertEqual(cmd.returncode, os.EX_OK)
        # logging config from file
        os.system('cp %s/logging.conf.sample %s/logging.conf' %
                  (topdir, topdir))
        cmd, output = runCmdOutput(['-p', '7788'])
        self.assertEqual(cmd.returncode, os.EX_OK)
arg.py 文件源码 项目:do-like-javac 作者: SRI-CSL 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def parse_args():
    to_parse, cmd, capturer = split_args_to_parse()

    global_argparser = create_argparser()

    args = global_argparser.parse_args(to_parse)

    if capturer:
        return args, cmd, capturer
    else:
        global_argparser.print_help()
        sys.exit(os.EX_OK)
typecheck.py 文件源码 项目:temci 作者: parttimenerd 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
        if not isinstance(value, str):
            return info.errormsg(self)
        is_valid = True
        if os.path.exists(value):
            if os.path.isdir(value) and os.access(os.path.abspath(value), os.W_OK)\
                    and (self.constraint is None or self.constraint(value)):
                return info.wrap(True)
            return info.errormsg(self)
        abs_name = os.path.abspath(value)
        dir_name = os.path.dirname(abs_name)
        if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
            and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        return info.errormsg(self)
yan.py 文件源码 项目:yan 作者: motet-a 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    argument_parser = get_argument_parser()
    options = argument_parser.parse_args()

    if options.list_checkers:
        list_checkers()
        return

    program = Program(options)
    program.check()
    program.print_issues()
    errors = [e for e in program.issues
              if e.level == 'error' or e.level == 'syntax-error']
    sys.exit(os.EX_OK if len(errors) == 0 else os.EX_DATAERR)
stallmanbot.py 文件源码 项目:homemadescripts 作者: helioloureiro 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_if_run():
    pid = read_file(PIDFILE)
    current_pid = os.getpid()
    if pid is None:
        return
    if int(pid) > 0 and int(pid) != current_pid:
        if os.path.exists("/proc/%d" % int(pid)):
            log("[%s] Already running - keepalive done." % time.ctime())
            sys.exit(os.EX_OK)
test_taskpool.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def mocked_plugin():
    patch = unittest.mock.patch("decapod_common.plugins.get_playbook_plugins")
    with patch as ptch:
        plugin = unittest.mock.MagicMock()

        required_mock = unittest.mock.MagicMock()
        required_mock.pid = 100
        required_mock.returncode = os.EX_OK
        plugin.execute.return_value.__enter__.return_value = required_mock

        ptch.return_value.get.return_value.return_value = plugin

        yield required_mock


问题


面经


文章

微信
公众号

扫码关注公众号