def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
python类fdopen()的实例源码
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def test_FILE_stored_explicitly():
ffi = FFI()
ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
lib = ffi.verify("""
#include <stdio.h>
FILE *myfile;
int myprintf11(const char *out, int value) {
return fprintf(myfile, out, value);
}
""")
import os
fdr, fdw = os.pipe()
fw1 = os.fdopen(fdw, 'wb', 256)
lib.myfile = ffi.cast("FILE *", fw1)
#
fw1.write(b"X")
r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
fw1.close()
assert r == len("hello, 42!\n")
#
result = os.read(fdr, 256)
os.close(fdr)
# the 'X' might remain in the user-level buffer of 'fw1' and
# end up showing up after the 'hello, 42!\n'
assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
def test_ffi_buffer_with_file(self):
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_ffi_buffer_with_file(self):
ffi = FFI(backend=self.Backend())
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def _decompress_bz2(filename, blocksize=900*1024):
"""
Decompress .tar.bz2 to .tar on disk (for faster access)
Use TemporaryFile to guarentee write access.
"""
if not filename.endswith('.tar.bz2'):
return filename
fd, path = mkstemp()
with os.fdopen(fd, 'wb') as fo:
with open(filename, 'rb') as fi:
z = bz2.BZ2Decompressor()
for block in iter(lambda: fi.read(blocksize), b''):
fo.write(z.decompress(block))
return path
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def spawn(argv, stdout=False):
"""Asynchronously run a program. argv[0] is the executable name, which
must be fully qualified or in the path. If stdout is True, return
a file object corresponding to the child's standard output; otherwise,
return the child's process ID.
argv must be strictly str objects to avoid encoding confusion.
"""
types = map(type, argv)
if not (min(types) == max(types) == str):
raise TypeError("executables and arguments must be str objects")
logger.debug("Running %r" % " ".join(argv))
args = GLib.spawn_async(argv=argv, flags=GLib.SpawnFlags.SEARCH_PATH,
standard_output=stdout)
if stdout:
return os.fdopen(args[2])
else:
return args[0]
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def main():
if sys.platform in ['win32', 'cygwin']:
return 0
# This script is called by gclient. gclient opens its hooks subprocesses with
# (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom
# output processing that breaks printing '\r' characters for single-line
# updating status messages as printed by curl and wget.
# Work around this by setting stderr of the update.sh process to stdin (!):
# gclient doesn't redirect stdin, and while stdin itself is read-only, a
# dup()ed sys.stdin is writable, try
# fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi')
# TODO: Fix gclient instead, http://crbug.com/95350
return subprocess.call(
[os.path.join(os.path.dirname(__file__), 'update.sh')] + sys.argv[1:],
stderr=os.fdopen(
os.dup(
sys.stdin.fileno())))
def close(self):
""" Implements GVSvgDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".svg":
self._filename += ".svg"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the SVG file.
os.system( 'dot -Tsvg:cairo -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVSvgzDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVSvgzDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-5:] != ".svgz":
self._filename += ".svgz"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the SVGZ file.
os.system( 'dot -Tsvgz -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVPngDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVPngDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".png":
self._filename += ".png"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the PNG file.
os.system( 'dot -Tpng -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVJpegDoc
#
#-------------------------------------------------------------------------------
def close(self):
""" Implements GVGifDoc.close() """
GVDocBase.close(self)
# Make sure the extension is correct
if self._filename[-4:] != ".gif":
self._filename += ".gif"
# Create a temporary dot file
(handle, tmp_dot) = tempfile.mkstemp(".gv" )
dotfile = os.fdopen(handle, "wb")
dotfile.write(self._dot.getvalue())
dotfile.close()
# Generate the GIF file.
os.system( 'dot -Tgif -o"%s" "%s"' % (self._filename, tmp_dot) )
# Delete the temporary dot file
os.remove(tmp_dot)
#-------------------------------------------------------------------------------
#
# GVPdfGvDoc
#
#-------------------------------------------------------------------------------
def set(self, key, value, timeout=None):
if timeout is None:
timeout = self.default_timeout
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
f = os.fdopen(fd, 'wb')
try:
pickle.dump(int(time() + timeout), f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
finally:
f.close()
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
pass
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _init_dirs(self):
test_dirs = ['a a', 'b', 'c\c', 'D_']
config = 'improbable'
root = tempfile.mkdtemp()
def cleanup():
try:
os.removedirs(root)
except (FileNotFoundError, OSError):
pass
os.chdir(root)
for dir_ in test_dirs:
os.mkdir(dir_, 0o0750)
f = '{0}.toml'.format(config)
flags = os.O_WRONLY | os.O_CREAT
rel_path = '{0}/{1}'.format(dir_, f)
abs_file_path = os.path.join(root, rel_path)
with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp:
fp.write("key = \"value is {0}\"\n".format(dir_))
return root, config, cleanup
def set(self, key, value, timeout=None):
if timeout is None:
timeout = self.default_timeout
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(int(time() + timeout), f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True