def encrypt_file(file, keys=secretKeys()):
'''Encrypt file data with the same method as the Send browser/js client'''
key = keys.encryptKey
iv = keys.encryptIV
encData = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b')
cipher = Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, iv)
pbar = progbar(fileSize(file))
for chunk in iter(lambda: file.read(CHUNK_SIZE), b''):
encData.write(cipher.encrypt(chunk))
pbar.update(len(chunk))
pbar.close()
encData.write(cipher.digest())
file.close()
encData.seek(0)
return encData
python类SpooledTemporaryFile()的实例源码
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_text_mode(self):
# Creating a SpooledTemporaryFile with a text mode should produce
# a file object reading and writing (Unicode) text strings.
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
f.write("abc\n")
f.seek(0)
self.assertEqual(f.read(), "abc\n")
f.write("def\n")
f.seek(0)
self.assertEqual(f.read(), "abc\ndef\n")
f.write("xyzzy\n")
f.seek(0)
self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
# Check that Ctrl+Z doesn't truncate the file
f.write("foo\x1abar\n")
f.seek(0)
self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
def api_download(service, fileId, authorisation):
'''Given a Send url, download and return the encrypted data and metadata'''
data = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b')
headers = {'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(authorisation)}
url = service + 'api/download/' + fileId
r = requests.get(url, headers=headers, stream=True)
r.raise_for_status()
content_length = int(r.headers['Content-length'])
pbar = progbar(content_length)
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
data.write(chunk)
pbar.update(len(chunk))
pbar.close()
data.seek(0)
return data
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def initCase(switches, count):
_failures.failedItems = []
_failures.failedParseOn = None
_failures.failedTraceBack = None
paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")
logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)
LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")
cmdLineOptions = cmdLineParser()
if switches:
for key, value in switches.items():
if key in cmdLineOptions.__dict__:
cmdLineOptions.__dict__[key] = value
initOptions(cmdLineOptions, True)
init()
def is_available(cls):
if (super(cls, cls).is_available() and
diagnose.check_executable('text2wave') and
diagnose.check_executable('festival')):
logger = logging.getLogger(__name__)
cmd = ['festival', '--pipe']
with tempfile.SpooledTemporaryFile() as out_f:
with tempfile.SpooledTemporaryFile() as in_f:
logger.debug('Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdin=in_f, stdout=out_f,
stderr=out_f)
out_f.seek(0)
output = out_f.read().strip()
if output:
logger.debug("Output was: '%s'", output)
return ('No default voice found' not in output)
return False
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
cmd = ['text2wave']
with tempfile.NamedTemporaryFile(suffix='.wav') as out_f:
with tempfile.SpooledTemporaryFile() as in_f:
in_f.write(phrase)
in_f.seek(0)
with tempfile.SpooledTemporaryFile() as err_f:
self._logger.debug('Executing %s',
' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdin=in_f, stdout=out_f,
stderr=err_f)
err_f.seek(0)
output = err_f.read()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(out_f.name)
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
cmd = ['flite']
if self.voice:
cmd.extend(['-voice', self.voice])
cmd.extend(['-t', phrase])
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
fname = f.name
cmd.append(fname)
with tempfile.SpooledTemporaryFile() as out_f:
self._logger.debug('Executing %s',
' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdout=out_f, stderr=out_f)
out_f.seek(0)
output = out_f.read().strip()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(fname)
os.remove(fname)
def transcribe(self, fp, mode=None):
cmd = ['julius',
'-quiet',
'-nolog',
'-input', 'stdin',
'-dfa', self._vocabulary.dfa_file,
'-v', self._vocabulary.dict_file,
'-h', self._hmmdefs,
'-hlist', self._tiedlist,
'-forcedict']
cmd = [str(x) for x in cmd]
self._logger.debug('Executing: %r', cmd)
with tempfile.SpooledTemporaryFile() as out_f:
with tempfile.SpooledTemporaryFile() as err_f:
subprocess.call(cmd, stdin=fp, stdout=out_f, stderr=err_f)
out_f.seek(0)
results = [(int(i), text) for i, text in
self._pattern.findall(out_f.read())]
transcribed = [text for i, text in
sorted(results, key=lambda x: x[0])
if text]
if not transcribed:
transcribed.append('')
self._logger.info('Transcribed: %r', transcribed)
return transcribed
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'rb+')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def test_text_newline_and_encoding(self):
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
newline='', encoding='utf-8')
f.write("\u039B\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n")
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNone(f.name)
self.assertIsNone(f.newlines)
self.assertIsNone(f.encoding)
f.write("\u039B" * 20 + "\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNotNone(f.name)
self.assertIsNotNone(f.newlines)
self.assertEqual(f.encoding, 'utf-8')
def test_truncate_with_size_parameter(self):
# A SpooledTemporaryFile can be truncated to zero size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.seek(0)
f.truncate()
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'')
# A SpooledTemporaryFile can be truncated to a specific size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(4)
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'abcd')
# A SpooledTemporaryFile rolls over if truncated to large size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(20)
self.assertTrue(f._rolled)
if has_stat:
self.assertEqual(os.fstat(f.fileno()).st_size, 20)
def csv_masks(request, hashfile_id):
hashfile = get_object_or_404(Hashfile, id=hashfile_id)
# didn't found the correct way in pure django...
res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])
fp = tempfile.SpooledTemporaryFile(mode='w')
csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
for item in res:
csvfile.writerow([item.count, item.password_mask])
fp.seek(0) # rewind the file handle
csvfile_data = fp.read()
for query in connection.queries[-1:]:
print(query["sql"])
print(query["time"])
response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
return response
def download():
trackrels = request.query.tracks.split('|')
# write the archive into a temporary in-memory file-like object
temp = tempfile.SpooledTemporaryFile()
with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive:
for trackrel in trackrels:
base_wildcard = trackrel.replace("-track.csv", "*")
paths = config.TRACKDIR.glob(base_wildcard)
for path in paths:
archive.write(str(path),
str(path.relative_to(config.TRACKDIR))
)
temp.seek(0)
# force a download; give it a filename and mime type
response.set_header('Content-Disposition', 'attachment; filename="data.zip"')
response.set_header('Content-Type', 'application/zip')
# relying on garbage collector to delete tempfile object
# (and hence the file itself) when done sending
return temp
def initCase(switches, count):
Failures.failedItems = []
Failures.failedParseOn = None
Failures.failedTraceBack = None
paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count)
paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")
logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)
LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")
cmdLineOptions = cmdLineParser()
if switches:
for key, value in switches.items():
if key in cmdLineOptions.__dict__:
cmdLineOptions.__dict__[key] = value
initOptions(cmdLineOptions, True)
init()
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def test_properties(self):
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'x' * 10)
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+b')
self.assertIsNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
f.write(b'x')
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'rb+')
self.assertIsNotNone(f.name)
with self.assertRaises(AttributeError):
f.newlines
with self.assertRaises(AttributeError):
f.encoding
def test_text_newline_and_encoding(self):
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
newline='', encoding='utf-8')
f.write("\u039B\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n")
self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNone(f.name)
self.assertIsNone(f.newlines)
self.assertIsNone(f.encoding)
f.write("\u039B" * 20 + "\r\n")
f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+')
self.assertIsNotNone(f.name)
self.assertIsNotNone(f.newlines)
self.assertEqual(f.encoding, 'utf-8')
def test_truncate_with_size_parameter(self):
# A SpooledTemporaryFile can be truncated to zero size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.seek(0)
f.truncate()
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'')
# A SpooledTemporaryFile can be truncated to a specific size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(4)
self.assertFalse(f._rolled)
self.assertEqual(f._file.getvalue(), b'abcd')
# A SpooledTemporaryFile rolls over if truncated to large size
f = tempfile.SpooledTemporaryFile(max_size=10)
f.write(b'abcdefg\n')
f.truncate(20)
self.assertTrue(f._rolled)
if has_stat:
self.assertEqual(os.fstat(f.fileno()).st_size, 20)
def say(self, phrase):
cmd = ['text2wave', '-eval', '(voice_%s)' % self.voice]
with tempfile.SpooledTemporaryFile() as out_f:
with tempfile.SpooledTemporaryFile() as in_f:
in_f.write(phrase)
in_f.seek(0)
with tempfile.SpooledTemporaryFile() as err_f:
self._logger.debug(
'Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdin=in_f, stdout=out_f,
stderr=err_f)
err_f.seek(0)
output = err_f.read()
if output:
self._logger.debug("Output was: '%s'", output)
out_f.seek(0)
return out_f.read()
def say(self, phrase):
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
fname = f.name
cmd = [EXECUTABLE, '-o', fname,
'--file-format=WAVE',
str(phrase)]
self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
with tempfile.SpooledTemporaryFile() as f:
subprocess.call(cmd, stdout=f, stderr=f)
f.seek(0)
output = f.read()
if output:
self._logger.debug("Output was: '%s'", output)
with open(fname, 'rb') as f:
data = f.read()
os.remove(fname)
return data
def mp3_to_wave(self, filename):
mf = mad.MadFile(filename)
with tempfile.SpooledTemporaryFile() as f:
wav = wave.open(f, mode='wb')
wav.setframerate(mf.samplerate())
wav.setnchannels(1 if mf.mode() == mad.MODE_SINGLE_CHANNEL else 2)
# 4L is the sample width of 32 bit audio
wav.setsampwidth(4)
frame = mf.read()
while frame is not None:
wav.writeframes(frame)
frame = mf.read()
wav.close()
f.seek(0)
data = f.read()
return data
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)