def _atomic_write(filename):
path = os.path.dirname(filename)
try:
file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+")
yield file
file.flush()
os.fsync(file.fileno())
os.rename(file.name, filename)
finally:
try:
os.remove(file.name)
except OSError as e:
if e.errno == 2:
pass
else:
raise e
python类fsync()的实例源码
def __save(self):
if self.__asynchronous == 0:
state = {
"version" : _BobState.CUR_VERSION,
"byNameDirs" : self.__byNameDirs,
"results" : self.__results,
"inputs" : self.__inputs,
"jenkins" : self.__jenkins,
"dirStates" : self.__dirStates,
"buildState" : self.__buildState,
}
tmpFile = self.__path+".new"
try:
with open(tmpFile, "wb") as f:
pickle.dump(state, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmpFile, self.__path)
except OSError as e:
raise ParseError("Error saving workspace state: " + str(e))
self.__dirty = False
else:
self.__dirty = True
def writeKeyToFile(key, filename):
"""Write **key** to **filename**, with ``0400`` permissions.
If **filename** doesn't exist, it will be created. If it does exist
already, and is writable by the owner of the current process, then it will
be truncated to zero-length and overwritten.
:param bytes key: A key (or some other private data) to write to
**filename**.
:param str filename: The path of the file to write to.
:raises: Any exceptions which may occur.
"""
logging.info("Writing key to file: %r", filename)
flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0)
fd = os.open(filename, flags, 0400)
os.write(fd, key)
os.fsync(fd)
os.close(fd)
def atomicWrite(dest, content, mode="w"):
try:
with open(dest + "-new", mode) as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
if os.path.isfile(dest + "-old"): # Previous incomplete write
os.rename(dest + "-old", dest + "-old-%s" % time.time())
os.rename(dest, dest + "-old")
os.rename(dest + "-new", dest)
os.unlink(dest + "-old")
return True
except Exception, err:
from Debug import Debug
logging.error(
"File %s write failed: %s, reverting..." %
(dest, Debug.formatException(err))
)
if os.path.isfile(dest + "-old") and not os.path.isfile(dest):
os.rename(dest + "-old", dest)
return False
def smb2Flush(connId, smbServer, recvPacket):
connData = smbServer.getConnectionData(connId)
respSMBCommand = smb2.SMB2Flush_Response()
flushRequest = smb2.SMB2Flush(recvPacket['Data'])
if connData['OpenedFiles'].has_key(str(flushRequest['FileID'])):
fileHandle = connData['OpenedFiles'][str(flushRequest['FileID'])]['FileHandle']
errorCode = STATUS_SUCCESS
try:
os.fsync(fileHandle)
except Exception, e:
smbServer.log("SMB2_FLUSH %s" % e, logging.ERROR)
errorCode = STATUS_ACCESS_DENIED
else:
errorCode = STATUS_INVALID_HANDLE
smbServer.setConnectionData(connId, connData)
return [respSMBCommand], None, errorCode
def save(url: str, local_path: str) -> str:
"""
"""
with closing(requests.get(url, stream=True)) as response:
if response.status_code != 200:
print((
'[ERROR]: Unable to download remote package. Has your'
'authorized URL expired? Is there internet connectivity?'
))
with open(local_path, 'wb') as f:
for chunk in response:
f.write(chunk)
f.flush()
os.fsync(f.fileno())
return local_path
def callback(params, iteration, args):
global X_test
global y_test
if (iteration + 1) in test_iters:
idx = np.where(test_iters == (iteration + 1))[0][0]
outfile1 = outfile1s[idx]
outfile2 = outfile2s[idx]
params_dict = unflatten_dict(params, args[0])
model.update_hypers(params_dict)
# We make predictions for the test set
mf, vf = model.predict_f(X_test)
mf, vf = mf[:, 0], vf[:, 0]
# We compute the test error and log lik
test_nll = compute_nll(y_test, mf, vf, 'cdf')
outfile2.write('%.6f\n' % test_nll)
outfile2.flush()
os.fsync(outfile2.fileno())
test_error = compute_error(y_test, mf, vf, 'cdf')
outfile1.write('%.6f\n' % test_error)
outfile1.flush()
os.fsync(outfile1.fileno())
# train
def callback(params, iteration, args):
global X_test
global y_test
if (iteration + 1) in test_iters:
idx = np.where(test_iters == (iteration + 1))[0][0]
outfile1 = outfile1s[idx]
outfile2 = outfile2s[idx]
params_dict = unflatten_dict(params, args[0])
model.update_hypers(params_dict)
# We make predictions for the test set
mf, vf = model.predict_f(X_test)
mf, vf = mf[:, 0], vf[:, 0]
# We compute the test error and log lik
test_nll = compute_nll(y_test, mf, vf, 'cdf')
outfile2.write('%.6f\n' % test_nll)
outfile2.flush()
os.fsync(outfile2.fileno())
test_error = compute_error(y_test, mf, vf, 'cdf')
outfile1.write('%.6f\n' % test_error)
outfile1.flush()
os.fsync(outfile1.fileno())
# train
def atomic_output(output, filename=None, mode=0o0644, quiet=False):
if filename:
tmp = None
try:
tmp = tempfile.NamedTemporaryFile(prefix='tmp2html.',
dir=os.path.dirname(filename),
delete=False)
tmp.write(output.encode('utf8'))
tmp.flush()
os.fsync(tmp.fileno())
except IOError as e:
print(e)
except Exception:
pass
finally:
if tmp:
tmp.close()
os.chmod(tmp.name, mode)
os.rename(tmp.name, filename)
if not quiet:
print('Wrote HTML to: {}'.format(filename))
else:
print(output.encode('utf8'))
def atomic_write(filepath, binary=False, fsync=False):
""" Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.
:param filepath: the file path to be opened
:param binary: whether to open the file in a binary mode instead of textual
:param fsync: whether to force write the file to disk
"""
tmppath = filepath + '~'
while os.path.isfile(tmppath):
tmppath += '~'
try:
with open(tmppath, 'wb' if binary else 'w') as file:
yield file
if fsync:
file.flush()
os.fsync(file.fileno())
replace(tmppath, filepath)
finally:
try:
os.remove(tmppath)
except (IOError, OSError):
pass
def handle_result(self, data, **kwargs):
if self.fp is None:
return
fp = self.fp
host_identifier = kwargs.get('host_identifier')
created = dt.datetime.utcnow().isoformat()
try:
for item in extract_results(data):
json_dump({
'@version': 1,
'@host_identifier': host_identifier,
'@timestamp': item.timestamp.isoformat(),
'log_type': 'result',
'action': item.action,
'columns': item.columns,
'name': item.name,
'created': created,
}, fp)
fp.write('\r\n')
finally:
fp.flush()
os.fsync(fp.fileno())
def test_write(self):
with gzip.GzipFile(self.filename, 'wb') as f:
f.write(data1 * 50)
# Try flush and fileno.
f.flush()
f.fileno()
if hasattr(os, 'fsync'):
os.fsync(f.fileno())
f.close()
# Test multiple close() calls.
f.close()
# The following test_write_xy methods test that write accepts
# the corresponding bytes-like object type as input
# and that the data written equals bytes(xy) in all cases.
def test_write(self):
with gzip.GzipFile(self.filename, 'wb') as f:
f.write(data1 * 50)
# Try flush and fileno.
f.flush()
f.fileno()
if hasattr(os, 'fsync'):
os.fsync(f.fileno())
f.close()
# Test multiple close() calls.
f.close()
# The following test_write_xy methods test that write accepts
# the corresponding bytes-like object type as input
# and that the data written equals bytes(xy) in all cases.
def refresh_token(self, email, refresh_token):
api_key = os.path.join(TOKEN_DIR, self.firebase.api_key)
self.user = self.firebase.auth().refresh(refresh_token)
self.user['email'] = email
self.user['expiration'] = time.time() + API_KEY_COOLDOWN
self.expired = False
# if not os.path.exists(api_key) or \
# time.time() - os.path.getmtime(api_key) > HALF_HOUR:
# Rename to ensure atomic writes to json file
# (technically more safe, but slower)
tmp_api_key = os.path.join(tempfile.gettempdir(),
"api_key_%s" % rand_string(32))
with open(tmp_api_key, 'w') as f:
f.write(json.dumps(self.user))
f.flush()
os.fsync(f.fileno())
f.close()
os.rename(tmp_api_key, api_key)
def smb2Flush(connId, smbServer, recvPacket):
connData = smbServer.getConnectionData(connId)
respSMBCommand = smb2.SMB2Flush_Response()
flushRequest = smb2.SMB2Flush(recvPacket['Data'])
if connData['OpenedFiles'].has_key(str(flushRequest['FileID'])):
fileHandle = connData['OpenedFiles'][str(flushRequest['FileID'])]['FileHandle']
errorCode = STATUS_SUCCESS
try:
os.fsync(fileHandle)
except Exception, e:
smbServer.log("SMB2_FLUSH %s" % e, logging.ERROR)
errorCode = STATUS_ACCESS_DENIED
else:
errorCode = STATUS_INVALID_HANDLE
smbServer.setConnectionData(connId, connData)
return [respSMBCommand], None, errorCode
def _send_command(self, command, parameters):
try:
if self._serial.inWaiting() != 0:
raise SIReaderException(
'Input buffer must be empty before sending command. Currently %s bytes in the input buffer.' % self._serial.inWaiting())
command_string = command + int2byte(len(parameters)) + parameters
crc = SIReader._crc(command_string)
cmd = SIReader.STX + command_string + crc + SIReader.ETX
if self._debug:
print("==>> command '%s', parameters %s, crc %s" % (hexlify(command).decode('ascii'),
' '.join(
[hexlify(int2byte(c)).decode('ascii') for c in
parameters]),
hexlify(crc).decode('ascii'),
))
self._serial.write(cmd)
except (SerialException, OSError) as msg:
raise SIReaderException('Could not send command: %s' % msg)
if self._logfile:
self._logfile.write('s %s %s\n' % (datetime.now(), cmd))
self._logfile.flush()
os.fsync(self._logfile)
return self._read_command()
def _write(self):
if threading.currentThread().isDaemon():
log.warning('daemon thread cannot write wallet')
return
if not self.modified:
return
s = json.dumps(self.data, indent=4, sort_keys=True)
temp_path = "%s.tmp.%s" % (self.path, os.getpid())
with open(temp_path, "w") as f:
f.write(s)
f.flush()
os.fsync(f.fileno())
if os.path.exists(self.path):
mode = os.stat(self.path).st_mode
else:
mode = stat.S_IREAD | stat.S_IWRITE
# perform atomic write on POSIX systems
try:
os.rename(temp_path, self.path)
except:
os.remove(self.path)
os.rename(temp_path, self.path)
os.chmod(self.path, mode)
self.modified = False
pytables.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def flush(self, fsync=False):
"""
Force all buffered modifications to be written to disk.
Parameters
----------
fsync : bool (default False)
call ``os.fsync()`` on the file handle to force writing to disk.
Notes
-----
Without ``fsync=True``, flushing may not guarantee that the OS writes
to disk. With fsync, the operation will block until the OS claims the
file has been written; however, other caching layers may still
interfere.
"""
if self._handle is not None:
self._handle.flush()
if fsync:
try:
os.fsync(self._handle.fileno())
except:
pass
def run(self):
file_size_dl = 0
response = requests.get(self.url, stream=True)
data_chunks = response.iter_content(chunk_size=1024)
if not os.path.exists('bin'):
os.mkdir('bin')
with open('bin/ffmpeg.7z', 'wb') as f:
while not self.is_aborted():
try:
chunk = next(data_chunks)
file_size_dl += len(chunk)
logger.info("FFmpeg downloader: Downloaded chunk: {chunk}".format(chunk=len(chunk)))
logger.info("FFmpeg downloader: Total downloaded so far: {total}".format(total=file_size_dl))
logger.info("FFmpeg downloader: Remaining: {r}".format(r=self.file_size - file_size_dl))
if chunk:
f.write(chunk)
f.flush()
# This makes the download super slow.
# os.fsync(f.fileno())
wx.CallAfter(self.parent.ff_update, message=file_size_dl)
except StopIteration:
wx.CallAfter(self.parent.ff_complete)
break
def store_index(self,ipath):
self.logger.info("# indexed_fasta.store_index('%s')" % ipath)
# write to tmp-file first and in the end rename in order to have this atomic
# otherwise parallel building of the same index may screw it up.
import tempfile
tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
for chrom in sorted(self.chrom_stats.keys()):
ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))
# make sure everything is on disk
os.fsync(tmp)
tmp.close()
# make it accessible to everyone
import stat
os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
# this is atomic on POSIX as we have created tmp in the same directory,
# therefore same filesystem
os.rename(tmp.name,ipath)
def store_index(self,ipath):
debug("# indexed_fasta.store_index('%s')" % ipath)
# write to tmp-file first and in the end rename in order to have this atomic
# otherwise parallel building of the same index may screw it up.
import tempfile
tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
for chrom in sorted(self.chrom_stats.keys()):
ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))
# make sure everything is on disk
os.fsync(tmp)
tmp.close()
# make it accessible to everyone
import stat
os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
# this is atomic on POSIX as we have created tmp in the same directory,
# therefore same filesystem
os.rename(tmp.name,ipath)
def write_image(self, filename):
with open(filename, 'rb') as src, \
open(self.storage.path, 'wb') as dst:
src.seek(0, os.SEEK_END)
size = src.tell()
src.seek(0, os.SEEK_SET)
count = 0
stat = time() + 3
while True:
data = src.read(1024*1024)
if not data:
break
dst.write(data)
dst.flush()
os.fsync(dst.fileno())
count += len(data)
if time() > stat:
stat += 3
print("writing image {:.0%}".format(count/size))
dst.flush()
os.fsync(dst.fileno())
def copy(self, sync=True, verify=True):
"""
The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time.
"""
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync = True, verify = True):
""" The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time. """
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync=True, verify=True):
"""
The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time.
"""
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync=True, verify=True):
"""
The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time.
"""
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync = True, verify = True):
""" The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time. """
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync = True, verify = True):
""" The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time. """
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync = True, verify = True):
""" The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time. """
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()
def copy(self, sync = True, verify = True):
""" The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
device synchronization from time to time in order to make sure we do
not get stuck in 'fsync()' for too long time. The problem is that the
kernel synchronizes block devices when the file is closed. And the
result is that if the user interrupts us while we are copying the data,
the program will be blocked in 'close()' waiting for the block device
synchronization, which may last minutes for slow USB stick. This is
very bad user experience, and we work around this effect by
synchronizing from time to time. """
self._tune_block_device()
try:
BmapCopy.copy(self, sync, verify)
except:
raise
finally:
self._restore_bdev_settings()