def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
python类F_OK的实例源码
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def execute(self):
from ranger.container.file import File
from os import access
new_name = self.rest(1)
if not new_name:
return self.fm.notify('Syntax: rename <newname>', bad=True)
if new_name == self.fm.thisfile.basename:
return
if access(new_name, os.F_OK):
return self.fm.notify("Can't rename: file already exists!", bad=True)
self.fm.rename(self.fm.thisfile, new_name)
f = File(new_name)
self.fm.thisdir.pointed_obj = f
self.fm.thisfile = f
def _create_test_db(self, verbosity, autoclobber, keepdb=False):
test_database_name = self._get_test_db_name()
if keepdb:
return test_database_name
if not self.connection.is_in_memory_db(test_database_name):
# Erase the old test database
if verbosity >= 1:
print("Destroying old test database '%s'..." % self.connection.alias)
if os.access(test_database_name, os.F_OK):
if not autoclobber:
confirm = input(
"Type 'yes' if you would like to try deleting the test "
"database '%s', or 'no' to cancel: " % test_database_name
)
if autoclobber or confirm == 'yes':
try:
os.remove(test_database_name)
except Exception as e:
sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
sys.exit(2)
else:
print("Tests cancelled.")
sys.exit(1)
return test_database_name
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def write_script(
scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir.revision_map.get_revision(rev_id)
path = old.path
content = textwrap.dedent(content)
if encoding:
content = content.encode(encoding)
with open(path, 'wb') as fp:
fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
script = Script._from_path(scriptdir, path)
old = scriptdir.revision_map.get_revision(script.revision)
if old.down_revision != script.down_revision:
raise Exception("Can't change down_revision "
"on a refresh operation.")
scriptdir.revision_map.add_revision(script, _replace=True)
if sourceless:
make_sourceless(path)
def _clone_test_db(self, number, verbosity, keepdb=False):
source_database_name = self.connection.settings_dict['NAME']
target_database_name = self.get_test_db_clone_settings(number)['NAME']
# Forking automatically makes a copy of an in-memory database.
if not self.is_in_memory_db(source_database_name):
# Erase the old test database
if os.access(target_database_name, os.F_OK):
if keepdb:
return
if verbosity >= 1:
print("Destroying old test database for alias %s..." % (
self._get_database_display_str(verbosity, target_database_name),
))
try:
os.remove(target_database_name)
except Exception as e:
sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
sys.exit(2)
try:
shutil.copy(source_database_name, target_database_name)
except Exception as e:
sys.stderr.write("Got an error cloning the test database: %s\n" % e)
sys.exit(2)
def test_sftp_session(server):
for uid in server.users:
target_dir = tempfile.mkdtemp()
target_fname = os.path.join(target_dir, "foo")
assert not os.access(target_fname, os.F_OK)
with server.client(uid) as c:
sftp = c.open_sftp()
sftp.put(__file__, target_fname, confirm=True)
assert files_equal(target_fname, __file__)
second_copy = os.path.join(target_dir, "bar")
assert not os.access(second_copy, os.F_OK)
sftp.get(target_fname, second_copy)
assert files_equal(target_fname, second_copy)
dir_contents = sftp.listdir(target_dir)
assert len(dir_contents) == 2
assert "foo" in dir_contents
assert "bar" in dir_contents
with raises(IOError):
sftp.listdir("/123_no_dir")
def _search_for_model(names, workdir):
"""
Check if a model file is available.
:param names: list of model names to be searcher for, highest ranking
first
:type names: str
:param workdir: the workdir where the model file is located
:type name: str
:type workdir: str
:returns: the path to the model file or None if model file not found
"""
for name in names:
path = os.path.join(workdir, name)
if os.access(path, os.F_OK):
return path
return None
def mountpoint_choosen(option):
if option is None:
return
from Screens.ChoiceBox import ChoiceBox
print "scanning", option
(description, mountpoint, session) = option
res = scanDevice(mountpoint)
list = [ (r.description, r, res[r], session) for r in res ]
if not list:
from Screens.MessageBox import MessageBox
if os.access(mountpoint, os.F_OK|os.R_OK):
session.open(MessageBox, _("No displayable files on this medium found!"), MessageBox.TYPE_INFO, simple = True, timeout = 5)
else:
print "ignore", mountpoint, "because its not accessible"
return
session.openWithCallback(execute, ChoiceBox,
title = _("The following files were found..."),
list = list)
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def _clean_check(cmd, target):
"""
Run the command to download target.
If the command fails, clean up before re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def test_generateScriptEnv(self):
filename = "/tmp/test.sh"
testFile = "/tmp/test.txt"
cmds = """
X="${ENV_TEST}"
touch "${X}"
exit 0
"""
generateScript(filename, cmds, {"ENV_TEST": testFile})
# check permissions
self.assertTrue(os.access(filename, os.F_OK))
self.assertTrue(os.access(filename, os.X_OK | os.R_OK))
# execute the script and expect it to create the file
status = subprocess.call([filename])
self.assertEqual(status, 0)
# check that the testFile got created by the script
self.assertTrue(os.access(testFile, os.F_OK))
# cleanup
os.remove(filename)
os.remove(testFile)
def execute(self):
from ranger.container.file import File
from os import access
new_name = self.rest(1)
tagged = {}
old_name = self.fm.thisfile.relative_path
for f in self.fm.tags.tags:
if str(f).startswith(self.fm.thisfile.path):
tagged[f] = self.fm.tags.tags[f]
self.fm.tags.remove(f)
if not new_name:
return self.fm.notify('Syntax: rename <newname>', bad=True)
if new_name == old_name:
return
if access(new_name, os.F_OK):
return self.fm.notify("Can't rename: file already exists!", bad=True)
if self.fm.rename(self.fm.thisfile, new_name):
f = File(new_name)
# Update bookmarks that were pointing on the previous name
obsoletebookmarks = [b for b in self.fm.bookmarks
if b[1].path == self.fm.thisfile]
if obsoletebookmarks:
for key, _ in obsoletebookmarks:
self.fm.bookmarks[key] = f
self.fm.bookmarks.update_if_outdated()
self.fm.thisdir.pointed_obj = f
self.fm.thisfile = f
for t in tagged:
self.fm.tags.tags[t.replace(old_name, new_name)] = tagged[t]
self.fm.tags.dump()
def _select_config_file_path():
"""
Return an openATTIC configuration pathname
"""
possible_paths = ("/etc/default/openattic", "/etc/sysconfig/openattic")
for path in possible_paths:
if os.access(path, os.F_OK) and os.access(path, os.R_OK | os.W_OK):
return path
raise CommandExecutionError(
("No openATTIC config file found in the following locations: "
"{}".format(possible_paths)))
def init(config, directory, template='generic'):
"""Initialize a new scripts directory."""
if os.access(directory, os.F_OK):
raise util.CommandError("Directory %s already exists" % directory)
template_dir = os.path.join(config.get_template_directory(),
template)
if not os.access(template_dir, os.F_OK):
raise util.CommandError("No such template %r" % template)
util.status("Creating directory %s" % os.path.abspath(directory),
os.makedirs, directory)
versions = os.path.join(directory, 'versions')
util.status("Creating directory %s" % os.path.abspath(versions),
os.makedirs, versions)
script = ScriptDirectory(directory)
for file_ in os.listdir(template_dir):
file_path = os.path.join(template_dir, file_)
if file_ == 'alembic.ini.mako':
config_file = os.path.abspath(config.config_file_name)
if os.access(config_file, os.F_OK):
util.msg("File %s already exists, skipping" % config_file)
else:
script._generate_template(
file_path,
config_file,
script_location=directory
)
elif os.path.isfile(file_path):
output_file = os.path.join(directory, file_)
script._copy_file(
file_path,
output_file
)
util.msg("Please edit configuration/connection/logging "
"settings in %r before proceeding." % config_file)