python类open()的实例源码

letters.py 文件源码 项目:margy 作者: opentower 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def add():
    if request.method == 'POST':
        if request.form['submit'] == 'Add':
            addr = request.form['addr'].lstrip().rstrip()
            f = io.open('blastlist.txt', 'a', encoding="utf-8")
            f.write(addr.decode('utf-8') + u'\r\n')
            f.close()
            return render_template('listadded.html',addr=addr)
        elif request.form['submit'] == 'Remove':
            addr = request.form['addr'].lstrip().rstrip()
            f = io.open('blastlist.txt', 'r', encoding="utf-8")
            lines = f.readlines()
            f.close()
            f = io.open('blastlist.txt', 'w', encoding="utf-8")
            for line in lines:
                if addr not in line:
                    f.write(line.decode('utf-8'))
            f.close()
            return render_template('listremoved.html',addr=addr)
util.py 文件源码 项目:lang-reps 作者: chaitanyamalaviya 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __iter__(self):
        """
        Read a file where each line is of the form "word1 word2 ..."
        Yields lists of the form [word1, word2, ...]
        """
        if os.path.isdir(self.fname):
            filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
        else:
            filenames = [self.fname]
        for filename in filenames:
            # with io.open(filename, encoding='utf-8') as f:
            with open(filename) as f:
                doc = f.read()
                for line in doc.split("\n"):
                    #if not line:  continue
                    sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
                    # sent = [word for word in line.strip().split()]
                    sent = [self.begin] + sent + [self.end]
                    yield sent
util.py 文件源码 项目:lang-reps 作者: chaitanyamalaviya 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __iter__(self):
        """
        Read a file where each line is of the form "word1 word2 ..."
        Yields lists of the form [word1, word2, ...]
        """
        #jfbbb
    if os.path.isdir(self.fname):
            filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
        #else:
        #    filenames = [self.fname]

        for langpath in filenames:
            with open(filename) as f:
                doc = f.read()
                for line in doc.split("\n"):
                    #if not line:  continue
                    sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
                    # sent = [word for word in line.strip().split()]
                    sent = [self.begin] + sent + [self.end]
                    yield sent
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
sdist.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_manifest(self):
        """Read the manifest file (named by 'self.manifest') and use it to
        fill in 'self.filelist', the list of files to include in the source
        distribution.
        """
        log.info("reading manifest file '%s'", self.manifest)
        manifest = open(self.manifest, 'rb')
        for line in manifest:
            # The manifest must contain UTF-8. See #303.
            if six.PY3:
                try:
                    line = line.decode('UTF-8')
                except UnicodeDecodeError:
                    log.warn("%r not UTF-8 decodable -- skipping" % line)
                    continue
            # ignore comments and blank lines
            line = line.strip()
            if line.startswith('#') or not line:
                continue
            self.filelist.append(line)
        manifest.close()
develop.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def uninstall_link(self):
        if os.path.exists(self.egg_link):
            log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
            egg_link_file = open(self.egg_link)
            contents = [line.rstrip() for line in egg_link_file]
            egg_link_file.close()
            if contents not in ([self.egg_path],
                                [self.egg_path, self.setup_path]):
                log.warn("Link points to %s: uninstall aborted", contents)
                return
            if not self.dry_run:
                os.unlink(self.egg_link)
        if not self.dry_run:
            self.update_pth(self.dist)  # remove any .pth link to us
        if self.distribution.scripts:
            # XXX should also check for entry point scripts!
            log.warn("Note: you must uninstall or replace scripts manually!")
develop.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def install_egg_scripts(self, dist):
        if dist is not self.dist:
            # Installing a dependency, so fall back to normal behavior
            return easy_install.install_egg_scripts(self, dist)

        # create wrapper scripts in the script dir, pointing to dist.scripts

        # new-style...
        self.install_wrapper_scripts(dist)

        # ...and old-style
        for script_name in self.distribution.scripts or []:
            script_path = os.path.abspath(convert_path(script_name))
            script_name = os.path.basename(script_path)
            with io.open(script_path) as strm:
                script_text = strm.read()
            self.install_script(dist, script_name, script_text, script_path)
easy_install.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
sdist.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read_manifest(self):
        """Read the manifest file (named by 'self.manifest') and use it to
        fill in 'self.filelist', the list of files to include in the source
        distribution.
        """
        log.info("reading manifest file '%s'", self.manifest)
        manifest = open(self.manifest, 'rb')
        for line in manifest:
            # The manifest must contain UTF-8. See #303.
            if six.PY3:
                try:
                    line = line.decode('UTF-8')
                except UnicodeDecodeError:
                    log.warn("%r not UTF-8 decodable -- skipping" % line)
                    continue
            # ignore comments and blank lines
            line = line.strip()
            if line.startswith('#') or not line:
                continue
            self.filelist.append(line)
        manifest.close()
develop.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def uninstall_link(self):
        if os.path.exists(self.egg_link):
            log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
            egg_link_file = open(self.egg_link)
            contents = [line.rstrip() for line in egg_link_file]
            egg_link_file.close()
            if contents not in ([self.egg_path],
                                [self.egg_path, self.setup_path]):
                log.warn("Link points to %s: uninstall aborted", contents)
                return
            if not self.dry_run:
                os.unlink(self.egg_link)
        if not self.dry_run:
            self.update_pth(self.dist)  # remove any .pth link to us
        if self.distribution.scripts:
            # XXX should also check for entry point scripts!
            log.warn("Note: you must uninstall or replace scripts manually!")
develop.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def install_egg_scripts(self, dist):
        if dist is not self.dist:
            # Installing a dependency, so fall back to normal behavior
            return easy_install.install_egg_scripts(self, dist)

        # create wrapper scripts in the script dir, pointing to dist.scripts

        # new-style...
        self.install_wrapper_scripts(dist)

        # ...and old-style
        for script_name in self.distribution.scripts or []:
            script_path = os.path.abspath(convert_path(script_name))
            script_name = os.path.basename(script_path)
            with io.open(script_path) as strm:
                script_text = strm.read()
            self.install_script(dist, script_name, script_text, script_path)
easy_install.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
generate_toy_data.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_parallel_text(sources, targets, output_prefix):
  """
  Writes two files where each line corresponds to one example
    - [output_prefix].sources.txt
    - [output_prefix].targets.txt

  Args:
    sources: Iterator of source strings
    targets: Iterator of target strings
    output_prefix: Prefix for the output file
  """
  source_filename = os.path.abspath(os.path.join(output_prefix, "sources.txt"))
  target_filename = os.path.abspath(os.path.join(output_prefix, "targets.txt"))

  with io.open(source_filename, "w", encoding='utf8') as source_file:
    for record in sources:
      source_file.write(record + "\n")
  print("Wrote {}".format(source_filename))

  with io.open(target_filename, "w", encoding='utf8') as target_file:
    for record in targets:
      target_file.write(record + "\n")
  print("Wrote {}".format(target_filename))
validators.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pngValidator(path=None, data=None, fileObj=None):
    """
    Version 3+.

    This checks the signature of the image data.
    """
    assert path is not None or data is not None or fileObj is not None
    if path is not None:
        with open(path, "rb") as f:
            signature = f.read(8)
    elif data is not None:
        signature = data[:8]
    elif fileObj is not None:
        pos = fileObj.tell()
        signature = fileObj.read(8)
        fileObj.seek(pos)
    if signature != pngSignature:
        return False, "Image does not begin with the PNG signature."
    return True, None

# -------------------
# layercontents.plist
# -------------------
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testInvalidLayerContentsFormat(self):
        # bogus
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        with open(path, "w") as f:
            f.write("test")
        reader = UFOReader(self.ufoPath)
        self.assertRaises(UFOLibError, reader.getGlyphSet)
        # dict
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        layerContents = {
            "public.default" : "glyphs",
            "layer 1" : "glyphs.layer 1",
            "layer 2" : "glyphs.layer 2",
        }
        with open(path, "wb") as f:
            writePlist(layerContents, f)
        reader = UFOReader(self.ufoPath)
        self.assertRaises(UFOLibError, reader.getGlyphSet)

    # layer contents invalid name format
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testInvalidLayerContentsFormat(self):
        # bogus
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        with open(path, "w") as f:
            f.write("test")
        self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)
        # dict
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        layerContents = {
            "public.default" : "glyphs",
            "layer 1" : "glyphs.layer 1",
            "layer 2" : "glyphs.layer 2",
        }
        with open(path, "wb") as f:
            writePlist(layerContents, f)
        self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)

    # __init__: layer contents invalid name format
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGetGlyphSets(self):
        self.makeUFO()
        # hack contents.plist
        path = os.path.join(self.ufoPath, "glyphs.layer 1", "contents.plist")
        with open(path, "wb") as f:
            writePlist(dict(b="a.glif"), f)
        path = os.path.join(self.ufoPath, "glyphs.layer 2", "contents.plist")
        with open(path, "wb") as f:
            writePlist(dict(c="a.glif"), f)
        # now test
        writer = UFOWriter(self.ufoPath)
        # default
        expected = ["a"]
        result = list(writer.getGlyphSet().keys())
        self.assertEqual(expected, result)
        # layer 1
        expected = ["b"]
        result = list(writer.getGlyphSet("layer 1", defaultLayer=False).keys())
        self.assertEqual(expected, result)
        # layer 2
        expected = ["c"]
        result = list(writer.getGlyphSet("layer 2", defaultLayer=False).keys())
        self.assertEqual(expected, result)

    # make a new font with two layers
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testNewFontThreeLayers(self):
        self.clearUFO()
        writer = UFOWriter(self.ufoPath)
        writer.getGlyphSet("layer 1", defaultLayer=False)
        writer.getGlyphSet()
        writer.getGlyphSet("layer 2", defaultLayer=False)
        writer.writeLayerContents(["layer 1", "public.default", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["layer 1", "glyphs.layer 1"], ["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # add a layer to an existing font
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testRenameLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.renameGlyphSet("layer 1", "layer 3")
        writer.writeLayerContents(["public.default", "layer 3", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 3")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [['public.default', 'glyphs'], ['layer 3', 'glyphs.layer 3'], ['layer 2', 'glyphs.layer 2']]
        self.assertEqual(expected, result)
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testRemoveLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.deleteGlyphSet("layer 1")
        writer.writeLayerContents(["public.default", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # remove default layer
test_UFO3.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testRemoveDefaultLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.deleteGlyphSet("public.default")
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["layer 1", "glyphs.layer 1"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # remove unknown layer
test_UFOConversion.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testWrite(self):
        writer = UFOWriter(self.dstDir, formatVersion=2)
        writer.setKerningGroupConversionRenameMaps(self.downConversionMapping)
        writer.writeKerning(self.kerning)
        writer.writeGroups(self.groups)
        # test groups
        path = os.path.join(self.dstDir, "groups.plist")
        with open(path, "rb") as f:
            writtenGroups = readPlist(f)
        self.assertEqual(writtenGroups, self.expectedWrittenGroups)
        # test kerning
        path = os.path.join(self.dstDir, "kerning.plist")
        with open(path, "rb") as f:
            writtenKerning = readPlist(f)
        self.assertEqual(writtenKerning, self.expectedWrittenKerning)
        self.tearDownUFO()
glifLib.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def writeLayerInfo(self, info):
        if self.ufoFormatVersion < 3:
            raise GlifLibError("layerinfo.plist is not allowed in UFO %d." % self.ufoFormatVersion)
        # gather data
        infoData = {}
        for attr in list(layerInfoVersion3ValueData.keys()):
            if hasattr(info, attr):
                try:
                    value = getattr(info, attr)
                except AttributeError:
                    raise GlifLibError("The supplied info object does not support getting a necessary attribute (%s)." % attr)
                if value is None or (attr == 'lib' and not value):
                    continue
                infoData[attr] = value
        # validate
        infoData = validateLayerInfoVersion3Data(infoData)
        # write file
        path = os.path.join(self.dirName, LAYERINFO_FILENAME)
        with open(path, "wb") as f:
            writePlist(infoData, f)

    # read caching
__init__.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def readBytesFromPath(self, path, encoding=None):
        """
        Returns the bytes in the file at the given path.
        The path must be relative to the UFO path.
        Returns None if the file does not exist.
        An encoding may be passed if needed.
        """
        fullPath = os.path.join(self._path, path)
        if not self._checkForFile(fullPath):
            return None
        if os.path.isdir(fullPath):
            raise UFOLibError("%s is a directory." % path)
        if encoding:
            f = open(fullPath, encoding=encoding)
        else:
            f = open(fullPath, "rb", encoding=encoding)
        data = f.read()
        f.close()
        return data
__init__.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getReadFileForPath(self, path, encoding=None):
        """
        Returns a file (or file-like) object for the
        file at the given path. The path must be relative
        to the UFO path. Returns None if the file does not exist.
        An encoding may be passed if needed.

        Note: The caller is responsible for closing the open file.
        """
        fullPath = os.path.join(self._path, path)
        if not self._checkForFile(fullPath):
            return None
        if os.path.isdir(fullPath):
            raise UFOLibError("%s is a directory." % path)
        if encoding:
            f = open(fullPath, "rb", encoding=encoding)
        else:
            f = open(fullPath, "r")
        return f
__init__.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def writeFileAtomically(text, path, encoding="utf-8"):
    """
    Write text into a file at path. Do this sort of atomically
    making it harder to cause corrupt files. This also checks to see
    if text matches the text that is already in the file at path.
    If so, the file is not rewritten so that the modification date
    is preserved. An encoding may be passed if needed.
    """
    if os.path.exists(path):
        with open(path, "r", encoding=encoding) as f:
            oldText = f.read()
        if text == oldText:
            return
        # if the text is empty, remove the existing file
        if not text:
            os.remove(path)
    if text:
        with open(path, "w", encoding=encoding) as f:
            f.write(text)
store_text.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def consume(self, doc, payload):
    """
    Write text to target directory, using a combination of filename and file
    ID as path.

    :param doc: Document object.
    :param payload: File pointer beloning to document.
    :type doc: ``gransk.core.document.Document``
    :type payload: ``file``
    """
    new_filename = '%s-%s' % \
      (doc.docid[0:8], document.secure_path(os.path.basename(doc.path)))

    if not os.path.exists(self.root):
      os.makedirs(self.root)

    new_path = os.path.join(self.root, new_filename)

    with io.open(new_path, 'w', encoding='utf-8') as out:
      out.write(doc.text)

    doc.meta['text_file'] = new_path
validators.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __call__(self, value):

        if value is None:
            return value

        path = unicode(value)

        if not os.path.isabs(path):
            path = os.path.join(self.directory, path)

        try:
            value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering)
        except IOError as error:
            raise ValueError('Cannot open {0} with mode={1} and buffering={2}: {3}'.format(
                value, self.mode, self.buffering, error))

        return value


问题


面经


文章

微信
公众号

扫码关注公众号