python类select()的实例源码

dal.py 文件源码 项目:mybookshelf2 作者: izderadicka 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def find_genre(gnr):
    async with engine.acquire() as conn:
        genre = model.Genre.__table__
        async def fg(name):
            res = await conn.execute(select([genre.c.id, genre.c.name]).where(func.lower(genre.c.name) == name.lower()))
            g = await res.fetchone()
            if g:
                return {'id':g[0], 'name':g[1]}

        ng =  await fg(gnr['name'])
        if not ng:
            name = await find_synonym(gnr['name'], model.Synonym.GENRE)
            if name:
                ng = await fg(name)

        return ng
dal.py 文件源码 项目:mybookshelf2 作者: izderadicka 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_ebooks_ids_for_object(object_name, id):
    async with engine.acquire() as conn:
        if object_name.lower() == 'author':

            q = select([model.ebook_authors.c.ebook_id]).where(model.ebook_authors.c.author_id == id)
        elif object_name.lower()  == 'series':
            ebook = model.Ebook.__table__
            q = select([ebook.c.id]).where(ebook.c.series_id == id)
        elif object_name.lower()  == 'bookshelf':
            bookshelf_item = model.BookshelfItem.__table__
            q = select([bookshelf_item.c.ebook_id]).where(and_(bookshelf_item.c.ebook_id != None, 
                                                           bookshelf_item.c.bookshelf_id == id)).distinct()
        else:
            raise ValueError('Invalid object_name')

        res = await conn.execute(q)
        res = await res.fetchall()

        return list(map(lambda x: x[0], res))
dal.py 文件源码 项目:mybookshelf2 作者: izderadicka 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_conversion_candidate(ebook_id, to_format):     
    to_format_id = await get_format_id(to_format)
    async with engine.acquire() as conn:
        source = model.Source.__table__
        format = model.Format.__table__
        res = await conn.execute(select([source.c.id, format.c.extension]).where(and_(source.c.ebook_id == ebook_id,
                                                                  source.c.format_id == to_format_id,
                                                                  source.c.format_id == format.c.id))\
                                 .order_by(nullslast(desc(source.c.quality))))
        res = await res.first()  
        if res:
            return res.as_tuple()

        #TODO: Consider optimal selection of the source 
        # in previous version we first selected format (from available convertable in ebook)
        # and then one with best quality -   so actually the other way around  
        q=select([source.c.id, format.c.extension])\
        .where(and_(source.c.format_id == format.c.id, source.c.ebook_id == ebook_id)).order_by(nullslast(desc(source.c.quality)))
        async for row in conn.execute(q):
            if row.extension in settings.CONVERTABLE_TYPES:
                return row.id, row.extension

        return None, None
sql.py 文件源码 项目:livebridge 作者: dpa-newslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_last_updated(self, source_id):
        try:
            db = await self.db
            table = self._get_table()
            result = await db.execute(
                table.select().where(
                    table.c.source_id == source_id
                ).order_by(
                    table.c.updated.desc()
                ).limit(1))
            item = await result.first()
            tstamp = item["updated"] if item else None
            return tstamp
        except Exception as exc:
            logger.error("[DB] Error when querying for last updated item on {}".format(source_id))
            logger.exception(exc)
        return None
sql.py 文件源码 项目:livebridge 作者: dpa-newslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_control(self, updated=None):
        try:
            db = await self.db
            table = self._get_control_table()
            sql = table.select().where(table.c.type == "control")
            if updated:  # check for updated timestamp
                sql = sql.where(table.c.updated != updated)
            sql = sql.limit(1)
            result = await db.execute(sql)
            item = await result.first()
            if item:
                item = dict(item)
                item["data"] = json.loads(item["data"]) if item.get("data") != "" else {}
                return item
        except Exception as exc:
            logger.error("[DB] Error when querying for a control data on {}".format(self.control_table_name))
            logger.error(exc)
        return False
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_upgrade_ip_policy_cidr_inside(self):
        self.connection.execute(
            self.subnets.insert(),
            dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
        dt = datetime.datetime(1970, 1, 1)
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="222", created_at=dt,
                 ip_policy_id="111", cidr="192.168.10.0/32"))

        alembic_command.upgrade(self.config, '2748e48cee3a')
        results = self.connection.execute(
            select([self.ip_policy_cidrs])).fetchall()
        self.assertEqual(len(results), 1)
        result = results[0]
        self.assertEqual(result["id"], "222")
        self.assertEqual(result["created_at"], dt)
        self.assertEqual(result["ip_policy_id"], "111")
        self.assertEqual(result["cidr"], "192.168.10.0/32")
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_upgrade_ip_policy_cidr_overlaps(self):
        self.connection.execute(
            self.subnets.insert(),
            dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="222", created_at=datetime.date(1970, 1, 1),
                 ip_policy_id="111", cidr="192.168.10.0/16"))

        with mock.patch("oslo_utils.uuidutils") as uuid, \
                mock.patch("oslo_utils.timeutils") as tu:
            tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
            uuid.generate_uuid.return_value = "foo"
            alembic_command.upgrade(self.config, '2748e48cee3a')
            results = self.connection.execute(
                select([self.ip_policy_cidrs])).fetchall()
            self.assertEqual(len(results), 1)
            result = results[0]
            self.assertEqual(result["id"], uuid.generate_uuid.return_value)
            self.assertEqual(result["created_at"], tu.utcnow.return_value)
            self.assertEqual(result["ip_policy_id"], "111")
            self.assertEqual(result["cidr"], "192.168.10.0/24")
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_upgrade_ip_policy_cidr_overlaps_v6(self):
        self.connection.execute(
            self.subnets.insert(),
            dict(id="000", _cidr="fd00::/8", ip_policy_id="111"))
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="222", created_at=datetime.date(1970, 1, 1),
                 ip_policy_id="111", cidr="fd00::/7"))

        with mock.patch("oslo_utils.uuidutils") as uuid, \
                mock.patch("oslo_utils.timeutils") as tu:
            tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
            uuid.generate_uuid.return_value = "foo"
            alembic_command.upgrade(self.config, '2748e48cee3a')
            results = self.connection.execute(
                select([self.ip_policy_cidrs])).fetchall()
            self.assertEqual(len(results), 1)
            result = results[0]
            self.assertEqual(result["id"], uuid.generate_uuid.return_value)
            self.assertEqual(result["created_at"], tu.utcnow.return_value)
            self.assertEqual(result["ip_policy_id"], "111")
            self.assertEqual(result["cidr"], "fd00::/8")
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_upgrade_with_subnets_default_ip_policy_cidrs(self):
        self.connection.execute(
            self.subnets.insert(),
            dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
        dt = datetime.datetime(1970, 1, 1)
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="222", created_at=dt,
                 ip_policy_id="111", cidr="192.168.10.0/32"),
            dict(id="223", created_at=dt,
                 ip_policy_id="111", cidr="192.168.10.255/32"))
        alembic_command.upgrade(self.config, '45a07fac3d38')
        results = self.connection.execute(
            select([self.ip_policy_cidrs])).fetchall()
        self.assertEqual(len(results), 2)
        default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
        self.assertIn(results[0]["cidr"], default_cidrs)
        self.assertIn(results[1]["cidr"], default_cidrs)
        self.assertTrue(results[0]["id"] == "222" or results[0]["id"] == "223")
        self.assertTrue(results[1]["id"] == "222" or results[1]["id"] == "223")
        self.assertEqual(results[0]["created_at"], dt)
        self.assertEqual(results[1]["created_at"], dt)
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_upgrade_bulk(self):
        self.connection.execute(
            self.ip_policy.insert(),
            dict(id="1", size=None),
            dict(id="2", size=None))
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="2", ip_policy_id="1", cidr="192.168.10.13/32"),
            dict(id="3", ip_policy_id="1", cidr="192.168.10.16/31"),
            dict(id="4", ip_policy_id="2", cidr="fd00::/64"))
        alembic_command.upgrade(self.config, '28e55acaf366')
        results = self.connection.execute(select([
            self.ip_policy])).fetchall()
        self.assertEqual(len(results), 2)
        for result in results:
            self.assertIn(result["id"], ("1", "2"))
            if result["id"] == "1":
                self.assertEqual(result["size"], 3)
            elif result["id"] == "2":
                self.assertEqual(result["size"], 2 ** 64)
test_migrations.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_upgrade_bulk(self):
        netv4 = netaddr.IPNetwork("192.168.10.13/31")
        netv6 = netaddr.IPNetwork("fd00::/64")
        self.connection.execute(
            self.ip_policy_cidrs.insert(),
            dict(id="1", ip_policy_id="1", cidr=str(netv4)),
            dict(id="2", ip_policy_id="2", cidr=str(netv6)))
        alembic_command.upgrade(self.config, '1664300cb03a')
        results = self.connection.execute(select([
            self.ip_policy_cidrs])).fetchall()
        self.assertEqual(len(results), 2)
        for result in results:
            self.assertIn(result["cidr"], (str(netv4), str(netv6)))
            if result["cidr"] == "192.168.10.13/31":
                self.assertEqual(result["first_ip"], netv4.ipv6().first)
                self.assertEqual(result["last_ip"], netv4.ipv6().last)
            else:
                self.assertEqual(result["first_ip"], netv6.first)
                self.assertEqual(result["last_ip"], netv6.last)
base.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def for_update_clause(self, select, **kw):
        if self.is_subquery():
            return ""

        tmp = ' FOR UPDATE'

        if select._for_update_arg.of:
            tmp += ' OF ' + ', '.join(
                self.process(elem, **kw) for elem in
                select._for_update_arg.of
            )

        if select._for_update_arg.nowait:
            tmp += " NOWAIT"

        return tmp
base.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _TODO_visit_compound_select(self, select):
        """Need to determine how to get ``LIMIT``/``OFFSET`` into a
        ``UNION`` for Oracle.
        """
        pass
base.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def limit_clause(self, select, **kw):
        return ""
indicator_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read(uid):
        s = select([Lamadb.indicator]).where(Lamadb.indicator.c._uid == uid)
        result = Lamadb.execute(s)
        if result.rowcount != 1:
            print("Error read indicator DAO")
            return None
        row = result.fetchone()
        ms = IndicatorDAO.make_from_row(row)
        return ms
indicator_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def find_by_module_uid(module_uid):
        s = select([Lamadb.indicator])\
                    .where(Lamadb.indicator.c._module_uid == module_uid)
        result = Lamadb.execute(s)
        ms_tab = []
        for row in result:
            ms_tab.append(IndicatorDAO.make_from_row(row))
        return ms_tab
module_status_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read(uid):
        s = select([Lamadb.module_status])\
                    .where(Lamadb.module_status.c._uid == uid)
        result = Lamadb.execute(s)
        if result.rowcount != 1:
            print("Error read module Status DAO")
            return None
        row = result.fetchone()
        ms = ModuleStatusDAO.make_from_row(row)
        return ms
module_status_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def find_by_malware_uid(malware_uid):
        s = select([Lamadb.module_status])\
                    .where(Lamadb.module_status.c._malware_uid == malware_uid)
        result = Lamadb.execute(s)
        ms_tab = []
        for row in result:
            ms_tab.append(ModuleStatusDAO.make_from_row(row))
        return ms_tab
analysis_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read(uid):
        s = select([Lamadb.analysis]).where(Lamadb.analysis.c._uid == uid)
        result = Lamadb.execute(s)
        if result.rowcount != 1:
            print("Error read analysis DAO")
            return None
        row = result.fetchone()
        analysis = AnalysisDAO.make_from_row(row)
        analysis._malwares = MalwareDAO.find_by_analysis_uid(analysis.uid)
        return analysis
malware_dao.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read(uid):
        s = select([Lamadb.malware]).where(Lamadb.malware.c._uid == uid)
        result = Lamadb.execute(s)
        if result.rowcount != 1:
            print("Error read malware DAO")
            return None
        row = result.fetchone()
        malware = MalwareDAO.make_from_row(row)
        return malware


问题


面经


文章

微信
公众号

扫码关注公众号