python类error()的实例源码

proxy.py 文件源码 项目:Proxy46 作者: Macronut 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def main():
    loadconfig()

    proxy_server = ThreadingTCPServer(('', 1081), ProxyServer)
    thread = threading.Thread(target=proxy_server.serve_forever)
    thread.daemon = True
    thread.start()
    print 'proxy running'

    resolver = DNSProxyResolver(DefaultDNS, 53, 10)
    logger = DNSLogger(log='-log_request, -log_reply', prefix=False)
    dns_server = DNSServer(resolver, port=5353, address='127.0.0.1', logger=logger, handler=DNSHandler)
    dns_server.start_thread()
    print 'dns running'

    try:
        while True:
            time.sleep(600)
            sys.stderr.flush()
            sys.stdout.flush()
    except socket.error, e:
        logging.error(e)
    except KeyboardInterrupt:
        #server.shutdown()
        sys.exit(0)
recipe-578154.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
            "Execute method on contents with provided arguments."
            name, error, buffer = self.__name, False, []
            for item in self.__array:
                attr = getattr(item, name)
                try:
                    data = attr(*args, **kwargs)
                except Exception as problem:
                    error = problem
                else:
                    if not error:
                        buffer.append(data)
            if error:
                raise error
            return tuple(buffer)

################################################################################

# Provide a way of converting byte sizes into strings.
t.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
        if 'Referer' in self.request.headers:
            go = self.request.headers['Referer']
        else:
            go = '/'
        member = CheckAuth(self)
        if member:
            if member.twitter_oauth == 1:
                status = self.request.arguments['status'][0]
                if len(status) > 140:
                    status = status[0:140]
                access_token = OAuthToken.from_string(member.twitter_oauth_string)
                twitter = OAuthApi(CONSUMER_KEY, CONSUMER_SECRET, access_token)
                try:
                    twitter.PostUpdate(status.encode('utf-8'))
                    memcache.delete('member::' + str(member.num) + '::twitter::home')
                except:
                    logging.error('Failed to tweet: ' + status)
                self.redirect(go)
            else:
                self.redirect('/twitter/link')
        else:
            self.redirect('/')
mail.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def receive(self, message):
        bodies = message.bodies(content_type = 'text/plain')
        for body in bodies:
            to = extract_address(message.to)
            sender = extract_address(message.sender.lower())
            if to[0:5].lower() == 'tweet':
                #q = db.GqlQuery("SELECT * FROM Member WHERE email = :1", sender)
                q = Member.selectBy(email=sender)
                if q.count() == 1:
                    member = q[0]
                    if member.twitter_oauth == 1:
                        access_token = OAuthToken.from_string(member.twitter_oauth_string)
                        twitter = OAuthApi(CONSUMER_KEY, CONSUMER_SECRET, access_token)
                        status = body[1].decode()
                        if len(status) > 140:
                            status = status[0:140]
                        try:
                            logging.info("About to send tweet: " + status)
                            twitter.PostUpdate(status.encode('utf-8'))
                            logging.info("Successfully tweet: " + status)
                        except:
                            logging.error("Failed to tweet for " + member.username)
                else:
                    logging.error("User " + sender + " doesn't have Twitter link.")
visualize.py 文件源码 项目:mbin 作者: fanglab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __check_input( opts, args, parser ):
    """
    Make sure the input is in the form of either a cmp.h5 file of aligned reads
    or a FOFN of unaligned bas.h5 files. Also make sure that a reference fasta 
    file is specified if 
    """
    if len(args)!=2:
        print "ERROR -- expecting two arguments: \
                 (1) <SEQ>_methyl_features.txt output from methylprofiles containing methylation features for mapping \
                 (2) <SEQ>_other_features.txt output from methylprofiles containing alternative sequence features for mapping"

    mfeats_fn  = args[0]
    ofeats_fn  = args[1]
    feature_type = None

    if not os.path.exists(mfeats_fn):
        parser.error("Can't find file of sequence features (methylprofiles output) for mapping: %s" % mfeats_fn)

    if not os.path.exists(ofeats_fn):
        parser.error("Can't find file of sequence features (methylprofiles output) for mapping: %s" % ofeats_fn)

    return mfeats_fn, ofeats_fn
db_transfer.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pull_db_all_user(self):
        import json
        rows = None

        config_path = get_config().MUDB_FILE
        with open(config_path, 'rb+') as f:
            rows = json.loads(f.read().decode('utf8'))
            for row in rows:
                try:
                    if 'forbidden_ip' in row:
                        row['forbidden_ip'] = common.IPNetwork(row['forbidden_ip'])
                except Exception as e:
                    logging.error(e)
                try:
                    if 'forbidden_port' in row:
                        row['forbidden_port'] = common.PortRange(row['forbidden_port'])
                except Exception as e:
                    logging.error(e)

        if not rows:
            logging.warn('no user in json file')
        return rows
eventloop.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def errno_from_exception(e):
    """Provides the errno from an Exception object.

    There are cases that the errno attribute was not set so we pull
    the errno out of the args but if someone instatiates an Exception
    without any args you will get a tuple error. So this function
    abstracts all that behavior to give you a safe way to get the
    errno.
    """

    if hasattr(e, 'errno'):
        return e.errno
    elif e.args:
        return e.args[0]
    else:
        return None


# from tornado
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_config(conf_file):
    if not os.path.isfile(conf_file):
        logging.error('Invalid config file: %s.' % conf_file)
        return False
    return yaml.load(open(conf_file).read())
storage.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 71 收藏 0 点赞 0 评论 0
def logging(func):
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception, e:
                logging.error("%s : %s", func.__name__, str(e))
                return False
        return wrapper
helper_functions.py 文件源码 项目:AFSCbot 作者: HadManySons 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def print_and_log(text, error=False):
    print(text)
    if error:
        logging.error(time.strftime(LOG_TIME_FORMAT) + text)
    else:
        logging.info(time.strftime(LOG_TIME_FORMAT) + text)
server.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def localnet_register(host, port):
    '''
    Runs a never-exiting thread which only registers a local network service
    via Zeroconf and then responds to info requests.
    '''
    try:
        from zeroconf import ServiceInfo, Zeroconf
        from time import sleep
    except ImportError as e:
        logging.error(
            'Zeroconf not installed, cannot register this server on the local '
            'network. Other players may still connect, but they must be told '
            'what your hostname and port are (hostname: {}, port: {})'.format(
                host, port))
        return

    advertised_interface = local_address('127.0.0.1')

    info = ServiceInfo(
        "_defusedivision._tcp.local.",
        "{}{}._defusedivision._tcp.local.".format(
            host.replace('.', '-'), advertised_interface.replace('.', '-')),
        address=socket.inet_aton(advertised_interface),
        port=int(port),
        weight=0,
        priority=0,
        properties=b"")

    zc = Zeroconf()
    zc.register_service(info)
    atexit.register(lambda: zc.close())
    while True:
        sleep(0.1)
concurrency.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def concurrent(f):
    """Concurrent is a decorator for a function which will cause that function
    to immediately return when called, but be left running in 'in the
    background'. It is intended as a functional equivelent to the 'go func()'
    syntax in the Go programming language."""
    def err_logger(*args, **kwargs):
        '''
        err_logger logs uncaught exceptions, which is nice to have in long
        running processes in other threads.
        '''
        try:
            f(*args, **kwargs)
        except Exception as e:
            logging.error(e, exc_info=True)

    def rv(*args, **kwargs):
        t = threading.Thread(target=err_logger, args=(args), kwargs=kwargs)
        t.daemon = True
        t.start()
    return rv
display.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def build_contents(cell, player):
    """
    Function build_contents returns a Glyph representing the contents of a
    cell, based on the state of that cell and the player who owns that cell.
    """
    x = ((1 + len(cell['contents'])) * cell['x']) + 1
    y = (2 * cell['y']) + 1
    rv = Glyph(x, y, cell['contents'])
    rv.attr = get_colorpair('black-white')

    # Probed cells show the number of cells they touch and an appropriate color
    if cell['probed']:
        mine_contacts = sum(
            [int(v == True) for _, v in cell['neighbors'].items()])
        # print(mine_contacts)
        rv.strng = " {} ".format(mine_contacts)
        rv.attr = contacts_color(mine_contacts)

    # If our cell's selected, mark it red
    if [cell['x'], cell['y']] == player['minefield']['selected']:
        # logging.error("Selected x,y: {} {}".format(cell['x'], cell['y']))
        rv.attr = get_colorpair('white-red')

    if not cell['probed']:
        rv.strng = Contents.empty
    if cell['flagged']:
        rv.strng = Contents.flag

    if not player['living']:
        if cell['contents'] == Contents.mine:
            rv.strng = Contents.mine
    return rv
main.py 文件源码 项目:pyradio-nepal 作者: sandipbgt 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run():
    """
    starts the player
    """
    radio = utils.get_player()
    if not radio:
        logging.error("Player not available, exiting now!")
        sys.exit(0)

    stations = utils.get_stations()
    play = True
    current = None
    while play:
        user_input = input("Enter station number ({}) or type station name to search> ".format(current)).strip()

        if user_input == "exit":
            radio.close()
            play = False
            sys.exit(0)
        elif user_input == 'list':
            utils.pretty_print_stations(stations)
            continue
        try:
            num = int(user_input)
            if num > 0:
                try:
                    station = stations[num - 1]
                    radio.play(station['stream_url'])
                    print("Playing: {} @ {} MHz, {}".format(station['name'], station['frequency'], station['location']))
                    current = "{}. {}".format(station['count'], station['name'])
                except IndexError:
                    print("Invalid station number")
        except ValueError:
            utils.pretty_print_stations(stations, user_input)
            user_input = 0
notification.py 文件源码 项目:logscan 作者: magedu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def send_mail(self):
        threading.Thread(target=self._send_mail, name='send-mail-real').start()
        while not self.__event.is_set():
            with self.__cond:
                self.__cond.wait()
                if 'mail' in self.message.type:
                    try:
                        self.__mail_queue.put(self.message, timeout=1)
                    except Full:
                        logging.error('mail queue is full')
check.py 文件源码 项目:logscan 作者: magedu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def match(self, checker):
        queue = Queue()
        self.queues[checker.name] = queue
        threading.Thread(target=self._match, args=(checker, )).start()
        while not self.events[checker.name].is_set():
            with self.__cond:
                self.__cond.wait()
                try:
                    queue.put_nowait(self.line)
                except Full:
                    logging.error("match queue for {0} full".format(checker.name))
tsproxy.py 文件源码 项目:tsproxy 作者: WPO-Foundation 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def HandleResolved(self, message):
    global dns_cache
    if self.state == self.STATE_RESOLVING:
      if 'addresses' in message and len(message['addresses']):
        self.state = self.STATE_CONNECTING
        self.addresses = message['addresses']
        dns_cache[self.hostname] = {'addresses': self.addresses, 'localhost': message['localhost']}
        logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
        self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port, 'localhost': message['localhost']})
      else:
        # Send host unreachable error
        self.state = self.STATE_ERROR
        self.buffer += chr(0x05) + chr(0x04) + self.requested_address
        self.handle_write()
tsproxy.py 文件源码 项目:tsproxy 作者: WPO-Foundation 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def signal_handler(signal, frame):
  global server
  global must_exit
  logging.error('Exiting...')
  must_exit = True
  del server


# Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
maas_common.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def status_err(message=None, force_print=False, exception=None):
    if exception:
        # a status message cannot exceed 256 characters
        # 'error ' plus up to 250 from the end of the exception
        message = message[-250:]
    status('error', message, force_print=force_print)
    if exception:
        raise exception
    sys.exit(1)
includehandler.py 文件源码 项目:kas 作者: siemens 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_config(self, repos=None):
        """
        Parameters:
          repos -- A dictionary that maps repo name to directory path

        Returns:
          (config, repos)
            config -- A dictionary containing the configuration
            repos -- A list of missing repo names that are needed \
                     to create a complete configuration
        """
        # pylint: disable=no-self-use,unused-argument

        logging.error('get_config is not implemented')
        raise NotImplementedError()


问题


面经


文章

微信
公众号

扫码关注公众号