python类InvalidRequestError()的实例源码

__init__.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create(cls, message, object, stage=u'Fetch', line=None):
        '''
        Helper function to create an error object and save it.
        '''
        err = cls(message=message, object=object,
                  stage=stage, line=line)
        try:
            err.save()
        except InvalidRequestError, e:
            # Clear any in-progress sqlalchemy transactions
            try:
                Session.rollback()
            except:
                pass
            try:
                Session.remove()
            except:
                pass
            err.save()
        finally:
            log_message = '{0}, line {1}'.format(message, line) \
                          if line else message
            log.debug(log_message)
Spheniscidae.py 文件源码 项目:Spirit 作者: TunnelBlanket 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connectionLost(self, reason):
        self.logger.info("Client disconnected")

        self.spirit.players.remove(self)

        try:
            self.session.commit()

            if hasattr(self, "room") and self.room is not None:
                self.room.remove(self)

            if hasattr(self, "user"):
                self.session.expunge(self.user)

        except InvalidRequestError:
            self.logger.info("There aren't any transactions in progress")

        finally:
            self.session.close()

        self.event.emit("disconnected", self, reason)
mc_integration.py 文件源码 项目:librarian 作者: HERA-Team 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def note_file_created(self, file_obj):
        """Tell M&C about a new Librarian file. M&C file records can also have null
        obsids, so we don't need to do anything special for maintenance files.

        """
        try:
            self.mc_session.add_lib_file(file_obj.name, file_obj.obsid,
                                         file_obj.create_time_astropy,
                                         file_obj.size / 1024**3)
        except InvalidRequestError as e:
            # This could happen if the file's obsid were not registered in the
            # M&C database. Which shouldn't happen, but ...
            self.error(SEVERE, 'couldn\'t register file %s (obsid %s) with M&C: %s',
                       file_obj.name, file_obj.obsid, e)

        try:
            self.mc_session.commit()
        except SQLAlchemyError as e:
            self.mc_session.rollback()
            self.error(SEVERE, 'could not commit file creation note to the M&C system: %s', e)
generics.py 文件源码 项目:django-rest-witchcraft 作者: shosca 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_model(cls):
        """
        Returns the model class
        """
        model = None

        with suppress(AttributeError, InvalidRequestError):
            model = cls.queryset._only_entity_zero().class_

        if model:
            return model

        with suppress(AttributeError):
            model = cls.serializer_class.Meta.model

        assert model is not None, (
            "Couldn't figure out the model for {viewset} attribute, either provide a"
            'queryset or a serializer with a Meta.model'.format(viewset=cls.__name__)
        )

        return model
test_select.py 文件源码 项目:clickhouse-sqlalchemy 作者: xzkostyan 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_group_by_query(self):
        table = self.create_table()

        query = session.query(table.c.x).group_by(table.c.x)
        self.assertEqual(
            self.compile(query),
            'SELECT x AS t1_x FROM t1 GROUP BY x'
        )

        query = session.query(table.c.x).group_by(table.c.x).with_totals()
        self.assertEqual(
            self.compile(query),
            'SELECT x AS t1_x FROM t1 GROUP BY x WITH TOTALS'
        )

        with self.assertRaises(exc.InvalidRequestError) as ex:
            session.query(table.c.x).with_totals()

        self.assertIn('with_totals', str(ex.exception))
base.py 文件源码 项目:ws-backend-community 作者: lavalamp- 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def tearDown(self):
        """
        Tear down this test case by closing any existing database connection and deleting all of the
        objects that were marked for deletion.
        :return: None
        """
        if len(self.to_delete) > 0:
            for cur_delete in self.to_delete:
                try:
                    self.db_session.delete(cur_delete)
                except InvalidRequestError:
                    continue
            self.db_session.commit()
        if self._db_session is not None:
            self._db_session.close()
        if self._transaction is not None:
            self._transaction.rollback()
        if self._connection is not None:
            self._connection.close()
        super(BaseSqlalchemyTestCase, self).tearDown()

    # Protected Methods

    # Private Methods
base.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
company.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add(cls, id, shortname, fullname, city_id, finance_stage=0, process_rate=0, features='', introduce='',
            address='', advantage='', size=0):
        company = cls(id=id, shortname=shortname, fullname=fullname, finance_stage=int(finance_stage),
                      city_id=int(city_id), process_rate=process_rate, features=features, introduce=introduce,
                      address=address, advantage=advantage, size=int(size))
        try:
            cls.session.merge(company)
            cls.session.flush()
        except InvalidRequestError as e:
            cls.session.rollback()
            raise e
job.py 文件源码 项目:webspider 作者: GuozhuHe 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def add(cls, id, company_id, city_id, title, work_year=0, department='', salary='', education=0, description='',
            advantage='', job_nature=0, created_at=0):
        job = cls(id=id, title=title, city_id=city_id, company_id=company_id, work_year=int(work_year),
                  department=department, salary=salary, education=int(education), description=description,
                  advantage=advantage, job_nature=int(job_nature), created_at=created_at)
        try:
            cls.session.merge(job)
            cls.session.flush()
        except InvalidRequestError as e:
            cls.session.rollback()
            raise e
base.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
utils.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unregister(cls):
        try:
            db.event.remove(db.session, 'before_commit',
                            cls._unexpectedly_closed)
            db.event.remove(db.session, 'after_soft_rollback',
                            cls._unexpectedly_closed)
        except InvalidRequestError:  # ok, it is not registered
            pass
users.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def delete(self, user, force=False):
        """Release all user's resources and mark user as deleted.

        :param user: user id, username or kuberdock.users.models.User object
        :param force: if True, will not raise ResourceReleaseError
        :raises ResourceReleaseError: if couldn't release some resources
        :raises APIError: if user was not found
        """
        user = self._convert_user(user)
        self._is_deletable(user, raise_=True)
        uid = user.id
        user.logout(commit=False)

        pod_collection = PodCollection(user)
        for pod in pod_collection.get(as_json=False):
            pod_collection.delete(pod['id'])
        # Now, when we have deleted all pods, events will rape db session a
        # little bit.
        # Get new, clean user instance to prevent a lot of various SA errors
        user = User.get(uid)

        # Add some delay for deleting drives to allow kubernetes unmap drives
        # after a pod was deleted. Also in some cases this delay is not enough,
        # for example DBMS in container may save data to PD for a long time.
        # So there is a regular procedure to clean such undeleted drives
        # tasks.clean_drives_for_deleted_users.
        for pd in user.persistent_disks:
            PersistentStorage.end_stat(pd.name, user.id)
        delete_persistent_drives_task.apply_async(
            ([pd.id for pd in user.persistent_disks],),
            countdown=10
        )
        prefix = '__' + generate()
        user.username += prefix
        user.email += prefix
        user.deleted = True
        try:
            db.session.commit()
        except (IntegrityError, InvalidRequestError), e:
            db.session.rollback()
            raise APIError('Cannot delete a user: {0}'.format(str(e)), 500)
base.py 文件源码 项目:awvspy 作者: wcc526 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def visit_select_precolumns(self, select):
        """Access puts TOP, it's version of LIMIT here """
        s = select.distinct and "DISTINCT " or ""
        if select.limit:
            s += "TOP %s " % (select.limit)
        if select.offset:
            raise exc.InvalidRequestError(
                    'Access does not support LIMIT with an offset')
        return s
__init__.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create(cls, message, job):
        '''
        Helper function to create an error object and save it.
        '''
        err = cls(message=message, job=job)
        try:
            err.save()
        except InvalidRequestError:
            Session.rollback()
            err.save()
        finally:
            # No need to alert administrator so don't log as an error
            log.info(message)
__init__.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def clean_harvest_log(condition):
    Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
                                .delete(synchronize_session=False)
    try:
        Session.commit()
    except InvalidRequestError:
        Session.rollback()
        log.error('An error occurred while trying to clean-up the harvest log table')

    log.info('Harvest log table clean-up finished successfully')
base.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
core.py 文件源码 项目:ECache 作者: MrKiven 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def make_transient_to_detached(instance):
    '''
    Moved from sqlalchemy newer version
    '''
    state = attributes.instance_state(instance)
    if state.session_id or state.key:
        raise sa_exc.InvalidRequestError(
            "Given object must be transient")
    state.key = state.mapper._identity_key_from_state(state)
    if state.deleted:
        del state.deleted
    state._commit_all(state.dict)
    state._expire_attributes(state.dict, state.unloaded)
base.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                                seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute("SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                        "The Sybase dialect only supports "
                        "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                        "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                        self.root_connection.connection.connection,
                        True)
base.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:Callandtext 作者: iaora 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
base.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def pre_exec(self):
        if self.isinsert:
            tbl = self.compiled.statement.table
            seq_column = tbl._autoincrement_column
            insert_has_sequence = seq_column is not None

            if insert_has_sequence:
                self._enable_identity_insert = \
                    seq_column.key in self.compiled_parameters[0]
            else:
                self._enable_identity_insert = False

            if self._enable_identity_insert:
                self.cursor.execute(
                    "SET IDENTITY_INSERT %s ON" %
                    self.dialect.identifier_preparer.format_table(tbl))

        if self.isddl:
            # TODO: to enhance this, we can detect "ddl in tran" on the
            # database settings.  this error message should be improved to
            # include a note about that.
            if not self.should_autocommit:
                raise exc.InvalidRequestError(
                    "The Sybase dialect only supports "
                    "DDL in 'autocommit' mode at this time.")

            self.root_connection.engine.logger.info(
                "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')")

            self.set_ddl_autocommit(
                self.root_connection.connection.connection,
                True)
backfill_ofx.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _do_account_dir(self, acct_id, path):
        """
        Handle all OFX statements in a per-account directory.

        :param acct_id: account database ID
        :type acct_id: int
        :param path: absolute path to per-account directory
        :type path: str
        """
        logger.debug('Doing account %d directory (%s)', acct_id, path)
        files = {}
        for f in os.listdir(path):
            p = os.path.join(path, f)
            if not os.path.isfile(p):
                continue
            extension = p.split('.')[-1].lower()
            if extension not in ['ofx', 'qfx']:
                continue
            files[p] = os.path.getmtime(p)
        logger.debug('Found %d files for account %d', len(files), acct_id)
        # run through the files, oldest to newest
        success = 0
        already = 0
        for p in sorted(files, key=files.get):
            try:
                self._do_one_file(acct_id, p)
                success += 1
            except DuplicateFileException:
                already += 1
                logger.warning('OFX is already parsed for account; skipping')
            except (InvalidRequestError, IntegrityError, TypeError):
                raise
            except Exception:
                logger.error('Exception parsing and inserting file %s',
                             p, exc_info=True)
        logger.info('Successfully parsed and inserted %d of %d files for '
                    'account %d; %d files already in DB', success, len(files),
                    acct_id, already)
sqlalchemysupport.py 文件源码 项目:reahl 作者: reahl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def instrument_declarative_classes(self, all_classes):
        registry = {}
        for cls in all_classes:
            try:
                if not hasattr(cls, '__mapper__'):
                    instrument_declarative(cls, registry, metadata)
                    logging.getLogger(__file__).info( 'Instrumented %s: __tablename__=%s [polymorphic_identity=%s]' % \
                                                          (cls, cls.table, cls.mapper.polymorphic_identity) )
            except InvalidRequestError:
                logging.info('skipping declarative instrumentation of %s' % cls)
test_invariants.py 文件源码 项目:temporal-sqlalchemy 作者: CloverHealth 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_add_clock_column_verification():
    with pytest.raises(exc.InvalidRequestError):
        @temporal.add_clock('prop_a', 'prop_b', 'prop_c')
        class TempFail(temporal.Clocked, models.ExpectedFailBase):
            __tablename__ = 'temp_fail'
            __table_args__ = {'schema': models.SCHEMA}

            id = models.auto_uuid()
            prop_a = sa.Column(sa.Integer)
            prop_b = sa.Column(sap.TEXT)
models.py 文件源码 项目:ctfscoreboard 作者: google 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_by_name(cls, name):
        try:
            return cls.query.filter_by(name=name).one()
        except exc.InvalidRequestError:
            return None
models.py 文件源码 项目:ctfscoreboard 作者: google 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_by_email(cls, email):
        try:
            return cls.query.filter_by(email=email).one()
        except exc.InvalidRequestError:
            return None


问题


面经


文章

微信
公众号

扫码关注公众号