def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
python类F_OK的实例源码
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def write_script(
scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir.revision_map.get_revision(rev_id)
path = old.path
content = textwrap.dedent(content)
if encoding:
content = content.encode(encoding)
with open(path, 'wb') as fp:
fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
script = Script._from_path(scriptdir, path)
old = scriptdir.revision_map.get_revision(script.revision)
if old.down_revision != script.down_revision:
raise Exception("Can't change down_revision "
"on a refresh operation.")
scriptdir.revision_map.add_revision(script, _replace=True)
if sourceless:
make_sourceless(path)
def getTasksProcForce(self) :
""" Get tasks list by bruteforcing /proc """
for i in range(1, 65535, 1) :
if(os.access("/proc/" + str(i) + "/status", os.F_OK) == True):
l = self.openProcTaskStatus("/proc/" + str(i) + "/status")
if(l != []):
self.tasks_procforce.map_tasks[int(l[0])] = [int(l[1]), int(l[2]), l[3], 0, 0]
self.tasks_procforce.list_tasks.append(int(l[0]))
if(os.access("/proc/" + str(i) + "/task", os.F_OK) == True):
for srep in os.listdir("/proc/" + str(i) + "/task"):
if(srep != l[0]):
ll = self.openProcTaskStatus("/proc/" + str(i) + "/task/" + srep + "/status")
self.tasks_procforce.map_tasks[int(ll[0])] = [int(ll[1]), int(ll[2]), ll[3], 0, 1]
self.tasks_procforce.list_tasks.append(int(ll[0]))
def _clone_test_db(self, number, verbosity, keepdb=False):
source_database_name = self.connection.settings_dict['NAME']
target_database_name = self.get_test_db_clone_settings(number)['NAME']
# Forking automatically makes a copy of an in-memory database.
if not self.connection.is_in_memory_db(source_database_name):
# Erase the old test database
if os.access(target_database_name, os.F_OK):
if keepdb:
return
if verbosity >= 1:
print("Destroying old test database for alias %s..." % (
self._get_database_display_str(verbosity, target_database_name),
))
try:
os.remove(target_database_name)
except Exception as e:
sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
sys.exit(2)
try:
shutil.copy(source_database_name, target_database_name)
except Exception as e:
sys.stderr.write("Got an error cloning the test database: %s\n" % e)
sys.exit(2)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def mountpoint_choosen(option):
if option is None:
return
from Screens.ChoiceBox import ChoiceBox
print "scanning", option
(description, mountpoint, session) = option
res = scanDevice(mountpoint)
list = [ (r.description, r, res[r], session) for r in res ]
if not list:
from Screens.MessageBox import MessageBox
if os.access(mountpoint, os.F_OK|os.R_OK):
session.open(MessageBox, _("No displayable files on this medium found!"), MessageBox.TYPE_INFO, simple = True, timeout = 5)
else:
print "ignore", mountpoint, "because its not accessible"
return
session.openWithCallback(execute, ChoiceBox,
title = _("The following files were found..."),
list = list)
def on_debug_activate(self):
dotfile = "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-player.dot"
pngfile = "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-player-pipeline.png" # NOQA
parent_dir = get_parent_dir(dotfile)
mkdir_if_does_not_exist(parent_dir)
if os.access(dotfile, os.F_OK):
os.remove(dotfile)
if os.access(pngfile, os.F_OK):
os.remove(pngfile)
if not fname_exists(dotfile):
touch_empty_file(dotfile)
if not fname_exists(pngfile):
touch_empty_file(pngfile)
Gst.debug_bin_to_dot_file(self.pipeline,
Gst.DebugGraphDetails.ALL, "generator-player")
os.system('/usr/bin/dot' + " -Tpng -o " + pngfile + " " + dotfile)
# Gstreamer callbacks.
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def set_imageinfo( memoryFilePath ):
path = r'{}'.format( memoryFilePath )
path = os.path.abspath( path )
try:
if os.access( path, os.F_OK):
if os.access( path, os.R_OK ):
cwd = os.getcwd()
imageinfo = [ 'vol.py', '-f', '{}'.format( path ), 'imageinfo', \
'--output=text', \
'--output-file={}'.format( os.path.join( cwd, 'imageinfo.text' ))]
return imageinfo
else:
print '\n[!] Error File Permissions: No Read Access for {}\n'.format( path )
else:
print '\n[!] Error FilePath: Does not exist {}\n'.format( path )
except Exception as set_imageinfo_error:
print '[!] EXCEPTION ERROR: < set_imageinfo > function'
print set_imageinfo_error
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def mountpoint_choosen(option):
if option is None:
return
from Screens.ChoiceBox import ChoiceBox
print "scanning", option
(description, mountpoint, session) = option
res = scanDevice(mountpoint)
list = [ (r.description, r, res[r], session) for r in res ]
if not list:
from Screens.MessageBox import MessageBox
if os.access(mountpoint, os.F_OK|os.R_OK):
session.open(MessageBox, _("No displayable files on this medium found!"), MessageBox.TYPE_INFO, simple = True, timeout = 5)
else:
print "ignore", mountpoint, "because its not accessible"
return
session.openWithCallback(execute, ChoiceBox,
title = _("The following files were found..."),
list = list)
def _set_cwd(self):
try:
cwd = os.getcwd()
if not os.access(cwd, os.F_OK|os.R_OK):
raise
return cwd
except:
# we don't have access to the cwd, probably because of sudo.
# Try and move to a neutral location to prevent errors
for cwd in [os.path.expandvars('$HOME'), tempfile.gettempdir()]:
try:
if os.access(cwd, os.F_OK|os.R_OK):
os.chdir(cwd)
return cwd
except:
pass
# we won't error here, as it may *not* be a problem,
# and we don't want to break modules unnecessarily
return None
def wrap_code(self, routine, helpers=[]):
workdir = self.filepath or tempfile.mkdtemp("_sympy_compile")
if not os.access(workdir, os.F_OK):
os.mkdir(workdir)
oldwork = os.getcwd()
os.chdir(workdir)
try:
sys.path.append(workdir)
self._generate_code(routine, helpers)
self._prepare_files(routine)
self._process_files(routine)
mod = __import__(self.module_name)
finally:
sys.path.remove(workdir)
CodeWrapper._module_counter += 1
os.chdir(oldwork)
if not self.filepath:
shutil.rmtree(workdir)
return self._get_wrapped_function(mod)
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def write_script(
scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir.revision_map.get_revision(rev_id)
path = old.path
content = textwrap.dedent(content)
if encoding:
content = content.encode(encoding)
with open(path, 'wb') as fp:
fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
script = Script._from_path(scriptdir, path)
old = scriptdir.revision_map.get_revision(script.revision)
if old.down_revision != script.down_revision:
raise Exception("Can't change down_revision "
"on a refresh operation.")
scriptdir.revision_map.add_revision(script, _replace=True)
if sourceless:
make_sourceless(path)
def crear_repo(package_name):
md5hash_pn = md5(package_name).hexdigest()
first_pref = md5hash_pn[0:2]
second_pref = md5hash_pn[2:4]
workingdir = root_git_dir+"/"+first_pref+"/"+second_pref+"/" + package_name
if not(os.access(root_git_dir+"/"+first_pref, os.F_OK)):
os.mkdir(root_git_dir+"/"+first_pref)
os.mkdir(root_git_dir+"/"+first_pref+"/"+second_pref)
elif not(os.access(root_git_dir+"/"+first_pref+"/"+second_pref, os.F_OK)):
os.mkdir(root_git_dir+"/"+first_pref+"/"+second_pref)
repo = pygit2.init_repository(workingdir)
dashed_package_name=package_name.replace('.','-').lower()
myRemote = repo.remotes.create(package_name, gitlab_url+'/marvin/'+dashed_package_name+'.git')
gl = Gitlab (gitlab_url, gitlab_token)
gl.auth()
p = gl.Project({'name': package_name, 'public':True})
p.save()
return repo
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8",
timezone=None):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
self.timezone = timezone
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def write_script(
scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir.revision_map.get_revision(rev_id)
path = old.path
content = textwrap.dedent(content)
if encoding:
content = content.encode(encoding)
with open(path, 'wb') as fp:
fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
script = Script._from_path(scriptdir, path)
old = scriptdir.revision_map.get_revision(script.revision)
if old.down_revision != script.down_revision:
raise Exception("Can't change down_revision "
"on a refresh operation.")
scriptdir.revision_map.add_revision(script, _replace=True)
if sourceless:
make_sourceless(path)
def setTmpDir(self):
"""Set the tmpDir attribute to a reasonnable location for a temporary
directory"""
if os.name != 'nt':
# On unix use /tmp by default
self.tmpDir = os.environ.get("TMPDIR", "/tmp")
self.tmpDir = os.environ.get("TMP", self.tmpDir)
else:
# On Windows use the current directory
self.tmpDir = os.environ.get("TMPDIR", "")
self.tmpDir = os.environ.get("TMP", self.tmpDir)
self.tmpDir = os.environ.get("TEMP", self.tmpDir)
if not os.path.isdir(self.tmpDir):
self.tmpDir = ""
elif not os.access(self.tmpDir, os.F_OK + os.W_OK):
self.tmpDir = ""
def do_cleanup(self):
"""cleanup temporary files"""
super(ModelIterationSample, self).do_cleanup()
if self.temp:
for i in (
self.transform,
self.inv_transform,
self.corr_transform,
self.corr_mri,
self.diff_bias,
self.corr_bias,
self.transform_grid,
self.inv_transform_grid,
self.corr_transform_grid0,
self.corr_transform_grid1,
):
if os.access(i, os.F_OK):
os.unlink(i)
def _create_test_db(self, verbosity, autoclobber, keepdb=False):
test_database_name = self._get_test_db_name()
if keepdb:
return test_database_name
if not self.connection.is_in_memory_db(test_database_name):
# Erase the old test database
if verbosity >= 1:
print("Destroying old test database '%s'..." % self.connection.alias)
if os.access(test_database_name, os.F_OK):
if not autoclobber:
confirm = input(
"Type 'yes' if you would like to try deleting the test "
"database '%s', or 'no' to cancel: " % test_database_name
)
if autoclobber or confirm == 'yes':
try:
os.remove(test_database_name)
except Exception as e:
sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
sys.exit(2)
else:
print("Tests cancelled.")
sys.exit(1)
return test_database_name
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def write_script(
scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir.revision_map.get_revision(rev_id)
path = old.path
content = textwrap.dedent(content)
if encoding:
content = content.encode(encoding)
with open(path, 'wb') as fp:
fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
script = Script._from_path(scriptdir, path)
old = scriptdir.revision_map.get_revision(script.revision)
if old.down_revision != script.down_revision:
raise Exception("Can't change down_revision "
"on a refresh operation.")
scriptdir.revision_map.add_revision(script, _replace=True)
if sourceless:
make_sourceless(path)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)