python类info()的实例源码

kubectl_util.py 文件源码 项目:benchmarks 作者: tensorflow 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _PrintLogs(pod_name_prefix, job_name):
  """Prints pod logs.

  If a pod has been restarted, prints logs from previous run. Otherwise,
  prints the logs from current run. We print logs for pods selected
  based on pod_name_prefix and job_name.

  Args:
    pod_name_prefix: value of 'name-prefix' selector.
    job_name: value of 'job' selector.
  """
  for pod_name in _GetPodNames(pod_name_prefix, job_name):
    try:
      # Get previous logs.
      logs_command = [_KUBECTL, 'logs', '-p', pod_name]
      logging.info('Command to get logs: %s', ' '.join(logs_command))
      output = subprocess.check_output(logs_command, universal_newlines=True)
    except subprocess.CalledProcessError:
      # We couldn't get previous logs, so we will try to get current logs.
      logs_command = [_KUBECTL, 'logs', pod_name]
      logging.info('Command to get logs: %s', ' '.join(logs_command))
      output = subprocess.check_output(logs_command, universal_newlines=True)
    print('%s logs:' % pod_name)
    print(output)
orm.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def select(sql, args, size=None):
    log(sql, args)
    # ??????????????????with???????????conn?????????
    async with __pool.get() as conn:
        # ????????DictCursor????dict?????????????????????SQL
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute(sql.replace('?', '%s'), args)  # ?sql??'?'???'%s'???mysql????????%s
            # ????size
            if size:
                resultset = await cur.fetchmany(size)  # ???????????
            else:
                resultset = await cur.fetchall()      # ????????
        logging.info('rows returned: %s' % len(resultset))
        return resultset

# ??SQL?INSERT INTO?UPDATE?DELETE???execute???????????????
api.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def oauth2(code):
    url = 'https://api.weibo.com/oauth2/access_token'
    payload = {
        'client_id': '366603916',
        'client_secret': 'b418efbd77094585d0a7f9ccac98a706',
        'grant_type': 'authorization_code',
        'code': code,
        'redirect_uri': 'http://www.qiangtaoli.com'
    }
    with ClientSession() as session:
        async with session.post(url, data=payload) as resp:
            params = await resp.json()
        async with session.get('https://api.weibo.com/2/users/show.json', params=params) as resp:
            info = await resp.json()
        o = await Oauth.find('weibo-' + info['idstr'])
        if not o:
            return 'redirect:/bootstrap/register?oid=weibo-%s&name=%s&image=%s' % (info['idstr'], info['name'], info['avatar_large'])
        user = await User.find(o.user_id)
        if not user:
            return 'oauth user was deleted.'
        return user.signin(web.HTTPFound('/'))


# ????
models.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def find_by_cookie(cls, cookie_str):
        if not cookie_str:
            return None
        try:
            L = cookie_str.split('-')
            if len(L) != 3:
                return None
            uid, expires, sha1 = L
            if int(expires) < time.time():
                return None
            user = await cls.find(uid)
            if not user:
                return None
            s = '%s-%s-%s-%s' % (uid, user.get('password'), expires, COOKIE_KEY)
            if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
                logging.info('invalid sha1')
                return None
            user.password = '******'
            return user
        except Exception as e:
            logging.exception(e)
            return None


# ?????
__init__.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init_jinja2(app, **kw):
    logging.info('init jinja2...')
    options = {
        'autoescape': kw.get('autoescape', True),
        'block_start_string': kw.get('block_start_string', '{%'),
        'block_end_string': kw.get('block_end_string', '%}'),
        'variable_start_string': kw.get('variable_start_string', '{{'),
        'variable_end_string': kw.get('variable_end_string', '}}'),
        'auto_reload': kw.get('auto_reload', True)
    }
    path = kw.get('path', os.path.join(__path__[0], 'templates'))
    logging.info('set jinja2 template path: %s' % path)
    env = Environment(loader=FileSystemLoader(path), **options)
    filters = kw.get('filters')
    if filters is not None:
        for name, ftr in filters.items():
            env.filters[name] = ftr
    app['__templating__'] = env
__init__.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_server(loop, config_mod_name):
    try:
        config = __import__(config_mod_name, fromlist=['get config'])
    except ImportError as e:
        raise e

    await create_pool(loop, **config.db_config)
    app = web.Application(loop=loop, middlewares=[
        logger_factory, auth_factory, data_factory, response_factory])
    add_routes(app, 'app.route')
    add_routes(app, 'app.api')
    add_routes(app, 'app.api_v2')
    add_static(app)
    init_jinja2(app, filters=dict(datetime=datetime_filter, marked=marked_filter), **config.jinja2_config)
    server = await loop.create_server(app.make_handler(), '127.0.0.1', 9900)
    logging.info('server started at http://127.0.0.1:9900...')
    return server
sessions.py 文件源码 项目:python-freezerclient 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def take_action(self, parsed_args):
        try:
            self.app.client.sessions.remove_job(parsed_args.session_id,
                                                parsed_args.job_id)
        except Exception as error:
            # there is an error coming from the api when a job is removed
            # with the following text:
            # Additional properties are not allowed
            # ('job_event' was unexpected)
            # but in reality the job gets removed correctly.
            if 'Additional properties are not allowed' in error.message:
                pass
            else:
                raise exceptions.ApiClientException(error.message)
        else:
            logging.info('Job {0} removed correctly from session {1}'.format(
                parsed_args.job_id, parsed_args.session_id))
sudoku_steps.py 文件源码 项目:pyku 作者: dubvulture 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def extract_digits(self, image):
        """
        Extract digits from a binary image representing a sudoku
        :param image: binary image/sudoku
        :return: array of digits and their probabilities
        """
        prob = np.zeros(4, dtype=np.float32)
        digits = np.zeros((4, 9, 9), dtype=object)
        for i in range(4):
            labeled, features = label(image, structure=CROSS)
            objs = find_objects(labeled)
            for obj in objs:
                roi = image[obj]
                # center of bounding box
                cy = (obj[0].stop + obj[0].start) / 2
                cx = (obj[1].stop + obj[1].start) / 2
                dists = cdist([[cy, cx]], CENTROIDS, 'euclidean')
                pos = np.argmin(dists)
                cy, cx = pos % 9, pos / 9
                # 28x28 image, center relative to sudoku
                prediction = self.classifier.classify(morph(roi))
                if digits[i, cy, cx] is 0:
                    # Newly found digit
                    digits[i, cy, cx] = prediction
                    prob[i] += prediction[0, 0]
                elif prediction[0, 0] > digits[i, cy, cx][0, 0]:
                    # Overlapping! (noise), choose the most probable prediction
                    prob[i] -= digits[i, cy, cx][0, 0]
                    digits[i, cy, cx] = prediction
                    prob[i] += prediction[0, 0]
            image = np.rot90(image)
        logging.info(prob)
        return digits[np.argmax(prob)]
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def clone_helpers(work_dir, branch):
    dest = os.path.join(work_dir, 'charm-helpers')
    logging.info('Checking out %s to %s.' % (branch, dest))
    cmd = ['bzr', 'checkout', '--lightweight', branch, dest]
    subprocess.check_call(cmd)
    return dest
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def ensure_init(path):
    '''
    ensure directories leading up to path are importable, omitting
    parent directory, eg path='/hooks/helpers/foo'/:
        hooks/
        hooks/helpers/__init__.py
        hooks/helpers/foo/__init__.py
    '''
    for d, dirs, files in os.walk(os.path.join(*path.split('/')[:2])):
        _i = os.path.join(d, '__init__.py')
        if not os.path.exists(_i):
            logging.info('Adding missing __init__.py: %s' % _i)
            open(_i, 'wb').close()
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def sync_pyfile(src, dest):
    src = src + '.py'
    src_dir = os.path.dirname(src)
    logging.info('Syncing pyfile: %s -> %s.' % (src, dest))
    if not os.path.exists(dest):
        os.makedirs(dest)
    shutil.copy(src, dest)
    if os.path.isfile(os.path.join(src_dir, '__init__.py')):
        shutil.copy(os.path.join(src_dir, '__init__.py'),
                    dest)
    ensure_init(dest)
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def sync_directory(src, dest, opts=None):
    if os.path.exists(dest):
        logging.debug('Removing existing directory: %s' % dest)
        shutil.rmtree(dest)
    logging.info('Syncing directory: %s -> %s.' % (src, dest))

    shutil.copytree(src, dest, ignore=get_filter(opts))
    ensure_init(dest)
add_albums.py 文件源码 项目:decouvrez_django 作者: oc-courses 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        reference = 0
        # open file with data
        directory = os.path.dirname(os.path.dirname(__file__))
        path = os.path.join(directory, 'data', 'albums.yml')
        with open(path, 'r') as file:
            data = yaml.load(file)
            albums = data['albums']
            for album in albums:
                # Create artists
                artists = []
                for artist in album['artists']:
                    try:
                        stored_artist = Artist.objects.get(name=artist)
                        lg.info('Artist found: %s'%stored_artist)
                    except ObjectDoesNotExist:
                        stored_artist = Artist.objects.create(name=artist)
                        lg.info('Artist created: %s'%stored_artist)
                    artists.append(stored_artist)
                # Find or create album
                try:
                    stored_album = Album.objects.get(title=album['title'])
                    lg.info('Album found: %s'%stored_album.title)
                except ObjectDoesNotExist:
                    reference += 1
                    album = Album.objects.create(
                        title=album['title'],
                        reference=reference,
                        picture=album['picture']
                    )
                    album.artists = artists
                    lg.info('New album: %s'%stored_artist)
sendmsg.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    filesidx = 1
    outbuffer = sys.stdout

    files = sys.argv[filesidx:]

    d = {
        'action': 'ParseAST',
        'filepath': '',
        'content': '',
        'language': 'python',
    }

    for f in files:
        content = ''
        logging.info(f)
        for encoding in ('utf_8', 'iso8859_15', 'iso8859_15', 'gb2313',
                         'cp1251', 'cp1252', 'cp1250', 'shift-jis', 'gbk', 'cp1256',
                         'iso8859-2', 'euc_jp', 'big5', 'cp874', 'euc_kr', 'iso8859_7',
                         'cp1255'):
            with open(f, encoding=encoding) as infile:
                try:
                    content = infile.read()
                    break
                except UnicodeDecodeError:
                    continue

        d.update({
            'filepath': f,
            'content': content,
        })

        json.dump(d, sys.stdout, ensure_ascii=False)
        outbuffer.write('\n')
    outbuffer.close()
helper_functions.py 文件源码 项目:AFSCbot 作者: HadManySons 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_and_log(text, error=False):
    print(text)
    if error:
        logging.error(time.strftime(LOG_TIME_FORMAT) + text)
    else:
        logging.info(time.strftime(LOG_TIME_FORMAT) + text)
DownvoteRemover.py 文件源码 项目:AFSCbot 作者: HadManySons 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def proccessComments():
    for comment in reddit.redditor(str(reddit.user.me())).comments.new(limit=None):
        #if comment score is below the threshold, delete it
        if comment.score < deleteThreshold:
            comment.delete()

            permalink = "http://www.reddit.com" + \
                                   comment.permalink() + "/"

            print("Deleting comment: " + permalink)
            logging.info(time.strftime("%Y/%m/%d %H:%M:%S ") +
                         "Deleting comment: " + permalink)
server.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def local_address(fallback):
    """Returns the local address of this computer."""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 53))
        interface = s.getsockname()[0]
        s.close()
    except OSError:
        interface = fallback
        logging.info(
            'Cannot connect to network determine interface, using fallback "{}"'.
            format(fallback))

    return interface
server.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def localnet_register(host, port):
    '''
    Runs a never-exiting thread which only registers a local network service
    via Zeroconf and then responds to info requests.
    '''
    try:
        from zeroconf import ServiceInfo, Zeroconf
        from time import sleep
    except ImportError as e:
        logging.error(
            'Zeroconf not installed, cannot register this server on the local '
            'network. Other players may still connect, but they must be told '
            'what your hostname and port are (hostname: {}, port: {})'.format(
                host, port))
        return

    advertised_interface = local_address('127.0.0.1')

    info = ServiceInfo(
        "_defusedivision._tcp.local.",
        "{}{}._defusedivision._tcp.local.".format(
            host.replace('.', '-'), advertised_interface.replace('.', '-')),
        address=socket.inet_aton(advertised_interface),
        port=int(port),
        weight=0,
        priority=0,
        properties=b"")

    zc = Zeroconf()
    zc.register_service(info)
    atexit.register(lambda: zc.close())
    while True:
        sleep(0.1)
server.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def send_input(self, inpt):
        # Just pass the input to the parent bout, but with info saying that
        # this input comes from this player
        logging.debug(inpt)
        self.bout.send_input({'player': self.name, 'input': inpt})


问题


面经


文章

微信
公众号

扫码关注公众号