python类debug()的实例源码

app.py 文件源码 项目:aniping 作者: kuruoujou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def scan_scrapers():
    """On demand scrapper scanning handler.

    For some reason the scheduler doesn't always work, this endpoint allows for
    instant scanning, assuming it's not already occurring. The function is aborted
    with a ``404`` message to hide the page if the user is not authenticated.

    Scanning can take a long time - 20 to 30 minutes - so it's recommended this
    endpoint be called asynchronously.

    Returns:
       JSON formatted output to identify that scanning has completed or is already
       ongoing.
    """
    log.debug("Entering scan_scrapers.")
    if fe.check_login_id(escape(session['logged_in'])):
        log.debug("User is logged in, attempting to begin scan.")
        if not fe.scrape_shows():
            log.debug("scrape_shows returned false, either the lockfile exists incorrectly or scraping is ongoing.")
            return jsonify({"scan":"failure", "reason":"A scan is ongoing"})
        log.debug("scrape_shows just returned. Returning success.")
        return jsonify({"scan":"success"})
    log.debug("User cannot be authenticated, send 404 to hide page.")
    abort(404)
model.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def response(self, resp, content):
    """Convert the response wire format into a Python object.

    Args:
      resp: httplib2.Response, the HTTP response headers and status
      content: string, the body of the HTTP response

    Returns:
      The body de-serialized as a Python object.

    Raises:
      googleapiclient.errors.HttpError if a non 2xx response is received.
    """
    self._log_response(resp, content)
    # Error handling is TBD, for example, do we retry
    # for some operation/error combinations?
    if resp.status < 300:
      if resp.status == 204:
        # A 204: No Content response should be treated differently
        # to all the other success states
        return self.no_content_response
      return self.deserialize(content)
    else:
      logging.debug('Content from bad request was: %s' % content)
      raise HttpError(resp, content)
arbitrage.py 文件源码 项目:bitcoin-arbitrage 作者: ucfyao 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("-d", "--debug", help="debug verbose mode",
                            action="store_true")
        parser.add_argument("-v", "--verbose", help="info verbose mode",
                            action="store_true")
        parser.add_argument("-o", "--observers", type=str,
                            help="observers, example: -oLogger,Emailer")
        parser.add_argument("-m", "--markets", type=str,
                            help="markets, example: -mHaobtcCNY,Bitstamp")
        parser.add_argument("-s", "--status", help="status", action="store_true")
        parser.add_argument("command", nargs='*', default="watch",
                            help='verb: "watch|replay-history|get-balance|list-public-markets|get-broker-balance"')
        args = parser.parse_args()
        self.init_logger(args)
        self.exec_command(args)
build.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def start_process(cmd, supress_output=False):
    """Starts the build process by passing the command string to the
    command line

    Args:
        cmd (string): command for the build process.
        supress_output (bool): Indicates if logging is active for the build .
    """
    logging.debug(cmd)
    proc = subprocess.Popen(cmd, stdout=None, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    rtn_code = proc.returncode

    if supress_output is False:
        if out:
            logging.info(out)
        if err:
            logging.error(err)

    if rtn_code == 0 or rtn_code is None:
        logging.info('Success: Process return code %s', str(rtn_code))
    else:
        logging.error('Error: Process return code %s', str(rtn_code))
        sys.exit(1)
MCP4725.py 文件源码 项目:Adafruit_Python_MCP4725 作者: adafruit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def set_voltage(self, value, persist=False):
        """Set the output voltage to specified value.  Value is a 12-bit number
        (0-4095) that is used to calculate the output voltage from:

          Vout =  (VDD*value)/4096

        I.e. the output voltage is the VDD reference scaled by value/4096.
        If persist is true it will save the voltage value in EEPROM so it
        continues after reset (default is false, no persistence).
        """
        # Clamp value to an unsigned 12-bit value.
        if value > 4095:
            value = 4095
        if value < 0:
            value = 0
        logging.debug('Setting value to {0:04}'.format(value))
        # Generate the register bytes and send them.
        # See datasheet figure 6-2:
        #   https://www.adafruit.com/datasheets/mcp4725.pdf 
        reg_data = [(value >> 4) & 0xFF, (value << 4) & 0xFF]
        if persist:
            self._device.writeList(WRITEDACEEPROM, reg_data)
        else:
            self._device.writeList(WRITEDAC, reg_data)
scrape.py 文件源码 项目:hearthscan-bot 作者: d-schmidt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    print("see log scrape.log")
    if os.path.isfile("scrape.log"):
        os.remove("scrape.log")
    log.basicConfig(filename="scrape.log",
            format='%(asctime)s %(levelname)s %(message)s',
            level=log.DEBUG)

    try:
        log.debug("main() full scrape will take 5-10 minutes")
        cards, tokens = loadJsonCards()

        saveCardsAsJson("data/cards.json", loadSets(allcards=cards))

        # a lot of token names are not unique
        # a static, handmade list of ids is more reliable
        if os.path.isfile('data/tokenlist.json'):
            with open('data/tokenlist.json', 'r', encoding='utf8') as f:
                saveCardsAsJson("data/tokens.json", loadTokens(tokens, json.load(f)))
    except Exception as e:
        log.exception("main() error %s", e)
console.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_console_log(session, arg_dict):
    try:
        raw_dom_id = arg_dict['dom_id']
    except KeyError:
        raise dom0_pluginlib.PluginError("Missing dom_id")
    try:
        dom_id = int(raw_dom_id)
    except ValueError:
        raise dom0_pluginlib.PluginError("Invalid dom_id")

    logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb')
    try:
        try:
            log_content = _last_bytes(logfile)
        except IOError, e:  # noqa
            msg = "Error reading console: %s" % e
            logging.debug(msg)
            raise dom0_pluginlib.PluginError(msg)
    finally:
        logfile.close()

    return base64.b64encode(zlib.compress(log_content))
ipxe.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _write_file(filename, data):
    # If the ISO was tampered with such that the destination is a symlink,
    # that could allow a malicious user to write to protected areas of the
    # dom0 filesystem. /HT to comstud for pointing this out.
    #
    # Short-term, checking that the destination is not a symlink should be
    # sufficient.
    #
    # Long-term, we probably want to perform all file manipulations within a
    # chroot jail to be extra safe.
    if os.path.islink(filename):
        raise RuntimeError('SECURITY: Cannot write to symlinked destination')

    logging.debug("Writing to file '%s'" % filename)
    f = open(filename, 'w')
    try:
        f.write(data)
    finally:
        f.close()
xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def iptables_config(session, args):
    # command should be either save or restore
    logging.debug("iptables_config:enter")
    logging.debug("iptables_config: args=%s", args)
    cmd_args = pluginlib.exists(args, 'cmd_args')
    logging.debug("iptables_config: cmd_args=%s", cmd_args)
    process_input = pluginlib.optional(args, 'process_input')
    logging.debug("iptables_config: process_input=%s", process_input)
    cmd = json.loads(cmd_args)
    cmd = map(str, cmd)

    # either execute iptable-save or iptables-restore
    # command must be only one of these two
    # process_input must be used only with iptables-restore
    if len(cmd) > 0 and cmd[0] in ('iptables-save',
                                   'iptables-restore',
                                   'ip6tables-save',
                                   'ip6tables-restore'):
        result = _run_command(cmd, process_input)
        ret_str = json.dumps(dict(out=result, err=''))
        logging.debug("iptables_config:exit")
        return ret_str
    # else don't do anything and return an error
    else:
        raise pluginlib.PluginError("Invalid iptables command")
parse_trace.py 文件源码 项目:git-stacktrace 作者: pinterest 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def prep_blob(self, blob):
        """Cleanup input."""
        # remove empty lines
        if type(blob) == list:
            blob = [line for line in blob if line.strip() != '']
            if len(blob) == 1:
                blob = blob[0].replace('\\n', '\n').split('\n')
        # Split by line
        if type(blob) == str or type(blob) == six.text_type:
            lines = blob.split('\n')
        elif type(blob) == list:
            if len(blob) == 1:
                lines = blob[0].split('\n')
            else:
                lines = [line.rstrip() for line in blob]
        else:
            message = "Unknown input format"
            log.debug("%s - '%s", message, blob)
            raise ParseException(message)
        return lines
websocket.py 文件源码 项目:engel 作者: Dalloriam 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def register(self, event, callback, selector=None):
        logging.debug('Registering: ' + str(event))

        if selector:
            key = str(id(callback))
        else:
            key = '_'

        self.handlers[event][key].append(callback)

        if event not in ('init', 'load', 'close'):
            capture = False
            if selector is None:
                selector = 'html'
                capture = True

            logging.debug('Dispatching: ' + str(event))
            self.dispatch({
                'name': 'subscribe',
                'event': event,
                'selector': selector,
                'capture': capture,
                'key': str(id(callback))
            })
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_filter(opts=None):
    opts = opts or []
    if 'inc=*' in opts:
        # do not filter any files, include everything
        return None

    def _filter(dir, ls):
        incs = [opt.split('=').pop() for opt in opts if 'inc=' in opt]
        _filter = []
        for f in ls:
            _f = os.path.join(dir, f)

            if not os.path.isdir(_f) and not _f.endswith('.py') and incs:
                if True not in [fnmatch(_f, inc) for inc in incs]:
                    logging.debug('Not syncing %s, does not match include '
                                  'filters (%s)' % (_f, incs))
                    _filter.append(f)
                else:
                    logging.debug('Including file, which matches include '
                                  'filters (%s): %s' % (incs, _f))
            elif (os.path.isfile(_f) and not _f.endswith('.py')):
                logging.debug('Not syncing file: %s' % f)
                _filter.append(f)
            elif (os.path.isdir(_f) and not
                  os.path.isfile(os.path.join(_f, '__init__.py'))):
                logging.debug('Not syncing directory: %s' % f)
                _filter.append(f)
        return _filter
    return _filter
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def sync_directory(src, dest, opts=None):
    if os.path.exists(dest):
        logging.debug('Removing existing directory: %s' % dest)
        shutil.rmtree(dest)
    logging.info('Syncing directory: %s -> %s.' % (src, dest))

    shutil.copytree(src, dest, ignore=get_filter(opts))
    ensure_init(dest)
storage.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        logging.debug("Storage engine : %s", engine)
        Base.metadata.create_all(engine)
        Session = sessionmaker(bind=engine)
        self.session = Session()
net.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def msg_recv(conn, sendfunc, closefunc):
    '''
    Function msg_recv reads null-delimited series of bytes from `conn`, which
    is a socket. Each series of bytes is then de-serialized into a json object,
    and `sendfunc` is called with that json object.
    `closefunc` is called if/when the socket `conn` is closed.
    '''
    buf = bytes()
    while True:
        try:
            data = conn.recv(8192)

            # No data means the connection is closed
            if not data:
                closefunc()
                return

            inbuf = buf + data
            if SEP in inbuf:
                parts = inbuf.split(SEP)
                # logging.debug("Length of parts: {}".format(len(parts)))
                tosend = [parts[0]]
                for p in parts[1:-1]:
                    tosend.append(p)
                buf = parts[-1]
                for msg in tosend:
                    m = gzip.decompress(msg)
                    m = m.decode('utf-8')
                    logging.debug("Msg: {}".format(m[:150]+'...' if len(m) > 150 else m))
                    obj = json.loads(m)
                    sendfunc(obj)
            else:
                buf += data
        except Exception as e:
            logging.exception(e)
server.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_input(self, inpt):
        # Just pass the input to the parent bout, but with info saying that
        # this input comes from this player
        logging.debug(inpt)
        self.bout.send_input({'player': self.name, 'input': inpt})
client.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host, port):
        self.host = host
        self.port = int(port)
        self.stateq = queue.Queue()
        self.clientsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.clientsock.connect((self.host, self.port))
        net.msg_recv(self.clientsock, self.stateq.put, lambda: None)
        conf = self.stateq.get()
        logging.debug("Conf: {}".format(conf))
        self.name = conf['name']
client.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_input(self, inpt):
        logging.debug('PlayerClient "{}" sending: {}'.format(
            self.name, net.json_dump(inpt)))
        if isinstance(inpt, dict) and 'change-name' in inpt:
            self.name = inpt['change-name']
        net.send(self.clientsock, inpt)
        # self.clientsock.sendall(net.json_dump(inpt).encode('utf-8')+net.SEP)
queue_example.py 文件源码 项目:logscan 作者: magedu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def consumer(e, q):
    while not e.is_set():
        message = q.get()
        time.sleep(0.1)
        logging.debug('consume {0}'.format(message))
queue_example.py 文件源码 项目:logscan 作者: magedu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def producer(e, q):
    for i in range(10):
        q.put(i)
        time.sleep(0.1)
        logging.debug('produce {0}'.format(i))
    if q.empty():
        e.set()


问题


面经


文章

微信
公众号

扫码关注公众号