def _convert_pflags(self, pflags):
"convert SFTP-style open() flags to python's os.open() flags"
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
python类O_APPEND的实例源码
def _create_base_aws_cli_config_files_if_needed(adfs_config):
def touch(fname, mode=0o600):
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags, mode)) as f:
try:
os.utime(fname, None)
finally:
f.close()
aws_config_root = os.path.dirname(adfs_config.aws_config_location)
if not os.path.exists(aws_config_root):
os.mkdir(aws_config_root, 0o700)
if not os.path.exists(adfs_config.aws_credentials_location):
touch(adfs_config.aws_credentials_location)
aws_credentials_root = os.path.dirname(adfs_config.aws_credentials_location)
if not os.path.exists(aws_credentials_root):
os.mkdir(aws_credentials_root, 0o700)
if not os.path.exists(adfs_config.aws_config_location):
touch(adfs_config.aws_config_location)
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def touch(self, path, mode=0o644, dir_fd=None, **kwargs):
"""Create an empty file with selected permissions.
Args:
path (str): File path.
mode (:obj:`int`, optional): File access mode. Defaults to 0o644.
dir_fd (optional): If set, it should be a file descriptor open to a
directory and path should then be relative to that directory.
Defaults to None.
**kwargs: Arbitrary keyword arguments.
"""
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(
os.open(path, flags=flags, mode=mode, dir_fd=dir_fd) ) as f:
os.utime(
f.fileno() if os.utime in os.supports_fd else path,
dir_fd=None if os.supports_fd else dir_fd,
**kwargs )
def flags(self):
'''
Adapted from http://hg.python.org/cpython/file/84cf25da86e8/Lib/_pyio.py#l154
See also open(2) which explains the modes
os.O_BINARY and os.O_TEXT are only available on Windows.
'''
return (
((self.reading and not self.updating) and os.O_RDONLY or 0) |
((self.writing and not self.updating) and os.O_WRONLY or 0) |
((self.creating_exclusively and not self.updating) and os.O_EXCL or 0) |
(self.updating and os.O_RDWR or 0) |
(self.appending and os.O_APPEND or 0) |
((self.writing or self.creating_exclusively) and os.O_CREAT or 0) |
(self.writing and os.O_TRUNC or 0) |
((self.binary and hasattr(os, 'O_BINARY')) and os.O_BINARY or 0) |
((self.text and hasattr(os, 'O_TEXT')) and os.O_TEXT or 0)
)
def _create_base_aws_cli_config_files_if_needed(google_config):
def touch(fname, mode=0o600):
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags, mode)) as f:
try:
os.utime(fname, None)
finally:
f.close()
aws_config_root = os.path.dirname(google_config.aws_config_location)
if not os.path.exists(aws_config_root):
os.mkdir(aws_config_root, 0o700)
if not os.path.exists(google_config.aws_credentials_location):
touch(google_config.aws_credentials_location)
aws_credentials_root = os.path.dirname(google_config.aws_credentials_location)
if not os.path.exists(aws_credentials_root):
os.mkdir(aws_credentials_root, 0o700)
if not os.path.exists(google_config.aws_config_location):
touch(google_config.aws_config_location)
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def mode_to_flags(mode: str):
if len(set("awrb+") | set(mode)) > 5:
raise ValueError('Invalid mode %s' % repr(mode))
if len(set(mode) & set("awr")) > 1:
raise ValueError('must have exactly one of create/read/write/append mode')
flags = 0
flags |= os.O_NONBLOCK
if '+' in mode:
flags |= os.O_CREAT
if "a" in mode:
flags |= os.O_RDWR
flags |= os.O_APPEND
elif "w" in mode:
flags |= os.O_RDWR
elif "r" in mode:
flags |= os.O_RDONLY
return flags
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def flags(self, *which):
import fcntl, os
if which:
if len(which) > 1:
raise TypeError, 'Too many arguments'
which = which[0]
else: which = '?'
l_flags = 0
if 'n' in which: l_flags = l_flags | os.O_NDELAY
if 'a' in which: l_flags = l_flags | os.O_APPEND
if 's' in which: l_flags = l_flags | os.O_SYNC
file = self._file_
if '=' not in which:
cur_fl = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
if '!' in which: l_flags = cur_fl & ~ l_flags
else: l_flags = cur_fl | l_flags
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFL, l_flags)
if 'c' in which:
arg = ('!' not in which) # 0 is don't, 1 is do close on exec
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFD, arg)
if '?' in which:
which = '' # Return current flags
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
if os.O_APPEND & l_flags: which = which + 'a'
if fcntl.fcntl(file.fileno(), fcntl.F_GETFD, 0) & 1:
which = which + 'c'
if os.O_NDELAY & l_flags: which = which + 'n'
if os.O_SYNC & l_flags: which = which + 's'
return which
def get_event_log_handle(sec, flags=os.O_APPEND | os.O_CREAT | os.O_WRONLY):
localtime = time.localtime(sec)
_ = os.path.join(config.LOG_DIR, "%d-%02d-%02d.log" % (localtime.tm_year, localtime.tm_mon, localtime.tm_mday))
if _ != getattr(_thread_data, "event_log_path", None):
if not os.path.exists(_):
open(_, "w+").close()
os.chmod(_, DEFAULT_EVENT_LOG_PERMISSIONS)
_thread_data.event_log_path = _
_thread_data.event_log_handle = os.open(_thread_data.event_log_path, flags)
return _thread_data.event_log_handle
def get_error_log_handle(flags=os.O_APPEND | os.O_CREAT | os.O_WRONLY):
if not hasattr(_thread_data, "error_log_handle"):
_ = os.path.join(config.LOG_DIR, "error.log")
if not os.path.exists(_):
open(_, "w+").close()
os.chmod(_, DEFAULT_ERROR_LOG_PERMISSIONS)
_thread_data.error_log_path = _
_thread_data.error_log_handle = os.open(_thread_data.error_log_path, flags)
return _thread_data.error_log_handle
def __init__(self, server, filename, flags, attrs):
self.server = server
openFlags = 0
if flags & FXF_READ == FXF_READ and flags & FXF_WRITE == 0:
openFlags = os.O_RDONLY
if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == 0:
openFlags = os.O_WRONLY
if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == FXF_READ:
openFlags = os.O_RDWR
if flags & FXF_APPEND == FXF_APPEND:
openFlags |= os.O_APPEND
if flags & FXF_CREAT == FXF_CREAT:
openFlags |= os.O_CREAT
if flags & FXF_TRUNC == FXF_TRUNC:
openFlags |= os.O_TRUNC
if flags & FXF_EXCL == FXF_EXCL:
openFlags |= os.O_EXCL
if attrs.has_key("permissions"):
mode = attrs["permissions"]
del attrs["permissions"]
else:
mode = 0777
fd = server.avatar._runAsUser(os.open, filename, openFlags, mode)
if attrs:
server.avatar._runAsUser(server._setAttrs, filename, attrs)
self.fd = fd
def flags(self, *which):
import fcntl, os
if which:
if len(which) > 1:
raise TypeError, 'Too many arguments'
which = which[0]
else: which = '?'
l_flags = 0
if 'n' in which: l_flags = l_flags | os.O_NDELAY
if 'a' in which: l_flags = l_flags | os.O_APPEND
if 's' in which: l_flags = l_flags | os.O_SYNC
file = self._file_
if '=' not in which:
cur_fl = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
if '!' in which: l_flags = cur_fl & ~ l_flags
else: l_flags = cur_fl | l_flags
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFL, l_flags)
if 'c' in which:
arg = ('!' not in which) # 0 is don't, 1 is do close on exec
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFD, arg)
if '?' in which:
which = '' # Return current flags
l_flags = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
if os.O_APPEND & l_flags: which = which + 'a'
if fcntl.fcntl(file.fileno(), fcntl.F_GETFD, 0) & 1:
which = which + 'c'
if os.O_NDELAY & l_flags: which = which + 'n'
if os.O_SYNC & l_flags: which = which + 's'
return which
def file_flags_to_mode(flags):
modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
if flags & os.O_APPEND:
mode = mode.replace('w', 'a', 1)
mode = mode.replace('w+', 'r+')
# possible values: r, w, a, r+, a+
return mode
def file_flags_to_mode(flags):
modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
if flags & os.O_APPEND:
mode = mode.replace('w', 'a', 1)
mode = mode.replace('w+', 'r+')
# possible values: r, w, a, r+, a+
return mode
def touch(fname, mode=0o666, dir_fd=None, **kwargs):
"""
Implementation of coreutils touch
http://stackoverflow.com/a/1160227
"""
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
os.utime(f.fileno() if os.utime in os.supports_fd else fname,
dir_fd=None if os.supports_fd else dir_fd, **kwargs)
def write(self, offset, data):
"""
Write C{data} into this file at position C{offset}. Extending the
file past its original end is expected. Unlike python's normal
C{write()} methods, this method cannot do a partial write: it must
write all of C{data} or else return an error.
The default implementation checks for an attribute on C{self} named
C{writefile}, and if present, performs the write operation on the
python file-like object found there. The attribute is named
differently from C{readfile} to make it easy to implement read-only
(or write-only) files, but if both attributes are present, they should
refer to the same file.
@param offset: position in the file to start reading from.
@type offset: int or long
@param data: data to write into the file.
@type data: str
@return: an SFTP error code like L{SFTP_OK}.
"""
writefile = getattr(self, 'writefile', None)
if writefile is None:
return SFTP_OP_UNSUPPORTED
try:
# in append mode, don't care about seeking
if (self.__flags & os.O_APPEND) == 0:
if self.__tell is None:
self.__tell = writefile.tell()
if offset != self.__tell:
writefile.seek(offset)
self.__tell = offset
writefile.write(data)
writefile.flush()
except IOError, e:
self.__tell = None
return SFTPServer.convert_errno(e.errno)
if self.__tell is not None:
self.__tell += len(data)
return SFTP_OK
def open(self, path, flags, attr):
"""
Open a file on the server and create a handle for future operations
on that file. On success, a new object subclassed from L{SFTPHandle}
should be returned. This handle will be used for future operations
on the file (read, write, etc). On failure, an error code such as
L{SFTP_PERMISSION_DENIED} should be returned.
C{flags} contains the requested mode for opening (read-only,
write-append, etc) as a bitset of flags from the C{os} module:
- C{os.O_RDONLY}
- C{os.O_WRONLY}
- C{os.O_RDWR}
- C{os.O_APPEND}
- C{os.O_CREAT}
- C{os.O_TRUNC}
- C{os.O_EXCL}
(One of C{os.O_RDONLY}, C{os.O_WRONLY}, or C{os.O_RDWR} will always
be set.)
The C{attr} object contains requested attributes of the file if it
has to be created. Some or all attribute fields may be missing if
the client didn't specify them.
@note: The SFTP protocol defines all files to be in "binary" mode.
There is no equivalent to python's "text" mode.
@param path: the requested path (relative or absolute) of the file
to be opened.
@type path: str
@param flags: flags or'd together from the C{os} module indicating the
requested mode for opening the file.
@type flags: int
@param attr: requested attributes of the file if it is newly created.
@type attr: L{SFTPAttributes}
@return: a new L{SFTPHandle} I{or error code}.
@rtype L{SFTPHandle}
"""
return SFTP_OP_UNSUPPORTED
def touch(fname, mode=0o666, dir_fd=None, **kwargs):
## After http://stackoverflow.com/questions/1158076/implement-touch-using-python
if sys.version_info < (3, 3, 0):
with open(fname, 'a'):
os.utime(fname, None) # set to now
else:
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
os.utime(f.fileno() if os.utime in os.supports_fd else fname,
dir_fd=None if os.supports_fd else dir_fd, **kwargs)
def open(self, path, flags, attr):
try:
if (flags & os.O_CREAT) and (attr is not None):
attr._flags &= ~attr.FLAG_PERMISSIONS
paramiko.SFTPServer.set_file_attr(self._parsePath(path), attr)
if flags & os.O_WRONLY:
if flags & os.O_APPEND:
fstr = 'ab'
else:
fstr = 'wb'
elif flags & os.O_RDWR:
if flags & os.O_APPEND:
fstr = 'a+b'
else:
fstr = 'r+b'
else:
# O_RDONLY (== 0)
fstr = 'rb'
f = self.client.open(self._parsePath(path), fstr)
fobj = paramiko.SFTPHandle(flags)
fobj.filename = self._parsePath(path)
fobj.readfile = f
fobj.writefile = f
fobj.client = self.client
return fobj
# TODO: verify (socket.error when stopping file upload/download)
except IOError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
def file_flags_to_mode(flags):
modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
if flags & os.O_APPEND:
mode = mode.replace('w', 'a', 1)
mode = mode.replace('w+', 'r+')
# possible values: r, w, a, r+, a+
return mode
def file_flags_to_mode(flags):
modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
if flags & os.O_APPEND:
mode = mode.replace('w', 'a', 1)
mode = mode.replace('w+', 'r+')
# possible values: r, w, a, r+, a+
return mode
def open(self, inode, flags, ctx):
flags_writable = os.O_WRONLY | os.O_RDWR | os.O_APPEND
if flags & flags_writable > 0:
raise llfuse.FUSEError(errno.EROFS)
return self._yarp_cell_relative_offset_to_handle(inode)
def test_send(self):
d1 = b"Come again?"
d2 = b"I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
with open(TESTFN, 'rb') as file:
self.assertEqual(file.read(), self.d + d1 + d2)
def file_flags_to_mode(flags):
modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
if flags & os.O_APPEND:
mode = mode.replace('w', 'a', 1)
mode = mode.replace('w+', 'r+')
# possible values: r, w, a, r+, a+
return mode