def testParseStream(self):
dstr = StringIO('2014 January 19')
self.assertEqual(parse(dstr), datetime(2014, 1, 19))
python类StringIO()的实例源码
def _colorize(self, text, token_type):
if not self.formatter:
return text
out = StringIO()
self.formatter.format([(token_type, text)], out)
return out.getvalue()
def gather_eol_dates(context, directory=DISTRO_INFO_DIRECTORY):
"""
Gather release `end of life`_ dates from distro-info-data_ CSV files.
:param context: An execution context created by :mod:`executor.contexts`.
:param directory: The pathname of a directory with CSV files containing
end-of-life dates (a string, defaults to
:data:`DISTRO_INFO_DIRECTORY`).
:returns: A dictionary like :data:`KNOWN_EOL_DATES`.
"""
known_dates = {}
if context.is_directory(directory):
for entry in context.list_entries(directory):
filename = os.path.join(directory, entry)
basename, extension = os.path.splitext(entry)
if extension.lower() == '.csv':
distributor_id = basename.lower()
known_dates[distributor_id] = {}
contents = context.read_file(filename)
for row in csv.DictReader(StringIO(contents)):
series = row.get('series')
eol = row.get('eol-server') or row.get('eol')
if series and eol:
eol = time.mktime(parse_date(eol) + (-1, -1, -1))
known_dates[distributor_id][series] = int(eol)
return known_dates
def process_upload(app, document, filename):
if document[0] == '{':
f = StringIO(document)
return Job.deserialize(f)
else:
return planning.plan_job(document, filename=filename)
def main(loops, level):
board, solution = LEVELS[level]
order = DESCENDING
strategy = Done.FIRST_STRATEGY
stream = StringIO()
board = board.strip()
expected = solution.rstrip()
range_it = xrange(loops)
t0 = perf.perf_counter()
for _ in range_it:
stream = StringIO()
solve_file(board, strategy, order, stream)
output = stream.getvalue()
stream = None
dt = perf.perf_counter() - t0
output = '\n'.join(line.rstrip() for line in output.splitlines())
if output != expected:
raise AssertionError("got a wrong answer:\n%s\nexpected: %s"
% (output, expected))
return dt
def _save_output_data(self):
# Only try to get sys.stdout and sys.sterr as they not be
# StringIO yet, e.g. when test fails during __call__
try:
self._stdout_data = sys.stdout.getvalue()
self._stderr_data = sys.stderr.getvalue()
except AttributeError:
pass
def reset(self):
self.out = StringIO()
self.messages = []
def tokenize_str(code):
return list(tokenize.generate_tokens(StringIO(code).readline))
def reset(self):
self.out = StringIO()
self.messages = []
def tokenize_str(code):
return list(tokenize.generate_tokens(StringIO(code).readline))
def _SerializeToUnicode(val, info=None, version=None, nsMap=None):
if version is None:
try:
if isinstance(val, list):
itemType = val.Item
version = itemType._version
else:
if val is None:
# neither val nor version is given
return ''
# Pick up the version from val
version = val._version
except AttributeError:
version = BASE_VERSION
if info is None:
info = Object(name="object", type=object, version=version, flags=0)
writer = StringIO()
SoapSerializer(writer, version, nsMap).Serialize(val, info)
return writer.getvalue()
## Serialize fault detail
#
# Serializes a fault as the content of the detail element in a
# soapenv:Fault (i.e. without a LocalizedMethodFault wrapper).
#
# This function assumes CheckField(info, val) was already called
# @param val the value to serialize
# @param info the field
# @param version the version
# @param nsMap a dict of xml ns -> prefix
# @return the serialized object as a unicode string
def unzip_gz(data):
return gzip.GzipFile(fileobj = StringIO(data)).read()
def __init__(self, log_file_path = None, print_to_console = True, prefix = None):
"""
:param log_file_path: The path to save the records, or None if you just want to keep it in memory
:param print_to_console:
"""
self._print_to_console = print_to_console
if log_file_path is not None:
# self._log_file_path = os.path.join(base_dir, log_file_path.replace('%T', now))
make_file_dir(log_file_path)
self.log = open(log_file_path, 'w')
else:
self.log = StringIO()
self._log_file_path = log_file_path
self.old_stdout = _ORIGINAL_STDOUT
self.prefix = None if prefix is None else prefix
def test_record_good():
"""
Tests that when we record a sequence of events, then
repeat it exactly, the Record class:
1) Records it correctly
2) Does not raise any errors
"""
# Record a sequence of events
output = StringIO()
recorder = Record(file_object=output, replay=False)
num_lines = 10
for i in xrange(num_lines):
recorder.handle_line(str(i) + '\n')
# Make sure they were recorded correctly
output_value = output.getvalue()
assert output_value == ''.join(str(i) + '\n' for i in xrange(num_lines))
# Make sure that the playback functionality doesn't raise any errors
# when we repeat them
output = StringIO(output_value)
playback_checker = Record(file_object=output, replay=True)
for i in xrange(num_lines):
playback_checker.handle_line(str(i) + '\n')
def test_record_bad():
"""
Tests that when we record a sequence of events, then
do something different on playback, the Record class catches it.
"""
# Record a sequence of events
output = StringIO()
recorder = Record(file_object=output, replay=False)
num_lines = 10
for i in xrange(num_lines):
recorder.handle_line(str(i) + '\n')
# Make sure that the playback functionality doesn't raise any errors
# when we repeat some of them
output_value = output.getvalue()
output = StringIO(output_value)
playback_checker = Record(file_object=output, replay=True)
for i in xrange(num_lines // 2):
playback_checker.handle_line(str(i) + '\n')
# Make sure it raises an error when we deviate from the recorded sequence
try:
playback_checker.handle_line('0\n')
except MismatchError:
return
raise AssertionError("Failed to detect mismatch between recorded sequence "
" and repetition of it.")
def test_record_mode_good():
"""
Like test_record_good, but some events are recorded by the
theano RecordMode. We don't attempt to check the
exact string value of the record in this case.
"""
# Record a sequence of events
output = StringIO()
recorder = Record(file_object=output, replay=False)
record_mode = RecordMode(recorder)
i = iscalar()
f = function([i], i, mode=record_mode, name='f')
num_lines = 10
for i in xrange(num_lines):
recorder.handle_line(str(i) + '\n')
f(i)
# Make sure that the playback functionality doesn't raise any errors
# when we repeat them
output_value = output.getvalue()
output = StringIO(output_value)
playback_checker = Record(file_object=output, replay=True)
playback_mode = RecordMode(playback_checker)
i = iscalar()
f = function([i], i, mode=playback_mode, name='f')
for i in xrange(num_lines):
playback_checker.handle_line(str(i) + '\n')
f(i)
def test_pydotprint_cond_highlight():
"""
This is a REALLY PARTIAL TEST.
I did them to help debug stuff.
"""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
f = theano.function([x], x * 2)
f([1, 2, 3, 4])
s = StringIO()
new_handler = logging.StreamHandler(s)
new_handler.setLevel(logging.DEBUG)
orig_handler = theano.logging_default_handler
theano.theano_logger.removeHandler(orig_handler)
theano.theano_logger.addHandler(new_handler)
try:
theano.printing.pydotprint(f, cond_highlight=True,
print_output_file=False)
finally:
theano.theano_logger.addHandler(orig_handler)
theano.theano_logger.removeHandler(new_handler)
assert (s.getvalue() == 'pydotprint: cond_highlight is set but there'
' is no IfElse node in the graph\n')
def test2_invalid_neg(self):
n = as_tensor_variable(rand(2, 3))
old_stderr = sys.stderr
sys.stderr = StringIO()
try:
try:
eval_outputs(max_and_argmax(n, -3))
assert False
except ValueError as e:
pass
finally:
sys.stderr = old_stderr
def test2_invalid_neg(self):
for fct, nfct in [(argmax, numpy.argmax), (argmin, numpy.argmin)]:
n = as_tensor_variable(rand(2, 3))
old_stderr = sys.stderr
sys.stderr = StringIO()
try:
try:
eval_outputs(fct(n, -3))
assert False
except ValueError as e:
pass
finally:
sys.stderr = old_stderr
def test2_invalid_neg(self):
for fct in [max, min]:
n = as_tensor_variable(rand(2, 3))
old_stderr = sys.stderr
sys.stderr = StringIO()
try:
try:
eval_outputs(fct(n, -3))
assert False
except ValueError as e:
pass
finally:
sys.stderr = old_stderr