python类get()的实例源码

gitlab_mr.py 文件源码 项目:gitlab-merge-request 作者: anti-social 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_branch(project, branch):
    try:
        project.branches.get(branch)
    except GitlabGetError as e:
        err(
            'Cannot find branch [%(branch)s] for project [%(project)s]',
            {'branch': branch, 'project': project.path_with_namespace},
        )
    except GitlabError as e:
        err('Gitlab error: %s', e)
    except GitlabConnectionError as e:
        err('%s', e)
gitlab_mr.py 文件源码 项目:gitlab-merge-request 作者: anti-social 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def validate_mr_data(source_project, target_project, data):
    check_branch(source_project, data['source_branch'])
    check_branch(target_project, data['target_branch'])
    if not data.get('title'):
        err('Empty [title]. Specify title of the merge request.')
gitlab_mr.py 文件源码 项目:gitlab-merge-request 作者: anti-social 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def edit_mr(data, source_project, target_project, commits):
    editor = os.environ.get('EDITOR', 'nano')
    title = data.get('title')
    assignee = data.get('assignee')
    description = data.get('description')
    content = (
        'Title:\n'
        '{title}\n'
        'Assignee:\n'
        '{assignee}\n'
        'Description:\n'
        '\n'
        '# You are creating a merge request:\n'
        '#\t{outline}\n'
        '#\n'
        '# Next commits will be included in the merge request:\n'
        '#\n'
        '{commits}\n'
        '#\n'
        '# Empty title will cancel the merge request.'
    ).format(
        title='{}\n'.format(title) if title else '',
        assignee='{}\n'.format(assignee) if assignee else '',
        description='{}\n'.format(description) if description else '',
        outline=get_mr_outline(data, source_project, target_project),
        commits=format_mr_commits(commits, prefix='#\t'),
    )
    with tempfile.NamedTemporaryFile() as tf:
        tf.write(content.encode('utf-8'))
        tf.flush()
        res = subprocess.run([editor, tf.name])
        tf.seek(0)
        new_data = data.copy()
        new_data.update(parse_mr_file(tf))
        return new_data
logs.py 文件源码 项目:evernote-telegram-bot 作者: djudman 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_logger(name=''):
    logging.config.dictConfig(get_config(
        project_name=config['project_name'],
        logs_dir=config['logs_dir'],
        smtp_settings=config.get('smtp')
    ))
    return logging.getLogger(name)
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cli(ctx, **kwargs):
    """A crawler system."""
    logging.config.fileConfig(kwargs['logging_config'])
    config = {}
    config_filepath = kwargs['config']
    if config_filepath:
        if not os.path.exists(config_filepath):
            raise IOError('No such file or directory: "%s".' % config_filepath)
        if not os.path.isfile(config_filepath):
            raise IOError('Is not a file: "%s".' % config_filepath)
        try:
            with open(config_filepath, 'r') as f:
                config = yaml.load(f)
        except Exception as err:
            raise err

    if kwargs.get('redis'):
        redis_conn = utils.connect_redis(kwargs['redis'])
    elif config.get('redis'):
        redis_conn = utils.connect_redis(config['redis']['url'])
    else:
        raise Exception('Could not find redis address.')

    mongodb_conn = None
    if kwargs.get('mongodb'):
        mongodb_conn = utils.connect_mongodb(kwargs['mongodb'])
    elif config.get('mongodb'):
        mongodb_conn = utils.connect_mongodb(config['mongodb']['url'])
    else:
        logging.warning('Could not find mongodb address. No results will be saved.')

    from fulmar.utils import LUA_RATE_LIMIT_SCRIPT
    lua_rate_limit = redis_conn.register_script(LUA_RATE_LIMIT_SCRIPT)

    setattr(utils, 'redis_conn', redis_conn)
    setattr(utils, 'lua_rate_limit',lua_rate_limit)
    setattr(utils, 'mongodb_conn', mongodb_conn)

    ctx.obj = utils.ObjectDict(ctx.obj or {})
    ctx.obj.update(config)
    return ctx
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
    """
    Run phantomjs if phantomjs is installed.
    """
    args = args or ctx.default_map and ctx.default_map.get('args', [])

    import subprocess
    g = ctx.obj
    _quit = []
    phantomjs_fetcher = os.path.join(
        os.path.dirname(fulmar.__file__), 'worker/phantomjs_fetcher.js')
    cmd = [phantomjs_path,
           # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
           #'--load-images=false',
           '--ssl-protocol=any',
           '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]

    try:
        _phantomjs = subprocess.Popen(cmd)
    except OSError:
        logging.warning('phantomjs not found.')
        return None

    if not g.get('phantomjs_proxy'):
        g['phantomjs_proxy'] = '127.0.0.1:%s' % port

    while True:
        _phantomjs.wait()
        if _quit or not auto_restart:
            break
        _phantomjs = subprocess.Popen(cmd)
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def all(ctx):
    """
        Start scheduler and worker, also run phantomjs if phantomjs is installed.
    """
    g = ctx.obj
    sub_processes = []
    threads = []
    try:
        if not g.get('phantomjs_proxy'):
            phantomjs_config = g.get('phantomjs', {})
            phantomjs_config.setdefault('auto_restart', True)
            sub_processes.append(utils.run_in_subprocess(ctx.invoke, phantomjs, **phantomjs_config))
            time.sleep(2)
            if sub_processes[-1].is_alive() and not g.get('phantomjs_proxy'):
                g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)

        scheduler_config = g.get('scheduler', {})
        threads.append(utils.run_in_thread(ctx.invoke, scheduler, **scheduler_config))

        worker_config = g.get('worker', {})
        threads.append(utils.run_in_thread(ctx.invoke, worker, **worker_config))

        while threads:
            for t in threads:
                if not t.isAlive():
                  threads.remove(t)
            time.sleep(0.1)

        for sub_process in sub_processes:
            sub_process.join()

    except KeyboardInterrupt:
        logging.info('Keyboard interrupt. Bye, bye.')
    finally:
        # Need to kill subprocesses: phantomjs.
        for process in sub_processes:
            process.terminate()
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start_project(ctx, project):
    """Start a project."""
    from fulmar.message_queue import newtask_queue
    from fulmar.scheduler.projectdb import projectdb

    if not os.path.exists(project):
        raise IOError('No such file or directory: "%s".' % project)

    if not os.path.isfile(project):
        raise IOError('Is not a Python file: "%s".' % project)

    if not project.endswith('.py'):
        raise TypeError('Not a standard Python file: "%s". Please make sure it is a Python file which ends with ".py".' % project)

    project_name = project.split('/')[-1].strip(' .py')
    project_data = projectdb.get(project_name)

    if not project_data:
        ctx.invoke(update_project, project_file=project)
        project_data = projectdb.get(project_name)

    if project_data.get('is_stopped'):
        project_data.update({'is_stopped': False})
        projectdb.set(project_name, project_data)

    newtask = {
        "project_name": project_name,
        'project_id': project_data.get('project_id'),
        "taskid": project_name + ': on_start',
        "url": 'first_task: ' + project_name,
        "process": {
            "callback": "on_start",
        },
        "schedule": {
            "is_cron": True
        },
    }
    newtask_queue.put(newtask)

    click.echo('Successfully start project: "%s".' % project_name)
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_project(ctx, project_name):
    """Delete a project."""
    from fulmar.scheduler.projectdb import projectdb

    project_name = project_name.split('/')[-1].strip(' .py')

    project_data = projectdb.get(project_name)
    if not project_data:
        click.echo('Sorry, can not find project: "%s".' % project_name)
        return

    projectdb.delete(project_name)
    click.echo('\nSuccessfully delete project: "%s".\n' % project_name)
nginx-nr-agent.py 文件源码 项目:nginx-nr-agent 作者: skyzyx 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def read_config(self):
        # if self.config:
        #     return

        config = ConfigParser.RawConfigParser()
        config.read(self.config_file)

        for s in config.sections():
            if s == 'global':
                if config.has_option(s, 'poll_interval'):
                    self.poll_interval = int(config.get(s, 'poll_interval'))
                if config.has_option(s, 'newrelic_license_key'):
                    self.license_key = config.get(s, 'newrelic_license_key')
                continue

            if not config.has_option(s, 'name') or not config.has_option(s, 'url'):
                continue

            ns = NginxStatusCollector(s, config.get(s, 'name'), config.get(s, 'url'), self.poll_interval)

            if config.has_option(s, 'http_user') and config.has_option(s, 'http_pass'):
                ns.basic_auth = base64.b64encode(config.get(s, 'http_user') + b':' + config.get(s, 'http_pass'))

            self.sources.append(ns)

        self.config = config
automation.py 文件源码 项目:bounty_tools 作者: gradiuscypher 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def droplet_worker(args, config, droplet, work_queue):
    while not work_queue.empty():
        # Get target info from queue
        target = work_queue.get()
        print(droplet.id, "Grabbing work...{}")

        args.workspace = target[0]
        args.domains = target[1]
        args.droplet = droplet

        # Run recon and import to elastic
        reconng.parse_args(args, config)
        print("Done working...")
    else:
        print("DONE")
__init__.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect_euca(host=None, aws_access_key_id=None, aws_secret_access_key=None,
                 port=8773, path='/services/Eucalyptus', is_secure=False,
                 **kwargs):
    """
    Connect to a Eucalyptus service.

    :type host: string
    :param host: the host name or ip address of the Eucalyptus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.ec2.connection.EC2Connection`
    :return: A connection to Eucalyptus server
    """
    from boto.ec2 import EC2Connection
    from boto.ec2.regioninfo import RegionInfo

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'eucalyptus_host', None)

    reg = RegionInfo(name='eucalyptus', endpoint=host)
    return EC2Connection(aws_access_key_id, aws_secret_access_key,
                         region=reg, port=port, path=path,
                         is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect_walrus(host=None, aws_access_key_id=None,
                   aws_secret_access_key=None,
                   port=8773, path='/services/Walrus', is_secure=False,
                   **kwargs):
    """
    Connect to a Walrus service.

    :type host: string
    :param host: the host name or ip address of the Walrus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to Walrus
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'walrus_host', None)

    return S3Connection(aws_access_key_id, aws_secret_access_key,
                        host=host, port=port, path=path,
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect_ia(ia_access_key_id=None, ia_secret_access_key=None,
               is_secure=False, **kwargs):
    """
    Connect to the Internet Archive via their S3-like API.

    :type ia_access_key_id: string
    :param ia_access_key_id: Your IA Access Key ID.  This will also look
        in your boto config file for an entry in the Credentials
        section called "ia_access_key_id"

    :type ia_secret_access_key: string
    :param ia_secret_access_key: Your IA Secret Access Key.  This will also
        look in your boto config file for an entry in the Credentials
        section called "ia_secret_access_key"

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to the Internet Archive
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    access_key = config.get('Credentials', 'ia_access_key_id',
                            ia_access_key_id)
    secret_key = config.get('Credentials', 'ia_secret_access_key',
                            ia_secret_access_key)

    return S3Connection(access_key, secret_key,
                        host='s3.us.archive.org',
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect_euca(host=None, aws_access_key_id=None, aws_secret_access_key=None,
                 port=8773, path='/services/Eucalyptus', is_secure=False,
                 **kwargs):
    """
    Connect to a Eucalyptus service.

    :type host: string
    :param host: the host name or ip address of the Eucalyptus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.ec2.connection.EC2Connection`
    :return: A connection to Eucalyptus server
    """
    from boto.ec2 import EC2Connection
    from boto.ec2.regioninfo import RegionInfo

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'eucalyptus_host', None)

    reg = RegionInfo(name='eucalyptus', endpoint=host)
    return EC2Connection(aws_access_key_id, aws_secret_access_key,
                         region=reg, port=port, path=path,
                         is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_walrus(host=None, aws_access_key_id=None,
                   aws_secret_access_key=None,
                   port=8773, path='/services/Walrus', is_secure=False,
                   **kwargs):
    """
    Connect to a Walrus service.

    :type host: string
    :param host: the host name or ip address of the Walrus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to Walrus
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'walrus_host', None)

    return S3Connection(aws_access_key_id, aws_secret_access_key,
                        host=host, port=port, path=path,
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_ia(ia_access_key_id=None, ia_secret_access_key=None,
               is_secure=False, **kwargs):
    """
    Connect to the Internet Archive via their S3-like API.

    :type ia_access_key_id: string
    :param ia_access_key_id: Your IA Access Key ID.  This will also look
        in your boto config file for an entry in the Credentials
        section called "ia_access_key_id"

    :type ia_secret_access_key: string
    :param ia_secret_access_key: Your IA Secret Access Key.  This will also
        look in your boto config file for an entry in the Credentials
        section called "ia_secret_access_key"

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to the Internet Archive
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    access_key = config.get('Credentials', 'ia_access_key_id',
                            ia_access_key_id)
    secret_key = config.get('Credentials', 'ia_secret_access_key',
                            ia_secret_access_key)

    return S3Connection(access_key, secret_key,
                        host='s3.us.archive.org',
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
osproject.py 文件源码 项目:setupproject-openstack 作者: jriguera 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dict_overwrite(base, default={}):
        """Creates a new dictionary overwriting the values from the
           default dictionary completing with the base key/values.
        """
        # clone current level
        new = default.copy()
        for key,value in base.items():
            if isinstance(value, list):
                new[key] = value[:]
            elif isinstance(value, dict):
                new[key] = dict_override(value, default.get(key, {}))
            else:
                new[key] = value
        return new
osproject.py 文件源码 项目:setupproject-openstack 作者: jriguera 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setup_groups(self, groups=[], domain='default'):
        groups_defined = []
        for gr in groups:
            name = gr['name']
            desc = gr.get('description', None)
            gdomain = gr.get('domain', domain)
            group_id = self.setup_group(name, desc, gdomain)
            groups_defined.append(group_id)
        return groups_defined
osproject.py 文件源码 项目:setupproject-openstack 作者: jriguera 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_nova_quotas(self, project_id, instances=None, cores=None, 
                          ram_mb=None, floating_ips=None):
        self._get_nova_client()
        updated = False
        msg = "Updating nova quota %s for project id '%s' from %s to %s"
        qs = self.nova.quotas.get(project_id).to_dict()
        if instances is not None and qs['instances'] != instances:
            self.logger.debug(msg % (
                'instances', project_id, qs['instances'], instances))
            qs['instances'] = instances
            updated = True
        if cores is not None and qs['cores'] != cores:
            self.logger.debug(msg % ('cores', project_id, qs['cores'], cores))
            qs['cores'] = cores
            updated = True
        if ram_mb is not None and qs['ram'] != ram_mb:
            self.logger.debug(msg % (
                'ram_mb', project_id, qs['ram'], ram_mb))
            qs['ram'] = ram_mb
            updated = True
        if floating_ips is not None and qs['floating_ips'] != floating_ips:
            self.logger.debug(msg % (
                'floating_ips', project_id,  qs['floating_ips'], floating_ips))
            qs['floating_ips'] = floating_ips
            updated = True
        if updated:
            try:
                self.nova.quotas.update(
                    project_id, instances=qs['instances'], 
                    cores=qs['cores'], ram=qs['ram'], 
                    floating_ips=qs['floating_ips'])
            except nova_exceptions.ClientException as e:
                msg = "Unable to setup new Nova quotas for project id '%s': %s"
                self.logger.error(msg % (project_id, e))
                raise
            msg = "Nova quotas updated successfully for project id '%s'"
            self.logger.info(msg % project_id)
        else:
            msg = "Nova quotas update not needed for project id '%s'"
            self.logger.debug(msg % project_id)
        return updated


问题


面经


文章

微信
公众号

扫码关注公众号