python类SpooledTemporaryFile()的实例源码

test_tempfile.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
testing.py 文件源码 项目:Helix 作者: 3lackrush 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def initCase(switches, count):
    _failures.failedItems = []
    _failures.failedParseOn = None
    _failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
testing.py 文件源码 项目:autoscan 作者: b01u 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def initCase(switches, count):
    Failures.failedItems = []
    Failures.failedParseOn = None
    Failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count)
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
test_tempfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
test_tempfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'rb+')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
test_tempfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_text_newline_and_encoding(self):
        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
                                          newline='', encoding='utf-8')
        f.write("\u039B\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n")
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNone(f.name)
        self.assertIsNone(f.newlines)
        self.assertIsNone(f.encoding)

        f.write("\u039B" * 20 + "\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNotNone(f.name)
        self.assertIsNotNone(f.newlines)
        self.assertEqual(f.encoding, 'utf-8')
test_tempfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_truncate_with_size_parameter(self):
        # A SpooledTemporaryFile can be truncated to zero size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.seek(0)
        f.truncate()
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'')
        # A SpooledTemporaryFile can be truncated to a specific size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(4)
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'abcd')
        # A SpooledTemporaryFile rolls over if truncated to large size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(20)
        self.assertTrue(f._rolled)
        if has_stat:
            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
hls_decrypt.py 文件源码 项目:linuxacademy-dl 作者: vassim 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def decrypt(self):
        decrypted_chunk = SpooledTemporaryFile(
            max_size=self.POOL_SIZE,
            mode='wb+'
        )
        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)

        next_chunk = ''
        finished = False

        while not finished:
            chunk, next_chunk = next_chunk, \
                self.chunk_stream.read(1024 * AES.block_size)

            chunk = cipher.decrypt(chunk)

            if len(next_chunk) == 0:
                chunk = self.pkcs7_reverse_padded_chunk(chunk)
                finished = True
            if chunk:
                decrypted_chunk.write(chunk)

        decrypted_chunk.seek(0)
        return decrypted_chunk
net.py 文件源码 项目:skorch 作者: dnouri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __setstate__(self, state):
        disable_cuda = False
        for key in self.cuda_dependent_attributes_:
            if key not in state:
                continue
            dump = state.pop(key)
            with tempfile.SpooledTemporaryFile() as f:
                f.write(dump)
                f.seek(0)
                if state['use_cuda'] and not torch.cuda.is_available():
                    disable_cuda = True
                    val = torch.load(
                        f, map_location=lambda storage, loc: storage)
                else:
                    val = torch.load(f)
            state[key] = val
        if disable_cuda:
            warnings.warn(
                "Model configured to use CUDA but no CUDA devices "
                "available. Loading on CPU instead.",
                DeviceWarning)
            state['use_cuda'] = False

        self.__dict__.update(state)
web.py 文件源码 项目:qgis_resources_sharing 作者: akbargumbira 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __call__(self, environ, start_response):
        if environ.get('HTTP_CONTENT_ENCODING', '') == 'gzip':
            try:
                environ['wsgi.input'].tell()
                wsgi_input = environ['wsgi.input']
            except (AttributeError, IOError, NotImplementedError):
                # The gzip implementation in the standard library of Python 2.x
                # requires working '.seek()' and '.tell()' methods on the input
                # stream.  Read the data into a temporary file to work around
                # this limitation.
                wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024)
                shutil.copyfileobj(environ['wsgi.input'], wsgi_input)
                wsgi_input.seek(0)

            environ['wsgi.input'] = gzip.GzipFile(filename=None, fileobj=wsgi_input, mode='r')
            del environ['HTTP_CONTENT_ENCODING']
            if 'CONTENT_LENGTH' in environ:
                del environ['CONTENT_LENGTH']

        return self.app(environ, start_response)
wsgiprox.py 文件源码 项目:wsgiprox 作者: webrecorder 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def buffer_iter(cls, orig_iter, buff_size=65536):
        out = SpooledTemporaryFile(buff_size)
        size = 0

        for buff in orig_iter:
            size += len(buff)
            out.write(buff)

        content_length_str = str(size)
        out.seek(0)

        def read_iter():
            while True:
                buff = out.read(buff_size)
                if not buff:
                    break
                yield buff

        return content_length_str, read_iter()


# ============================================================================
Peer.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getFile(self, site, inner_path):
        # Use streamFile if client supports it
        if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310:
            return self.streamFile(site, inner_path)

        location = 0
        if config.use_tempfiles:
            buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
        else:
            buff = StringIO()

        s = time.time()
        while True:  # Read in 512k parts
            res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})

            if not res or "body" not in res:  # Error
                return False

            buff.write(res["body"])
            res["body"] = None  # Save memory
            if res["location"] == res["size"]:  # End of file
                break
            else:
                location = res["location"]

        self.download_bytes += res["location"]
        self.download_time += (time.time() - s)
        if self.site:
            self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
        buff.seek(0)
        return buff

    # Download file out of msgpack context to save memory and cpu
Peer.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def streamFile(self, site, inner_path):
        location = 0
        if config.use_tempfiles:
            buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
        else:
            buff = StringIO()

        s = time.time()
        while True:  # Read in 512k parts
            res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)

            if not res:  # Error
                self.log("Invalid response: %s" % res)
                return False

            if res["location"] == res["size"]:  # End of file
                break
            else:
                location = res["location"]

        self.download_bytes += res["location"]
        self.download_time += (time.time() - s)
        self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
        buff.seek(0)
        return buff

    # Send a ping request
scratchdir.py 文件源码 项目:scratchdir 作者: ahawker 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
                encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
                suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
                dir: typing.Optional[str] = None) -> typing.IO:
        """
        Create a new spooled temporary file within the scratch dir.

        This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
        :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
        reaches a certain size.

        By default, a spooled file will never roll over to disk.

        :param max_size: (Optional) max size before the in-memory buffer rolls over to disk
        :type max_size: :class:`~int`
        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~bool`
        :return: SpooledTemporaryFile instance
        :rtype: :class:`~tempfile.SpooledTemporaryFile`
        """
        return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
                                             newline, suffix, prefix, self.join(dir))
warcwriter.py 文件源码 项目:warcio 作者: webrecorder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _create_temp_file(cls):
        return tempfile.SpooledTemporaryFile(max_size=512*1024)


# ============================================================================
msc.py 文件源码 项目:pymsc 作者: jam1garner 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def readFromBytes(self, b, headerEndianess='>'):
        with tempfile.SpooledTemporaryFile(mode='w+b') as f:
            f.write(b)
            f.seek(0)
            self.readFromFile(f, headerEndianess)
files.py 文件源码 项目:archive-Holmes-Totem-Service-Library 作者: HolmesProcessing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        """
        Create the temporary file in memory first, when it uses too much memory
        it is automatically relocated to the filesystem.
        """
        self.file = tempfile.SpooledTemporaryFile(max_size=self.max_size)
        return self.file
models.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def generate_hash(self):
        """Requests the image as found in `url` and generates a perceptual_hash from it"""
        # This is slow: it has to get the image and spool it to a tempfile, then compute
        # the hash
        return None
#        req = requests.get(self.url)
#        if req.status_code == 200:
#            buff = tempfile.SpooledTemporaryFile(max_size=1e9)
#            downloaded = 0
#            filesize = int(req.headers.get('content-length', 1000))  # Set a default length for the test client
#            for chunk in req.iter_content():
#                downloaded += len(chunk)
#                buff.write(chunk)
#            buff.seek(0)
#            im = PillowImage.open(io.BytesIO(buff.read()))
#            return str(imagehash.average_hash(im))
test_tempfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def do_create(self, max_size=0, dir=None, pre="", suf=""):
        if dir is None:
            dir = tempfile.gettempdir()
        try:
            file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
        except:
            self.failOnException("SpooledTemporaryFile")

        return file
test_tempfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_basic(self):
        # SpooledTemporaryFile can create files
        f = self.do_create()
        self.assertFalse(f._rolled)
        f = self.do_create(max_size=100, pre="a", suf=".txt")
        self.assertFalse(f._rolled)


问题


面经


文章

微信
公众号

扫码关注公众号