python类InterfaceError()的实例源码

postgresqlconn.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def insert_one(self, table_name: str, row: dict, retcol: str):
        """ inserts a single row into the specified table and returns the value of the specified
        column.

        row - a dict containing all row values to insert

        returns: new id
        """
        if not self.__conn or not row:
            raise psycopg2.InterfaceError("null connection")

        try:
            columns = "(" + ",".join(row.keys()) + ")"
            holders = "(" + ",".join(["%s"] * len(row.keys())) + ")"
            fillers = tuple(row.values())
            query = "insert into {table_name} {columnnames} values {values} returning {retcol}".format(**{
                "table_name": table_name,
                "columnnames": columns,
                "values": holders,
                "retcol": retcol
                })

            with self.__conn.cursor() as curs:
                curs.execute(query, fillers)
                return curs.fetchone()[0]

        except Exception as e:
            tb.print_exc()
            raise(e)
postgresqlconn.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def insert(self, table_name: str, rows: list) -> int:
        """ inserts the given data into the specified table.

        An individual insert statement is generated per row. The query is handled as a prepared
        statement so multi-inserts are more efficient.

        table_name --
        rows -- list of dicts. each dict representing a row to be inserted

        returns: the number of rows inserted
        """
        n_inserted = 0
        if not self.__conn or not rows:
            raise psycopg2.InterfaceError("null connection")

        try:
            for row in rows:
                columns = "(" + ",".join(row.keys()) + ")"   # "(col1, col2, col3...)"
                params = ",".join(["%s"] * len(row.keys()))  # "%s, %s, %s..."

                query = "insert into {table_name} {columns} select {params}".format(**{
                    "table_name": table_name,
                    "columns": columns,
                    "params": params,
                    })

                n_inserted += self.execute_write_query(query, tuple(row.values()))
            return n_inserted
        except Exception as e:
            tb.print_exc()
            raise(e)
test_connection.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_isolation_level_closed(self):
        cnn = self.connect()
        cnn.close()
        self.assertRaises(psycopg2.InterfaceError,
            cnn.set_isolation_level, 0)
        self.assertRaises(psycopg2.InterfaceError,
            cnn.set_isolation_level, 1)
test_connection.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_closed(self):
        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError,
            self.conn.set_session,
            ext.ISOLATION_LEVEL_SERIALIZABLE)
test_connection.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_closed(self):
        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError,
            setattr, self.conn, 'autocommit', True)

        # The getter doesn't have a guard. We may change this in future
        # to make it consistent with other methods; meanwhile let's just check
        # it doesn't explode.
        try:
            self.assertTrue(self.conn.autocommit in (True, False))
        except psycopg2.InterfaceError:
            pass
test_with.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_with_closed(self):
        def f():
            with self.conn:
                pass

        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError, f)
test_types_extras.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_parse(self):
        from psycopg2.extras import HstoreAdapter

        def ok(s, d):
            self.assertEqual(HstoreAdapter.parse(s, None), d)

        ok(None, None)
        ok('', {})
        ok('"a"=>"1", "b"=>"2"', {'a': '1', 'b': '2'})
        ok('"a"  => "1" , "b"  =>  "2"', {'a': '1', 'b': '2'})
        ok('"a"=>NULL, "b"=>"2"', {'a': None, 'b': '2'})
        ok(r'"a"=>"\"", "\""=>"2"', {'a': '"', '"': '2'})
        ok('"a"=>"\'", "\'"=>"2"', {'a': "'", "'": '2'})
        ok('"a"=>"1", "b"=>NULL', {'a': '1', 'b': None})
        ok(r'"a\\"=>"1"', {'a\\': '1'})
        ok(r'"a\""=>"1"', {'a"': '1'})
        ok(r'"a\\\""=>"1"', {r'a\"': '1'})
        ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'})

        def ko(s):
            self.assertRaises(psycopg2.InterfaceError,
                HstoreAdapter.parse, s, None)

        ko('a')
        ko('"a"')
        ko(r'"a\\""=>"1"')
        ko(r'"a\\\\""=>"1"')
        ko('"a=>"1"')
        ko('"a"=>"1", "b"=>NUL')
test_async.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_async_cursor_gone(self):
        import gc
        cur = self.conn.cursor()
        cur.execute("select 42;")
        del cur
        gc.collect()
        self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn)

        # The connection is still usable
        cur = self.conn.cursor()
        cur.execute("select 42;")
        self.wait(self.conn)
        self.assertEqual(cur.fetchone(), (42,))
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_write_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_read_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.read, 5)
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_seek_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_tell_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.tell)
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_seek_larger_than_2gb(self):
        lo = self.conn.lobject()
        offset = 1 << 32  # 4gb
        self.assertRaises(
            (OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError),
            lo.seek, offset, 0)
test_lobject.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_truncate_larger_than_2gb(self):
        lo = self.conn.lobject()
        length = 1 << 32  # 4gb
        self.assertRaises(
            (OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError),
            lo.truncate, length)
extras.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse(self, s, cur, _bsdec=_re.compile(r"\\(.)")):
        """Parse an hstore representation in a Python string.

        The hstore is represented as something like::

            "a"=>"1", "b"=>"2"

        with backslash-escaped strings.
        """
        if s is None:
            return None

        rv = {}
        start = 0
        for m in self._re_hstore.finditer(s):
            if m is None or m.start() != start:
                raise psycopg2.InterfaceError(
                    "error parsing hstore pair at char %d" % start)
            k = _bsdec.sub(r'\1', m.group(1))
            v = m.group(2)
            if v is not None:
                v = _bsdec.sub(r'\1', v)

            rv[k] = v
            start = m.end()

        if start < len(s):
            raise psycopg2.InterfaceError(
                "error parsing hstore: unparsed data after char %d" % start)

        return rv
extras.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tokenize(self, s):
        rv = []
        for m in self._re_tokenize.finditer(s):
            if m is None:
                raise psycopg2.InterfaceError("can't parse type: %r" % s)
            if m.group(1) is not None:
                rv.append(None)
            elif m.group(2) is not None:
                rv.append(self._re_undouble.sub(r"\1", m.group(2)))
            else:
                rv.append(m.group(3))

        return rv
test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_isolation_level_closed(self):
        cnn = self.connect()
        cnn.close()
        self.assertRaises(psycopg2.InterfaceError, getattr,
            cnn, 'isolation_level')
        self.assertRaises(psycopg2.InterfaceError,
            cnn.set_isolation_level, 0)
        self.assertRaises(psycopg2.InterfaceError,
            cnn.set_isolation_level, 1)
test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_closed(self):
        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError,
            self.conn.set_session,
            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_closed(self):
        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError,
            setattr, self.conn, 'autocommit', True)

        # The getter doesn't have a guard. We may change this in future
        # to make it consistent with other methods; meanwhile let's just check
        # it doesn't explode.
        try:
            self.assert_(self.conn.autocommit in (True, False))
        except psycopg2.InterfaceError:
            pass
test_with.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_with_closed(self):
        def f():
            with self.conn:
                pass

        self.conn.close()
        self.assertRaises(psycopg2.InterfaceError, f)


问题


面经


文章

微信
公众号

扫码关注公众号