python类warn()的实例源码

asyncdns.py 文件源码 项目:ShadowSocks 作者: immqy 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def handle_event(self, sock, fd, event):
        if sock != self._sock:
            return
        if event & eventloop.POLL_ERR:
            logging.error('dns socket err')
            self._loop.remove(self._sock)
            self._sock.close()
            # TODO when dns server is IPv6
            self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
                                       socket.SOL_UDP)
            self._sock.setblocking(False)
            self._loop.add(self._sock, eventloop.POLL_IN, self)
        else:
            data, addr = sock.recvfrom(1024)
            if addr[0] not in self._servers:
                logging.warn('received a packet other than our dns')
                return
            self._handle_data(data)
weiboarc.py 文件源码 项目:sfm-weibo-harvester 作者: gwu-libraries 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get(self, *args, **kwargs):
        try:
            r = self.client.get(*args, **kwargs)
            # if rate limit reach
            if r.status_code == 429:
                seconds = self.wait_time()
                logging.warn("Rate limit 429 from Weibo API, Sleep %d to try...", seconds)
                time.sleep(seconds)
                r = self.get(*args, **kwargs)
            return r
        except APIError, e:
            # if rate limit reach
            log.error("caught APIError error %s", e)
            if e.error_code in [10022, 10023, 10024]:
                seconds = self.wait_time()
                logging.warn("Rate limit %d from Weibo API, Sleep %d to try...", e.error_code, seconds)
                time.sleep(seconds)
                return self.get(*args, **kwargs)
            else:
                raise e
        except requests.exceptions.ConnectionError as e:
            log.error("caught connection error %s", e)
            self._connect()
            return self.get(*args, **kwargs)
logger.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def setup_logger():
    from colorlog import ColoredFormatter
    from gettext import gettext as _  # noqa

    try:
        """Return a logging obj with a default ColoredFormatter."""
        formatter = ColoredFormatter(
            "%(asctime)s %(name)-12s (%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s (%(funcName)-5s) %(message_log_color)s%(message)s",  # noqa
            datefmt=None,
            reset=True,
            log_colors={
                'DEBUG': 'cyan',
                'INFO': 'green',
                'WARNING': 'yellow',
                'ERROR': 'red',
                'CRITICAL': 'bold_red',
                'TRACE': 'purple'
            },
            secondary_log_colors={
                'message': {
                    'ERROR': 'red',
                    'CRITICAL': 'red',
                    'DEBUG': 'yellow',
                    'INFO': 'yellow,bg_blue'
                }
            },
            style='%'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        logging.root.setLevel(logging.DEBUG)
    except ImportError:
        # No color available, use default config
        logging.basicConfig(format='%(levelname)s: %(message)s')
        logging.warn("Disabling color, you really want to install colorlog.")
resource.py 文件源码 项目:django-corenlp 作者: arunchaganty 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_data_or_download(dir_name, file_name, url='', size='unknown'):
    """Returns the data. if the data hasn't been downloaded, then first download the data.

    :param dir_name: directory to look in
    :param file_name: file name to retrieve
    :param url: if the file is not found, then download it from this url
    :param size: the expected size
    :return: path to the requested file
    """
    dname = os.path.join(stanza.DATA_DIR, dir_name)
    fname = os.path.join(dname, file_name)
    if not os.path.isdir(dname):
        assert url, 'Could not locate data {}, and url was not specified. Cannot retrieve data.'.format(dname)
        os.makedirs(dname)
    if not os.path.isfile(fname):
        assert url, 'Could not locate data {}, and url was not specified. Cannot retrieve data.'.format(fname)
        logging.warn('downloading from {}. This file could potentially be *very* large! Actual size ({})'.format(url, size))
        with open(fname, 'wb') as f:
            f.write(get_from_url(url))
    return fname
resource.py 文件源码 项目:django-corenlp 作者: arunchaganty 项目源码 文件源码 阅读 58 收藏 0 点赞 0 评论 0
def get_data_or_download(dir_name, file_name, url='', size='unknown'):
    """Returns the data. if the data hasn't been downloaded, then first download the data.

    :param dir_name: directory to look in
    :param file_name: file name to retrieve
    :param url: if the file is not found, then download it from this url
    :param size: the expected size
    :return: path to the requested file
    """
    dname = os.path.join(stanza.DATA_DIR, dir_name)
    fname = os.path.join(dname, file_name)
    if not os.path.isdir(dname):
        assert url, 'Could not locate data {}, and url was not specified. Cannot retrieve data.'.format(dname)
        os.makedirs(dname)
    if not os.path.isfile(fname):
        assert url, 'Could not locate data {}, and url was not specified. Cannot retrieve data.'.format(fname)
        logging.warn('downloading from {}. This file could potentially be *very* large! Actual size ({})'.format(url, size))
        with open(fname, 'wb') as f:
            f.write(get_from_url(url))
    return fname
pipe.py 文件源码 项目:gae-dataflow 作者: amygdala 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def process(self, element):

    created_at = element.properties.get('created_at', None)
    cav = None
    if created_at:
      cav = created_at.timestamp_value
      cseconds = cav.seconds
    else:
      return
    crdt = datetime.datetime.fromtimestamp(cseconds)
    logging.warn("crdt: %s", crdt)
    logging.warn("earlier: %s", self.earlier)
    if crdt > self.earlier:
      # return only the elements (datastore entities) with a 'created_at' date
      # within the last self.days days.
      yield element
db_to_gtfs.py 文件源码 项目:db-api-to-gtfs 作者: patrickbr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch_json(self, url):
        """Fetch remote json"""

        timeout = 1
        while True:
            try:
                logging.debug('Opening %s.', url)
                response = urllib2.urlopen(url)
                break
            except urllib2.HTTPError as err:
                if timeout <= MAX_TIMEOUT:
                    logging.warn('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
                    logging.warn('Waiting for %ds before retrying.', timeout)
                    time.sleep(timeout)
                    timeout *= 2
                else:
                    logging.error('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
                    raise err

        data = json.load(response)

        return data
mqtt.py 文件源码 项目:broadlink-mqtt 作者: eschava 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def record(device, file):
    logging.debug("Recording command to file " + file)
    # receive packet
    device.enter_learning()
    ir_packet = None
    attempt = 0
    while ir_packet is None and attempt < 6:
        time.sleep(5)
        ir_packet = device.check_data()
        attempt = attempt + 1
    if ir_packet is not None:
        # write to file
        directory = os.path.dirname(file)
        if not os.path.exists(directory):
            os.makedirs(directory)
        with open(file, 'wb') as f:
            f.write(str(ir_packet).encode('hex'))
        logging.debug("Done")
    else:
        logging.warn("No command received")
utils.py 文件源码 项目:pyradio-nepal 作者: sandipbgt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_player():
    """
    :returns a radio instance
    """
    radio = None
    try:
        radio = player.VlcPlayer()
    except Exception as e:
        logging.warn('Failed to load first player option, trying another, %s' % str(e))
        radio = player.MpPlayer()
    finally:
        return radio
utils.py 文件源码 项目:pyradio-nepal 作者: sandipbgt 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_stations():
    """
    :returns list of all the(available) radio stations fetching from a server
    helps to manage station list in one place
    """
    stations_json = []
    try:
        print("Fetching stations data from server.")
        response = requests.get(STATION_FETCH_URL)
        stations_json = response.json()
    except Exception as e:
        logging.warn('Unable to load data from server. Processing local data.')
        stations_json = get_stations_from_json()
    finally:
        return _format_station_json_to_dict(stations_json)
data_source_tables_gen.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def merge_all_files_into_pytables(file_dir, file_out):
    """
    process each file into pytables
    """
    start = None
    start = datetime.datetime.now()
    out_h5 = tables.openFile(file_out,
                             mode="w",
                             title="bars",
                             filters=tables.Filters(complevel=9,
                                                    complib='zlib'))
    table = None
    for file_in in glob.glob(file_dir + "/*.gz"):
        gzip_file = gzip.open(file_in)
        expected_header = ["dt", "sid", "open", "high", "low", "close",
                           "volume"]
        csv_reader = csv.DictReader(gzip_file)
        header = csv_reader.fieldnames
        if header != expected_header:
            logging.warn("expected header %s\n" % (expected_header))
            logging.warn("header_found %s" % (header))
            return

        for current_date, rows in parse_csv(csv_reader):
            table = out_h5.createTable("/TD", "date_" + current_date,
                                       OHLCTableDescription,
                                       expectedrows=len(rows),
                                       createparents=True)
            table.append(rows)
            table.flush()
        if table is not None:
            table.flush()
    end = datetime.datetime.now()
    diff = (end - start).seconds
    logging.debug("finished  it took %d." % (diff))
api_v2.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def api_delete_item_v2(table, id, request):
    models = {'user': User, 'blog': Blog, 'comment': Comment}
    check_user(request.__user__)
    item = await models[table].find(id)
    if item:
        await item.remove()
    else:
        logging.warn('id: %s not exist in %s' % (id, table))
    return dict(id=id)


# ???????????
orm.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save(self):
        args = list(map(self.getValueOrDefault, self.__mappings__))
        rows = await execute(self.__insert__, args)
        if rows != 1:
            logging.warn('failed to insert record: affected rows: %s' % rows)

    # ?????????????
orm.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update(self):
        args = list(map(self.get, self.__fields__))
        rows = await execute(self.__update__, args)
        if rows != 1:
            logging.warn('failed to update by primary key: affected rows: %s' % rows)

    # ????????????
orm.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def remove(self):
        args = [self.get(self.__primary_key__)]
        rows = await execute(self.__delete__, args)
        if rows != 1:
            logging.warn('failed to remove by primary key: affected rows: %s' % rows)
api.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def api_delete_item(table, id, request):
    models = {'users': User, 'blogs': Blog, 'comments': Comment, 'oauth': Oauth}
    check_user(request.__user__)
    item = await models[table].find(id)
    if item:
        await item.remove()
    else:
        logging.warn('id: %s not exist in %s' % (id, table))
    return dict(id=id)
PeerProtocolTCP.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def send_data(self, data):
        """Wrap data in framer's header and send it"""
        packet = bytearray()
        packet.extend(struct.pack('>I', len(data)))
        packet.extend(data)

        try:
            self._transport.write(packet)
        except Exception as exc:
            #logging.warn("Conn: {} Exception when sending: {}"
            #             .format(self._connection_id, e))
            logging.exception('Conn: %s Exception while sending', self._connection_id, exc_info=exc)
            self.remove_all_members()
PeerProtocolTCP.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pause_writing(self):
        logging.warn("PEER PROTOCOL IS OVER THE HIGH-WATER MARK")
        self._throttle = True
PeerProtocolTCP.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def resume_writing(self):
        logging.warn("PEER PROTOCL IS DRAINED BELOW THE HIGH-WATER MARK")
        self._throttle = False
ledbat-sink.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def pause_writing(self):
        logging.warn("UDP SOCKET OVER HIGH-WATER MARK")


问题


面经


文章

微信
公众号

扫码关注公众号