python类SkipTest()的实例源码

util.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def skip_if(predicate, reason=None):
    """Skip a test if predicate is true."""
    reason = reason or predicate.__name__

    from nose import SkipTest

    def decorate(fn):
        fn_name = fn.__name__

        def maybe(*args, **kw):
            if predicate():
                msg = "'%s' skipped: %s" % (
                    fn_name, reason)
                raise SkipTest(msg)
            else:
                return fn(*args, **kw)
        return function_named(maybe, fn_name)
    return decorate
test_delete.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'aliases': u'b\xfck2',
            'fields': [{'id': 'book', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'rating with %', 'type': 'text'}],
            'records': [{'book': 'annakarenina', 'author': 'tolstoy',
                         'rating with %': '90%'},
                        {'book': 'warandpeace', 'author': 'tolstoy',
                         'rating with %': '42%'}]
        }

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
test_interfaces.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setup_class(cls):
        wsgiapp = middleware.make_app(config['global_conf'], **config)
        cls.app = paste.fixture.TestApp(wsgiapp)
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        p.load('datapusher')
        p.load('test_datapusher_plugin')

        resource = factories.Resource(url_type='datastore')
        cls.dataset = factories.Dataset(resources=[resource])

        cls.sysadmin_user = factories.User(name='testsysadmin', sysadmin=True)
        cls.normal_user = factories.User(name='annafan')
        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
test_interrupt.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_interrupted_systemcall(self):
        '''
        Make sure interrupted system calls don't break the world, since we
        can't control what all signals our connection thread will get
        '''
        if 'linux' not in platform:
            raise SkipTest('Unable to reproduce error case on'
                           ' non-linux platforms')

        path = 'interrupt_test'
        value = b"1"
        self.client.create(path, value)

        # set the euid to the current process' euid.
        # glibc sends SIGRT to all children, which will interrupt the
        # system call
        os.seteuid(os.geteuid())

        # basic sanity test that it worked alright
        assert self.client.get(path)[0] == value
test_helpers.py 文件源码 项目:ckan-timeseries 作者: namgk 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setup_class(cls):

        if not pylons.config.get('ckan.datastore.read_url'):
            raise nose.SkipTest('Datastore runs on legacy mode, skipping...')

        engine = db._get_engine(
            {'connection_url': pylons.config['ckan.datastore.write_url']}
        )
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))

        datastore_test_helpers.clear_db(cls.Session)

        create_tables = [
            u'CREATE TABLE test_a (id_a text)',
            u'CREATE TABLE test_b (id_b text)',
            u'CREATE TABLE "TEST_C" (id_c text)',
            u'CREATE TABLE test_d ("?/?" integer)',
        ]
        for create_table_sql in create_tables:
            cls.Session.execute(create_table_sql)
decorators.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def skip(msg=None):
    """Decorator factory - mark a test function for skipping from test suite.

    Parameters
    ----------
      msg : string
        Optional message to be added.

    Returns
    -------
       decorator : function
         Decorator, which, when applied to a function, causes SkipTest
         to be raised, with the optional message added.
      """

    return skipif(True,msg)
test_run.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_obj_del(self):
        """Test that object's __del__ methods are called on exit."""
        if sys.platform == 'win32':
            try:
                import win32api
            except ImportError:
                raise SkipTest("Test requires pywin32")
        src = ("class A(object):\n"
               "    def __del__(self):\n"
               "        print 'object A deleted'\n"
               "a = A()\n")
        self.mktmp(py3compat.doctest_refactor_print(src))
        if dec.module_not_available('sqlite3'):
            err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
        else:
            err = None
        tt.ipexec_validate(self.fname, 'object A deleted', err)
test_path.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_not_writable_ipdir():
    tmpdir = tempfile.mkdtemp()
    os.name = "posix"
    env.pop('IPYTHON_DIR', None)
    env.pop('IPYTHONDIR', None)
    env.pop('XDG_CONFIG_HOME', None)
    env['HOME'] = tmpdir
    ipdir = os.path.join(tmpdir, '.ipython')
    os.mkdir(ipdir, 0o555)
    try:
        open(os.path.join(ipdir, "_foo_"), 'w').close()
    except IOError:
        pass
    else:
        # I can still write to an unwritable dir,
        # assume I'm root and skip the test
        raise SkipTest("I can't create directories that I can't write to")
    with AssertPrints('is not a writable location', channel='stderr'):
        ipdir = paths.get_ipython_dir()
    env.pop('IPYTHON_DIR', None)
iptest.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def monkeypatch_xunit():
    try:
        knownfailureif(True)(lambda: None)()
    except Exception as e:
        KnownFailureTest = type(e)

    def addError(self, test, err, capt=None):
        if issubclass(err[0], KnownFailureTest):
            err = (SkipTest,) + err[1:]
        return self.orig_addError(test, err, capt)

    Xunit.orig_addError = Xunit.addError
    Xunit.addError = addError

#-----------------------------------------------------------------------------
# Check which dependencies are installed and greater than minimum version.
#-----------------------------------------------------------------------------
exclusions.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __call__(self, fn):
        @decorator
        def decorate(fn, *args, **kw):
            if self.predicate():
                if self.reason:
                    msg = "'%s' : %s" % (
                            fn.__name__,
                            self.reason
                        )
                else:
                    msg = "'%s': %s" % (
                            fn.__name__, self.predicate
                        )
                raise SkipTest(msg)
            else:
                if self._fails_on:
                    with self._fails_on.fail_if(name=fn.__name__):
                        return fn(*args, **kw)
                else:
                    return fn(*args, **kw)
        return decorate(fn)
test_nifpga.py 文件源码 项目:nifpga-python 作者: ni 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_status_exceptions_can_be_pickled_across_processes(self):
        try:
            import jobrunner
        except ImportError:
            raise SkipTest("jobrunner not installed, skipping")
        runner = jobrunner.JobRunner(jobrunner.JobRunner.RUN_MODE_MULTIPROCESS,
                                     runnables=[raise_an_exception],
                                     auto_assert=False)
        result = runner.run()[0]
        self.assertTrue(result.exception_occured())
        self.assertEqual(str(result.err_type), str(nifpga.FifoTimeoutError))
        self.assertIn("session: 0xbeef", result.err_class)
        if python_version == 2:
            self.assertIn("a bogus string argument: 'I am a string'", result.err_class)
        else:
            self.assertIn("a bogus string argument: b'I am a string'", result.err_class)
util.py 文件源码 项目:python-group-proj 作者: Sharcee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def skip_if(predicate, reason=None):
    """Skip a test if predicate is true."""
    reason = reason or predicate.__name__

    from nose import SkipTest

    def decorate(fn):
        fn_name = fn.__name__

        def maybe(*args, **kw):
            if predicate():
                msg = "'%s' skipped: %s" % (
                    fn_name, reason)
                raise SkipTest(msg)
            else:
                return fn(*args, **kw)
        return function_named(maybe, fn_name)
    return decorate
test_trace_call.py 文件源码 项目:logfury 作者: ppolewicz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_complex_signature_py3(self):
        if six.PY2:
            raise SkipTest()
        with LogCapture() as l:
            # without this exec Python 2 and pyflakes complain about syntax errors etc
            exec (
                """@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
    pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
            )
            l.check(
                (
                    'test.v0_1.test_base',
                    'DEBUG',
                    "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
                    "i='ii', j='jj')" % (INSPECT_MODULE_NAME,)  # prefix does not work because of the eval, inspect module is for pypy3
                ),
            )
test_ols.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testWLS(self):
        # WLS centered SS changed (fixed) in 0.5.0
        sm_version = sm.version.version
        if sm_version < LooseVersion('0.5.0'):
            raise nose.SkipTest("WLS centered SS not fixed in statsmodels"
                                " version {0}".format(sm_version))

        X = DataFrame(np.random.randn(30, 4), columns=['A', 'B', 'C', 'D'])
        Y = Series(np.random.randn(30))
        weights = X.std(1)

        self._check_wls(X, Y, weights)

        weights.ix[[5, 15]] = np.nan
        Y[[2, 21]] = np.nan
        self._check_wls(X, Y, weights)
test_compat.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_compat():
    # test we have compat with our version of nu

    from pandas.computation import _NUMEXPR_INSTALLED
    try:
        import numexpr as ne
        ver = ne.__version__
        if ver == LooseVersion('2.4.4'):
            assert not _NUMEXPR_INSTALLED
        elif ver < LooseVersion('2.1'):
            with tm.assert_produces_warning(UserWarning,
                                            check_stacklevel=False):
                assert not _NUMEXPR_INSTALLED
        else:
            assert _NUMEXPR_INSTALLED

    except ImportError:
        raise nose.SkipTest("not testing numexpr version compat")
test_compat.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def check_invalid_numexpr_version(engine, parser):
    def testit():
        a, b = 1, 2
        res = pd.eval('a + b', engine=engine, parser=parser)
        tm.assert_equal(res, 3)

    if engine == 'numexpr':
        try:
            import numexpr as ne
        except ImportError:
            raise nose.SkipTest("no numexpr")
        else:
            if ne.__version__ < LooseVersion('2.1'):
                with tm.assertRaisesRegexp(ImportError, "'numexpr' version is "
                                           ".+, must be >= 2.1"):
                    testit()
            elif ne.__version__ == LooseVersion('2.4.4'):
                raise nose.SkipTest("numexpr version==2.4.4")
            else:
                testit()
    else:
        testit()
test_eval.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_expected_pow_result(self, lhs, rhs):
        try:
            expected = _eval_single_bin(lhs, '**', rhs, self.engine)
        except ValueError as e:
            msg = 'negative number cannot be raised to a fractional power'
            try:
                emsg = e.message
            except AttributeError:
                emsg = e

            emsg = u(emsg)

            if emsg == msg:
                if self.engine == 'python':
                    raise nose.SkipTest(emsg)
                else:
                    expected = np.nan
            else:
                raise
        return expected
test_multi.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_legacy_pickle(self):
        if PY3:
            raise nose.SkipTest("testing for legacy pickles not "
                                "support on py3")

        path = tm.get_data_path('multiindex_v1.pickle')
        obj = pd.read_pickle(path)

        obj2 = MultiIndex.from_tuples(obj.values)
        self.assertTrue(obj.equals(obj2))

        res = obj.get_indexer(obj)
        exp = np.arange(len(obj))
        assert_almost_equal(res, exp)

        res = obj.get_indexer(obj2[::-1])
        exp = obj.get_indexer(obj[::-1])
        exp2 = obj2.get_indexer(obj2[::-1])
        assert_almost_equal(res, exp)
        assert_almost_equal(exp, exp2)
test_analytics.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_quantile_interpolation_np_lt_1p9(self):
        # GH #10174
        if not _np_version_under1p9:
            raise nose.SkipTest("Numpy version is greater than 1.9")

        from numpy import percentile

        # interpolation = linear (default case)
        q = self.ts.quantile(0.1, interpolation='linear')
        self.assertEqual(q, percentile(self.ts.valid(), 10))
        q1 = self.ts.quantile(0.1)
        self.assertEqual(q1, percentile(self.ts.valid(), 10))

        # interpolation other than linear
        expErrMsg = "Interpolation methods other than "
        with tm.assertRaisesRegexp(ValueError, expErrMsg):
            self.ts.quantile(0.9, interpolation='nearest')

        # object dtype
        with tm.assertRaisesRegexp(ValueError, expErrMsg):
            q = Series(self.ts, dtype=object).quantile(0.7,
                                                       interpolation='higher')
test_panel.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_to_excel(self):
        try:
            import xlwt  # noqa
            import xlrd  # noqa
            import openpyxl  # noqa
            from pandas.io.excel import ExcelFile
        except ImportError:
            raise nose.SkipTest("need xlwt xlrd openpyxl")

        for ext in ['xls', 'xlsx']:
            path = '__tmp__.' + ext
            with ensure_clean(path) as path:
                self.panel.to_excel(path)
                try:
                    reader = ExcelFile(path)
                except ImportError:
                    raise nose.SkipTest("need xlwt xlrd openpyxl")

                for item, df in self.panel.iteritems():
                    recdf = reader.parse(str(item), index_col=0)
                    assert_frame_equal(df, recdf)
test_panel.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_to_excel_xlsxwriter(self):
        try:
            import xlrd  # noqa
            import xlsxwriter  # noqa
            from pandas.io.excel import ExcelFile
        except ImportError:
            raise nose.SkipTest("Requires xlrd and xlsxwriter. Skipping test.")

        path = '__tmp__.xlsx'
        with ensure_clean(path) as path:
            self.panel.to_excel(path, engine='xlsxwriter')
            try:
                reader = ExcelFile(path)
            except ImportError as e:
                raise nose.SkipTest("cannot write excel file: %s" % e)

            for item, df in self.panel.iteritems():
                recdf = reader.parse(str(item), index_col=0)
                assert_frame_equal(df, recdf)
test_panel4d.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_setitem_ndarray(self):
        raise nose.SkipTest("skipping for now")
    #    from pandas import DateRange, datetools

    #    timeidx = DateRange(start=datetime(2009,1,1),
    #                        end=datetime(2009,12,31),
    #                        offset=datetools.MonthEnd())
    #    lons_coarse = np.linspace(-177.5, 177.5, 72)
    #    lats_coarse = np.linspace(-87.5, 87.5, 36)
    #    P = Panel(items=timeidx, major_axis=lons_coarse,
    #              minor_axis=lats_coarse)
    #    data = np.random.randn(72*36).reshape((72,36))
    #    key = datetime(2009,2,28)
    #    P[key] = data#

    #    assert_almost_equal(P[key].values, data)
test_panel4d.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_shift(self):
        raise nose.SkipTest("skipping for now")
    #    # major
    #    idx = self.panel.major_axis[0]
    #    idx_lag = self.panel.major_axis[1]

    #    shifted = self.panel.shift(1)

    #    assert_frame_equal(self.panel.major_xs(idx),
    #                       shifted.major_xs(idx_lag))

    #    # minor
    #    idx = self.panel.minor_axis[0]
    #    idx_lag = self.panel.minor_axis[1]

    #    shifted = self.panel.shift(1, axis='minor')

    #    assert_frame_equal(self.panel.minor_xs(idx),
    #                       shifted.minor_xs(idx_lag))

    #    self.assertRaises(Exception, self.panel.shift, 1, axis='items')
test_panel4d.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_multiindex_get(self):
        raise nose.SkipTest("skipping for now")
    #    ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1), ('b',2)],
    #                                 names=['first', 'second'])
    #    wp = Panel(np.random.random((4,5,5)),
    #                                items=ind,
    #                                major_axis=np.arange(5),
    #                                minor_axis=np.arange(5))
    #    f1 = wp['a']
    #    f2 = wp.ix['a']
    #    assert_panel_equal(f1, f2)

    #    self.assertTrue((f1.items == [1, 2]).all())
    #    self.assertTrue((f2.items == [1, 2]).all())

    #    ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)],
    #                                 names=['first', 'second'])
test_unpack.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unpacker_hook_refcnt(self):
        if not hasattr(sys, 'getrefcount'):
            raise nose.SkipTest('no sys.getrefcount()')
        result = []

        def hook(x):
            result.append(x)
            return x

        basecnt = sys.getrefcount(hook)

        up = Unpacker(object_hook=hook, list_hook=hook)

        assert sys.getrefcount(hook) >= basecnt + 2

        up.feed(packb([{}]))
        up.feed(packb([{}]))
        assert up.unpack() == [{}]
        assert up.unpack() == [{}]
        assert result == [{}, [{}], {}, [{}]]

        del up

        assert sys.getrefcount(hook) == basecnt
test_analytics.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_built_in_round(self):
        if not compat.PY3:
            raise nose.SkipTest("build in round cannot be overriden "
                                "prior to Python 3")

        # GH11763
        # Here's the test frame we'll be working with
        df = DataFrame(
            {'col1': [1.123, 2.123, 3.123], 'col2': [1.234, 2.234, 3.234]})

        # Default round to integer (i.e. decimals=0)
        expected_rounded = DataFrame(
            {'col1': [1., 2., 3.], 'col2': [1., 2., 3.]})
        assert_frame_equal(round(df), expected_rounded)

    # Clip
test_internals.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_split_block_at(self):

        # with dup column support this method was taken out
        # GH3679
        raise nose.SkipTest("skipping for now")

        bs = list(self.fblock.split_block_at('a'))
        self.assertEqual(len(bs), 1)
        self.assertTrue(np.array_equal(bs[0].items, ['c', 'e']))

        bs = list(self.fblock.split_block_at('c'))
        self.assertEqual(len(bs), 2)
        self.assertTrue(np.array_equal(bs[0].items, ['a']))
        self.assertTrue(np.array_equal(bs[1].items, ['e']))

        bs = list(self.fblock.split_block_at('e'))
        self.assertEqual(len(bs), 1)
        self.assertTrue(np.array_equal(bs[0].items, ['a', 'c']))

        # bblock = get_bool_ex(['f'])
        # bs = list(bblock.split_block_at('f'))
        # self.assertEqual(len(bs), 0)
test_multilevel.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_partial_ix_missing(self):
        raise nose.SkipTest("skipping for now")

        result = self.ymd.ix[2000, 0]
        expected = self.ymd.ix[2000]['A']
        assert_series_equal(result, expected)

        # need to put in some work here

        # self.ymd.ix[2000, 0] = 0
        # self.assertTrue((self.ymd.ix[2000]['A'] == 0).all())

        # Pretty sure the second (and maybe even the first) is already wrong.
        self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6))
        self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6), 0)

    # ---------------------------------------------------------------------
test_cparser.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_header_not_enough_lines_as_recarray(self):

        if compat.is_platform_windows():
            raise nose.SkipTest(
                "segfaults on win-64, only when all tests are run")

        data = ('skip this\n'
                'skip this\n'
                'a,b,c\n'
                '1,2,3\n'
                '4,5,6')

        reader = TextReader(StringIO(data), delimiter=',', header=2,
                            as_recarray=True)
        header = reader.header
        expected = [['a', 'b', 'c']]
        self.assertEqual(header, expected)

        recs = reader.read()
        expected = {'a': [1, 4], 'b': [2, 5], 'c': [3, 6]}
        assert_array_dicts_equal(expected, recs)

        # not enough rows
        self.assertRaises(parser.CParserError, TextReader, StringIO(data),
                          delimiter=',', header=5, as_recarray=True)
test_cparser.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_numpy_string_dtype_as_recarray(self):
        data = """\
a,1
aa,2
aaa,3
aaaa,4
aaaaa,5"""

        if compat.is_platform_windows():
            raise nose.SkipTest(
                "segfaults on win-64, only when all tests are run")

        def _make_reader(**kwds):
            return TextReader(StringIO(data), delimiter=',', header=None,
                              **kwds)

        reader = _make_reader(dtype='S4', as_recarray=True)
        result = reader.read()
        self.assertEqual(result['0'].dtype, 'S4')
        ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4')
        self.assertTrue((result['0'] == ex_values).all())
        self.assertEqual(result['1'].dtype, 'S4')


问题


面经


文章

微信
公众号

扫码关注公众号