def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
python类unlink()的实例源码
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def dot2graph(self, dot, format='svg'):
# windows ???????????????????
# ?????? NamedTemporaryFile ?????
with NamedTemporaryFile(delete=False) as dotfile:
dotfile.write(dot)
outfile = NamedTemporaryFile(delete=False)
os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
dotfile.name, outfile.name, format))
result = outfile.read()
outfile.close()
os.unlink(dotfile.name)
os.unlink(outfile.name)
return result
def test_rs3topng():
"""rs3 file is converted to PNG"""
png_str = rstviewer.rs3topng(RS3_FILEPATH)
temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp.close()
rstviewer.rs3topng(RS3_FILEPATH, temp.name)
with open(temp.name, 'r') as png_file:
assert png_str == png_file.read()
os.unlink(temp.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def test_cli_rs3topng():
"""conversion to PNG on the commandline"""
temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp_png.close()
# calling `rstviewer -f png input.rs3 output.png` will end the program
# with sys.exit(0), so we'll have to catch this here.
with pytest.raises(SystemExit) as serr:
cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
out, err = pytest.capsys.readouterr()
assert err == 0
with open(temp_png.name, 'r') as png_file:
png_str = png_file.read()
os.unlink(temp_png.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def is_running(name):
"""Test whether task is running under ``name``.
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def _start_kuryr_manage_daemon(self):
LOG.info("Pool manager started")
server_address = oslo_cfg.CONF.pool_manager.sock_file
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
try:
httpd = UnixDomainHttpServer(server_address, RequestHandler)
httpd.serve_forever()
except KeyboardInterrupt:
pass
except Exception:
LOG.exception('Failed to start Pool Manager.')
httpd.socket.close()
def run(self):
self.run_command("egg_info")
from glob import glob
for pattern in self.match:
pattern = self.distribution.get_name() + '*' + pattern
files = glob(os.path.join(self.dist_dir, pattern))
files = [(os.path.getmtime(f), f) for f in files]
files.sort()
files.reverse()
log.info("%d file(s) matching %s", len(files), pattern)
files = files[self.keep:]
for (t, f) in files:
log.info("Deleting %s", f)
if not self.dry_run:
if os.path.isdir(f):
shutil.rmtree(f)
else:
os.unlink(f)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def zap_pyfiles(self):
log.info("Removing .py files from temporary directory")
for base, dirs, files in walk_egg(self.bdist_dir):
for name in files:
path = os.path.join(base, name)
if name.endswith('.py'):
log.debug("Deleting %s", path)
os.unlink(path)
if base.endswith('__pycache__'):
path_old = path
pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
m = re.match(pattern, name)
path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
try:
os.remove(path_new)
except OSError:
pass
os.rename(path_old, path_new)
def run(self):
self.run_command("egg_info")
from glob import glob
for pattern in self.match:
pattern = self.distribution.get_name() + '*' + pattern
files = glob(os.path.join(self.dist_dir, pattern))
files = [(os.path.getmtime(f), f) for f in files]
files.sort()
files.reverse()
log.info("%d file(s) matching %s", len(files), pattern)
files = files[self.keep:]
for (t, f) in files:
log.info("Deleting %s", f)
if not self.dry_run:
if os.path.isdir(f):
shutil.rmtree(f)
else:
os.unlink(f)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def generate_pdf(card):
"""
Make a PDF from a card
:param card: dict from fetcher.py
:return: Binary PDF buffer
"""
from eclaire.base import SPECIAL_LABELS
pdf = FPDF('L', 'mm', (62, 140))
pdf.set_margins(2.8, 2.8, 2.8)
pdf.set_auto_page_break(False, margin=0)
pdf.add_page()
font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
pdf.add_font('Clairifont', fname=font, uni=True)
pdf.set_font('Clairifont', size=48)
pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')
qrcode = generate_qr_code(card.url)
qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
qrcode.save(qrcode_file)
pdf.image(qrcode_file, 118, 35, 20, 20)
os.unlink(qrcode_file)
# May we never speak of this again.
pdf.set_fill_color(255, 255, 255)
pdf.rect(0, 55, 140, 20, 'F')
pdf.set_font('Clairifont', '', 16)
pdf.set_y(-4)
labels = ', '.join([label.name for label in card.labels
if label.name not in SPECIAL_LABELS])
pdf.multi_cell(0, 0, labels, 0, 'R')
return pdf.output(dest='S')
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo), targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def __init__(self, proxies=None, **x509):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
self.key_file = x509.get('key_file')
self.cert_file = x509.get('cert_file')
self.addheaders = [('User-Agent', self.version)]
self.__tempfiles = []
self.__unlink = os.unlink # See cleanup()
self.tempcache = None
# Undocumented feature: if you assign {} to tempcache,
# it is used to cache files retrieved with
# self.retrieve(). This is not enabled by default
# since it does not work for changing documents (and I
# haven't got the logic to check expiration headers
# yet).
self.ftpcache = ftpcache
# Undocumented feature: you can use a different
# ftp cache by assigning to the .ftpcache member;
# in case you want logically independent URL openers
# XXX This is not threadsafe. Bah.
def nextfile(self):
savestdout = self._savestdout
self._savestdout = 0
if savestdout:
sys.stdout = savestdout
output = self._output
self._output = 0
if output:
output.close()
file = self._file
self._file = 0
if file and not self._isstdin:
file.close()
backupfilename = self._backupfilename
self._backupfilename = 0
if backupfilename and not self._backup:
try: os.unlink(backupfilename)
except OSError: pass
self._isstdin = False
self._buffer = []
self._bufindex = 0
def close(self,
remove=os.unlink,error=os.error):
if self.pipe:
rc = self.pipe.close()
else:
rc = 255
if self.tmpfile:
try:
remove(self.tmpfile)
except error:
pass
return rc
# Alias
def putsequences(self, sequences):
"""Write the set of sequences back to the folder."""
fullname = self.getsequencesfilename()
f = None
for key, seq in sequences.iteritems():
s = IntSet('', ' ')
s.fromlist(seq)
if not f: f = open(fullname, 'w')
f.write('%s: %s\n' % (key, s.tostring()))
if not f:
try:
os.unlink(fullname)
except os.error:
pass
else:
f.close()
def removemessages(self, list):
"""Remove one or more messages -- may raise os.error."""
errors = []
deleted = []
for n in list:
path = self.getmessagefilename(n)
commapath = self.getmessagefilename(',' + str(n))
try:
os.unlink(commapath)
except os.error:
pass
try:
os.rename(path, commapath)
except os.error, msg:
errors.append(msg)
else:
deleted.append(n)
if deleted:
self.removefromallsequences(deleted)
if errors:
if len(errors) == 1:
raise os.error, errors[0]
else:
raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def test_save_svgz_filename():
import gzip
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
f.close()
qr.save(f.name)
f = open(f.name, mode='rb')
expected = b'\x1f\x8b\x08' # gzip magic number
val = f.read(len(expected))
f.close()
f = gzip.open(f.name)
try:
content = f.read(6)
finally:
f.close()
os.unlink(f.name)
assert expected == val
assert b'<?xml ' == content
def test_write_unicode_filename():
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False)
f.close()
title = 'mürrische Mädchen'
desc = '?'
qr.save(f.name, title=title, desc=desc)
f = open(f.name, mode='rb')
root = _parse_xml(f)
f.seek(0)
val = f.read(6)
f.close()
os.unlink(f.name)
assert b'<?xml ' == val
assert title == _get_title(root).text
assert desc == _get_desc(root).text
def grab(bbox=None):
if sys.platform == "darwin":
f, file = tempfile.mkstemp('.png')
os.close(f)
subprocess.call(['screencapture', '-x', file])
im = Image.open(file)
im.load()
os.unlink(file)
else:
size, data = grabber()
im = Image.frombytes(
"RGB", size, data,
# RGB, 32-bit line padding, origo in lower left corner
"raw", "BGR", (size[0]*3 + 3) & -4, -1
)
if bbox:
im = im.crop(bbox)
return im
def test_infile_outfile(self):
with tempfile.NamedTemporaryFile() as infile:
infile.write(self.data.encode())
infile.flush()
# outfile will get overwritten by tool, so the delete
# may not work on some platforms. Do it manually.
outfile = tempfile.NamedTemporaryFile()
try:
self.assertEqual(
self.runTool(args=[infile.name, outfile.name]),
''.encode())
with open(outfile.name, 'rb') as f:
self.assertEqual(f.read(), self.expect.encode())
finally:
outfile.close()
if os.path.exists(outfile.name):
os.unlink(outfile.name)