python类SkipTest()的实例源码

common_io_test.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download_test_files_if_not_present(self):
        '''
        Download %s file at G-node for testing
        url_for_tests is global at beginning of this file.

        ''' % self.ioclass.__name__
        if not self.use_network:
            raise unittest.SkipTest("Requires download of data from the web")

        url = url_for_tests+self.shortname
        try:
            make_all_directories(self.files_to_download, self.local_test_dir)
            download_test_file(self.files_to_download,
                               self.local_test_dir, url)
        except IOError as exc:
            raise unittest.SkipTest(exc)
test_queryset.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_paged_result_handling(self):
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest("Paging requires native protocol 2+, currently using: {0}".format(PROTOCOL_VERSION))

        # addresses #225
        class PagingTest(Model):
            id = columns.Integer(primary_key=True)
            val = columns.Integer()
        sync_table(PagingTest)

        PagingTest.create(id=1, val=1)
        PagingTest.create(id=2, val=2)

        session = get_session()
        with mock.patch.object(session, 'default_fetch_size', 1):
            results = PagingTest.objects()[:]

        assert len(results) == 2
test_failure_types.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Test is skipped if run with native protocol version <4
        """
        self.support_v5 = True
        if PROTOCOL_VERSION < 4:
            raise unittest.SkipTest(
                "Native protocol 4,0+ is required for custom payloads, currently using %r"
                % (PROTOCOL_VERSION,))
        try:
            self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
            self.session = self.cluster.connect()
        except NoHostAvailable:
            log.info("Protocol Version 5 not supported,")
            self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
            self.session = self.cluster.connect()
            self.support_v5 = False

        self.nodes_currently_failing = []
        self.node1, self.node2, self.node3 = get_cluster().nodes.values()
test_query.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Test is skipped if run with cql version < 2

        """
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest(
                "Protocol 2.0+ is required for Lightweight transactions, currently testing against %r"
                % (PROTOCOL_VERSION,))

        self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.session = self.cluster.connect()

        ddl = '''
            CREATE TABLE test3rf.lwt (
                k int PRIMARY KEY,
                v int )'''
        self.session.execute(ddl)
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cql_compatibility(self):
        if CASS_SERVER_VERSION >= (3, 0):
            raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")

        # having more than one non-PK column is okay if there aren't any
        # clustering columns
        create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()

        self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
        self.assertEqual([], tablemeta.clustering_key)
        self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))

        self.assertTrue(tablemeta.is_cql_compatible)

        # ... but if there are clustering columns, it's not CQL compatible.
        # This is a hacky way to simulate having clustering columns.
        tablemeta.clustering_key = ["foo", "bar"]
        tablemeta.columns["foo"] = None
        tablemeta.columns["bar"] = None
        self.assertFalse(tablemeta.is_cql_compatible)
test_metadata.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_replicas(self):
        """
        Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
        """
        if murmur3 is None:
            raise unittest.SkipTest('the murmur3 extension is not available')

        cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])

        cluster.connect('test3rf')

        self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), [])
        host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0]
        self.assertEqual(host.datacenter, 'dc1')
        self.assertEqual(host.rack, 'r1')
        cluster.shutdown()
test_queryset.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_paged_result_handling(self):
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest(
                "Paging requires native protocol 2+, "
                "currently using: {0}".format(PROTOCOL_VERSION)
            )

        # addresses #225
        class PagingTest(Model):
            id = columns.Integer(primary_key=True)
            val = columns.Integer()
        sync_table(self.conn, PagingTest)

        PagingTest.create(self.conn, id=1, val=1)
        PagingTest.create(self.conn, id=2, val=2)

        with mock.patch.object(self.conn.session, 'default_fetch_size', 1):
            results = PagingTest.objects().find_all(self.conn)

        assert len(results) == 2
        drop_table(self.conn, PagingTest)
test_setups.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_skiptest_in_setupclass(self):
        class Test(unittest2.TestCase):
            @classmethod
            def setUpClass(cls):
                raise unittest2.SkipTest('foo')
            def test_one(self):
                pass
            def test_two(self):
                pass

        result = self.runTests(Test)
        self.assertEqual(result.testsRun, 0)
        self.assertEqual(len(result.errors), 0)
        self.assertEqual(len(result.skipped), 1)
        skipped = result.skipped[0][0]
        self.assertEqual(str(skipped),
                    'setUpClass (%s.%s)' % (__name__,
                    getattr(Test, '__qualname__', Test.__name__)))
test_setups.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_skiptest_in_setupmodule(self):
        class Test(unittest2.TestCase):
            def test_one(self):
                pass
            def test_two(self):
                pass

        class Module(object):
            @staticmethod
            def setUpModule():
                raise unittest2.SkipTest('foo')

        Test.__module__ = 'Module'
        sys.modules['Module'] = Module

        result = self.runTests(Test)
        self.assertEqual(result.testsRun, 0)
        self.assertEqual(len(result.errors), 0)
        self.assertEqual(len(result.skipped), 1)
        skipped = result.skipped[0][0]
        self.assertEqual(str(skipped), 'setUpModule (Module)')
test_discovery.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_discover_with_init_module_that_raises_SkipTest_on_import(self):
        vfs = {abspath('/foo'): ['my_package'],
               abspath('/foo/my_package'): ['__init__.py', 'test_module.py']}
        self.setup_import_issue_package_tests(vfs)
        import_calls = []
        def _get_module_from_name(name):
            import_calls.append(name)
            raise unittest.SkipTest('skipperoo')
        loader = unittest.TestLoader()
        loader._get_module_from_name = _get_module_from_name
        suite = loader.discover(abspath('/foo'))

        self.assertIn(abspath('/foo'), sys.path)
        self.assertEqual(suite.countTestCases(), 1)
        result = unittest.TestResult()
        suite.run(result)
        self.assertEqual(len(result.skipped), 1)
        self.assertEqual(result.testsRun, 1)
        self.assertEqual(import_calls, ['my_package'])

        # Check picklability
        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
            pickle.loads(pickle.dumps(suite, proto))
test_suite.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def assert_garbage_collect_test_after_run(self, TestSuiteClass):
        if not unittest.BaseTestSuite._cleanup:
            raise unittest.SkipTest("Suite cleanup is disabled")

        class Foo(unittest.TestCase):
            def test_nothing(self):
                pass

        test = Foo('test_nothing')
        wref = weakref.ref(test)

        suite = TestSuiteClass([wref()])
        suite.run(unittest.TestResult())

        del test

        # for the benefit of non-reference counting implementations
        gc.collect()

        self.assertEqual(suite._tests, [None])
        self.assertIsNone(wref())
test_connection.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_basic.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
test_connection.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_basic.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
support.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
support.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
test_connection.py 文件源码 项目:bawk 作者: jttwnsnd 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_basic.py 文件源码 项目:bawk 作者: jttwnsnd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
support.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
support.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
support.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
support.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
test.py 文件源码 项目:PDF_text_extract 作者: theemadnes 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_test_client(nowait=False, **kwargs):
    # construct kwargs from the environment
    kw = {'timeout': 30}
    if 'TEST_ES_CONNECTION' in os.environ:
        from elasticsearch import connection
        kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])

    kw.update(kwargs)
    client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)

    # wait for yellow status
    for _ in range(1 if nowait else 100):
        try:
            client.cluster.health(wait_for_status='yellow')
            return client
        except ConnectionError:
            time.sleep(.1)
    else:
        # timeout
        raise SkipTest("Elasticsearch failed to start.")
test_connection.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_connection.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_connection.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
test_basic.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
support.py 文件源码 项目:FightstickDisplay 作者: calexil 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
support.py 文件源码 项目:FightstickDisplay 作者: calexil 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.


问题


面经


文章

微信
公众号

扫码关注公众号