python类Namespace()的实例源码

run.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _parse_args(args):
    parser = argparse.ArgumentParser()

    if any([arg == '--version' for arg in args]):
        return argparse.Namespace(version=True)

    parser.add_argument('script', help='Script to run')
    parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'')

    parser.add_argument('--version', action='store_true', help='Print version info and exit')
    parser.add_argument('--clear', action='store_true', help='Clear output directory')
    parser.add_argument('--clear-cache', action='store_true', help='Clear cache before compiling')

    parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2')
    parser.add_argument('--no-threading', '-nt', action='store_true', help='Disable multithreaded compiling')

    # TODO: Make target '*' instead of '?' so multiple targets could be ran from the same command

    return parser.parse_args(args)
params.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parse(self, argv):
        """
        Parse arguments.

        :param argv: arguments.
        """
        values = dict()

        self.data = None
        self._errors = list()

        for idx, param in enumerate(self.params):
            try:
                values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx)
            except ParamException as e:
                self._errors.append(str(e))

        self.data = Namespace(**values)
map.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def list_action_remove(self, player, values, map_dictionary, view, **kwargs):
        # Check permission.
        if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'):
            await self.instance.chat(
                '$f00You don\'t have the permission to perform this action!',
                player
            )
            return

        # Ask for confirmation.
        cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format(
            map_dictionary['name']
        ), size='sm'))
        if cancel is True:
            return

        # Simulate command.
        await self.remove_map(player, Namespace(nr=map_dictionary['id']))

        # Reload parent view.
        await view.refresh(player)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_with_args(args, parser):
    # type: (argparse.Namespace, argparse.ArgumentParser) -> int
    set_logging_parameters(args, parser)
    start_time = time.time()
    ret = OK
    try:
        if args.profile:
            outline("Profiling...")
            profile("ret = whatstyle(args, parser)", locals(), globals())
        else:
            ret = whatstyle(args, parser)
    except IOError as exc:
        # If the output is piped into a pager like 'less' we get a broken pipe when
        # the pager is quit early and that is ok.
        if exc.errno == errno.EPIPE:
            pass
        elif str(exc) == 'Stream closed':
            pass
        else:
            raise
        if not PY2:
            sys.stderr.close()
    iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
    return ret
test_pjf_server.py 文件源码 项目:PyJFuzz 作者: mseclab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_start_object(self):
        server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
                                                   html=False, level=6, command=["radamsa"], stdin=True,
                                                   json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
                                                   parameters=[], notify=False, debug=False, content_type="text/plain",
                                                                    utf8=False, nologo=True)))
        server.run()
        json_http = urllib2.urlopen("http://127.0.0.1:8080").read()
        try:
            import requests
            requests.packages.urllib3.disable_warnings()
            json_https = requests.get('https://127.0.0.1:8443', verify=False).content
            self.assertTrue(json_https)
        except ImportError:
            pass
        self.assertTrue(json_http)
        server.stop()
nwnc.py 文件源码 项目:NoWannaNoCry 作者: dolang 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cli_args():
    """Parse the command line arguments.

    :return: The parsed arguments.
    :rtype: argparse.Namespace
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--check', action='store_true',
                        help="check if the system is vulnerable to WCry")
    parser.add_argument('-m', '--mitigate', action='store_true',
                        help="mitigate the system's vulnerability by disabling the"
                             " SMBv1 protocol, if necessary; implies --check")
    parser.add_argument('-f', '--fix', action='store_true')
    parser.add_argument('--download-directory',
                        help="Optionally specify a directory where the Microsoft"
                             " KB update is saved when using --fix")
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit(1)
    # else:
    return parser.parse_args()
__init__.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def register_command(handler: Callable[[argparse.Namespace], None],
                     main_parser: Optional[ArgParserType]=None,
                    ) -> Callable[[argparse.Namespace], None]:
    if main_parser is None:
        main_parser = global_argparser
    if id(main_parser) not in _subparsers:
        subparsers = main_parser.add_subparsers(title='commands',
                                                dest='command')
        _subparsers[id(main_parser)] = subparsers
    else:
        subparsers = _subparsers[id(main_parser)]

    @functools.wraps(handler)
    def wrapped(args):
        handler(args)

    doc_summary = handler.__doc__.split('\n\n')[0]
    inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
                                         description=handler.__doc__,
                                         help=doc_summary)
    inner_parser.set_defaults(function=wrapped)
    wrapped.register_command = functools.partial(register_command,
                                                 main_parser=inner_parser)
    wrapped.add_argument = inner_parser.add_argument
    return wrapped
arg_parsing.py 文件源码 项目:aws-encryption-sdk-cli 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __call__(
            self,
            parser,  # type: argparse.ArgumentParser
            namespace,  # type: argparse.Namespace
            values,  # type: Union[ARGPARSE_TEXT, Sequence[Any], None]
            option_string=None  # type: Optional[ARGPARSE_TEXT]
    ):
        # type: (...) -> None
        """Checks to make sure that the destination is empty before writing.

        :raises parser.error: if destination is already set
        """
        if getattr(namespace, self.dest) is not None:  # type: ignore # typeshed doesn't know about Action.dest yet?
            parser.error('{} argument may not be specified more than once'.format(option_string))
            return
        setattr(namespace, self.dest, values)  # type: ignore # typeshed doesn't know about Action.dest yet?
upnp_reciever.py 文件源码 项目:Static-UPnP 作者: nigelb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start(self):
        self.setup_sockets()
        import StaticUPnP_Settings
        permissions = Namespace(**StaticUPnP_Settings.permissions)
        print(permissions)
        if permissions.drop_permissions:
            self.drop_privileges(permissions.user, permissions.group)

        self.running = Value(ctypes.c_int, 1)
        self.queue = Queue()
        self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running))
        self.reciever_thread.start()
        self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,))
        self.schedule_thread.start()
        self.response_thread = Process(target=self.response_handler, args=(self.queue, self.running))
        self.response_thread.start()
util.py 文件源码 项目:Static-UPnP 作者: nigelb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_interface_addresses(logger):
    import StaticUPnP_Settings
    interface_config = Namespace(**StaticUPnP_Settings.interfaces)
    ip_addresses = StaticUPnP_Settings.ip_addresses
    if len(ip_addresses) == 0:
        import netifaces
        ifs = netifaces.interfaces()
        if len(interface_config.include) > 0:
            ifs = interface_config.include
        if len(interface_config.exclude) > 0:
            for iface in interface_config.exclude:
                ifs.remove(iface)

        for i in ifs:
            addrs = netifaces.ifaddresses(i)
            if netifaces.AF_INET in addrs:
                for addr in addrs[netifaces.AF_INET]:
                    ip_addresses.append(addr['addr'])
                    logger.info("Regestering multicast on %s: %s"%(i, addr['addr']))
    return ip_addresses
main.py 文件源码 项目:kudubot 作者: namboy94 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_args() -> argparse.Namespace:  # pragma: no cover
    """
    Parses the Command Line Arguments using argparse

    :return: The parsed arguments
    """

    parser = argparse.ArgumentParser()
    parser.add_argument("connection", help="The Type of Connection to use")

    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Activates verbose output")
    parser.add_argument("-d", "--debug", action="store_true",
                        help="Activates debug-level logging output")
    parser.add_argument("-q", "--quiet", action="store_true",
                        help="Disables all text output")
    parser.add_argument("-c", "--config",
                        default=os.path.join(os.path.expanduser("~"),
                                             ".kudubot"),
                        help="Overrides the configuration directory location")

    return parser.parse_args()
test_main.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_init_2(tmpdir):
    "it should open sam file if provided"

    make_bam(tmpdir.strpath, """
             123456789_123456789_
        r1 + ...........
        r1 -       ......*....
        r2 +    .........*.
        r2 -        .....*.......
    """)

    o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)

    init(o)

    assert isinstance(o.cfdna, AlignmentFile)
    assert o.gdna == None
test_main.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_3(tmpdir):
    "it should generate a proper output file name if not provided"

    make_bam(tmpdir.strpath, """
             123456789_123456789_
        r1 + ...........
        r1 -       ......*....
        r2 +    .........*.
        r2 -        .....*.......
    """)

    o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)

    init(o)

    assert o.output != None
test_aggregate.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_aggregate_reads_3():
    "it should ignore when 3+ reads share the same name"

    o = Namespace(verbos=False, qual=20, mismatch_limit=-1)

    reads = (
        ("r1", 'A', 60, 2, 11, -1, 2, 9,  False, True),
        ("r1", 'C', 60, 2, 11, -1, 2, -9, True, True),
        ("r1", 'C', 60, 2, 11, -1, 2, 9,  False,  True),
        ("r2", 'C', 60, 2, 11, -1, 0, 0,  True,  False)
    )

    unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads)

    assert len(unique_pairs) == 0
    assert len(unique_single) == 1
    assert nerror == 3
test_aggregate.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_aggregate_reads_4():
    "it should ignore when base in overlap area inconsistent between two reads"

    o = Namespace(verbos=False, qual=20, mismatch_limit=-1)

    reads = (
        ("r1", 'A', 60, 2, 11, -1, 4, 11,  False, True),
        ("r1", 'C', 60, 4, 13, -1, 2, -11, True, True),
        ("r2", 'C', 60, 3, 12, -1, 5, 11,  False, True),
        ("r2", 'C', 60, 5, 14, -1, 3, -11, True, True)
    )

    unique_pairs, unique_single, *_, ninconsis = aggregate_reads(o, reads)

    assert len(unique_pairs) == 1
    assert ninconsis == 2
test_aggregate.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_aggregate_reads_5():
    "it should drop reads that has too much mismatch"

    o = Namespace(verbos=False, qual=20, mismatch_limit=2)

    reads = (
        ("r1", 'C', 60, 2, 11, 1, 4, 11,  False, True),
        ("r1", 'C', 60, 4, 13, 1, 2, -11, True, True),
        ("r2", 'C', 60, 3, 12, 3, 5, 11,  False, True),
        ("r2", 'C', 60, 5, 14, 1, 3, -11, True, True),
        ("r3", 'C', 60, 6, 14, 3, 0, 0, True, False),
        ("r4", 'C', 60, 7, 14, 1, 0, 0, True, False)
    )

    unique_pairs, unique_single, *_, nlowq, ninconsis = aggregate_reads(o, reads)

    assert len(unique_pairs) == 1
    assert len(unique_single) == 1
    assert nlowq == 3
test_count.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_count_different_type_1():
    "it should NOT count dna that more than 10% reads have different bases"

    o = Namespace(verbos=False, allow_inconsist=False)

    pair = {
        ('ref', 2, True): [
            ('A', 60),
            ('T', 60),
            ('T', 60)
        ],
        ('ref', 3, False): [
            ('A', 60),
            ('T', 60),
            ('T', 60)
        ]
    }

    mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A')

    assert inconsis == 6
    assert sum((mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa)) == 0
test_bam.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get_reads_1(tmpdir):
    "it should get all but only the reads that covers the given position"

    make_bam(tmpdir.strpath, """
             123456789_123456789_12
        r1 + ...........
        r1 -      ......*....
        r2 +   .........*.
        r2 -       .....*.......
        r3 +       ...........
        r3 -            ....*......
        r4 +       ...........
        r4 -            ...........
             123456789_123456789_12
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') )  == 2
    assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7
    assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
test_bam.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_get_reads_2(tmpdir):
    "it should read properties correctly"

    make_bam(tmpdir.strpath, """
             123456789_123
        r1 + ...*.......
        r1 -   .*.........
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    r = next(get_reads(o, sam, 'ref', '4'))

    assert r[0] == "r1" # name
    assert r[3] == 0 # 0-based pos
    assert r[4] == 11 # length
    assert r[5] == -1 # mismatch, not caculated
    assert r[6] == 2 # mate pos
    assert r[7] == 13 # template length
    assert r[8] == False # is_reverse
    assert r[9] == True # paired and mapped
test_bam.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pad_softclip_2(tmpdir):
    "it should ignore more than two reads which share the same name"

    make_bam(tmpdir.strpath, """
        r1 + __.*.......
        r1 -   .*.......__
        r1 -   .*.......__
        r2 +   .*.......__
        r2 -   .*.......__
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
test_bam.py 文件源码 项目:MrBam 作者: OpenGene 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_pad_softclip_3(tmpdir):
    "it should pad softclipped bases"

    make_bam(tmpdir.strpath, """
             123456789_123
        r1 + __.*.......
        r1 -   .*.........
        r2 - ...*.......
        r2 +   .*.......__
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert adjusted_pos["r1"] == (0, 13) # 0-based position
    assert adjusted_pos["r2"] == (0, 13)
runner.py 文件源码 项目:webhook2lambda2sqs 作者: jantman 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_api_id(config, args):
    """
    Get the API ID from Terraform, or from AWS if that fails.

    :param config: configuration
    :type config: :py:class:`~.Config`
    :param args: command line arguments
    :type args: :py:class:`argparse.Namespace`
    :return: API Gateway ID
    :rtype: str
    """
    try:
        logger.debug('Trying to get Terraform rest_api_id output')
        runner = TerraformRunner(config, args.tf_path)
        outputs = runner._get_outputs()
        depl_id = outputs['rest_api_id']
        logger.debug("Terraform rest_api_id output: '%s'", depl_id)
    except Exception:
        logger.info('Unable to find API rest_api_id from Terraform state;'
                    ' querying AWS.', exc_info=1)
        aws = AWSInfo(config)
        depl_id = aws.get_api_id()
        logger.debug("AWS API ID: '%s'", depl_id)
    return depl_id
predict.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _predict(predictors: Dict[str, str]):
    def predict_inner(args: argparse.Namespace) -> None:
        predictor = _get_predictor(args, predictors)
        output_file = None

        if args.silent and not args.output_file:
            print("--silent specified without --output-file.")
            print("Exiting early because no output will be created.")
            sys.exit(0)

        # ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist
        with ExitStack() as stack:
            input_file = stack.enter_context(args.input_file)  # type: ignore
            if args.output_file:
                output_file = stack.enter_context(args.output_file)  # type: ignore

            _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)

    return predict_inner
argfinder.py 文件源码 项目:azure-cli-shell 作者: Azure 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_parsed_args(self, comp_words):
        """ gets the parsed args from a patched parser """
        active_parsers = self._patch_argument_parser()

        parsed_args = argparse.Namespace()

        self.completing = True
        if USING_PYTHON2:
            # Python 2 argparse only properly works with byte strings.
            comp_words = [ensure_bytes(word) for word in comp_words]

        try:
            stderr = sys.stderr
            sys.stderr = io.open(os.devnull, "w")

            active_parsers[0].parse_known_args(comp_words, namespace=parsed_args)

            sys.stderr.close()
            sys.stderr = stderr
        except BaseException:
            pass

        self.completing = False
        return parsed_args
test_main.py 文件源码 项目:artman 作者: googleapis 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_with_config(self, is_file, open_):
        # Create our stand-in config file.
        config_file = textwrap.dedent(u"""\
        ---
        local_paths:
          reporoot: ~/Code
        publish: local
        """)
        is_file.return_value = True
        open_.return_value = io.StringIO(config_file)

        # Get the config and test the result.
        flags = Namespace(user_config='/bogus/file.yaml', command='not_init')
        user_config = main.read_user_config(flags)
        assert user_config == {
            'local_paths': {'reporoot': '~/Code'},
            'publish': 'local',
        }
test_github.py 文件源码 项目:artman 作者: googleapis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_with_ssh_repo(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.MagicMock(spec=github3.github.GitHub)
        login.return_value = gh
        url = 'https://github.com/me/repo/pulls/1/'
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**self.task_kwargs)

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct methods were called.
        login.assert_called_once_with('lukesneeringer', '1335020400')
        gh.repository.assert_called_with('me', 'repo')
        gh.repository().create_pull.assert_called_once_with(
            base='master',
            body='This pull request was generated by artman. '
                 'Please review it thoroughly before merging.',
            head='pubsub-python-v1',
            title='Python GAPIC: Pubsub v1',
        )
test_github.py 文件源码 项目:artman 作者: googleapis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_with_http_url(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.MagicMock(spec=github3.github.GitHub)
        login.return_value = gh
        url = 'https://github.com/me/repo/pulls/1/'
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**dict(self.task_kwargs, git_repo={
            'location': 'https://github/me/repo/',
        }))

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct repository method was still called.
        gh.repository.assert_called_with('me', 'repo')
params.py 文件源码 项目:ParlAI 作者: facebookresearch 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def print_args(self):
        """Print out all the arguments in this parser."""
        if not self.opt:
            self.parse_args(print_args=False)
        values = {}
        for key, value in self.opt.items():
            values[str(key)] = str(value)
        for group in self._action_groups:
            group_dict = {
                a.dest: getattr(self.args, a.dest, None)
                for a in group._group_actions
            }
            namespace = argparse.Namespace(**group_dict)
            count = 0
            for key in namespace.__dict__:
                if key in values:
                    if count == 0:
                        print('[ ' + group.title + ': ] ')
                    count += 1
                    print('[  ' + key + ': ' + values[key] + ' ]')
test_sequana_gui.py 文件源码 项目:sequana 作者: sequana 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _test_standalone_sequana(qtbot, tmpdir):
    wkdir = TemporaryDirectory()
    inputdir = os.path.realpath(
            sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz")).rsplit(os.sep,1)[0]

    # Standalone for sequana given a wkdir and pipeline and input_dir
    args = Namespace(pipeline="quality_control", wkdir=wkdir.name,
                input_directory=inputdir)
    widget = sequana_gui.SequanaGUI(ipython=False, user_options=args)
    qtbot.addWidget(widget)
    assert widget.mode == "sequana"
    widget.force = True
    widget.save_project()

    widget.click_run()
    count = 0
    while widget.process.state() and count < 5:
        time.sleep(0.5)
        count+=0.5
    widget.click_stop()
    time.sleep(1)
main.py 文件源码 项目:python-mysql-binlog-pubsub 作者: tarzanjw 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _setup_arg_parser(argv):
    """ Parse arguments from command line
    Args:
        argv list: by default it's command line arguments

    Returns:
        argparse.Namespace: parsed argument
    """
    parser = argparse.ArgumentParser(
        description='MySQL binlog to Google Cloud Pub/Sub')
    parser.add_argument('conf',
                        help='configuration file for publishing')
    parser.add_argument('--loglevel', '-l', default=None,
                        help='log level for root')

    if os.path.isfile('logging.ini'):
        _log_file = 'logging.ini'
    else:
        _log_file = None

    parser.add_argument('--logconf', default=_log_file,
                        help='INI file log configuration')
    args = parser.parse_args(argv)
    return args


问题


面经


文章

微信
公众号

扫码关注公众号