python类DEBUG的实例源码

parser.py 文件源码 项目:fewsn-zigbee-base-station 作者: sethmbaker 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_xbee_rf_data(data):

    if DEBUG:
        print data

    node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr']))
    readings = re.findall("{.*?}", data['rf_data'])        # returns a list of K-V pair matches
    payload = {'node': node_addr}

    for reading in readings:
        item_dict = json.loads(reading)
        for k, v in item_dict.iteritems():
            sensor_name = k.encode('utf-8')
            sensor_value = v.encode('utf-8')
            if sensor_name == "temp:":
                sensor_name = "temp"
            payload['sensor'] = sensor_name
            payload['val'] = sensor_value
            # now pass the payload to the RequestBuilder and send it
            print payload
            if PRODUCTION:
                sendHTTPPost(payload)
auth_prompt_window.py 文件源码 项目:Enibar 作者: ENIB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ask_auth(*dargs, fail_callback=None, pass_performer=False):
    """ Decorator to ask for authorization """
    def decorator(func):
        """ Decorator wrapper """
        def wrapper(*args, **kwargs):
            """ Wrapper """
            if settings.DEBUG:
                func(*args, **kwargs)
                return
            prompt = AuthPromptWindow(dargs)
            if prompt.is_authorized:
                if pass_performer:
                    kwargs["_performer"] = prompt.user
                func(*args, **kwargs)
            else:
                if prompt.show_error:
                    gui.utils.error("Error", "Erreur d'authentification")
                if fail_callback is not None:
                    fail_callback()
        return wrapper
    return decorator
telegram_writer.py 文件源码 项目:classifieds-to-telegram 作者: DiegoHueltes 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def posts_fetcher(self):
        while True:
            for bot in self.bots:
                try:
                    if settings.DEBUG:
                        print('Checking for news {} on bot type {}-{}'.format(datetime.datetime.utcnow(), bot.web_type,
                                                                              bot.target_url))
                    chat_id = bot.chat_id or self.chat
                    posts = bot.save_last_updates(chat_id)
                    if settings.DEBUG:
                        print('-- {} New posts saved found'.format(len(posts)))
                except Exception as e:
                    exc_type, exc_value, exc_traceback = sys.exc_info()
                    error_stack = 'Save_updates ERROR: {}\n'.format(e)
                    error_stack += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
                    self.send_error(error_stack)
            gevent.sleep(self.wait_seconds)
pipes.py 文件源码 项目:telegram-autoposter 作者: vaniakosmos 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def pre_schedule_hook(self):
        if not DEBUG:
            self.post_interval = RedditSetup.post_interval
        self.scheduler.run_repeating(self.store.clear_ids, interval=RedditSetup.db_clear_interval, first=0)
web.py 文件源码 项目:PYKE 作者: muddyfish 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main(debug=settings.DEBUG, url="127.0.0.1"):
    log = logging.getLogger('werkzeug')
    log.setLevel(logging.DEBUG)
    file_handler = logging.FileHandler("log.log", "a")
    file_handler.setLevel(logging.DEBUG)
    log.addHandler(file_handler)
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.DEBUG)
    log.addHandler(stream_handler)
    app.debug = debug
    app.run(url)
retrieve_token.py 文件源码 项目:fewsn-zigbee-base-station 作者: sethmbaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_token():

    r = requests.post(TOKEN_URL, data=payload)
    print r
    print r.text
    token_dict = json.loads(r.text)
    token = token_dict['token']


    if VERBOSE:
        print token_dict

    if DEBUG:                          # TODO move to tests
        assert(token is not None)
    return token_dict['token']
app.py 文件源码 项目:aiohttp-login 作者: imbolc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_app(loop):
    app = web.Application(loop=loop, debug=settings.DEBUG)
    setup_jinja(app, settings.DEBUG)
    aiohttp_session.setup(app, EncryptedCookieStorage(
        settings.SESSION_SECRET.encode('utf-8'),
        max_age=settings.SESSION_MAX_AGE))
    app.middlewares.append(aiohttp_login.flash.middleware)

    app.router.add_get('/', handlers.index)
    app.router.add_get('/users/', handlers.users, name='users')

    app['db'] = await asyncpg.create_pool(dsn=settings.DATABASE, loop=loop)
    aiohttp_login.setup(app, AsyncpgStorage(app['db']), settings.AUTH)

    return app
util.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __call__(self, method):
        from functools import wraps

        @wraps(method)
        def timed(*args, **kw):
            if self.logger.level <= logging.DEBUG:
                ts = time.time()
                result = method(*args, **kw)
                te = time.time()

                print_args = False
                if print_args:
                    self.logger.debug('Ran %r (%s, %r) in %2.2fs' %
                            (method.__name__,
                             ", ".join(["%s" % (a,) for a in args]),
                             kw,
                             te - ts))
                else:
                    self.logger.debug('Ran %r in %2.2fs' %
                            (method.__name__,
                             te - ts))
                return result
            else:
                return method(*args, **kw)

        return timed
util.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, label = ""):
        # Avoid importing this at module scope in order
        # to co-habit with chroma_settings()
        from django.db import connection
        self.connection = connection

        self.label = label
        self.logger.disabled = not self.enabled
        if self.enabled and not len(self.logger.handlers):
            self.logger.setLevel(logging.DEBUG)
            self.logger.addHandler(logging.FileHandler('dbperf.log'))
util.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __enter__(self):
        if settings.DEBUG:
            self.t_initial = time.time()
            self.q_initial = len(self.connection.queries)
test_gui_auth_window.py 文件源码 项目:Enibar 作者: ENIB 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_debug(self):
        """ Testing debug
        """
        settings.DEBUG = True
        self.func()
        self.assertTrue(self.func_called)
        settings.DEBUG = False
dal.py 文件源码 项目:mybookshelf2 作者: izderadicka 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def init():
    global engine
    loop = asyncio.get_event_loop()
    engine = loop.run_until_complete(create_engine(DSN, echo=settings.DEBUG))
response.py 文件源码 项目:flask-skeleton 作者: axiaoxin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def jsonify_(data):
    keycase = settings.JSON_KEYCASE
    if keycase:
        try:
            casefunc = getattr(stringcase, keycase)
            data = keycase_convert(data, casefunc)
        except AttributeError:
            log.warning(u'%s keycase is not supported, response default json. '
                        u'Supported keycase: %s'
                        % (keycase, get_support_keycase()))
    if settings.DEBUG:
        js = json.dumps(data, ensure_ascii=False, indent=4)
    else:
        js = json.dumps(data, ensure_ascii=False, separators=[',', ':'])
    return Response(js, mimetype='application/json')
__init__.py 文件源码 项目:flask-skeleton 作者: axiaoxin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_decorators_on_module_funcs(modules, decorators):
    '''?decorator?????module??????
    ?????__nodeco__???False???????????
    ???????????
    eg:
        def func():
            pass
        func.__nodeco__ = True
    '''
    if not isinstance(modules, (list, tuple)):
        modules = [modules]
    if not isinstance(decorators, (list, tuple)):
        decorators = [decorators]
    for m in modules:
        for funcname, func in vars(m).iteritems():
            if (isinstance(func, types.FunctionType)
                    and not funcname.startswith('_')
                    and func.__module__ == m.__name__):
                if getattr(func, '__nodeco__', False):
                    continue
                for deco in decorators:
                    if settings.DEBUG:
                        log.debug('register %s on %s.%s'
                                  % (deco.__name__, m.__name__, funcname))
                    func = deco(func)
                    vars(m)[funcname] = func
piper_pickledb.py 文件源码 项目:piper 作者: PiperProject 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get( ID, cursor ) :
  if DEBUG :
    print " >>> running piper_pickledb get "
  return cursor.get( ID )


#########
#  EOF  #
#########
piper_mongodb.py 文件源码 项目:piper 作者: PiperProject 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get( ID, cursor ) :
  if DEBUG :
    print " >>> running piper_mongodb get "
  return cursor.find_one( { "_id" : ID } )


#########
#  EOF  #
#########
telegram_writer.py 文件源码 项目:classifieds-to-telegram 作者: DiegoHueltes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def telegram_posts_sender(self):
        while True:
            time_to_wait = (self.wait_seconds // 3) + 1
            posts_to_sent = Crawler.get_post_to_send()
            grouped_posts = groupby(posts_to_sent, key=lambda x: x.to_send_id)
            pending_msgs_to_send = False
            for chat_id, posts in grouped_posts:
                pending_msgs_to_send = True
                posts = list(posts)[:self.post_chunk]
                if settings.DEBUG:
                    print('Sending {} new posts to chat {}'.format(len(posts), chat_id))
                for post in posts:
                    try:
                        self.telegram.sendMessage(chat_id=chat_id, text=post.description)
                        if post.image:
                            try:
                                self.telegram.sendPhoto(chat_id=chat_id, photo=post.image)
                            except BadRequest:
                                self.send_error('ERROR sending picture to {}'.format(post))
                        post.status = 'SENT'
                    except RetryAfter as e:
                        # Flood control exceeded. Retry in 175 seconds
                        self.send_error('RetryAfter error, waiting {} seconds'.format(e.retry_after))
                        time_to_wait = max(e.retry_after, time_to_wait)
                        post.status = 'ERROR'
                    except Exception as e:
                        exc_type, exc_value, exc_traceback = sys.exc_info()
                        error_stack = 'Send_updates ERROR: {}\n'.format(e)
                        error_stack += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
                        self.send_error(error_stack)
                        post.status = 'ERROR'
                Crawler.save_posts(posts)

            if pending_msgs_to_send:
                sleep_time = 3
            else:
                sleep_time = time_to_wait
            gevent.sleep(sleep_time)
nginx.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):
        """
        Generate config files for running nginx, and send the nginx command
        line to stdout.

        The reason for sending the command line to stdout instead of just running
        it is so that supervisord can directly manage the resulting nginx
        process (otherwise we would have to handle passing signals through).
        """
        from chroma_core.lib.util import site_dir

        # This command is only for development
        assert settings.DEBUG

        SITE_ROOT = site_dir()
        join_site_root = partial(os.path.join, SITE_ROOT)

        DEV_NGINX_DIR = join_site_root("dev_nginx")
        join_nginx_dir = partial(join_site_root, DEV_NGINX_DIR)

        NGINX_CONF_TEMPLATE = join_site_root("nginx.conf.template")
        NGINX_CONF = join_nginx_dir("nginx.conf")

        CHROMA_MANAGER_CONF_TEMPLATE = join_site_root("chroma-manager.conf.template")
        CHROMA_MANAGER_CONF = join_nginx_dir("chroma-manager.conf")

        if not os.path.exists(DEV_NGINX_DIR):
            os.makedirs(DEV_NGINX_DIR)

        def write_conf(template_path, conf_path):
            conf_text = Template(open(template_path).read()).render(Context({
                'var': DEV_NGINX_DIR,
                'log': SITE_ROOT,
                'SSL_PATH': settings.SSL_PATH,
                'APP_PATH': settings.APP_PATH,
                'REPO_PATH': settings.DEV_REPO_PATH,
                'HTTP_FRONTEND_PORT': settings.HTTP_FRONTEND_PORT,
                'HTTPS_FRONTEND_PORT': settings.HTTPS_FRONTEND_PORT,
                'HTTP_AGENT_PORT': settings.HTTP_AGENT_PORT,
                'HTTP_API_PORT': settings.HTTP_API_PORT,
                'REALTIME_PORT': settings.REALTIME_PORT,
                'VIEW_SERVER_PORT': settings.VIEW_SERVER_PORT
            }))
            open(conf_path, 'w').write(conf_text)

        write_conf(NGINX_CONF_TEMPLATE, NGINX_CONF)
        write_conf(CHROMA_MANAGER_CONF_TEMPLATE, CHROMA_MANAGER_CONF)

        print " ".join([self._nginx_path, "-c", NGINX_CONF])


问题


面经


文章

微信
公众号

扫码关注公众号