def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
old_subpath = self._lookup(key)
temp_key = self.add(message)
temp_subpath = self._lookup(temp_key)
if isinstance(message, MaildirMessage):
# temp's subdir and suffix were specified by message.
dominant_subpath = temp_subpath
else:
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
if self.colon in dominant_subpath:
suffix = self.colon + dominant_subpath.split(self.colon)[-1]
else:
suffix = ''
self.discard(key)
new_path = os.path.join(self._path, subdir, key + suffix)
os.rename(os.path.join(self._path, temp_subpath), new_path)
if isinstance(message, MaildirMessage):
os.utime(new_path, (os.path.getatime(new_path),
message.get_date()))
python类utime()的实例源码
def _caches_to_file(cache_path, start, end, name, cb, concat):
start_time = time()
if concat:
all_data = []
for i in range(start, end):
data = load(os.path.join(cache_path, "{0}.jb".format(i)))
all_data.extend(data)
dump(all_data, name, 3)
else:
target_path = os.path.join(cache_path, name[:-3])
if not os.path.exists(target_path):
os.makedirs(target_path)
for i in range(start, end):
src_file_path = os.path.join(cache_path, "{0}.jb".format(i))
basename = os.path.basename(src_file_path)
target_file_path = os.path.join(target_path, basename)
shutil.move(src_file_path, target_file_path)
finished_flag = os.path.join(target_path, '.finished')
with open(finished_flag, 'a'):
os.utime(finished_flag, None)
logging.debug("Finished saving data to {0}. Took {1}s".format(name, time()-start_time))
cb()
touch_compiler_timestamps.py 文件源码
项目:mongo_module_ninja
作者: RedBeard0531
项目源码
文件源码
阅读 45
收藏 0
点赞 0
评论 0
def run_if_needed(base_file, then_file, now_file):
# Python uses doubles for mtime so it can't precisely represent linux's
# nanosecond precision. Round up to next whole second to ensure we get a
# stable timestamp that is guaranteed to be >= the timestamp of the
# compiler. This also avoids issues if the compiler is on a file system
# with high-precision timestamps, but the build directory isn't.
base_stat = os.stat(base_file)
mtime = math.ceil(base_stat.st_mtime)
atime = math.ceil(base_stat.st_atime)
if (os.path.exists(then_file)
and os.path.exists(now_file)
and os.stat(then_file).st_mtime == mtime):
return # Don't need to do anything.
createIfNeeded(now_file)
os.utime(now_file, None) # None means now
createIfNeeded(then_file)
os.utime(then_file, (atime, mtime))
def test_file_save_outside_roamer(self):
digest = self.session.get_digest('egg.txt')
path = os.path.join(TEST_DIR, 'egg.txt')
with open(path, 'a') as egg_file:
egg_file.write(' extra content')
os.utime(path, (1330712280, 1330712292))
self.session.process()
second_session = MockSession(DOC_DIR)
second_session.add_entry('egg.txt', digest)
second_session.process()
path = os.path.join(DOC_DIR, 'egg.txt')
self.assertTrue(os.path.exists(path))
with open(path, 'r') as egg_file:
self.assertEqual(egg_file.read(), 'egg file content extra content')
def copystat(cls, src, dest, copy_own=True, copy_xattr=True):
"""
Copy all stat info (mode bits, atime, mtime, flags) from `src` to
`dest`. If `copy_own=True`, the uid and gid are also copied.
If `copy_xattr=True`, the extended attributes are also copied
(only available on Linux).
"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dest, mode=mode)
os.utime(dest, ns=(st.st_atime_ns, st.st_mtime_ns))
if hasattr(st, "st_flags"):
os.chflags(dest, flags=st.st_flags)
if copy_own:
os.chown(dest, uid=st.st_uid, gid=st.st_gid)
if copy_xattr:
cls.copyxattr(src, dest)
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError, why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
else:
raise
def blankAFile(file_path):
'''
truncate a file to zero bytes, and preserve its original modification time
Adapted from 'Keeping Large intermediate files' (http://www.ruffus.org.uk/faq.html)
:param file: Input file path
:return: None
'''
if os.path.exists(file_path):
timeInfo = os.stat(file_path) # retrieve current time stamp of the file
try:
f = open(file_path,'w')
except IOError:
pass
else:
f.truncate(0)
f.close()
# change the time of the file back to what it was
os.utime(file_path,(timeInfo.st_atime, timeInfo.st_mtime))
print file_path + ' blanked to save disk-space.'
else:
print 'blankAFile: ' + file_path + ' not found.'
sys.exit(1)
def try_utime(self, filename, last_modified_hdr):
"""Try to set the last-modified time of the given file."""
if last_modified_hdr is None:
return
if not os.path.isfile(encodeFilename(filename)):
return
timestr = last_modified_hdr
if timestr is None:
return
filetime = timeconvert(timestr)
if filetime is None:
return filetime
# Ignore obviously invalid dates
if filetime == 0:
return
try:
os.utime(filename, (time.time(), filetime))
except Exception:
pass
return filetime
def handle(self):
local_item_tmp_path = self.local_parent_path + get_tmp_filename(self.item_name)
try:
with open(local_item_tmp_path, 'wb') as f:
self.drive.download_file(file=f, size=self._item.size, item_id=self._item.id)
local_sha1 = hasher.hash_value(local_item_tmp_path)
item_sha1 = None
if self._item.file_props is not None and self._item.file_props.hashes is not None:
item_sha1 = self._item.file_props.hashes.sha1
if item_sha1 is None:
self.logger.warn('Remote file %s has not sha1 property, we keep the file but cannot check correctness of it',
self.local_path)
elif local_sha1 != item_sha1:
self.logger.error('Mismatch hash of download file %s : remote:%s,%d local:%s %d', self.local_path,
self._item.file_props.hashes.sha1, self._item.size, local_sha1, os.path.getsize(local_item_tmp_path))
return
os.rename(local_item_tmp_path, self.local_path)
t = datetime_to_timestamp(self._item.modified_time)
os.utime(self.local_path, (t, t))
os.chown(self.local_path, OS_USER_ID, OS_USER_GID)
self.items_store.update_item(self._item, ItemRecordStatuses.DOWNLOADED)
except (IOError, OSError) as e:
self.logger.error('An IO error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
except errors.OneDriveError as e:
self.logger.error('An API error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
def Main():
parser = argparse.ArgumentParser()
parser.add_argument('files', nargs='+')
parser.add_argument('-f', '--force', action='store_true',
help="don't err on missing")
parser.add_argument('--stamp', required=True, help='touch this file')
args = parser.parse_args()
for f in args.files:
try:
os.remove(f)
except OSError:
if not args.force:
print >>sys.stderr, "'%s' does not exist" % f
return 1
with open(args.stamp, 'w'):
os.utime(args.stamp, None)
return 0
def setUp(self):
super(HeatConfigTest, self).setUp()
self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py')
self.heat_config_path = self.relative_path(
__file__,
'..',
'heat-config/os-refresh-config/configure.d/55-heat-config')
self.hooks_dir = self.useFixture(fixtures.TempDir())
self.deployed_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.hooks_dir.join(hook)
with open(hook_name, 'w') as f:
os.utime(hook_name, None)
f.write(fake_hook)
f.flush()
os.chmod(hook_name, 0o755)
self.env = os.environ.copy()
def setUp(self):
super(HeatConfigDockerComposeORCTest, self).setUp()
self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py')
self.heat_config_docker_compose_path = self.relative_path(
__file__,
'..',
'heat-config-docker-compose/os-refresh-config/configure.d/'
'50-heat-config-docker-compose')
self.docker_compose_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.docker_compose_dir.join(hook)
with open(hook_name, 'w') as f:
os.utime(hook_name, None)
f.write(fake_hook)
f.flush()
os.chmod(hook_name, 0o755)
def setUp(self):
super(HeatConfigKubeletORCTest, self).setUp()
self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py')
self.heat_config_kubelet_path = self.relative_path(
__file__,
'..',
'heat-config-kubelet/os-refresh-config/configure.d/'
'50-heat-config-kubelet')
self.manifests_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.manifests_dir.join(hook)
with open(hook_name, 'w') as f:
os.utime(hook_name, None)
f.write(fake_hook)
f.flush()
os.chmod(hook_name, 0o755)
def execute_run_box_basics(output="default"):
if os.path.exists(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER):
return {"return_code": -1}
task_config = [{'tasks':
['makkus.box-basics']
}]
result = create_and_run_nsbl_runner(task_config, task_metadata={}, output_format=output, ask_become_pass=True, run_box_basics=False)
if result["return_code"] == 0:
with open(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, 'a'):
os.utime(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, None)
return result
def touch(path, times=None, dirs=False):
''' perform UNIX touch with extra
based on: http://stackoverflow.com/questions/12654772/create-empty-file-using-python
Args:
path: file path to touch
times: a 2-tuple of the form (atime, mtime) where each member is an int or float expressing seconds.
defaults to current time.
dirs: is set, create folders if not exists
'''
if dirs:
basedir = os.path.dirname(path)
if not os.path.exists(basedir):
os.makedirs(basedir)
with open(path, 'a'):
os.utime(path, times=times)
def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
if self._closed:
self._raise_closed()
if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
self._accessor.utime(self, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = self._raw_open(flags, mode)
os.close(fd)
def run(self):
if not self.has_npm():
log.error("`npm` unavailable. If you're running this command using sudo, make sure `npm` is available to sudo")
else:
env = os.environ.copy()
env['PATH'] = npm_path
log.info("Installing build dependencies with npm. This may take a while...")
check_call(["npm", "install"], cwd=jsroot, stdout=sys.stdout, stderr=sys.stderr, **prckws)
os.utime(node_modules, None)
for t in self.targets:
if not os.path.exists(t):
msg = "Missing file: %s" % t
if not self.has_npm():
msg += "\nnpm is required to build a development version of jupyter-" + name
raise ValueError(msg)
# update package data in case this created new files
update_package_data(self.distribution)
def try_utime(self, filename, last_modified_hdr):
"""Try to set the last-modified time of the given file."""
if last_modified_hdr is None:
return
if not os.path.isfile(encodeFilename(filename)):
return
timestr = last_modified_hdr
if timestr is None:
return
filetime = timeconvert(timestr)
if filetime is None:
return filetime
# Ignore obviously invalid dates
if filetime == 0:
return
try:
os.utime(filename, (time.time(), filetime))
except Exception:
pass
return filetime
def set_file_utime(filename, desired_time):
"""
Set the utime of a file, and if it fails, raise a more explicit error.
:param filename: the file to modify
:param desired_time: the epoch timestamp to set for atime and mtime.
:raises: SetFileUtimeError: if you do not have permission (errno 1)
:raises: OSError: for all errors other than errno 1
"""
try:
os.utime(filename, (desired_time, desired_time))
except OSError as e:
# Only raise a more explicit exception when it is a permission issue.
if e.errno != errno.EPERM:
raise e
raise SetFileUtimeError(
("The file was downloaded, but attempting to modify the "
"utime of the file failed. Is the file owned by another user?"))
def check_for_update():
if os.path.exists(FILE_UPDATE):
mtime = os.path.getmtime(FILE_UPDATE)
last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
today = datetime.utcnow().strftime('%Y-%m-%d')
if last == today:
return
try:
with open(FILE_UPDATE, 'a'):
os.utime(FILE_UPDATE, None)
request = urllib2.Request(
CORE_VERSION_URL,
urllib.urlencode({'version': main.__version__}),
)
response = urllib2.urlopen(request)
with open(FILE_UPDATE, 'w') as update_json:
update_json.write(response.read())
except (urllib2.HTTPError, urllib2.URLError):
pass
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError, why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
else:
raise
def main(reactor, *args):
client = http.HTTPClient()
fname = os.path.join(tmp, str(uuid.uuid4()))
yield httpRequest(client._agent, URI, method='GET', saveto=fname)
filesize = os.path.getsize(fname)
assert filesize > 1
# touch file to 5 minutes in the past
past = int(os.path.getmtime(fname)) - 300
print "PAST MTIME", past
os.utime(fname, (past, past))
assert os.path.getmtime(fname) == past
yield httpRequest(client._agent, URI, method='GET', saveto=fname)
# it was not modified
current = os.path.getmtime(fname)
print "CURRENT MTIME", current
assert int(current) == past
print 'OK'
shutil.rmtree(tmp)
def _create_dummy_file(self, base, size):
filepath = '{base}/{name}.dat'.format(base=base, name=self._gen_rand_str(5))
with open(filepath, 'wb') as f:
f.truncate(size)
# Change file creation and modification to random time
atime, mtime = self.get_rand_time_pair()
os.utime(filepath, times=(atime, mtime), follow_symlinks=False)
# Change st_birthtime on MacOS
if platform == 'darwin':
created_str = datetime.fromtimestamp(atime).strftime('%m/%d/%Y %H:%M:%S')
# subprocess.run with list arguments and shell=False behaves very strange
# and sets its st_birthtime to earlier than given timestamp very weirdly
command = 'SetFile -d "{0}" {1}'.format(created_str, filepath)
process = subprocess.Popen(command,
shell=True, close_fds=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
self._tasks_birthtime_flags.append(process)
self._total_size += size
return filepath, atime, mtime, os.stat(filepath).st_ctime
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError, why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
else:
raise
def check_for_update():
if os.path.exists(FILE_UPDATE):
mtime = os.path.getmtime(FILE_UPDATE)
last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
today = datetime.utcnow().strftime('%Y-%m-%d')
if last == today:
return
try:
with open(FILE_UPDATE, 'a'):
os.utime(FILE_UPDATE, None)
request = urllib2.Request(
CORE_VERSION_URL,
urllib.urlencode({'version': main.__version__}),
)
response = urllib2.urlopen(request)
with open(FILE_UPDATE, 'w') as update_json:
update_json.write(response.read())
except (urllib2.HTTPError, urllib2.URLError):
pass
def extractall(self, path=".", members=None):
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers().
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories, as we will do that further down
self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, mtime and filemode on directories.
for tarinfo in directories:
dirpath = os.path.join(path, tarinfo.name)
try:
self.chown(tarinfo, dirpath)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath):
"""Set modification time of targetpath according to tarinfo.
"""
if not hasattr(os, 'utime'):
return
try:
os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
except EnvironmentError as e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError as why:
if (not hasattr(errno, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def extractall(self, path=".", members=None, *, numeric_owner=False):
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers(). If `numeric_owner` is True, only
the numbers for user/group names are used and not the names.
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories, as we will do that further down
self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(),
numeric_owner=numeric_owner)
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, mtime and filemode on directories.
for tarinfo in directories:
dirpath = os.path.join(path, tarinfo.name)
try:
self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath):
"""Set modification time of targetpath according to tarinfo.
"""
if not hasattr(os, 'utime'):
return
try:
os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
except OSError:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------