def test_filter():
work_dir = tempfile.mkdtemp(prefix='tail-test_filter-tail_from_dir')
six.print_('generating log files', work_dir)
test_file = os.path.join(work_dir, 'test.log')
with open(test_file, 'wb') as file_to_tail:
file_name = file_to_tail.name
six.print_('file to tail with filter', file_to_tail)
for i in range(1000):
if i % 2 == 0:
line = "odd: %d\n" % i
else:
line = "even: %d\n" % i
file_to_tail.write(line)
def consumer_gen():
global filter_count
while True:
record = yield ()
filter_count += 1
consumer = consumer_gen()
consumer.send(None)
vargs = [__name__, '--only-backfill', '--clear-checkpoint', '--filter-re=odd:\\s*\\d+']
vargs.append(test_file)
main(vargs, consumer)
assert (500 == filter_count)
python类print_()的实例源码
def test_backfill(log_handler=RotatingWithDelayFileHandler, consumer=None, tail_to_dir=None, vargs=None):
tail_from_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_from_dir')
six.print_('generating log files', tail_from_dir)
log_handler.generate(os.path.join(tail_from_dir, 'test.log'), BACKFILL_EMITS)
if not tail_to_dir:
tail_to_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_to_dir')
if not consumer:
def consumer_gen():
while True:
record = yield ()
open(os.path.join(tail_to_dir, record[1][0]), 'ab').write(record[2])
consumer = consumer_gen()
consumer.send(None)
six.print_('start tailer', tail_to_dir)
source_pattern = os.path.join(tail_from_dir, '*')
if not vargs:
vargs = [__name__, '--only-backfill', '--clear-checkpoint']
vargs.append(source_pattern)
main(vargs, consumer)
cmp_files(source_pattern, tail_to_dir, lambda x: Tailer.make_sig(x))
six.print_('all done', tail_to_dir)
# for src_file_path in glob.glob(source_pattern):
# dst_file_path = os.path.join(tail_to_dir, Tailer.make_sig(src_file_path))
# six.print_("testing:", src_file_path, dst_file_path)
# assert (Tailer.file_opener(src_file_path).read() == Tailer.file_opener(dst_file_path).read())
def eprint (msg, *args, **kwargs):
'''print args to stderr'''
#print_(text_type(msg).encode(), file=sys.stderr, **kwargs)
print_(msg, file=sys.stderr, **kwargs)
def main():
six.print_('loading data')
train_x, train_y, val_x, val_y = load_data()
train_x = train_x.reshape(-1, 64 * 64)
val_x = val_x.reshape(-1, 64 * 64)
six.print_('load data complete')
six.print_('start PCA')
try:
pca = pickle.load(open('pca.pickle', 'rb'))
except:
pca = decomposition.PCA(n_components=8*8)
pca.fit(train_x[:])
train_x = pca.transform(train_x)
six.print_('PCA complete')
clf = SVC(C=0.0001, kernel='linear', verbose=True, max_iter=100)
six.print_('start training')
clf.fit(train_x, train_y)
six.print_('training complete')
val_x = pca.transform(val_x)
acc = sum(val_y == clf.predict(val_x)) / float(len(val_y))
print(acc)
pickle.dump(pca, open('pca.pickle', 'wb'))
pickle.dump(clf, open('svm.pickle', 'wb'))
def main():
parser = argparse.ArgumentParser(description='Train a neural network')
parser.add_argument('--model', type=str)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--decay', type=float, default=1e-4)
parser.add_argument('--momentum', type=float, default=0.9)
parser.add_argument('--batch', type=int, default=128)
parser.add_argument('--epoch', type=int, default=100)
parser.add_argument('--output', type=str, default='weight')
args = parser.parse_args()
model = importlib.import_module(args.model).build()
six.print_('loading data')
(train_x, train_y, val_x, val_y) = load_data()
six.print_('load data complete')
sgd = SGD(lr=args.lr,
decay=args.decay,
momentum=args.momentum,
nesterov=True)
model.compile(loss='binary_crossentropy', optimizer=sgd,
metrics=['accuracy'])
six.print_('build model complete')
six.print_('start training')
model.fit(train_x, train_y, batch_size=args.batch, nb_epoch=args.epoch,
verbose=2,
shuffle=True,
validation_data=(val_x, val_y))
model.save_weights(args.output + '.hdf5')
def join(self):
"""Will wait till all the processes are terminated
"""
try:
self._process_queue.join()
except KeyboardInterrupt:
six.print_("\nTerminating, please wait...", file=sys.stderr)
self._process_queue.join()
self._running = False
def test_print_():
save = sys.stdout
out = sys.stdout = six.moves.StringIO()
try:
six.print_("Hello,", "person!")
finally:
sys.stdout = save
assert out.getvalue() == "Hello, person!\n"
out = six.StringIO()
six.print_("Hello,", "person!", file=out)
assert out.getvalue() == "Hello, person!\n"
out = six.StringIO()
six.print_("Hello,", "person!", file=out, end="")
assert out.getvalue() == "Hello, person!"
out = six.StringIO()
six.print_("Hello,", "person!", file=out, sep="X")
assert out.getvalue() == "Hello,Xperson!\n"
out = six.StringIO()
six.print_(six.u("Hello,"), six.u("person!"), file=out)
result = out.getvalue()
assert isinstance(result, six.text_type)
assert result == six.u("Hello, person!\n")
six.print_("Hello", file=None) # This works.
out = six.StringIO()
six.print_(None, file=out)
assert out.getvalue() == "None\n"
class FlushableStringIO(six.StringIO):
def __init__(self):
six.StringIO.__init__(self)
self.flushed = False
def flush(self):
self.flushed = True
out = FlushableStringIO()
six.print_("Hello", file=out)
assert not out.flushed
six.print_("Hello", file=out, flush=True)
assert out.flushed
def test_print_encoding(monkeypatch):
# Fool the type checking in print_.
monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
out = six.BytesIO()
out.encoding = "utf-8"
out.errors = None
six.print_(six.u("\u053c"), end="", file=out)
assert out.getvalue() == six.b("\xd4\xbc")
out = six.BytesIO()
out.encoding = "ascii"
out.errors = "strict"
py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
out.errors = "backslashreplace"
six.print_(six.u("\u053c"), end="", file=out)
assert out.getvalue() == six.b("\\u053c")
def test_print_exceptions():
py.test.raises(TypeError, six.print_, x=3)
py.test.raises(TypeError, six.print_, end=3)
py.test.raises(TypeError, six.print_, sep=42)
def read_ids(filename):
try:
with open(filename) as f:
return [int(id) for id in f.read().split()]
except IOError as e:
six.print_("Can't read the file", filename)
exit(1)
def retrieve_statuses(api, ids_portion):
try:
return api.statuses_lookup(ids_portion)
except tweepy.error.RateLimitError:
six.print_("\nRate limit has been achieved. Waiting...")
while True:
try:
return api.statuses_lookup(ids_portion)
except tweepy.error.RateLimitError:
time.sleep(30)
except tweepy.error.TweepError as e:
six.print_("Failed to look up:", str(e))
exit(1)
def test_print_():
save = sys.stdout
out = sys.stdout = six.moves.StringIO()
try:
six.print_("Hello,", "person!")
finally:
sys.stdout = save
assert out.getvalue() == "Hello, person!\n"
out = six.StringIO()
six.print_("Hello,", "person!", file=out)
assert out.getvalue() == "Hello, person!\n"
out = six.StringIO()
six.print_("Hello,", "person!", file=out, end="")
assert out.getvalue() == "Hello, person!"
out = six.StringIO()
six.print_("Hello,", "person!", file=out, sep="X")
assert out.getvalue() == "Hello,Xperson!\n"
out = six.StringIO()
six.print_(six.u("Hello,"), six.u("person!"), file=out)
result = out.getvalue()
assert isinstance(result, six.text_type)
assert result == six.u("Hello, person!\n")
six.print_("Hello", file=None) # This works.
out = six.StringIO()
six.print_(None, file=out)
assert out.getvalue() == "None\n"
class FlushableStringIO(six.StringIO):
def __init__(self):
six.StringIO.__init__(self)
self.flushed = False
def flush(self):
self.flushed = True
out = FlushableStringIO()
six.print_("Hello", file=out)
assert not out.flushed
six.print_("Hello", file=out, flush=True)
assert out.flushed
def test_print_encoding(monkeypatch):
# Fool the type checking in print_.
monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
out = six.BytesIO()
out.encoding = "utf-8"
out.errors = None
six.print_(six.u("\u053c"), end="", file=out)
assert out.getvalue() == six.b("\xd4\xbc")
out = six.BytesIO()
out.encoding = "ascii"
out.errors = "strict"
py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
out.errors = "backslashreplace"
six.print_(six.u("\u053c"), end="", file=out)
assert out.getvalue() == six.b("\\u053c")
def test_print_exceptions():
py.test.raises(TypeError, six.print_, x=3)
py.test.raises(TypeError, six.print_, end=3)
py.test.raises(TypeError, six.print_, sep=42)
def simple_write_callback(iter_count, iteration_length):
import six
six.print_("{:d}/{:d}".format(iter_count, iteration_length))
def examples(directory):
"""
Generate example strategies to target folder
"""
source_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "examples")
try:
shutil.copytree(source_dir, os.path.join(directory, "examples"))
except OSError as e:
if e.errno == errno.EEXIST:
six.print_("Folder examples is exists.")
def version(**kwargs):
"""
Output Version Info
"""
from rqalpha import version_info
six.print_("Current Version: ", version_info)
def generate_config(directory):
"""
Generate default config file
"""
default_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yml")
target_config_path = os.path.abspath(os.path.join(directory, 'config.yml'))
shutil.copy(default_config, target_config_path)
six.print_("Config file has been generated in", target_config_path)
# For Mod Cli
def output_profile_result(env):
stdout_trap = six.StringIO()
env.profile_deco.print_stats(stdout_trap)
profile_output = stdout_trap.getvalue()
profile_output = profile_output.rstrip()
six.print_(profile_output)
env.event_bus.publish_event(Event(EVENT.ON_LINE_PROFILER_RESULT, result=profile_output))
def get_bars(self, order_book_id, fields=None):
try:
s, e = self._index[order_book_id]
except KeyError:
six.print_(_(u"No data for {}").format(order_book_id))
return
if fields is None:
# the first is date
fields = self._table.names[1:]
if len(fields) == 1:
return self._converter.convert(fields[0], self._table.cols[fields[0]][s:e])
# remove datetime if exist in fields
self._remove_(fields, 'datetime')
dtype = np.dtype([('datetime', np.uint64)] +
[(f, self._converter.field_type(f, self._table.cols[f].dtype))
for f in fields])
result = np.empty(shape=(e - s, ), dtype=dtype)
for f in fields:
result[f] = self._converter.convert(f, self._table.cols[f][s:e])
result['datetime'] = self._table.cols['date'][s:e]
result['datetime'] *= 1000000
return result