def add():
if request.method == 'POST':
if request.form['submit'] == 'Add':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'a', encoding="utf-8")
f.write(addr.decode('utf-8') + u'\r\n')
f.close()
return render_template('listadded.html',addr=addr)
elif request.form['submit'] == 'Remove':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'r', encoding="utf-8")
lines = f.readlines()
f.close()
f = io.open('blastlist.txt', 'w', encoding="utf-8")
for line in lines:
if addr not in line:
f.write(line.decode('utf-8'))
f.close()
return render_template('listremoved.html',addr=addr)
python类open()的实例源码
def __iter__(self):
"""
Read a file where each line is of the form "word1 word2 ..."
Yields lists of the form [word1, word2, ...]
"""
if os.path.isdir(self.fname):
filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
else:
filenames = [self.fname]
for filename in filenames:
# with io.open(filename, encoding='utf-8') as f:
with open(filename) as f:
doc = f.read()
for line in doc.split("\n"):
#if not line: continue
sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
# sent = [word for word in line.strip().split()]
sent = [self.begin] + sent + [self.end]
yield sent
def __iter__(self):
"""
Read a file where each line is of the form "word1 word2 ..."
Yields lists of the form [word1, word2, ...]
"""
#jfbbb
if os.path.isdir(self.fname):
filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
#else:
# filenames = [self.fname]
for langpath in filenames:
with open(filename) as f:
doc = f.read()
for line in doc.split("\n"):
#if not line: continue
sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
# sent = [word for word in line.strip().split()]
sent = [self.begin] + sent + [self.end]
yield sent
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def read_manifest(self):
"""Read the manifest file (named by 'self.manifest') and use it to
fill in 'self.filelist', the list of files to include in the source
distribution.
"""
log.info("reading manifest file '%s'", self.manifest)
manifest = open(self.manifest, 'rb')
for line in manifest:
# The manifest must contain UTF-8. See #303.
if six.PY3:
try:
line = line.decode('UTF-8')
except UnicodeDecodeError:
log.warn("%r not UTF-8 decodable -- skipping" % line)
continue
# ignore comments and blank lines
line = line.strip()
if line.startswith('#') or not line:
continue
self.filelist.append(line)
manifest.close()
def uninstall_link(self):
if os.path.exists(self.egg_link):
log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
egg_link_file = open(self.egg_link)
contents = [line.rstrip() for line in egg_link_file]
egg_link_file.close()
if contents not in ([self.egg_path],
[self.egg_path, self.setup_path]):
log.warn("Link points to %s: uninstall aborted", contents)
return
if not self.dry_run:
os.unlink(self.egg_link)
if not self.dry_run:
self.update_pth(self.dist) # remove any .pth link to us
if self.distribution.scripts:
# XXX should also check for entry point scripts!
log.warn("Note: you must uninstall or replace scripts manually!")
def install_egg_scripts(self, dist):
if dist is not self.dist:
# Installing a dependency, so fall back to normal behavior
return easy_install.install_egg_scripts(self, dist)
# create wrapper scripts in the script dir, pointing to dist.scripts
# new-style...
self.install_wrapper_scripts(dist)
# ...and old-style
for script_name in self.distribution.scripts or []:
script_path = os.path.abspath(convert_path(script_name))
script_name = os.path.basename(script_path)
with io.open(script_path) as strm:
script_text = strm.read()
self.install_script(dist, script_name, script_text, script_path)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def read_manifest(self):
"""Read the manifest file (named by 'self.manifest') and use it to
fill in 'self.filelist', the list of files to include in the source
distribution.
"""
log.info("reading manifest file '%s'", self.manifest)
manifest = open(self.manifest, 'rb')
for line in manifest:
# The manifest must contain UTF-8. See #303.
if six.PY3:
try:
line = line.decode('UTF-8')
except UnicodeDecodeError:
log.warn("%r not UTF-8 decodable -- skipping" % line)
continue
# ignore comments and blank lines
line = line.strip()
if line.startswith('#') or not line:
continue
self.filelist.append(line)
manifest.close()
def uninstall_link(self):
if os.path.exists(self.egg_link):
log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
egg_link_file = open(self.egg_link)
contents = [line.rstrip() for line in egg_link_file]
egg_link_file.close()
if contents not in ([self.egg_path],
[self.egg_path, self.setup_path]):
log.warn("Link points to %s: uninstall aborted", contents)
return
if not self.dry_run:
os.unlink(self.egg_link)
if not self.dry_run:
self.update_pth(self.dist) # remove any .pth link to us
if self.distribution.scripts:
# XXX should also check for entry point scripts!
log.warn("Note: you must uninstall or replace scripts manually!")
def install_egg_scripts(self, dist):
if dist is not self.dist:
# Installing a dependency, so fall back to normal behavior
return easy_install.install_egg_scripts(self, dist)
# create wrapper scripts in the script dir, pointing to dist.scripts
# new-style...
self.install_wrapper_scripts(dist)
# ...and old-style
for script_name in self.distribution.scripts or []:
script_path = os.path.abspath(convert_path(script_name))
script_name = os.path.basename(script_path)
with io.open(script_path) as strm:
script_text = strm.read()
self.install_script(dist, script_name, script_text, script_path)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def write_parallel_text(sources, targets, output_prefix):
"""
Writes two files where each line corresponds to one example
- [output_prefix].sources.txt
- [output_prefix].targets.txt
Args:
sources: Iterator of source strings
targets: Iterator of target strings
output_prefix: Prefix for the output file
"""
source_filename = os.path.abspath(os.path.join(output_prefix, "sources.txt"))
target_filename = os.path.abspath(os.path.join(output_prefix, "targets.txt"))
with io.open(source_filename, "w", encoding='utf8') as source_file:
for record in sources:
source_file.write(record + "\n")
print("Wrote {}".format(source_filename))
with io.open(target_filename, "w", encoding='utf8') as target_file:
for record in targets:
target_file.write(record + "\n")
print("Wrote {}".format(target_filename))
def pngValidator(path=None, data=None, fileObj=None):
"""
Version 3+.
This checks the signature of the image data.
"""
assert path is not None or data is not None or fileObj is not None
if path is not None:
with open(path, "rb") as f:
signature = f.read(8)
elif data is not None:
signature = data[:8]
elif fileObj is not None:
pos = fileObj.tell()
signature = fileObj.read(8)
fileObj.seek(pos)
if signature != pngSignature:
return False, "Image does not begin with the PNG signature."
return True, None
# -------------------
# layercontents.plist
# -------------------
def testInvalidLayerContentsFormat(self):
# bogus
self.makeUFO()
path = os.path.join(self.ufoPath, "layercontents.plist")
os.remove(path)
with open(path, "w") as f:
f.write("test")
reader = UFOReader(self.ufoPath)
self.assertRaises(UFOLibError, reader.getGlyphSet)
# dict
self.makeUFO()
path = os.path.join(self.ufoPath, "layercontents.plist")
os.remove(path)
layerContents = {
"public.default" : "glyphs",
"layer 1" : "glyphs.layer 1",
"layer 2" : "glyphs.layer 2",
}
with open(path, "wb") as f:
writePlist(layerContents, f)
reader = UFOReader(self.ufoPath)
self.assertRaises(UFOLibError, reader.getGlyphSet)
# layer contents invalid name format
def testInvalidLayerContentsFormat(self):
# bogus
self.makeUFO()
path = os.path.join(self.ufoPath, "layercontents.plist")
os.remove(path)
with open(path, "w") as f:
f.write("test")
self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)
# dict
self.makeUFO()
path = os.path.join(self.ufoPath, "layercontents.plist")
os.remove(path)
layerContents = {
"public.default" : "glyphs",
"layer 1" : "glyphs.layer 1",
"layer 2" : "glyphs.layer 2",
}
with open(path, "wb") as f:
writePlist(layerContents, f)
self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)
# __init__: layer contents invalid name format
def testGetGlyphSets(self):
self.makeUFO()
# hack contents.plist
path = os.path.join(self.ufoPath, "glyphs.layer 1", "contents.plist")
with open(path, "wb") as f:
writePlist(dict(b="a.glif"), f)
path = os.path.join(self.ufoPath, "glyphs.layer 2", "contents.plist")
with open(path, "wb") as f:
writePlist(dict(c="a.glif"), f)
# now test
writer = UFOWriter(self.ufoPath)
# default
expected = ["a"]
result = list(writer.getGlyphSet().keys())
self.assertEqual(expected, result)
# layer 1
expected = ["b"]
result = list(writer.getGlyphSet("layer 1", defaultLayer=False).keys())
self.assertEqual(expected, result)
# layer 2
expected = ["c"]
result = list(writer.getGlyphSet("layer 2", defaultLayer=False).keys())
self.assertEqual(expected, result)
# make a new font with two layers
def testNewFontThreeLayers(self):
self.clearUFO()
writer = UFOWriter(self.ufoPath)
writer.getGlyphSet("layer 1", defaultLayer=False)
writer.getGlyphSet()
writer.getGlyphSet("layer 2", defaultLayer=False)
writer.writeLayerContents(["layer 1", "public.default", "layer 2"])
# directories
path = os.path.join(self.ufoPath, "glyphs")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 1")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 2")
exists = os.path.exists(path)
self.assertEqual(True, exists)
# layer contents
path = os.path.join(self.ufoPath, "layercontents.plist")
with open(path, "rb") as f:
result = readPlist(f)
expected = [["layer 1", "glyphs.layer 1"], ["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
self.assertEqual(expected, result)
# add a layer to an existing font
def testRenameLayer(self):
self.makeUFO()
writer = UFOWriter(self.ufoPath)
writer.renameGlyphSet("layer 1", "layer 3")
writer.writeLayerContents(["public.default", "layer 3", "layer 2"])
# directories
path = os.path.join(self.ufoPath, "glyphs")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 1")
exists = os.path.exists(path)
self.assertEqual(False, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 2")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 3")
exists = os.path.exists(path)
self.assertEqual(True, exists)
# layer contents
path = os.path.join(self.ufoPath, "layercontents.plist")
with open(path, "rb") as f:
result = readPlist(f)
expected = [['public.default', 'glyphs'], ['layer 3', 'glyphs.layer 3'], ['layer 2', 'glyphs.layer 2']]
self.assertEqual(expected, result)
def testRemoveLayer(self):
self.makeUFO()
writer = UFOWriter(self.ufoPath)
writer.deleteGlyphSet("layer 1")
writer.writeLayerContents(["public.default", "layer 2"])
# directories
path = os.path.join(self.ufoPath, "glyphs")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 1")
exists = os.path.exists(path)
self.assertEqual(False, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 2")
exists = os.path.exists(path)
self.assertEqual(True, exists)
# layer contents
path = os.path.join(self.ufoPath, "layercontents.plist")
with open(path, "rb") as f:
result = readPlist(f)
expected = [["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
self.assertEqual(expected, result)
# remove default layer
def testRemoveDefaultLayer(self):
self.makeUFO()
writer = UFOWriter(self.ufoPath)
writer.deleteGlyphSet("public.default")
# directories
path = os.path.join(self.ufoPath, "glyphs")
exists = os.path.exists(path)
self.assertEqual(False, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 1")
exists = os.path.exists(path)
self.assertEqual(True, exists)
path = os.path.join(self.ufoPath, "glyphs.layer 2")
exists = os.path.exists(path)
self.assertEqual(True, exists)
# layer contents
path = os.path.join(self.ufoPath, "layercontents.plist")
with open(path, "rb") as f:
result = readPlist(f)
expected = [["layer 1", "glyphs.layer 1"], ["layer 2", "glyphs.layer 2"]]
self.assertEqual(expected, result)
# remove unknown layer
def testWrite(self):
writer = UFOWriter(self.dstDir, formatVersion=2)
writer.setKerningGroupConversionRenameMaps(self.downConversionMapping)
writer.writeKerning(self.kerning)
writer.writeGroups(self.groups)
# test groups
path = os.path.join(self.dstDir, "groups.plist")
with open(path, "rb") as f:
writtenGroups = readPlist(f)
self.assertEqual(writtenGroups, self.expectedWrittenGroups)
# test kerning
path = os.path.join(self.dstDir, "kerning.plist")
with open(path, "rb") as f:
writtenKerning = readPlist(f)
self.assertEqual(writtenKerning, self.expectedWrittenKerning)
self.tearDownUFO()
def writeLayerInfo(self, info):
if self.ufoFormatVersion < 3:
raise GlifLibError("layerinfo.plist is not allowed in UFO %d." % self.ufoFormatVersion)
# gather data
infoData = {}
for attr in list(layerInfoVersion3ValueData.keys()):
if hasattr(info, attr):
try:
value = getattr(info, attr)
except AttributeError:
raise GlifLibError("The supplied info object does not support getting a necessary attribute (%s)." % attr)
if value is None or (attr == 'lib' and not value):
continue
infoData[attr] = value
# validate
infoData = validateLayerInfoVersion3Data(infoData)
# write file
path = os.path.join(self.dirName, LAYERINFO_FILENAME)
with open(path, "wb") as f:
writePlist(infoData, f)
# read caching
def readBytesFromPath(self, path, encoding=None):
"""
Returns the bytes in the file at the given path.
The path must be relative to the UFO path.
Returns None if the file does not exist.
An encoding may be passed if needed.
"""
fullPath = os.path.join(self._path, path)
if not self._checkForFile(fullPath):
return None
if os.path.isdir(fullPath):
raise UFOLibError("%s is a directory." % path)
if encoding:
f = open(fullPath, encoding=encoding)
else:
f = open(fullPath, "rb", encoding=encoding)
data = f.read()
f.close()
return data
def getReadFileForPath(self, path, encoding=None):
"""
Returns a file (or file-like) object for the
file at the given path. The path must be relative
to the UFO path. Returns None if the file does not exist.
An encoding may be passed if needed.
Note: The caller is responsible for closing the open file.
"""
fullPath = os.path.join(self._path, path)
if not self._checkForFile(fullPath):
return None
if os.path.isdir(fullPath):
raise UFOLibError("%s is a directory." % path)
if encoding:
f = open(fullPath, "rb", encoding=encoding)
else:
f = open(fullPath, "r")
return f
def writeFileAtomically(text, path, encoding="utf-8"):
"""
Write text into a file at path. Do this sort of atomically
making it harder to cause corrupt files. This also checks to see
if text matches the text that is already in the file at path.
If so, the file is not rewritten so that the modification date
is preserved. An encoding may be passed if needed.
"""
if os.path.exists(path):
with open(path, "r", encoding=encoding) as f:
oldText = f.read()
if text == oldText:
return
# if the text is empty, remove the existing file
if not text:
os.remove(path)
if text:
with open(path, "w", encoding=encoding) as f:
f.write(text)
def consume(self, doc, payload):
"""
Write text to target directory, using a combination of filename and file
ID as path.
:param doc: Document object.
:param payload: File pointer beloning to document.
:type doc: ``gransk.core.document.Document``
:type payload: ``file``
"""
new_filename = '%s-%s' % \
(doc.docid[0:8], document.secure_path(os.path.basename(doc.path)))
if not os.path.exists(self.root):
os.makedirs(self.root)
new_path = os.path.join(self.root, new_filename)
with io.open(new_path, 'w', encoding='utf-8') as out:
out.write(doc.text)
doc.meta['text_file'] = new_path
def __call__(self, value):
if value is None:
return value
path = unicode(value)
if not os.path.isabs(path):
path = os.path.join(self.directory, path)
try:
value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering)
except IOError as error:
raise ValueError('Cannot open {0} with mode={1} and buffering={2}: {3}'.format(
value, self.mode, self.buffering, error))
return value