def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
python类SpooledTemporaryFile()的实例源码
def initCase(switches, count):
_failures.failedItems = []
_failures.failedParseOn = None
_failures.failedTraceBack = None
paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")
logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)
LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")
cmdLineOptions = cmdLineParser()
if switches:
for key, value in switches.items():
if key in cmdLineOptions.__dict__:
cmdLineOptions.__dict__[key] = value
initOptions(cmdLineOptions, True)
init()
def initCase(switches, count):
Failures.failedItems = []
Failures.failedParseOn = None
Failures.failedTraceBack = None
paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count)
paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")
logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)
LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")
cmdLineOptions = cmdLineParser()
if switches:
for key, value in switches.items():
if key in cmdLineOptions.__dict__:
cmdLineOptions.__dict__[key] = value
initOptions(cmdLineOptions, True)
init()
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'rb+')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def test_text_newline_and_encoding(self):
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
newline='', encoding='utf-8')
f.write("\u039B\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n")
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNone(f.name)
self.assertIsNone(f.newlines)
self.assertIsNone(f.encoding)
f.write("\u039B" * 20 + "\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNotNone(f.name)
self.assertIsNotNone(f.newlines)
self.assertEqual(f.encoding, 'utf-8')
def test_truncate_with_size_parameter(self):
# A SpooledTemporaryFile can be truncated to zero size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.seek(0)
f.truncate()
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'')
# A SpooledTemporaryFile can be truncated to a specific size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(4)
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'abcd')
# A SpooledTemporaryFile rolls over if truncated to large size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(20)
self.assertTrue(f._rolled)
if has_stat:
self.assertEqual(os.fstat(f.fileno()).st_size, 20)
def decrypt(self):
decrypted_chunk = SpooledTemporaryFile(
max_size=self.POOL_SIZE,
mode='wb+'
)
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
next_chunk = ''
finished = False
while not finished:
chunk, next_chunk = next_chunk, \
self.chunk_stream.read(1024 * AES.block_size)
chunk = cipher.decrypt(chunk)
if len(next_chunk) == 0:
chunk = self.pkcs7_reverse_padded_chunk(chunk)
finished = True
if chunk:
decrypted_chunk.write(chunk)
decrypted_chunk.seek(0)
return decrypted_chunk
def __setstate__(self, state):
disable_cuda = False
for key in self.cuda_dependent_attributes_:
if key not in state:
continue
dump = state.pop(key)
with tempfile.SpooledTemporaryFile() as f:
f.write(dump)
f.seek(0)
if state['use_cuda'] and not torch.cuda.is_available():
disable_cuda = True
val = torch.load(
f, map_location=lambda storage, loc: storage)
else:
val = torch.load(f)
state[key] = val
if disable_cuda:
warnings.warn(
"Model configured to use CUDA but no CUDA devices "
"available. Loading on CPU instead.",
DeviceWarning)
state['use_cuda'] = False
self.__dict__.update(state)
def __call__(self, environ, start_response):
if environ.get('HTTP_CONTENT_ENCODING', '') == 'gzip':
try:
environ['wsgi.input'].tell()
wsgi_input = environ['wsgi.input']
except (AttributeError, IOError, NotImplementedError):
# The gzip implementation in the standard library of Python 2.x
# requires working '.seek()' and '.tell()' methods on the input
# stream. Read the data into a temporary file to work around
# this limitation.
wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024)
shutil.copyfileobj(environ['wsgi.input'], wsgi_input)
wsgi_input.seek(0)
environ['wsgi.input'] = gzip.GzipFile(filename=None, fileobj=wsgi_input, mode='r')
del environ['HTTP_CONTENT_ENCODING']
if 'CONTENT_LENGTH' in environ:
del environ['CONTENT_LENGTH']
return self.app(environ, start_response)
def buffer_iter(cls, orig_iter, buff_size=65536):
out = SpooledTemporaryFile(buff_size)
size = 0
for buff in orig_iter:
size += len(buff)
out.write(buff)
content_length_str = str(size)
out.seek(0)
def read_iter():
while True:
buff = out.read(buff_size)
if not buff:
break
yield buff
return content_length_str, read_iter()
# ============================================================================
def getFile(self, site, inner_path):
# Use streamFile if client supports it
if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310:
return self.streamFile(site, inner_path)
location = 0
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
s = time.time()
while True: # Read in 512k parts
res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
if not res or "body" not in res: # Error
return False
buff.write(res["body"])
res["body"] = None # Save memory
if res["location"] == res["size"]: # End of file
break
else:
location = res["location"]
self.download_bytes += res["location"]
self.download_time += (time.time() - s)
if self.site:
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
buff.seek(0)
return buff
# Download file out of msgpack context to save memory and cpu
def streamFile(self, site, inner_path):
location = 0
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
s = time.time()
while True: # Read in 512k parts
res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
if not res: # Error
self.log("Invalid response: %s" % res)
return False
if res["location"] == res["size"]: # End of file
break
else:
location = res["location"]
self.download_bytes += res["location"]
self.download_time += (time.time() - s)
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
buff.seek(0)
return buff
# Send a ping request
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
dir: typing.Optional[str] = None) -> typing.IO:
"""
Create a new spooled temporary file within the scratch dir.
This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
:class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
reaches a certain size.
By default, a spooled file will never roll over to disk.
:param max_size: (Optional) max size before the in-memory buffer rolls over to disk
:type max_size: :class:`~int`
:param mode: (Optional) mode to open the file with
:type mode: :class:`~str`
:param buffering: (Optional) size of the file buffer
:type buffering: :class:`~int`
:param encoding: (Optional) encoding to open the file with
:type encoding: :class:`~str`
:param newline: (Optional) newline argument to open the file with
:type newline: :class:`~str` or :class:`~NoneType`
:param suffix: (Optional) filename suffix
:type suffix: :class:`~str` or :class:`~NoneType`
:param prefix: (Optional) filename prefix
:type prefix: :class:`~str` or :class:`~NoneType`
:param dir: (Optional) relative path to directory within the scratch dir where the file should exist
:type dir: :class:`~bool`
:return: SpooledTemporaryFile instance
:rtype: :class:`~tempfile.SpooledTemporaryFile`
"""
return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
newline, suffix, prefix, self.join(dir))
def _create_temp_file(cls):
return tempfile.SpooledTemporaryFile(max_size=512*1024)
# ============================================================================
def readFromBytes(self, b, headerEndianess='>'):
with tempfile.SpooledTemporaryFile(mode='w+b') as f:
f.write(b)
f.seek(0)
self.readFromFile(f, headerEndianess)
files.py 文件源码
项目:archive-Holmes-Totem-Service-Library
作者: HolmesProcessing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def __enter__(self):
"""
Create the temporary file in memory first, when it uses too much memory
it is automatically relocated to the filesystem.
"""
self.file = tempfile.SpooledTemporaryFile(max_size=self.max_size)
return self.file
def generate_hash(self):
"""Requests the image as found in `url` and generates a perceptual_hash from it"""
# This is slow: it has to get the image and spool it to a tempfile, then compute
# the hash
return None
# req = requests.get(self.url)
# if req.status_code == 200:
# buff = tempfile.SpooledTemporaryFile(max_size=1e9)
# downloaded = 0
# filesize = int(req.headers.get('content-length', 1000)) # Set a default length for the test client
# for chunk in req.iter_content():
# downloaded += len(chunk)
# buff.write(chunk)
# buff.seek(0)
# im = PillowImage.open(io.BytesIO(buff.read()))
# return str(imagehash.average_hash(im))
def do_create(self, max_size=0, dir=None, pre="", suf=""):
if dir is None:
dir = tempfile.gettempdir()
try:
file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
except:
self.failOnException("SpooledTemporaryFile")
return file
def test_basic(self):
# SpooledTemporaryFile can create files
f = self.do_create()
self.assertFalse(f._rolled)
f = self.do_create(max_size=100, pre="a", suf=".txt")
self.assertFalse(f._rolled)