python类get()的实例源码

logsetup.py 文件源码 项目:psyplot 作者: Chilipp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_home():
    """Find user's home directory if possible.
    Otherwise, returns None.

    :see:  http://mail.python.org/pipermail/python-list/2005-February/325395.html

    This function is copied from matplotlib version 1.4.3, Jan 2016
    """
    try:
        if six.PY2 and sys.platform == 'win32':
            path = os.path.expanduser(b"~").decode(sys.getfilesystemencoding())
        else:
            path = os.path.expanduser("~")
    except ImportError:
        # This happens on Google App Engine (pwd module is not present).
        pass
    else:
        if os.path.isdir(path):
            return path
    for evar in ('HOME', 'USERPROFILE', 'TMP'):
        path = os.environ.get(evar)
        if path is not None and os.path.isdir(path):
            return path
    return None
client_cli.py 文件源码 项目:sawtooth-mktplace 作者: hyperledger-archives 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _dump_offer_list(self, offerids, sortkey='name'):
        """
        Utility routine to dump a list of offers
        """
        offers = []
        for offerid in offerids:
            offer = self.MarketState.State[offerid]
            offer['id'] = offerid
            offers.append(offer)

        print "{0:8} {1:35} {2:35} {3}".format(
            'Ratio', 'Input Asset (What You Pay)',
            'Output Asset (What You Get)', 'Name')
        for offer in sorted(offers, key=lambda h: h[sortkey]):
            iholding = self.MarketState.State[offer['input']]
            oholding = self.MarketState.State[offer['output']]
            name = self.MarketState.i2n(offer.get('id'))
            print "{0:<8} {1:35} {2:35} {3}".format(
                offer['ratio'], self.MarketState.i2n(iholding['asset']),
                self.MarketState.i2n(oholding['asset']), name)
util.py 文件源码 项目:ci_scripts 作者: twotwo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def svn_ver(svn_dir, do_update=True):
        """pull the latest content and get version
        svn info <path> | grep "Last Changed Rev" | cut -d" " -f4
        svn info --show-item last-changed-revision <path>
        """
        if do_update:
            cmd = 'svn up %s' % svn_dir
            (cost, out, err) = Command.excute(cmd)
            if len(err) > 0:
                Command.logger.error('excute[%s]: %s' %(cmd, err))

        cmd = 'svn info --show-item last-changed-revision %s' % svn_dir
        (cost, out, err) = Command.excute(cmd)
        if len(err) > 0:
            Command.logger.error('excute[%s]: %s' %(cmd, err))
        if len(out) > 0:
            return out.strip()
        else:
            return ''
util.py 文件源码 项目:ci_scripts 作者: twotwo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, ini_file, dry_run=False):
        self.dry_run = dry_run

        self.build_info = []
        self.logger = logging.getLogger('util.agentBuilder')

        import ConfigParser
        # config = ConfigParser.RawConfigParser(allow_no_value=True)
        config = ConfigParser.ConfigParser()
        config.read(ini_file)
        root_dir = config.get('base', 'root_dir')

        self.lib_base_dir = config.get('base', 'lib_base_dir', 0, {'root_dir': root_dir})
        self.channels_dir = config.get('base', 'channels_dir', 0, {'root_dir': root_dir})

        self.demo_dir = config.get('demo', 'demo_dir', 0, {'root_dir': root_dir})
        from datetime import date
        self.apk_dir = config.get('demo', 'apk_dir', 0, {'root_dir': root_dir, 'day':date.today().strftime('%m%d')})

        self.plugin_dir = config.get('plugins', 'plugin_dir', 0, {'root_dir': root_dir})
core.py 文件源码 项目:sir-bot-a-lot 作者: pyslackers 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(self):
        """
        Update sirbot

        Trigger the update method of the plugins. This is needed if the plugins
        need to perform update migration (i.e database)
        """
        logger.info('Updating Sir Bot-a-lot')
        for name, plugin in self._plugins.items():
            plugin_update = getattr(plugin['plugin'], 'update', None)
            if callable(plugin_update):
                logger.info('Updating %s', name)
                await plugin_update(self.config.get(name, {}), self._plugins)
                logger.info('%s updated', name)
        self._session.close()
        logger.info('Sir Bot-a-lot updated')
config.py 文件源码 项目:ckanext-extractor 作者: stadt-karlsruhe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(setting):
    """
    Get configuration setting.

    ``setting`` is the setting without the ``ckanext.extractor.``
    prefix.

    Handles defaults and transformations.
    """
    setting = 'ckanext.extractor.' + setting
    value = config.get(setting, DEFAULTS[setting])
    for transformation in TRANSFORMATIONS[setting]:
        value = transformation(value)
    return value


# Adapted from ckanext-archiver
cli.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def show_projects(ctx):
    """Show projects."""
    from fulmar.scheduler.projectdb import projectdb

    projects = projectdb.get_all()

    table = []
    headers = ['project_name', 'updated_time', 'is_stopped']
    for _, project in projects.iteritems():
        project_name = project.get('project_name')
        update_timestamp = project.get('update_time')
        update_time = datetime.datetime.fromtimestamp(update_timestamp).strftime('%Y-%m-%d %H:%M:%S')
        is_stopped = 'True' if project.get('is_stopped') else 'False'
        table.append([project_name, update_time, is_stopped])

    click.echo(tabulate(table, headers, tablefmt="grid", numalign="right"))
jlog.py 文件源码 项目:falsy 作者: pingf 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def trace(self, kwargs):
        exc_type, exc_value, exc_traceback = sys.exc_info()
        stack = traceback.extract_tb(exc_traceback)
        lines = []
        for i, s in enumerate(stack):
            filename = s.filename
            l = len(filename)
            shortfile = kwargs.get('shortfile', 40)
            if l > shortfile:
                filename = filename[filename.find('/', l - shortfile):]
            line = '%-40s:%-4s %s' % (
                blue() + filename, yellow() + str(s.lineno),
                '|' + '-' * (i * 4) + cyan() + s.name + ':' + red() + s.line)
            lines.append(line)
        lines = '\n\t'.join(lines)
        kwargs['extra'] = {
            'trace': magenta() + str(exc_type) + ' ' + bold() + magenta() + str(exc_value) + '\n\t' + lines}
osproject.py 文件源码 项目:setupproject-openstack 作者: jriguera 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_logging(self):
        self.logpath = os.environ.get(self.LOGENVCONF, self.LOGGING)
        try:
            #logging.config.fileConfig(self.logpath)
            with open(self.logpath) as logconfig:
                logdata = yaml.load(logconfig)
                logging.config.dictConfig(logdata)
        except Exception as e:
            print("Error with '%s': %s" % (self.logpath, e))
            logging.basicConfig(level=self.LOGLEVEL, format=self.OUTPUT_FORMAT)
            lfile = False
        else:
            lfile = True
        self.logger = logging.getLogger(self.PROG)
        if not lfile:
            self.logger.info("Using default logging settings")
        else:
            self.logger.info("Using logging settings from '%s'" % self.logpath)
osproject.py 文件源码 项目:setupproject-openstack 作者: jriguera 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setup_users(self, users=[], domain='default'):
        users_defined = []
        for user in users:
            name = user['name']
            mail = user.get('email', None)
            desc = user.get('description', None)
            password = user.get('password', None)
            udomain = user.get('domain', domain)
            groups = user.get('groups', [])
            user_id = self.setup_user(name, password, mail, desc, udomain)
            group_ids = []
            for group in groups:
                gr = self.setup_group(group, None, udomain)
                group_ids.append(gr.id)
            self.setup_user_groups(user_id, group_ids, udomain)
            users_defined.append(user_id)
        return users_defined
server.py 文件源码 项目:mergeit 作者: insolite 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gitlab_push(request, config):
    data = json.loads((yield from request.content.read()).decode())
    branch = data['ref'].split('refs/heads/')[1]
    config.reload()
    repo_manager = RepoManager(config['name'],
                               config['uri'],
                               config['merge_workspace'])
    handler = PushHandler(config,
                          branch,
                          data['commits'],
                          repo_manager)
    # re.match(r'(.+?:\/\/.+?)\/', data['repository']['homepage']).group(1),
    loop = asyncio.get_event_loop()
    # Close connection first, then handle
    # (if gitlab can not get response for too long it is repeating request)
    loop.call_soon(handler.handle)
    return web.Response()
server.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start_servers(self):
        ssl = config.get('ssl', 'privatekey')
        # Launch Server
        if config.get('jsonrpc', 'listen'):
            from trytond.protocols.jsonrpc import JSONRPCDaemon
            for hostname, port in parse_listen(
                    config.get('jsonrpc', 'listen')):
                self.jsonrpcd.append(JSONRPCDaemon(hostname, port, ssl))
                self.logger.info("starting JSON-RPC%s protocol on %s:%d",
                    ssl and ' SSL' or '', hostname or '*', port)

        if config.get('xmlrpc', 'listen'):
            from trytond.protocols.xmlrpc import XMLRPCDaemon
            for hostname, port in parse_listen(
                    config.get('xmlrpc', 'listen')):
                self.xmlrpcd.append(XMLRPCDaemon(hostname, port, ssl))
                self.logger.info("starting XML-RPC%s protocol on %s:%d",
                    ssl and ' SSL' or '', hostname or '*', port)

        if config.get('webdav', 'listen'):
            from trytond.protocols.webdav import WebDAVServerThread
            for hostname, port in parse_listen(
                    config.get('webdav', 'listen')):
                self.webdavd.append(WebDAVServerThread(hostname, port, ssl))
                self.logger.info("starting WebDAV%s protocol on %s:%d",
                    ssl and ' SSL' or '', hostname or '*', port)

        for servers in (self.xmlrpcd, self.jsonrpcd, self.webdavd):
            for server in servers:
                server.start()
log.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup_console_logging(verbosity_level):
    if verbosity_level < min(LOG_LEVELS.keys()):
        verbosity_level = min(LOG_LEVELS.keys())
    if verbosity_level > max(LOG_LEVELS.keys()):
        verbosity_level = max(LOG_LEVELS.keys())

    # loglevels = config.get('loglevels', {})
    loglevels = {}
    has_debug_loglevels = any([
        level < logging.INFO for level in loglevels.values()])

    verbosity_filter = VerbosityFilter(verbosity_level, loglevels)

    if verbosity_level < 1 and not has_debug_loglevels:
        log_format = "%(levelname)-8s %(message)s"
    else:
        # log_format = "(%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s (%(funcName)-5s) %(message_log_color)s%(message)s"
        log_format = "%(levelname)-8s %(asctime)s [%(process)d:%(threadName)s] %(name)s\n  %(message)s"
        # source: Logging Cookbook — Python 3.6.0 documentation
        # log_format = "%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s"
    formatter = logging.Formatter(log_format)

    # if config['logging']['color']:
    #     handler = ColorizingStreamHandler({})
    # else:
    #     handler = logging.StreamHandler()
    handler = ColorizingStreamHandler({})
    handler.addFilter(verbosity_filter)
    handler.setFormatter(formatter)

    logging.getLogger('').addHandler(handler)


# def setup_debug_logging_to_file(config):
#     formatter = logging.Formatter(config['logging']['debug_format'])
#     handler = logging.handlers.RotatingFileHandler(
#         config['logging']['debug_file'], maxBytes=10485760, backupCount=3)
#     handler.setFormatter(formatter)
#
#     logging.getLogger('').addHandler(handler)
__init__.py 文件源码 项目:python-aws-ecr-deployer 作者: filc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_app(config=None):
    app = Flask(__name__)
    app.config.update(config)
    app.wsgi_app = SessionMiddleware(app.wsgi_app, config['session'])
    app.register_blueprint(bp)

    if config.get('logger'):
        logging.config.dictConfig(config.get('logger'))

    _configure_error_handlers(app)
    _setup_requests(app)
    return app
util.py 文件源码 项目:ci_scripts 作者: twotwo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def git_ver(git_dir):
        """pull the latest content and get version
        git revision cmd: git rev-list --count HEAD
        """
        cmd = 'git -C %s pull' % git_dir
        (cost, out, err) = Command.excute(cmd)
        if len(err) > 0:
            Command.logger.error('excute[%s]: %s' %(cmd, err))
        cmd = 'git -C %s rev-list --count HEAD' % git_dir
        (cost, out, err) = Command.excute(cmd)
        if len(err) > 0:
            Command.logger.error('excute[%s]: %s' %(cmd, err))
            return ''
        else:
            return out.strip('\n')
__init__.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect_euca(host=None, aws_access_key_id=None, aws_secret_access_key=None,
                 port=8773, path='/services/Eucalyptus', is_secure=False,
                 **kwargs):
    """
    Connect to a Eucalyptus service.

    :type host: string
    :param host: the host name or ip address of the Eucalyptus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.ec2.connection.EC2Connection`
    :return: A connection to Eucalyptus server
    """
    from boto.ec2 import EC2Connection
    from boto.ec2.regioninfo import RegionInfo

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'eucalyptus_host', None)

    reg = RegionInfo(name='eucalyptus', endpoint=host)
    return EC2Connection(aws_access_key_id, aws_secret_access_key,
                         region=reg, port=port, path=path,
                         is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect_walrus(host=None, aws_access_key_id=None,
                   aws_secret_access_key=None,
                   port=8773, path='/services/Walrus', is_secure=False,
                   **kwargs):
    """
    Connect to a Walrus service.

    :type host: string
    :param host: the host name or ip address of the Walrus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to Walrus
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'walrus_host', None)

    return S3Connection(aws_access_key_id, aws_secret_access_key,
                        host=host, port=port, path=path,
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect_ia(ia_access_key_id=None, ia_secret_access_key=None,
               is_secure=False, **kwargs):
    """
    Connect to the Internet Archive via their S3-like API.

    :type ia_access_key_id: string
    :param ia_access_key_id: Your IA Access Key ID.  This will also look
        in your boto config file for an entry in the Credentials
        section called "ia_access_key_id"

    :type ia_secret_access_key: string
    :param ia_secret_access_key: Your IA Secret Access Key.  This will also
        look in your boto config file for an entry in the Credentials
        section called "ia_secret_access_key"

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to the Internet Archive
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    access_key = config.get('Credentials', 'ia_access_key_id',
                            ia_access_key_id)
    secret_key = config.get('Credentials', 'ia_secret_access_key',
                            ia_secret_access_key)

    return S3Connection(access_key, secret_key,
                        host='s3.us.archive.org',
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
admin_cli.py 文件源码 项目:sawtooth-validator 作者: hyperledger-archives 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def local_main(config):
    controller = ClientController(config['LedgerURL'],
                                  keystring=config.get('SigningKey'))
    controller.cmdloop()
core.py 文件源码 项目:sir-bot-a-lot 作者: pyslackers 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _initialize_plugins(self):
        """
        Initialize the plugins

        Query the configuration and the plugins for info
        (name, registry name, start priority, etc)
        """
        logger.debug('Initializing plugins')
        plugins = self._pm.hook.plugins(loop=self._loop)
        if plugins:
            for plugin in plugins:
                name = plugin.__name__
                registry_name = plugin.__registry__ or plugin.__name__
                config = self.config.get(name, {})

                priority = config.get('priority', 50)

                if priority:
                    self._plugins[name] = {
                        'plugin': plugin,
                        'config': config,
                        'priority': priority,
                        'factory': registry_name
                    }

                    self._start_priority[priority].append(name)
        else:
            logger.error('No plugins found')
utils.py 文件源码 项目:experiment-manager 作者: softfire-eu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_config(section, key, default=None):
    config = get_config_parser()
    if default is None:
        return config.get(section=section, option=key)
    try:
        return config.get(section=section, option=key)
    except configparser.NoOptionError:
        return default
    except configparser.NoSectionError:
        return default
config.py 文件源码 项目:ckanext-extractor 作者: stadt-karlsruhe 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_field_indexed(field):
    """
    Check if a metadata field is configured to be indexed.
    """
    return _any_match(field.lower(), get('indexed_fields'))
config.py 文件源码 项目:ckanext-extractor 作者: stadt-karlsruhe 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_format_indexed(format):
    """
    Check if a resource format is configured to be indexed.
    """
    return _any_match(format.lower(), get('indexed_formats'))
config.py 文件源码 项目:cointrader 作者: toirl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, configfile=None):

        self.verbose = False
        self.market = "poloniex"
        self.api_key = None
        self.api_secret = None

        if configfile:
            logging.config.fileConfig(configfile.name)
            config = configparser.ConfigParser()
            config.readfp(configfile)
            exchange = config.get("DEFAULT", "exchange")
            self.api_key = config.get(exchange, "api_key")
            self.api_secret = config.get(exchange, "api_secret")
__init__.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect_euca(host=None, aws_access_key_id=None, aws_secret_access_key=None,
                 port=8773, path='/services/Eucalyptus', is_secure=False,
                 **kwargs):
    """
    Connect to a Eucalyptus service.

    :type host: string
    :param host: the host name or ip address of the Eucalyptus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.ec2.connection.EC2Connection`
    :return: A connection to Eucalyptus server
    """
    from boto.ec2 import EC2Connection
    from boto.ec2.regioninfo import RegionInfo

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'eucalyptus_host', None)

    reg = RegionInfo(name='eucalyptus', endpoint=host)
    return EC2Connection(aws_access_key_id, aws_secret_access_key,
                         region=reg, port=port, path=path,
                         is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_walrus(host=None, aws_access_key_id=None,
                   aws_secret_access_key=None,
                   port=8773, path='/services/Walrus', is_secure=False,
                   **kwargs):
    """
    Connect to a Walrus service.

    :type host: string
    :param host: the host name or ip address of the Walrus server

    :type aws_access_key_id: string
    :param aws_access_key_id: Your AWS Access Key ID

    :type aws_secret_access_key: string
    :param aws_secret_access_key: Your AWS Secret Access Key

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to Walrus
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    # Check for values in boto config, if not supplied as args
    if not aws_access_key_id:
        aws_access_key_id = config.get('Credentials',
                                       'euca_access_key_id',
                                       None)
    if not aws_secret_access_key:
        aws_secret_access_key = config.get('Credentials',
                                           'euca_secret_access_key',
                                           None)
    if not host:
        host = config.get('Boto', 'walrus_host', None)

    return S3Connection(aws_access_key_id, aws_secret_access_key,
                        host=host, port=port, path=path,
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
__init__.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_ia(ia_access_key_id=None, ia_secret_access_key=None,
               is_secure=False, **kwargs):
    """
    Connect to the Internet Archive via their S3-like API.

    :type ia_access_key_id: string
    :param ia_access_key_id: Your IA Access Key ID.  This will also look
        in your boto config file for an entry in the Credentials
        section called "ia_access_key_id"

    :type ia_secret_access_key: string
    :param ia_secret_access_key: Your IA Secret Access Key.  This will also
        look in your boto config file for an entry in the Credentials
        section called "ia_secret_access_key"

    :rtype: :class:`boto.s3.connection.S3Connection`
    :return: A connection to the Internet Archive
    """
    from boto.s3.connection import S3Connection
    from boto.s3.connection import OrdinaryCallingFormat

    access_key = config.get('Credentials', 'ia_access_key_id',
                            ia_access_key_id)
    secret_key = config.get('Credentials', 'ia_secret_access_key',
                            ia_secret_access_key)

    return S3Connection(access_key, secret_key,
                        host='s3.us.archive.org',
                        calling_format=OrdinaryCallingFormat(),
                        is_secure=is_secure, **kwargs)
log.py 文件源码 项目:flask-maple 作者: honmaple 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def init_app(self, app):
        config = app.config.get('LOGGING', DEFAULT_LOG)
        logs_folder = config['LOGGING_FOLDER']
        formatter = Formatter(
            config.get('formatter', DEFAULT_LOG['formatter']))
        info_log = os.path.join(logs_folder, config.get('info',
                                                        DEFAULT_LOG['info']))

        info_file_handler = logging.handlers.RotatingFileHandler(
            info_log, maxBytes=100000, backupCount=10)

        info_file_handler.setLevel(logging.INFO)
        info_file_handler.setFormatter(formatter)
        app.logger.addHandler(info_file_handler)

        error_log = os.path.join(logs_folder, config.get('error',
                                                         DEFAULT_LOG['error']))

        error_file_handler = logging.handlers.RotatingFileHandler(
            error_log, maxBytes=100000, backupCount=10)

        error_file_handler.setLevel(logging.ERROR)
        error_file_handler.setFormatter(formatter)
        app.logger.addHandler(error_file_handler)

        if app.config.get('send_mail', DEFAULT_LOG['send_mail']):
            credentials = (config['MAIL_USERNAME'], config['MAIL_PASSWORD'])
            mailhost = (config['MAIL_SERVER'], config['MAIL_PORT'])
            mail_handler = ThreadedSMTPHandler(
                secure=(),
                mailhost=mailhost,
                fromaddr=config['MAIL_DEFAULT_SENDER'],
                toaddrs=config['MAIL_ADMIN'],
                subject=config.get('subject', DEFAULT_LOG['subject']),
                credentials=credentials)

            mail_handler.setLevel(logging.ERROR)
            mail_handler.setFormatter(formatter)
            app.logger.addHandler(mail_handler)
gitlab_mr.py 文件源码 项目:gitlab-merge-request 作者: anti-social 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_project_by_path(self, project_path):
        try:
            return self.gitlab.projects.get(project_path)
        except GitlabConnectionError as e:
            err('Cannot connect to the gitlab server: %s', e)
        except GitlabGetError:
            err('Project [%s] not found', project_path)
        except GitlabError as e:
            err('Error when getting project [%s]: %s' % (project_path, e))
gitlab_mr.py 文件源码 项目:gitlab-merge-request 作者: anti-social 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_remote_branch_name(self, project, local_branch, remote):
        # check if there is upstream for local branch
        tracking_branch = self.repo.branches[local_branch].tracking_branch()
        if tracking_branch:
            remote_branch = tracking_branch.name.partition('/')[2]
        else:
            remote_branch = local_branch
        try:
            project.branches.get(remote_branch)
        except GitlabGetError:
            err('Branch [%s] from project [%s] not found',
                remote_branch, project.path_with_namespace)
        except GitlabConnectionError as e:
            err('%s', e)
        return remote_branch


问题


面经


文章

微信
公众号

扫码关注公众号