def _validate_lockfile(self):
"""Check existence and validity of lockfile.
If the lockfile exists, but contains an invalid PID
or the PID of a non-existant process, it is removed.
"""
try:
with open(self.lockfile) as fp:
s = fp.read()
except Exception:
return
try:
pid = int(s)
except ValueError:
return self.release()
from background import _process_exists
if not _process_exists(pid):
self.release()
python类open()的实例源码
def atomic_writer(file_path, mode):
"""Atomic file writer.
:param file_path: path of file to write to.
:type file_path: ``unicode``
:param mode: sames as for `func:open`
:type mode: string
.. versionadded:: 1.12
Context manager that ensures the file is only written if the write
succeeds. The data is first written to a temporary file.
"""
temp_suffix = '.aw.temp'
temp_file_path = file_path + temp_suffix
with open(temp_file_path, mode) as file_obj:
try:
yield file_obj
os.rename(temp_file_path, file_path)
finally:
try:
os.remove(temp_file_path)
except (OSError, IOError):
pass
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def read_text_file(filename):
"""Return the contents of *filename*.
Try to decode the file contents with utf-8, the preferred system encoding
(e.g., cp1252 on some Windows machines), and latin1, in that order.
Decoding a byte string with latin1 will never raise an error. In the worst
case, the returned string will contain some garbage characters.
"""
with open(filename, 'rb') as fp:
data = fp.read()
encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
for enc in encodings:
try:
data = data.decode(enc)
except UnicodeDecodeError:
continue
break
assert type(data) != bytes # Latin1 should have worked.
return data
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def read_text_file(filename):
"""Return the contents of *filename*.
Try to decode the file contents with utf-8, the preferred system encoding
(e.g., cp1252 on some Windows machines), and latin1, in that order.
Decoding a byte string with latin1 will never raise an error. In the worst
case, the returned string will contain some garbage characters.
"""
with open(filename, 'rb') as fp:
data = fp.read()
encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
for enc in encodings:
try:
data = data.decode(enc)
except UnicodeDecodeError:
continue
break
assert type(data) != bytes # Latin1 should have worked.
return data
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# All *open() methods are registered here.
def _mkstemp_inner(dir, pre, suf, flags):
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0600)
_set_cloexec(fd)
return (fd, _os.path.abspath(file))
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
raise
raise IOError, (_errno.EEXIST, "No usable temporary file name found")
# User visible interfaces.
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
f = open(os.path.join(self._path, subpath), 'r')
try:
if self._factory:
msg = self._factory(f)
else:
msg = MaildirMessage(f)
finally:
f.close()
subdir, name = os.path.split(subpath)
msg.set_subdir(subdir)
if self.colon in name:
msg.set_info(name.split(self.colon)[-1])
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
return msg
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
f = open(path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
self._dump_message(message, f)
if isinstance(message, MHMessage):
self._dump_sequences(message, key)
finally:
if self._locked:
_unlock_file(f)
finally:
_sync_close(f)
def get_terminal_size():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LINES'], os.env['COLUMNS'])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
def __save(self):
if self.__asynchronous == 0:
state = {
"version" : _BobState.CUR_VERSION,
"byNameDirs" : self.__byNameDirs,
"results" : self.__results,
"inputs" : self.__inputs,
"jenkins" : self.__jenkins,
"dirStates" : self.__dirStates,
"buildState" : self.__buildState,
}
tmpFile = self.__path+".new"
try:
with open(tmpFile, "wb") as f:
pickle.dump(state, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmpFile, self.__path)
except OSError as e:
raise ParseError("Error saving workspace state: " + str(e))
self.__dirty = False
else:
self.__dirty = True
def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open pickle file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: Python object
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return pickle.dump(obj, file_obj, protocol=-1)
# Set up default manager and register built-in serializers
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in backwardcompat due to differences on AIX and Jython,
that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_NOFOLLOW'):
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: # AIX and Jython
# WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
return file_uid
def writeKeyToFile(key, filename):
"""Write **key** to **filename**, with ``0400`` permissions.
If **filename** doesn't exist, it will be created. If it does exist
already, and is writable by the owner of the current process, then it will
be truncated to zero-length and overwritten.
:param bytes key: A key (or some other private data) to write to
**filename**.
:param str filename: The path of the file to write to.
:raises: Any exceptions which may occur.
"""
logging.info("Writing key to file: %r", filename)
flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0)
fd = os.open(filename, flags, 0400)
os.write(fd, key)
os.fsync(fd)
os.close(fd)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
# According to the FHS 2.3 section on PID files in /var/run:
#
# The file must consist of the process identifier in
# ASCII-encoded decimal, followed by a newline character. For
# example, if crond was process number 25, /var/run/crond.pid
# would contain three characters: two, five, and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def read_text_file(filename):
"""Return the contents of *filename*.
Try to decode the file contents with utf-8, the preferred system encoding
(e.g., cp1252 on some Windows machines), and latin1, in that order.
Decoding a byte string with latin1 will never raise an error. In the worst
case, the returned string will contain some garbage characters.
"""
with open(filename, 'rb') as fp:
data = fp.read()
encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
for enc in encodings:
try:
data = data.decode(enc)
except UnicodeDecodeError:
continue
break
assert type(data) != bytes # Latin1 should have worked.
return data
def do_magic(self):
if OS_WIN:
try:
if os.path.exists(LOCK_PATH):
os.unlink(LOCK_PATH)
self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except EnvironmentError as err:
if err.errno == 13:
self.is_running = True
else:
raise
else:
try:
self.fh = open(LOCK_PATH, 'w')
fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except EnvironmentError as err:
if self.fh is not None:
self.is_running = True
else:
raise
def wipe(self):
filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(filter_bitmap_fd, 0x1000000)
filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(0x1000000):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
def load(cls, file_obj):
"""Load serialized object from open JSON file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from JSON file
:rtype: object
"""
return json.load(file_obj)
def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open JSON file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: JSON-serializable data structure
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return json.dump(obj, file_obj, indent=2, encoding='utf-8')
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return cPickle.load(file_obj)
def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open pickle file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: Python object
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return cPickle.dump(obj, file_obj, protocol=-1)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return pickle.load(file_obj)
def acquire(self, blocking=True):
"""Acquire the lock if possible.
If the lock is in use and ``blocking`` is ``False``, return
``False``.
Otherwise, check every `self.delay` seconds until it acquires
lock or exceeds `self.timeout` and raises an `~AcquisitionError`.
"""
start = time.time()
while True:
self._validate_lockfile()
try:
fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
with os.fdopen(fd, 'w') as fd:
fd.write('{0}'.format(os.getpid()))
break
except OSError as err:
if err.errno != errno.EEXIST: # pragma: no cover
raise
if self.timeout and (time.time() - start) >= self.timeout:
raise AcquisitionError('Lock acquisition timed out.')
if not blocking:
return False
time.sleep(self.delay)
self._locked = True
return True
def _load(self):
"""Load cached settings from JSON file `self._filepath`."""
self._nosave = True
d = {}
with open(self._filepath, 'rb') as file_obj:
for key, value in json.load(file_obj, encoding='utf-8').items():
d[key] = value
self.update(d)
self._original = deepcopy(d)
self._nosave = False