python类FieldStorage()的实例源码

demo.py 文件源码 项目:e2e-coref 作者: kentonl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_POST(self):
    form = cgi.FieldStorage(
      fp=self.rfile,
      headers=self.headers,
      environ={"REQUEST_METHOD":"POST",
               "CONTENT_TYPE":self.headers["Content-Type"]
      })
    if "text" in form:
      text = form["text"].value.decode("utf-8")
      if len(text) <= 10000:
        print(u"Document text: {}".format(text))
        example = make_predictions(text, self.model)
        print_predictions(example)
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        self.wfile.write(json.dumps(example))
        return
    self.send_response(400)
    self.send_header("Content-Type", "application/json")
    self.end_headers()
request.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def transcode_fs(self, fs, content_type):
        # transcode FieldStorage
        if PY3: # pragma: no cover
            decode = lambda b: b
        else:
            decode = lambda b: b.decode(self.charset, self.errors)
        data = []
        for field in fs.list or ():
            field.name = decode(field.name)
            if field.filename:
                field.filename = decode(field.filename)
                data.append((field.name, field))
            else:
                data.append((field.name, decode(field.value)))

        # TODO: transcode big requests to temp file
        content_type, fout = _encode_multipart(
            data,
            content_type,
            fout=io.BytesIO()
        )
        return fout

# TODO: remove in 1.4
file.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getUploads(self, field_name = None):
        """
            Get uploads sent to this handler.
            Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc.

            Args:
                field_name: Only select uploads that were sent as a specific field.

            Returns:
                A list of BlobInfo records corresponding to each upload.
                Empty list if there are no blob-info records for field_name.

        """
        uploads = collections.defaultdict(list)

        for key, value in request.current.get().request.params.items():
            if isinstance(value, cgi.FieldStorage):
                if "blob-key" in value.type_options:
                    uploads[key].append(blobstore.parse_blob_info(value))
        if field_name:
            return list(uploads.get(field_name, []))
        results = []
        for uploads in uploads.itervalues():
            results.extend(uploads)
        return results
dbtransfer.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getUploads(self, field_name=None):
        """
            Get uploads sent to this handler.
            Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc.

            Args:
                field_name: Only select uploads that were sent as a specific field.

            Returns:
                A list of BlobInfo records corresponding to each upload.
                Empty list if there are no blob-info records for field_name.

        """
        uploads = collections.defaultdict(list)
        for key, value in request.current.get().request.params.items():
            if isinstance(value, cgi.FieldStorage):
                if 'blob-key' in value.type_options:
                    uploads[key].append(blobstore.parse_blob_info(value))
        if field_name:
            return list(uploads.get(field_name, []))
        else:
            results = []
            for uploads in uploads.itervalues():
                results.extend(uploads)
            return results
cvs.py 文件源码 项目:postsai 作者: postsai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process(self):
        """Returns information about a commit"""

        form = cgi.FieldStorage()
        commit = self.read_commit(form)

        print("Content-Type: text/plain; charset='utf-8'\r")
        print("Cache-Control: max-age=60\r")
        if form.getfirst("download", "false") == "true":
            print("Content-Disposition: attachment; filename=\"patch.txt\"\r")

        print("\r")

        print("#" + json.dumps(PostsaiCommitViewer.format_commit_header(commit), default=convert_to_builtin_type))
        sys.stdout.flush()
        PostsaiCommitViewer.dump_commit_diff(commit)
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def do_test(buf, method):
    env = {}
    if method == "GET":
        fp = None
        env['REQUEST_METHOD'] = 'GET'
        env['QUERY_STRING'] = buf
    elif method == "POST":
        fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
        env['REQUEST_METHOD'] = 'POST'
        env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
        env['CONTENT_LENGTH'] = str(len(buf))
    else:
        raise ValueError("unknown method: %s" % method)
    try:
        return cgi.parse(fp, env, strict_parsing=1)
    except Exception as err:
        return ComparableException(err)
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_strict(self):
        for orig, expect in parse_strict_test_cases:
            # Test basic parsing
            d = do_test(orig, "GET")
            self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
            d = do_test(orig, "POST")
            self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))

            env = {'QUERY_STRING': orig}
            fs = cgi.FieldStorage(environ=env)
            if isinstance(expect, dict):
                # test dict interface
                self.assertEqual(len(expect), len(fs))
                self.assertCountEqual(expect.keys(), fs.keys())
                ##self.assertEqual(norm(expect.values()), norm(fs.values()))
                ##self.assertEqual(norm(expect.items()), norm(fs.items()))
                self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
                # test individual fields
                for key in expect.keys():
                    expect_val = expect[key]
                    self.assertIn(key, fs)
                    if len(expect_val) > 1:
                        self.assertEqual(fs.getvalue(key), expect_val)
                    else:
                        self.assertEqual(fs.getvalue(key), expect_val[0])
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_fieldstorage_multipart(self):
        #Test basic FieldStorage multipart parsing
        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
            'CONTENT_LENGTH': '558'}
        fp = BytesIO(POSTDATA.encode('latin-1'))
        fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
        self.assertEqual(len(fs.list), 4)
        expect = [{'name':'id', 'filename':None, 'value':'1234'},
                  {'name':'title', 'filename':None, 'value':''},
                  {'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
                  {'name':'submit', 'filename':None, 'value':' Add '}]
        for x in range(len(fs.list)):
            for k, exp in expect[x].items():
                got = getattr(fs.list[x], k)
                self.assertEqual(got, exp)
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_fieldstorage_multipart_leading_whitespace(self):
        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
            'CONTENT_LENGTH': '560'}
        # Add some leading whitespace to our post data that will cause the
        # first line to not be the innerboundary.
        fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
        fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
        self.assertEqual(len(fs.list), 4)
        expect = [{'name':'id', 'filename':None, 'value':'1234'},
                  {'name':'title', 'filename':None, 'value':''},
                  {'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
                  {'name':'submit', 'filename':None, 'value':' Add '}]
        for x in range(len(fs.list)):
            for k, exp in expect[x].items():
                got = getattr(fs.list[x], k)
                self.assertEqual(got, exp)
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_fieldstorage_multipart_w3c(self):
        # Test basic FieldStorage multipart parsing (W3C sample)
        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
            'CONTENT_LENGTH': str(len(POSTDATA_W3))}
        fp = BytesIO(POSTDATA_W3.encode('latin-1'))
        fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
        self.assertEqual(len(fs.list), 2)
        self.assertEqual(fs.list[0].name, 'submit-name')
        self.assertEqual(fs.list[0].value, 'Larry')
        self.assertEqual(fs.list[1].name, 'files')
        files = fs.list[1].value
        self.assertEqual(len(files), 2)
        expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
                  {'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
        for x in range(len(files)):
            for k, exp in expect[x].items():
                got = getattr(files[x], k)
                self.assertEqual(got, exp)
test_cgi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_fieldstorage_part_content_length(self):
        BOUNDARY = "JfISa01"
        POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 5

Larry
--JfISa01"""
        env = {
            'REQUEST_METHOD': 'POST',
            'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
            'CONTENT_LENGTH': str(len(POSTDATA))}
        fp = BytesIO(POSTDATA.encode('latin-1'))
        fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
        self.assertEqual(len(fs.list), 1)
        self.assertEqual(fs.list[0].name, 'submit-name')
        self.assertEqual(fs.list[0].value, 'Larry')
fcgi.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_app(environ, start_response):
        """Probably not the most efficient example."""
        import cgi
        start_response('200 OK', [('Content-Type', 'text/html')])
        yield '<html><head><title>Hello World!</title></head>\n' \
              '<body>\n' \
              '<p>Hello World!</p>\n' \
              '<table border="1">'
        names = environ.keys()
        names.sort()
        for name in names:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                name, cgi.escape(`environ[name]`))

        form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
                                keep_blank_values=1)
        if form.list:
            yield '<tr><th colspan="2">Form data</th></tr>'

        for field in form.list:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                field.name, field.value)

        yield '</table>\n' \
              '</body></html>\n'
blobstore_handlers.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_uploads(self, field_name=None):
    """Get uploads sent to this handler.

    Args:
      field_name: Only select uploads that were sent as a specific field.

    Returns:
      A list of BlobInfo records corresponding to each upload.
      Empty list if there are no blob-info records for field_name.
    """
    if self.__uploads is None:
      self.__uploads = collections.defaultdict(list)
      for key, value in self.request.params.items():
        if isinstance(value, cgi.FieldStorage):
          if 'blob-key' in value.type_options:
            self.__uploads[key].append(blobstore.parse_blob_info(value))

    if field_name:
      return list(self.__uploads.get(field_name, []))
    else:
      results = []
      for uploads in self.__uploads.itervalues():
        results.extend(uploads)
      return results
blobstore_handlers.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_file_infos(self, field_name=None):
    """Get the file infos associated to the uploads sent to this handler.

    Args:
      field_name: Only select uploads that were sent as a specific field.
        Specify None to select all the uploads.

    Returns:
      A list of FileInfo records corresponding to each upload.
      Empty list if there are no FileInfo records for field_name.
    """
    if self.__file_infos is None:
      self.__file_infos = collections.defaultdict(list)
      for key, value in self.request.params.items():
        if isinstance(value, cgi.FieldStorage):
          if 'blob-key' in value.type_options:
            self.__file_infos[key].append(blobstore.parse_file_info(value))

    if field_name:
      return list(self.__file_infos.get(field_name, []))
    else:
      results = []
      for uploads in self.__file_infos.itervalues():
        results.extend(uploads)
      return results
request.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def transcode_fs(self, fs, content_type):
        # transcode FieldStorage
        if PY3: # pragma: no cover
            decode = lambda b: b
        else:
            decode = lambda b: b.decode(self.charset, self.errors)
        data = []
        for field in fs.list or ():
            field.name = decode(field.name)
            if field.filename:
                field.filename = decode(field.filename)
                data.append((field.name, field))
            else:
                data.append((field.name, decode(field.value)))

        # TODO: transcode big requests to temp file
        content_type, fout = _encode_multipart(
            data,
            content_type,
            fout=io.BytesIO()
        )
        return fout

# TODO: remove in 1.4
test_form.py 文件源码 项目:mechanize 作者: python-mechanize 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_file_request(self):
        import cgi

        # fill in a file upload form...
        form = self.make_form()
        form["user"] = "john"
        data_control = form.find_control("data")
        data = "blah\nbaz\n"
        data_control.add_file(StringIO(data))
        # print "data_control._upload_data", data_control._upload_data
        req = form.click()
        self.assertTrue(
            get_header(req, "Content-type").startswith(
                "multipart/form-data; boundary="))

        # print "req.get_data()\n>>%s<<" % req.get_data()

        # ...and check the resulting request is understood by cgi module
        fs = cgi.FieldStorage(
            StringIO(req.get_data()),
            CaseInsensitiveDict(header_items(req)),
            environ={"REQUEST_METHOD": "POST"})
        self.assert_(fs["user"].value == "john")
        self.assert_(fs["data"].value == data)
        self.assertEquals(fs["data"].filename, "")
test_form.py 文件源码 项目:mechanize 作者: python-mechanize 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_file_request_with_filename(self):
        import cgi

        # fill in a file upload form...
        form = self.make_form()
        form["user"] = "john"
        data_control = form.find_control("data")
        data = "blah\nbaz\n"
        data_control.add_file(StringIO(data), filename="afilename")
        req = form.click()
        self.assert_(
            get_header(req, "Content-type").startswith(
                "multipart/form-data; boundary="))

        # ...and check the resulting request is understood by cgi module
        fs = cgi.FieldStorage(
            StringIO(req.get_data()),
            CaseInsensitiveDict(header_items(req)),
            environ={"REQUEST_METHOD": "POST"})
        self.assert_(fs["user"].value == "john")
        self.assert_(fs["data"].value == data)
        self.assert_(fs["data"].filename == "afilename")
scgi_fork.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_app(environ, start_response):
        """Probably not the most efficient example."""
        import cgi
        start_response('200 OK', [('Content-Type', 'text/html')])
        yield '<html><head><title>Hello World!</title></head>\n' \
              '<body>\n' \
              '<p>Hello World!</p>\n' \
              '<table border="1">'
        names = environ.keys()
        names.sort()
        for name in names:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                name, cgi.escape(`environ[name]`))

        form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
                                keep_blank_values=1)
        if form.list:
            yield '<tr><th colspan="2">Form data</th></tr>'

        for field in form.list:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                field.name, field.value)

        yield '</table>\n' \
              '</body></html>\n'
ajp_fork.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_app(environ, start_response):
        """Probably not the most efficient example."""
        import cgi
        start_response('200 OK', [('Content-Type', 'text/html')])
        yield '<html><head><title>Hello World!</title></head>\n' \
              '<body>\n' \
              '<p>Hello World!</p>\n' \
              '<table border="1">'
        names = environ.keys()
        names.sort()
        for name in names:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                name, cgi.escape(`environ[name]`))

        form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
                                keep_blank_values=1)
        if form.list:
            yield '<tr><th colspan="2">Form data</th></tr>'

        for field in form.list:
            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
                field.name, field.value)

        yield '</table>\n' \
              '</body></html>\n'


问题


面经


文章

微信
公众号

扫码关注公众号