def test_timezone_in_plugin(capsys):
class ActivateImpl(MockedImpl):
TIMEZONE = 'Asia/Tokyo'
CRONTAB = [
'0 0 * * * .print_datetime',
]
def activate(self):
self.activate_crontab()
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
python类print_()的实例源码
def test_timezone_in_config(capsys):
class MockConfig(object):
TIMEZONE = 'Asia/Tokyo'
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * .print_datetime',
]
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
setattr(plugin, 'bot_config', MockConfig())
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def test_timezone_in_plugin(capsys):
class ActivateImpl(MockedImpl):
TIMEZONE = 'Asia/Tokyo'
CRONTAB = [
'0 0 * * * .print_datetime',
]
def activate(self):
self.activate_crontab()
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def test_timezone_in_config(capsys):
class MockConfig(object):
TIMEZONE = 'Asia/Tokyo'
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * .print_datetime',
]
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
setattr(plugin, 'bot_config', MockConfig())
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def echo(args, file=None):
'''
Echo a list of arguments (as given to ``subprocess.call``) to the given stream.
This defaults to ``stdout``, but can be changed to any stream-like object such as a file handle.
:param args: A string or list of strings
:param file: A file-like object to stream output to. Defaults to ``sys.stdout``
'''
if file is None:
file = sys.stdout
if isinstance(args, six.string_types + (six.text_type,)):
args = [args]
six.print_(*args, file=file, flush=True)
def sign(cls, args):
"""Sign."""
key = args.alg.kty.load(args.key.read())
args.key.close()
if args.protect is None:
args.protect = []
if args.compact:
args.protect.append('alg')
sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
protect=set(args.protect))
if args.compact:
six.print_(sig.to_compact().decode('utf-8'))
else: # JSON
six.print_(sig.json_dumps_pretty())
def verify(cls, args):
"""Verify."""
if args.compact:
sig = JWS.from_compact(sys.stdin.read().encode())
else: # JSON
try:
sig = JWS.json_loads(sys.stdin.read())
except errors.Error as error:
six.print_(error)
return -1
if args.key is not None:
assert args.kty is not None
key = args.kty.load(args.key.read()).public_key()
args.key.close()
else:
key = None
sys.stdout.write(sig.payload)
return not sig.verify(key=key)
def _ingest_pairs(self, pairs, oid2nid, frame_size, limit, single_sided):
oid2nid_v = np.vectorize(oid2nid.get)
# whole pairs set does not fit in memory, so split it in frames with `frame_size` number of pairs.
for start in range(0, limit, frame_size):
stop = frame_size + start
t1 = process_time()
six.print_('Fetching pairs {0}:{1} of {2} ... '.format(start, stop, limit), end='', flush=True)
raw_frame = pairs.read(start=start, stop=stop)
t2 = process_time()
six.print_('{0}s, Parsing ... '.format(int(t2 - t1)), flush=True)
frame = self._translate_frame(raw_frame, oid2nid_v, single_sided)
t3 = process_time()
six.print_('Writing ... '.format(int(t3 - t2)), flush=True)
# alternate direction, to make use of cached chunks of prev frame
self._ingest_pairs_frame(frame)
del frame
t4 = process_time()
six.print_('{0}s, Done with {1}:{2} in {3}s'.format(int(t4 - t3), start, stop, int(t4 - t1)), flush=True)
def to_pairs(self, pairs):
"""Copies labels and scores from self to pairs matrix.
Args:
pairs (SimilarityMatrix):
"""
six.print_('copy labels', flush=True)
self.build_label_cache()
pairs.labels.update(self.cache_l2i)
six.print_('copy matrix to pairs', flush=True)
limit = self.scores.shape[0]
bar = ProgressBar()
for query_id in bar(six.moves.range(0, limit)):
subjects = self.scores[query_id, ...]
filled_subjects_ids = subjects.nonzero()[0]
filled_subjects = [(query_id, i, subjects[i]) for i in filled_subjects_ids if query_id < i]
if filled_subjects:
pairs.pairs.table.append(filled_subjects)
def test_pdt_view():
testname = 'pdt_view'
pdt = view.DerivedTable(sql="SELECT id, count(*) c FROM table GROUP BY id",
sql_trigger_value='DATE()',
indexes=['id'])
v = view.View(testname)
v.derived_table = pdt
v.add_field(field.Dimension('id', type='number',
primary_key=True))
v.add_field(field.Dimension('c', type='number'))
v.add_field(field.Measure('sum_c', sql='${TABLE}.c', type='sum'))
f = six.StringIO()
v.generate_lookml(f, format_options=test_format_options)
lookml = f.getvalue()
six.print_(lookml)
with open(os.path.join(os.path.dirname(__file__),
'expected_output/%s.lkml' % testname),
'rt') as expected:
assert lookml == expected.read()
def sign(cls, args):
"""Sign."""
key = args.alg.kty.load(args.key.read())
args.key.close()
if args.protect is None:
args.protect = []
if args.compact:
args.protect.append('alg')
sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
protect=set(args.protect))
if args.compact:
six.print_(sig.to_compact().decode('utf-8'))
else: # JSON
six.print_(sig.json_dumps_pretty())
def verify(cls, args):
"""Verify."""
if args.compact:
sig = JWS.from_compact(sys.stdin.read().encode())
else: # JSON
try:
sig = JWS.json_loads(sys.stdin.read())
except errors.Error as error:
six.print_(error)
return -1
if args.key is not None:
assert args.kty is not None
key = args.kty.load(args.key.read()).public_key()
args.key.close()
else:
key = None
sys.stdout.write(sig.payload)
return not sig.verify(key=key)
def single_poll(self, next_page_token=None):
poll_time = time.time()
try:
kwargs = {'domain': self.domain,
'taskList': {'name': self.task_list},
'identity': self.identity}
if next_page_token is not None:
kwargs['nextPageToken'] = next_page_token
# old botocore throws TypeError when unable to establish SWF connection
return self.worker.client.poll_for_decision_task(**kwargs)
except KeyboardInterrupt:
# sleep before actually exiting as the connection is not yet closed
# on the other end
sleep_time = 60 - (time.time() - poll_time)
six.print_("Exiting in {0}...".format(sleep_time), file=sys.stderr)
time.sleep(sleep_time)
raise
def cleanup(self, _warn=False):
if self.name and not self._closed:
try:
self._rmtree(self.name)
except (TypeError, AttributeError) as ex:
# Issue #10188: Emit a warning on stderr
# if the directory could not be cleaned
# up due to missing globals
if "None" not in str(ex):
raise
six.print_("ERROR: {!r} while cleaning up {!r}".format(ex,
self,),
file=_sys.stderr)
return
self._closed = True
if _warn:
# This should be a ResourceWarning, but it is not available in
# Python 2.x.
self._warn("Implicitly cleaning up {!r}".format(self),
Warning)
def print_model_suffixes(model):
# Six.Print_ all suffix values for all model components in a nice table
six.print_("\t",end='')
for name,suffix in active_import_suffix_generator(model):
six.print_("%10s" % (name),end='')
six.print_("")
for i in model.s:
six.print_(model.x[i].name+"\t",end='')
for name,suffix in active_import_suffix_generator(model):
six.print_("%10s" % (suffix.get(model.x[i])),end='')
six.print_("")
for i in model.s:
six.print_(model.con[i].name+"\t",end='')
for name,suffix in active_import_suffix_generator(model):
six.print_("%10s" % (suffix.get(model.con[i])),end='')
six.print_("")
six.print_(model.obj.name+"\t",end='')
for name,suffix in active_import_suffix_generator(model):
six.print_("%10s" % (suffix.get(model.obj)),end='')
print("")
print("")
def on_recv_rsp(self, rsp_str):
ret_code, ret_data = super(DataCache, self).on_recv_rsp(rsp_str)
if ret_code == RET_ERROR or isinstance(ret_data, str):
six.print_(_(u"push kline data error:{bar_data}").format(ret_data=ret_data))
else:
if ret_data.empty:
self._cache['cur_kline'] = {}
else:
bar_data = ret_data.iloc[-1:].copy()
del bar_data['code'], bar_data['k_type'] # ????????????
for i in range(len(bar_data['time_key'])): # ????
bar_data.loc[i, 'time_key'] = int(
bar_data['time_key'][i].replace('-', '').replace(' ', '').replace(':', ''))
bar_data.rename(columns={'time_key': 'datetime', 'turnover': 'total_turnover'},
inplace=True) # ??????????
bar_data['volume'] = bar_data['volume'].astype('float64') # ???????????float
self._cache['cur_kline'][ret_data['code'][0]]=bar_data
return ret_code, self._cache['cur_kline'][ret_data['code'][0]]
def print_datetime(plugin, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
def print_datetime_with_str(plugin, polled_time, prefix):
six.print_(prefix + polled_time.strftime('%Y-%m-%d'), end='')
def test_activate_instance_method(capsys):
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * .print_datetime',
]
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out == '2016-01-01'
def show_plot_methods(self):
"""Print the plotmethods of this instance"""
print_func = PlotterInterface._print_func
if print_func is None:
print_func = six.print_
s = "\n".join(
"%s\n %s" % t for t in six.iteritems(self._plot_methods))
return print_func(s)
def __init__(self, description="", logger=None, logging_level=logging.INFO,
verbose_start=True, verbose_end=True, end_in_new_line=True,
prefix="..."):
if logger is not None:
self.log = partial(logger.log, logging_level)
else:
self.log = six.print_
self.description = prefix + description
self.verbose_start = verbose_start
self.verbose_end = verbose_end
self.end_in_new_line = end_in_new_line
self.start_time = None
self.end_time = None
self.elapsed_time = None
def _rate_limit_info(response):
"""Print response rate limit information to stderr.
Args:
response (requests.Response): A GitHub API response.
"""
remaining = response.headers.get(_RATE_REMAINING_HEADER)
rate_limit = response.headers.get(_RATE_LIMIT_HEADER)
rate_reset = response.headers.get(_RATE_RESET_HEADER)
msg = _RATE_LIMIT_TEMPLATE.format(remaining, rate_limit, rate_reset)
six.print_(msg, file=sys.stderr)
six.print_(_GH_ENV_VAR_MSG, file=sys.stderr)
def print_errors(self):
"""
Print tracebacks for every node with state "ERROR" in a Computation
"""
for n in self.nodes():
if self.s[n] == States.ERROR:
six.print_("{}".format(n))
six.print_("=" * len(n))
six.print_()
six.print_(self.v[n].traceback)
six.print_()
def determine_shard_size(self, file_size, accumulator):
# Based on <https://github.com/aleitner/shard-size-calculator/blob/master/src/shard_size.c>
hops = 0
if (file_size <= 0):
return 0
# if accumulator != True:
# accumulator = 0
self.__logger.debug(accumulator)
# Determine hops back by accumulator
if ((accumulator - self.SHARD_MULTIPLES_BACK) < 0):
hops = 0
else:
hops = accumulator - self.SHARD_MULTIPLES_BACK
# accumulator = 10
byte_multiple = self.shard_size(accumulator)
check = file_size / byte_multiple
# print_(check)
if (check > 0 and check <= 1):
while (hops > 0 and self.shard_size(hops) > self.MAX_SHARD_SIZE):
if hops - 1 <= 0:
hops = 0
else:
hops = hops - 1
return self.shard_size(hops)
# Maximum of 2 ^ 41 * 8 * 1024 * 1024
if (accumulator > 41):
return 0
# return self.determine_shard_size(file_size, ++accumulator)
def test_dimension_group():
testname = 'dimension_group_test'
v = view.View(testname)
v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
f = six.StringIO()
v.generate_lookml(f, format_options=test_format_options)
lookml = f.getvalue()
six.print_(lookml)
with open(os.path.join(os.path.dirname(__file__),
'expected_output/%s.lkml' % testname),
'rt') as expected:
assert lookml == expected.read()
def test_dimension_group_no_timeframes():
testname = 'dimension_group_no_timeframes_test'
v = view.View(testname)
v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
f = six.StringIO()
fo_omit_timeframes = base_generator.\
GeneratorFormatOptions(warning_header_comment=None, omit_time_frames_if_not_set=True)
v.generate_lookml(f, format_options=fo_omit_timeframes)
lookml = f.getvalue()
six.print_(lookml)
with open(os.path.join(os.path.dirname(__file__),
'expected_output/%s.lkml' % testname),
'rt') as expected:
assert lookml == expected.read()
def test_newlines():
testname = 'newlines_test'
v = view.View(testname)
for l in ['a', 'b', 'c', 'd']:
v.add_field(field.Dimension(l, type='number'))
v.add_field(field.Measure('sum_' + l, type='sum', sql='${{{0}}}'.format(l)))
f = six.StringIO()
v.generate_lookml(f, format_options=test_format_options)
lookml = f.getvalue()
six.print_(lookml)
with open(os.path.join(os.path.dirname(__file__),
'expected_output/%s.lkml' % testname),
'rt') as expected:
assert lookml == expected.read()
def cmp_file(src_file, dst_file):
six.print_('testing: ', src_file, dst_file)
assert (Tailer.file_opener(src_file, 'rb').read() == Tailer.file_opener(dst_file, 'rb').read())
def emit(self, count):
six.print_(count)
def doRollover(self):
time.sleep(self.ROLL_DELAY)
self.rolls += 1
six.print_('rolls', self.rolls)
return super(RotatingWithDelayFileHandler, self).doRollover()