python类print_()的实例源码

test_tailchaser.py 文件源码 项目:tailchaser 作者: thanos 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_filter():
    work_dir = tempfile.mkdtemp(prefix='tail-test_filter-tail_from_dir')
    six.print_('generating log files', work_dir)
    test_file = os.path.join(work_dir, 'test.log')
    with open(test_file, 'wb') as file_to_tail:
        file_name = file_to_tail.name
        six.print_('file to tail with filter', file_to_tail)
        for i in range(1000):
            if i % 2 == 0:
                line = "odd: %d\n" % i
            else:
                line = "even: %d\n" % i
            file_to_tail.write(line)

    def consumer_gen():
        global filter_count
        while True:
            record = yield ()
            filter_count += 1

    consumer = consumer_gen()
    consumer.send(None)
    vargs = [__name__, '--only-backfill', '--clear-checkpoint', '--filter-re=odd:\\s*\\d+']
    vargs.append(test_file)
    main(vargs, consumer)
    assert (500 == filter_count)
test_tailchaser.py 文件源码 项目:tailchaser 作者: thanos 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_backfill(log_handler=RotatingWithDelayFileHandler, consumer=None, tail_to_dir=None, vargs=None):
    tail_from_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_from_dir')
    six.print_('generating log files', tail_from_dir)
    log_handler.generate(os.path.join(tail_from_dir, 'test.log'), BACKFILL_EMITS)
    if not tail_to_dir:
        tail_to_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_to_dir')

    if not consumer:
        def consumer_gen():
            while True:
                record = yield ()
                open(os.path.join(tail_to_dir, record[1][0]), 'ab').write(record[2])

        consumer = consumer_gen()
        consumer.send(None)
    six.print_('start tailer', tail_to_dir)
    source_pattern = os.path.join(tail_from_dir, '*')
    if not vargs:
        vargs = [__name__, '--only-backfill', '--clear-checkpoint']
    vargs.append(source_pattern)
    main(vargs, consumer)
    cmp_files(source_pattern, tail_to_dir, lambda x: Tailer.make_sig(x))
    six.print_('all done', tail_to_dir)
    # for src_file_path in glob.glob(source_pattern):
    #     dst_file_path = os.path.join(tail_to_dir, Tailer.make_sig(src_file_path))
    #     six.print_("testing:", src_file_path, dst_file_path)
    #     assert (Tailer.file_opener(src_file_path).read() == Tailer.file_opener(dst_file_path).read())
log.py 文件源码 项目:jira_reporting_scripts 作者: andrew-hamlin-sp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def eprint (msg, *args, **kwargs):
        '''print args to stderr'''
        #print_(text_type(msg).encode(), file=sys.stderr, **kwargs)
        print_(msg, file=sys.stderr, **kwargs)
svm.py 文件源码 项目:Particle-Picking-Cryo-EM 作者: hqythu 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def main():
    six.print_('loading data')
    train_x, train_y, val_x, val_y = load_data()
    train_x = train_x.reshape(-1, 64 * 64)
    val_x = val_x.reshape(-1, 64 * 64)
    six.print_('load data complete')

    six.print_('start PCA')
    try:
        pca = pickle.load(open('pca.pickle', 'rb'))
    except:
        pca = decomposition.PCA(n_components=8*8)
        pca.fit(train_x[:])
    train_x = pca.transform(train_x)
    six.print_('PCA complete')

    clf = SVC(C=0.0001, kernel='linear', verbose=True, max_iter=100)
    six.print_('start training')
    clf.fit(train_x, train_y)
    six.print_('training complete')

    val_x = pca.transform(val_x)
    acc = sum(val_y == clf.predict(val_x)) / float(len(val_y))
    print(acc)

    pickle.dump(pca, open('pca.pickle', 'wb'))
    pickle.dump(clf, open('svm.pickle', 'wb'))
train.py 文件源码 项目:Particle-Picking-Cryo-EM 作者: hqythu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(description='Train a neural network')

    parser.add_argument('--model', type=str)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--decay', type=float, default=1e-4)
    parser.add_argument('--momentum', type=float, default=0.9)
    parser.add_argument('--batch', type=int, default=128)
    parser.add_argument('--epoch', type=int, default=100)
    parser.add_argument('--output', type=str, default='weight')
    args = parser.parse_args()

    model = importlib.import_module(args.model).build()

    six.print_('loading data')
    (train_x, train_y, val_x, val_y) = load_data()
    six.print_('load data complete')

    sgd = SGD(lr=args.lr,
              decay=args.decay,
              momentum=args.momentum,
              nesterov=True)
    model.compile(loss='binary_crossentropy', optimizer=sgd,
                  metrics=['accuracy'])
    six.print_('build model complete')

    six.print_('start training')
    model.fit(train_x, train_y, batch_size=args.batch, nb_epoch=args.epoch,
              verbose=2,
              shuffle=True,
              validation_data=(val_x, val_y))
    model.save_weights(args.output + '.hdf5')
multiprocessing_executor.py 文件源码 项目:botoflow 作者: boto 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def join(self):
        """Will wait till all the processes are terminated
        """
        try:
            self._process_queue.join()
        except KeyboardInterrupt:
            six.print_("\nTerminating, please wait...", file=sys.stderr)
            self._process_queue.join()
        self._running = False
test_six.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_print_():
    save = sys.stdout
    out = sys.stdout = six.moves.StringIO()
    try:
        six.print_("Hello,", "person!")
    finally:
        sys.stdout = save
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out)
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, end="")
    assert out.getvalue() == "Hello, person!"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, sep="X")
    assert out.getvalue() == "Hello,Xperson!\n"
    out = six.StringIO()
    six.print_(six.u("Hello,"), six.u("person!"), file=out)
    result = out.getvalue()
    assert isinstance(result, six.text_type)
    assert result == six.u("Hello, person!\n")
    six.print_("Hello", file=None) # This works.
    out = six.StringIO()
    six.print_(None, file=out)
    assert out.getvalue() == "None\n"
    class FlushableStringIO(six.StringIO):
        def __init__(self):
            six.StringIO.__init__(self)
            self.flushed = False
        def flush(self):
            self.flushed = True
    out = FlushableStringIO()
    six.print_("Hello", file=out)
    assert not out.flushed
    six.print_("Hello", file=out, flush=True)
    assert out.flushed
test_six.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_print_encoding(monkeypatch):
    # Fool the type checking in print_.
    monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
    out = six.BytesIO()
    out.encoding = "utf-8"
    out.errors = None
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\xd4\xbc")
    out = six.BytesIO()
    out.encoding = "ascii"
    out.errors = "strict"
    py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
    out.errors = "backslashreplace"
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\\u053c")
test_six.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_print_exceptions():
    py.test.raises(TypeError, six.print_, x=3)
    py.test.raises(TypeError, six.print_, end=3)
    py.test.raises(TypeError, six.print_, sep=42)
retrieve_tweets.py 文件源码 项目:humor 作者: micyril 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def read_ids(filename):
    try:
        with open(filename) as f:
            return [int(id) for id in f.read().split()]
    except IOError as e:
        six.print_("Can't read the file", filename)
        exit(1)
retrieve_tweets.py 文件源码 项目:humor 作者: micyril 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def retrieve_statuses(api, ids_portion):
    try:
        return api.statuses_lookup(ids_portion)
    except tweepy.error.RateLimitError:
        six.print_("\nRate limit has been achieved. Waiting...")
        while True:
            try:
                return api.statuses_lookup(ids_portion)
            except tweepy.error.RateLimitError:
                time.sleep(30)
    except tweepy.error.TweepError as e:
        six.print_("Failed to look up:", str(e))
        exit(1)
test_six.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_print_():
    save = sys.stdout
    out = sys.stdout = six.moves.StringIO()
    try:
        six.print_("Hello,", "person!")
    finally:
        sys.stdout = save
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out)
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, end="")
    assert out.getvalue() == "Hello, person!"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, sep="X")
    assert out.getvalue() == "Hello,Xperson!\n"
    out = six.StringIO()
    six.print_(six.u("Hello,"), six.u("person!"), file=out)
    result = out.getvalue()
    assert isinstance(result, six.text_type)
    assert result == six.u("Hello, person!\n")
    six.print_("Hello", file=None) # This works.
    out = six.StringIO()
    six.print_(None, file=out)
    assert out.getvalue() == "None\n"
    class FlushableStringIO(six.StringIO):
        def __init__(self):
            six.StringIO.__init__(self)
            self.flushed = False
        def flush(self):
            self.flushed = True
    out = FlushableStringIO()
    six.print_("Hello", file=out)
    assert not out.flushed
    six.print_("Hello", file=out, flush=True)
    assert out.flushed
test_six.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_print_encoding(monkeypatch):
    # Fool the type checking in print_.
    monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
    out = six.BytesIO()
    out.encoding = "utf-8"
    out.errors = None
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\xd4\xbc")
    out = six.BytesIO()
    out.encoding = "ascii"
    out.errors = "strict"
    py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
    out.errors = "backslashreplace"
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\\u053c")
test_six.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_print_exceptions():
    py.test.raises(TypeError, six.print_, x=3)
    py.test.raises(TypeError, six.print_, end=3)
    py.test.raises(TypeError, six.print_, sep=42)
test_mediawiki_writer.py 文件源码 项目:pytablewriter 作者: thombashi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def simple_write_callback(iter_count, iteration_length):
    import six

    six.print_("{:d}/{:d}".format(iter_count, iteration_length))
__main__.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def examples(directory):
    """
    Generate example strategies to target folder
    """
    source_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "examples")

    try:
        shutil.copytree(source_dir, os.path.join(directory, "examples"))
    except OSError as e:
        if e.errno == errno.EEXIST:
            six.print_("Folder examples is exists.")
__main__.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def version(**kwargs):
    """
    Output Version Info
    """
    from rqalpha import version_info
    six.print_("Current Version: ", version_info)
__main__.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def generate_config(directory):
    """
    Generate default config file
    """
    default_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yml")
    target_config_path = os.path.abspath(os.path.join(directory, 'config.yml'))
    shutil.copy(default_config, target_config_path)
    six.print_("Config file has been generated in", target_config_path)


# For Mod Cli
main.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def output_profile_result(env):
    stdout_trap = six.StringIO()
    env.profile_deco.print_stats(stdout_trap)
    profile_output = stdout_trap.getvalue()
    profile_output = profile_output.rstrip()
    six.print_(profile_output)
    env.event_bus.publish_event(Event(EVENT.ON_LINE_PROFILER_RESULT, result=profile_output))
daybar_store.py 文件源码 项目:rqalpha 作者: ricequant 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_bars(self, order_book_id, fields=None):
        try:
            s, e = self._index[order_book_id]
        except KeyError:
            six.print_(_(u"No data for {}").format(order_book_id))
            return

        if fields is None:
            # the first is date
            fields = self._table.names[1:]

        if len(fields) == 1:
            return self._converter.convert(fields[0], self._table.cols[fields[0]][s:e])

        # remove datetime if exist in fields
        self._remove_(fields, 'datetime')

        dtype = np.dtype([('datetime', np.uint64)] +
                         [(f, self._converter.field_type(f, self._table.cols[f].dtype))
                          for f in fields])
        result = np.empty(shape=(e - s, ), dtype=dtype)
        for f in fields:
            result[f] = self._converter.convert(f, self._table.cols[f][s:e])
        result['datetime'] = self._table.cols['date'][s:e]
        result['datetime'] *= 1000000

        return result


问题


面经


文章

微信
公众号

扫码关注公众号