crud.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:gino 作者: fantix 项目源码 文件源码
def apply(self, bind=None, timeout=DEFAULT):
        cls = type(self._instance)
        values = self._values.copy()

        # handle JSON columns
        json_updates = {}
        for prop, value in self._props.items():
            value = prop.save(self._instance, value)
            updates = json_updates.setdefault(prop.column_name, {})
            if self._literal:
                updates[prop.name] = value
            else:
                if isinstance(value, int):
                    value = sa.cast(value, sa.BigInteger)
                elif not isinstance(value, ClauseElement):
                    value = sa.cast(value, sa.Unicode)
                updates[sa.cast(prop.name, sa.Unicode)] = value
        for column_name, updates in json_updates.items():
            column = getattr(cls, column_name)
            if self._literal:
                values[column_name] = column.concat(updates)
            else:
                if isinstance(column.type, sa_pg.JSONB):
                    func = sa.func.jsonb_build_object
                else:
                    func = sa.func.json_build_object
                values[column_name] = column.concat(
                    func(*itertools.chain(*updates.items())))

        opts = dict(return_model=False)
        if timeout is not DEFAULT:
            opts['timeout'] = timeout
        clause = self._clause.values(
            **values,
        ).returning(
            *[getattr(cls, key) for key in values],
        ).execution_options(**opts)
        row = await cls.__metadata__.first(clause, bind=bind)
        if not row:
            raise NoSuchRowError()
        self._instance.__values__.update(row)
        for prop in self._props:
            prop.reload(self._instance)
        return self
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号