def _dump(self, file=None, format=None, **options):
import tempfile
suffix = ''
if format:
suffix = '.'+format
if not file:
f, file = tempfile.mkstemp(suffix)
os.close(f)
self.load()
if not format or format == "PPM":
self.im.save_ppm(file)
else:
if not file.endswith(format):
file = file + "." + format
self.save(file, format, **options)
return file
python类mkstemp()的实例源码
def convert(cls, report, data):
"converts the report data to another mimetype if necessary"
input_format = report.template_extension
output_format = report.extension or report.template_extension
if output_format in MIMETYPES:
return output_format, data
fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
prefix='trytond_')
oext = FORMAT2EXT.get(output_format, output_format)
with os.fdopen(fd, 'wb+') as fp:
fp.write(data)
cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
'-f', oext, '--stdout', path]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdoutdata, stderrdata = proc.communicate()
if proc.wait() != 0:
raise Exception(stderrdata)
return oext, stdoutdata
finally:
os.remove(path)
def test_basic_config():
fd, path = tempfile.mkstemp()
f = os.fdopen(fd,'w')
f.write(yaml.dump(testcfg))
f.flush()
cfg = ny.get_config(path)
ny.write_supervisor_conf()
config = ConfigParser.ConfigParser()
config.readfp(open(cfg['supervisor.conf']))
# from IPython import embed
# embed()
print(config.get('program:testtunnel','command'))
assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
self._createdir() # Cache dir can be deleted at any time.
fname = self._key_to_file(key, version)
self._cull() # make some room if necessary
fd, tmp_path = tempfile.mkstemp(dir=self._dir)
renamed = False
try:
with io.open(fd, 'wb') as f:
expiry = self.get_backend_timeout(timeout)
f.write(pickle.dumps(expiry, -1))
f.write(zlib.compress(pickle.dumps(value), -1))
file_move_safe(tmp_path, fname, allow_overwrite=True)
renamed = True
finally:
if not renamed:
os.remove(tmp_path)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def create_temporary_ca_file(anchor_list):
"""
Concatenate all the certificates (PEM format for the export) in
'anchor_list' and write the result to file to a temporary file
using mkstemp() from tempfile module. On success 'filename' is
returned, None otherwise.
If you are used to OpenSSL tools, this function builds a CAfile
that can be used for certificate and CRL check.
Also see create_temporary_ca_file().
"""
try:
f, fname = tempfile.mkstemp()
for a in anchor_list:
s = a.output(fmt="PEM")
l = os.write(f, s)
os.close(f)
except:
return None
return fname
def validate(self, inline=False):
"""Validate workflow object.
This method currently validates the workflow object with the use of
cwltool. It writes the workflow to a tmp CWL file, reads it, validates
it and removes the tmp file again. By default, the workflow is written
to file using absolute paths to the steps. Optionally, the steps can be
saved inline.
"""
# define tmpfile
(fd, tmpfile) = tempfile.mkstemp()
os.close(fd)
try:
# save workflow object to tmpfile,
# do not recursively call validate function
self.save(tmpfile, inline=inline, validate=False, relative=False,
wd=False)
# load workflow from tmpfile
document_loader, processobj, metadata, uri = load_cwl(tmpfile)
finally:
# cleanup tmpfile
os.remove(tmpfile)
def _pack(self, fname, encoding):
"""Save workflow with ``--pack`` option
This means that al tools and subworkflows are included in the workflow
file that is created. A packed workflow cannot be loaded and used in
scriptcwl.
"""
(fd, tmpfile) = tempfile.mkstemp()
os.close(fd)
try:
self.save(tmpfile, validate=False, wd=False, inline=False,
relative=False, pack=False)
document_loader, processobj, metadata, uri = load_cwl(tmpfile)
finally:
# cleanup tmpfile
os.remove(tmpfile)
with codecs.open(fname, 'wb', encoding=encoding) as f:
f.write(print_pack(document_loader, processobj, uri, metadata))
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
self._createdir() # Cache dir can be deleted at any time.
fname = self._key_to_file(key, version)
self._cull() # make some room if necessary
fd, tmp_path = tempfile.mkstemp(dir=self._dir)
renamed = False
try:
with io.open(fd, 'wb') as f:
expiry = self.get_backend_timeout(timeout)
f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
file_move_safe(tmp_path, fname, allow_overwrite=True)
renamed = True
finally:
if not renamed:
os.remove(tmp_path)
def create_temporary_ca_file(anchor_list):
"""
Concatenate all the certificates (PEM format for the export) in
'anchor_list' and write the result to file to a temporary file
using mkstemp() from tempfile module. On success 'filename' is
returned, None otherwise.
If you are used to OpenSSL tools, this function builds a CAfile
that can be used for certificate and CRL check.
Also see create_temporary_ca_file().
"""
try:
f, fname = tempfile.mkstemp()
for a in anchor_list:
s = a.output(fmt="PEM")
l = os.write(f, s)
os.close(f)
except:
return None
return fname
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def mktemp(self, *args, **kwds):
"""create temp file that's cleaned up at end of test"""
self.require_writeable_filesystem()
fd, path = tempfile.mkstemp(*args, **kwds)
os.close(fd)
queue = self._mktemp_queue
if queue is None:
queue = self._mktemp_queue = []
def cleaner():
for path in queue:
if os.path.exists(path):
os.remove(path)
del queue[:]
self.addCleanup(cleaner)
queue.append(path)
return path
def set(self, key, value, timeout=None):
timeout = self._normalize_timeout(timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def set(self, key, value, timeout=None):
timeout = self._normalize_timeout(timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def test_ffi_buffer_with_file(self):
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_ffi_buffer_with_file(self):
ffi = FFI(backend=self.Backend())
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def __init__(self, executable_path, port=0, service_args=None, log_path=None):
"""
Creates a new instance of the Service
:Args:
- executable_path : Path to PhantomJS binary
- port : Port the service is running on
- service_args : A List of other command line options to pass to PhantomJS
- log_path: Path for PhantomJS service to log to
"""
self.service_args= service_args
if self.service_args is None:
self.service_args = []
else:
self.service_args=service_args[:]
if not log_path:
log_path = "ghostdriver.log"
if not self._args_contain("--cookies-file="):
self._cookie_temp_file = tempfile.mkstemp()[1]
self.service_args.append("--cookies-file=" + self._cookie_temp_file)
else:
self._cookie_temp_file = None
service.Service.__init__(self, executable_path, port=port, log_file=open(log_path, 'w'))