def speak(text, lang='es'):
"""Text to speech. For funp."""
try:
from googletrans import Translator
from gtts import gTTS
from pygame import mixer
from tempfile import TemporaryFile
translator = Translator()
tts = gTTS(text=translator.translate(text, dest=lang).text, lang=lang)
mixer.init()
sf = TemporaryFile()
tts.write_to_fp(sf)
sf.seek(0)
mixer.music.load(sf)
mixer.music.play()
except Exception:
raise
python类TemporaryFile()的实例源码
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def run_coala_with_specific_file(working_dir, file):
"""Run coala in a specified directory."""
command = ["coala", "--json", "--find-config", "--files", file]
stdout_file = tempfile.TemporaryFile()
kwargs = {"stdout": stdout_file,
"cwd": working_dir}
process = subprocess.Popen(command, **kwargs)
retval = process.wait()
output_str = None
if retval == 1:
stdout_file.seek(0)
output_str = stdout_file.read().decode("utf-8", "ignore")
if output_str:
log("Output =", output_str)
else:
log("No results for the file")
elif retval == 0:
log("No issues found")
else:
log("Exited with:", retval)
stdout_file.close()
return output_str
def bytes2zip(bytes):
"""
RETURN COMPRESSED BYTES
"""
if hasattr(bytes, "read"):
buff = TemporaryFile()
archive = gzip.GzipFile(fileobj=buff, mode='w')
for b in bytes:
archive.write(b)
archive.close()
buff.seek(0)
from pyLibrary.env.big_data import FileString, safe_size
return FileString(buff)
buff = BytesIO()
archive = gzip.GzipFile(fileobj=buff, mode='w')
archive.write(bytes)
archive.close()
return buff.getvalue()
def __init__(self, stream, length, _shared=None):
"""
:param stream: THE STREAM WE WILL GET THE BYTES FROM
:param length: THE MAX NUMBER OF BYTES WE ARE EXPECTING
:param _shared: FOR INTERNAL USE TO SHARE THE BUFFER
:return:
"""
self.position = 0
file_ = TemporaryFile()
if not _shared:
self.shared = Data(
length=length,
locker=Lock(),
stream=stream,
done_read=0,
file=file_,
buffer=mmap(file_.fileno(), length)
)
else:
self.shared = _shared
self.shared.ref_count += 1
def assert_exception_writes_error_message(self, exception, message):
parser = cli.get_htsget_parser()
args = parser.parse_args(["https://some.url"])
saved_stderr = sys.stderr
try:
with tempfile.TemporaryFile("w+") as tmp_stderr:
sys.stderr = tmp_stderr
with mock.patch("htsget.get") as mocked_get, \
mock.patch("sys.exit") as mocked_exit, \
mock.patch("logging.basicConfig"):
mocked_get.side_effect = exception
cli.run(args)
tmp_stderr.seek(0)
stderr = tmp_stderr.read().strip()
mocked_exit.assert_called_once_with(1)
finally:
sys.stderr = saved_stderr
self.assertTrue(stderr.endswith(message))
def capture(self, data, term_instance=None):
"""
Stores *data* as a temporary file and returns that file's object.
*term_instance* can be used by overrides of this function to make
adjustments to the terminal emulator after the *data* is captured e.g.
to make room for an image.
"""
# Remove the extra \r's that the terminal adds:
data = data.replace(b'\r\n', b'\n')
logging.debug("capture() len(data): %s" % len(data))
# Write the data to disk in a temporary location
self.file_obj = tempfile.TemporaryFile()
self.file_obj.write(data)
self.file_obj.flush()
# Leave it open
return self.file_obj
def setUp(self):
file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
with open(file_path, 'r') as in_file:
defaults = json.load(in_file)
buildargs = {
'serviceName': 'customsearch',
'version': 'v1',
'developerKey': defaults['build_developerKey']
}
cseargs = {
'q': 'google',
'num': 1,
'fileType': 'png',
'cx': defaults['cx']
}
self.results = search_google.api.results(buildargs, cseargs)
tempfile = TemporaryFile()
self.tempfile = str(tempfile.name)
tempfile.close()
self.tempdir = str(TemporaryDirectory().name)
def __call__(self, test_case):
module = self.constructor(*self.constructor_args)
input = self._get_input()
if self.reference_fn is not None:
out = test_case._forward(module, input)
if isinstance(out, Variable):
out = out.data
ref_input = self._unpack_input(deepcopy(input))
expected_out = self.reference_fn(ref_input, test_case._get_parameters(module)[0])
test_case.assertEqual(out, expected_out)
# TODO: do this with in-memory files as soon as torch.save will support it
with TemporaryFile() as f:
test_case._forward(module, input)
torch.save(module, f)
f.seek(0)
module_copy = torch.load(f)
test_case.assertEqual(test_case._forward(module, input), test_case._forward(module_copy, input))
self._do_test(test_case, module, input)
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
""" save targetfd descriptor, and open a new
temporary file there. If no tmpfile is
specified a tempfile.Tempfile() will be opened
in text mode.
"""
self.targetfd = targetfd
if tmpfile is None and targetfd != 0:
f = tempfile.TemporaryFile('wb+')
tmpfile = dupfile(f, encoding="UTF-8")
f.close()
self.tmpfile = tmpfile
self._savefd = os.dup(self.targetfd)
if patchsys:
self._oldsys = getattr(sys, patchsysdict[targetfd])
if now:
self.start()
def writeorg(self, data):
""" write a string to the original file descriptor
"""
tempfp = tempfile.TemporaryFile()
try:
os.dup2(self._savefd, tempfp.fileno())
tempfp.write(data)
finally:
tempfp.close()
def __init__(self, msg, buffer = None, scheduler = None):
"""Produce this message.
@param msg: The message I am to produce.
@type msg: L{IMessage}
@param buffer: A buffer to hold the message in. If None, I will
use a L{tempfile.TemporaryFile}.
@type buffer: file-like
"""
self.msg = msg
if buffer is None:
buffer = tempfile.TemporaryFile()
self.buffer = buffer
if scheduler is None:
scheduler = iterateInReactor
self.scheduler = scheduler
self.write = self.buffer.write
def pytest_configure(config):
import py
if config.option.pastebin == "all":
tr = config.pluginmanager.getplugin('terminalreporter')
# if no terminal reporter plugin is present, nothing we can do here;
# this can happen when this function executes in a slave node
# when using pytest-xdist, for example
if tr is not None:
# pastebin file will be utf-8 encoded binary file
config._pastebinfile = tempfile.TemporaryFile('w+b')
oldwrite = tr._tw.write
def tee_write(s, **kwargs):
oldwrite(s, **kwargs)
if py.builtin._istext(s):
s = s.encode('utf-8')
config._pastebinfile.write(s)
tr._tw.write = tee_write
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
self.targetfd_save = os.dup(self.targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
else:
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull, "r")
self.syscapture = SysCapture(targetfd)
else:
if tmpfile is None:
f = TemporaryFile()
with f:
tmpfile = safe_text_dupfile(f, mode="wb+")
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
self.syscapture = NoCapture()
self.tmpfile = tmpfile
self.tmpfile_fd = tmpfile.fileno()
def save_db_objects(db_engine, db_objects):
"""Saves a collection of SQLAlchemy model objects to the database using a COPY command
Args:
db_engine (sqlalchemy.engine)
db_objects (list) SQLAlchemy model objects, corresponding to a valid table
"""
with tempfile.TemporaryFile(mode='w+') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
for db_object in db_objects:
writer.writerow([
getattr(db_object, col.name)
for col in db_object.__table__.columns
])
f.seek(0)
postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def get_qr_image(session: CashdeskSession) -> TemporaryFile:
# TODO: check qr code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_H,
box_size=10,
border=4,
)
tz = timezone.get_current_timezone()
data = '{end}\tEinnahme\t{total}\tKassensession\t#{pk}\t{supervisor}\t{user}'.format(
end=session.end.astimezone(tz).strftime('%d.%m.%Y\t%H:%M:%S'),
total='{0:,.2f}'.format(session.get_cash_transaction_total()).translate(str.maketrans(',.', '.,')),
pk=session.pk,
supervisor=session.backoffice_user_after.get_full_name(),
user=session.user.get_full_name(),
)
qr.add_data(data)
qr.make()
f = TemporaryFile()
img = qr.make_image()
img.save(f)
return f
def test_exec_command_stdout():
# Regression test for gh-2999 and gh-2915.
# There are several packages (nose, scipy.weave.inline, Sage inline
# Fortran) that replace stdout, in which case it doesn't have a fileno
# method. This is tested here, with a do-nothing command that fails if the
# presence of fileno() is assumed in exec_command.
# The code has a special case for posix systems, so if we are on posix test
# both that the special case works and that the generic code works.
# Test posix version:
with redirect_stdout(StringIO()):
with redirect_stderr(TemporaryFile()):
exec_command.exec_command("cd '.'")
if os.name == 'posix':
# Test general (non-posix) version:
with emulate_nonposix():
with redirect_stdout(StringIO()):
with redirect_stderr(TemporaryFile()):
exec_command.exec_command("cd '.'")
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def test_dump_as_file(self):
with open(util.get_data_filename('nginx.conf')) as handle:
parsed = load(handle)
parsed[-1][-1].append(UnspacedList([['server'],
[['listen', ' ', '443', ' ', 'ssl'],
['server_name', ' ', 'localhost'],
['ssl_certificate', ' ', 'cert.pem'],
['ssl_certificate_key', ' ', 'cert.key'],
['ssl_session_cache', ' ', 'shared:SSL:1m'],
['ssl_session_timeout', ' ', '5m'],
['ssl_ciphers', ' ', 'HIGH:!aNULL:!MD5'],
[['location', ' ', '/'],
[['root', ' ', 'html'],
['index', ' ', 'index.html', ' ', 'index.htm']]]]]))
with tempfile.TemporaryFile(mode='w+t') as f:
dump(parsed, f)
f.seek(0)
parsed_new = load(f)
self.assertEqual(parsed, parsed_new)
def test_comments(self):
with open(util.get_data_filename('minimalistic_comments.conf')) as handle:
parsed = load(handle)
with tempfile.TemporaryFile(mode='w+t') as f:
dump(parsed, f)
f.seek(0)
parsed_new = load(f)
self.assertEqual(parsed, parsed_new)
self.assertEqual(parsed_new, [
['#', " Use bar.conf when it's a full moon!"],
['include', 'foo.conf'],
['#', ' Kilroy was here'],
['check_status'],
[['server'],
[['#', ''],
['#', " Don't forget to open up your firewall!"],
['#', ''],
['listen', '1234'],
['#', ' listen 80;']]],
])
def setUp(self):
file_path = resource_filename(Requirement.parse('google_streetview'), 'google_streetview/config.json')
with open(file_path, 'r') as in_file:
defaults = json.load(in_file)
params = [{
'size': '600x300', # max 640x640 pixels
'location': '46.414382,10.013988',
'heading': '151.78',
'pitch': '-0.76',
'key': defaults['key']
}]
self.results = google_streetview.api.results(params)
tempfile = TemporaryFile()
self.tempfile = str(tempfile.name)
tempfile.close()
self.tempdir = str(TemporaryDirectory().name)
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
corpus.py 文件源码
项目:Natural-Language-Processing-Python-and-NLTK
作者: PacktPublishing
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def remove_line(fname, line):
'''Remove line from file by creating a temporary file containing all lines
from original file except those matching the given line, then copying the
temporary file back into the original file, overwriting its contents.
'''
with lockfile.FileLock(fname):
tmp = tempfile.TemporaryFile()
fp = open(fname, 'rw+')
# write all lines from orig file, except if matches given line
for l in fp:
if l.strip() != line:
tmp.write(l)
# reset file pointers so entire files are copied
fp.seek(0)
tmp.seek(0)
# copy tmp into fp, then truncate to remove trailing line(s)
shutil.copyfileobj(tmp, fp)
fp.truncate()
fp.close()
tmp.close()
def get_read_stream(self, dag_id, task_id, execution_date):
key_name = self.get_key_name(dag_id, task_id, execution_date)
key = self.bucket.get_key(key_name)
if key is not None:
import tempfile
temp_file_stream = tempfile.TemporaryFile(mode='w+b')
key.get_file(temp_file_stream)
# Stream has been read in and is now at the end
# So reset it to the start
temp_file_stream.seek(0)
return temp_file_stream
message = \
'S3 key named {key_name} in bucket {bucket_name} does not exist.'.format(key_name=key_name,
bucket_name=self.bucket_name)
raise StorageDriverError(message)
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def add_here_document(self, interp, name, content, io_number=None):
if io_number is None:
io_number = 0
if name==pyshlex.unquote_wordtree(name):
content = interp.expand_here_document(('TOKEN', content))
# Write document content in a temporary file
tmp = tempfile.TemporaryFile()
try:
tmp.write(content)
tmp.flush()
tmp.seek(0)
self._add_descriptor(io_number, FileWrapper('r', tmp))
except:
tmp.close()
raise
def rman(self, finalscript):
self._setenv()
debug("RMAN execution starts")
BackupLogger.close()
starttime = datetime.now()
with TemporaryFile() as f:
p = Popen([os.path.join(self.oraclehome, 'bin', 'rman'), "log", BackupLogger.logfile, "append"], stdout=f, stderr=f, stdin=PIPE)
# Send the script to RMAN
p.communicate(input=finalscript)
endtime = datetime.now()
BackupLogger.init()
debug("RMAN execution time %s" % (endtime-starttime))
# If RMAN exists with any code except 0, then there was some error
if p.returncode != 0:
error("RMAN execution failed with code %d" % p.returncode)
raise Exception('rman', "RMAN exited with code %d" % p.returncode)
else:
debug("RMAN execution successful")
def sqlplus(self, finalscript, silent=False):
self._setenv()
with TemporaryFile() as f:
args = [os.path.join(self.oraclehome, 'bin', 'sqlplus')]
if silent:
args.append('-S')
args.append('/nolog')
debug("SQL*Plus execution starts")
BackupLogger.close()
p = Popen(args, stdout=f, stderr=f, stdin=PIPE)
p.communicate(input=finalscript)
BackupLogger.init()
if p.returncode != 0:
error("SQL*Plus exited with code %d" % p.returncode)
raise Exception('sqlplus', "sqlplus exited with code %d" % p.returncode)
else:
debug("SQL*Plus execution successful")
if silent:
f.seek(0,0)
return f.read()