def finalize(self):
assert (self.__asynchronous == 0) and not self.__dirty
if self.__buildIdCache is not None:
self.__buildIdCache.close()
self.__buildIdCache = None
if self.__lock:
try:
os.unlink(self.__lock)
except FileNotFoundError:
from .tty import colorize
from sys import stderr
print(colorize("Warning: lock file was deleted while Bob was still running!", "33"),
file=stderr)
except OSError as e:
from .tty import colorize
from sys import stderr
print(colorize("Warning: cannot unlock workspace: "+str(e), "33"),
file=stderr)
python类unlink()的实例源码
def download_file(file, url, optional = False):
try:
print "Downloading", url + "...",
sys.stdout.flush()
my_urlretrieve(url, file)
print "OK"
sys.stdout.flush()
except IOError, e:
if optional:
print "missing but optional, so that's OK"
else:
print e
sys.stdout.flush()
if os.path.exists(file):
os.unlink(file)
raise
# Create a file of the given size, containing NULs, without holes.
def install(self):
# This is needed for Xen and noemu, where we get the kernel
# from the dist rather than the installed image
self.dist.set_workdir(self.workdir)
if self.vmm == 'noemu':
self.dist.download()
self._install()
else:
# Already installed?
if os.path.exists(self.wd0_path()):
return
try:
self._install()
except:
if os.path.exists(self.wd0_path()):
os.unlink(self.wd0_path())
raise
# Boot the virtual machine (installing it first if it's not
# installed already). The vmm_args argument applies when
# booting, but not when installing. Does not wait for
# a login prompt.
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def is_running(name):
"""
Test whether task is running under ``name``
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def is_running(name):
"""
Test whether task is running under ``name``
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def service_resume(service_name, init_dir="/etc/init",
initd_dir="/etc/init.d"):
"""Resume a system service.
Reenable starting again at boot. Start the service"""
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if init_is_systemd():
service('enable', service_name)
elif os.path.exists(upstart_file):
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
if os.path.exists(override_path):
os.unlink(override_path)
elif os.path.exists(sysv_file):
subprocess.check_call(["update-rc.d", service_name, "enable"])
else:
raise ValueError(
"Unable to detect {0} as SystemD, Upstart {1} or"
" SysV {2}".format(
service_name, upstart_file, sysv_file))
started = service_running(service_name)
if not started:
started = service_start(service_name)
return started
def download(self, source, dest):
"""
Download an archive file.
:param str source: URL pointing to an archive file.
:param str dest: Local path location to download archive file to.
"""
# propogate all exceptions
# URLError, OSError, etc
proto, netloc, path, params, query, fragment = urlparse(source)
if proto in ('http', 'https'):
auth, barehost = splituser(netloc)
if auth is not None:
source = urlunparse((proto, barehost, path, params, query, fragment))
username, password = splitpasswd(auth)
passman = HTTPPasswordMgrWithDefaultRealm()
# Realm is set to None in add_password to force the username and password
# to be used whatever the realm
passman.add_password(None, source, username, password)
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):
os.unlink(dest)
raise e
# Mandatory file validation via Sha1 or MD5 hashing.
def test_cli_rs3tohtml():
"""conversion to HTML on the commandline"""
temp_html = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
temp_html.close()
cli([RS3_FILEPATH, temp_html.name])
with open(temp_html.name, 'r') as html_file:
assert EXPECTED_HTML in html_file.read()
os.unlink(temp_html.name)
def rs3topng(rs3_filepath, png_filepath=None):
"""Convert a RS3 file into a PNG image of the RST tree.
If no output filename is given, the PNG image is returned
as a string (which is useful for embedding).
"""
try:
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
except ImportError:
raise ImportError(
'Please install selenium: pip install selenium')
html_str = rs3tohtml(rs3_filepath)
temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
temp.write(html_str.encode('utf8'))
temp.close()
try:
driver = webdriver.PhantomJS()
except WebDriverException as err:
raise WebDriverException(
'Please install phantomjs: http://phantomjs.org/\n' + err.msg)
driver.get(temp.name)
os.unlink(temp.name)
png_str = driver.get_screenshot_as_png()
if png_filepath:
with open(png_filepath, 'w') as png_file:
png_file.write(png_str)
else:
return png_str
def release(self):
"""Release the lock by deleting `self.lockfile`."""
self._locked = False
try:
os.unlink(self.lockfile)
except (OSError, IOError) as err: # pragma: no cover
if err.errno != 2:
raise err
def cache_data(self, name, data):
"""Save ``data`` to cache under ``name``.
If ``data`` is ``None``, the corresponding cache file will be
deleted.
:param name: name of datastore
:param data: data to store. This may be any object supported by
the cache serializer
"""
serializer = manager.serializer(self.cache_serializer)
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if data is None:
if os.path.exists(cache_path):
os.unlink(cache_path)
self.logger.debug('Deleted cache file : %s', cache_path)
return
with atomic_writer(cache_path, 'wb') as file_obj:
serializer.dump(data, file_obj)
self.logger.debug('Cached data saved at : %s', cache_path)
def test_bearer_token(self, m_cfg, m_get):
token_content = (
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl"
"cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc"
"3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn"
"Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5"
"pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi"
"LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a"
"WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi"
"JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef"
"DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI"
"iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80"
"ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW"
"GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb"
"5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4"
"w_8Wkn2y-Tg")
token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
try:
m_cfg.kubernetes.token_file = token_file.name
token_file.write(token_content)
token_file.close()
m_cfg.kubernetes.ssl_verify_server_crt = False
path = '/test'
client = k8s_client.K8sClient(self.base_url)
client.get(path)
headers = {
'Authorization': 'Bearer {}'.format(token_content)}
m_get.assert_called_once_with(
self.base_url + path, cert=(None, None), headers=headers,
verify=False)
finally:
os.unlink(m_cfg.kubernetes.token_file)
def get_exit_code(self):
self.process.wait()
os.unlink(self.tmp_script_filename)
return self.process.returncode
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
elif not self.i_am_locking():
raise NotMyLock("%s is locked, but not by me" % self.path)
os.unlink(self.lock_file)
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
elif not os.path.exists(self.unique_name):
raise NotMyLock("%s is locked, but not by me" % self.path)
os.unlink(self.unique_name)
os.rmdir(self.lock_file)
def break_lock(self):
if os.path.exists(self.lock_file):
for name in os.listdir(self.lock_file):
os.unlink(os.path.join(self.lock_file, name))
os.rmdir(self.lock_file)
def acquire(self, timeout=None):
try:
open(self.unique_name, "wb").close()
except IOError:
raise LockFailed("failed to create %s" % self.unique_name)
timeout = timeout if timeout is not None else self.timeout
end_time = time.time()
if timeout is not None and timeout > 0:
end_time += timeout
while True:
# Try and create a hard link to it.
try:
os.link(self.unique_name, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
nlinks = os.stat(self.unique_name).st_nlink
if nlinks == 2:
# The original link plus the one I created == 2. We're
# good to go.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
os.unlink(self.unique_name)
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout is not None and timeout / 10 or 0.1)
else:
# Link creation succeeded. We're good to go.
return