python类DatabaseError()的实例源码

schema.py 文件源码 项目:logo-gen 作者: jellene4eva 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
schema.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
base.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (Database.DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
creation.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _execute_allow_fail_statements(self, cursor, statements, parameters, verbosity, acceptable_ora_err):
        """
        Execute statements which are allowed to fail silently if the Oracle
        error code given by `acceptable_ora_err` is raised. Return True if the
        statements execute without an exception, or False otherwise.
        """
        try:
            # Statement can fail when acceptable_ora_err is not None
            allow_quiet_fail = acceptable_ora_err is not None and len(acceptable_ora_err) > 0
            self._execute_statements(cursor, statements, parameters, verbosity, allow_quiet_fail=allow_quiet_fail)
            return True
        except DatabaseError as err:
            description = str(err)
            if acceptable_ora_err is None or acceptable_ora_err not in description:
                raise
            return False
features.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def supports_stddev(self):
        """Confirm support for STDDEV and related stats functions

        SQLite supports STDDEV as an extension package; so
        connection.ops.check_expression_support() can't unilaterally
        rule out support for STDDEV. We need to manually check
        whether the call works.
        """
        with self.connection.cursor() as cursor:
            cursor.execute('CREATE TABLE STDDEV_TEST (X INT)')
            try:
                cursor.execute('SELECT STDDEV(*) FROM STDDEV_TEST')
                has_support = True
            except utils.DatabaseError:
                has_support = False
            cursor.execute('DROP TABLE STDDEV_TEST')
        return has_support
base.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def validate_thread_sharing(self):
        """
        Validates that the connection isn't accessed by another thread than the
        one which originally created it, unless the connection was explicitly
        authorized to be shared between threads (via the `allow_thread_sharing`
        property). Raises an exception if the validation fails.
        """
        if not (self.allow_thread_sharing or self._thread_ident == thread.get_ident()):
            raise DatabaseError(
                "DatabaseWrapper objects created in a "
                "thread can only be used in that same thread. The object "
                "with alias '%s' was created in thread id %s and this is "
                "thread id %s."
                % (self.alias, self._thread_ident, thread.get_ident())
            )

    # ##### Miscellaneous #####
compiler.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_combinator_sql(self, combinator, all):
        features = self.connection.features
        compilers = [
            query.get_compiler(self.using, self.connection)
            for query in self.query.combined_queries
        ]
        if not features.supports_slicing_ordering_in_compound:
            for query, compiler in zip(self.query.combined_queries, compilers):
                if query.low_mark or query.high_mark:
                    raise DatabaseError('LIMIT/OFFSET not allowed in subqueries of compound statements.')
                if compiler.get_order_by():
                    raise DatabaseError('ORDER BY not allowed in subqueries of compound statements.')
        parts = (compiler.as_sql() for compiler in compilers)
        combinator_sql = self.connection.ops.set_operators[combinator]
        if all and combinator == 'union':
            combinator_sql += ' ALL'
        braces = '({})' if features.supports_slicing_ordering_in_compound else '{}'
        sql_parts, args_parts = zip(*((braces.format(sql), args) for sql, args in parts))
        result = [' {} '.format(combinator_sql).join(sql_parts)]
        params = []
        for part in args_parts:
            params.extend(part)
        return result, params
schema.py 文件源码 项目:gmail_scanner 作者: brandonhub 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
schema.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
base.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
features.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def supports_stddev(self):
        """Confirm support for STDDEV and related stats functions

        SQLite supports STDDEV as an extension package; so
        connection.ops.check_expression_support() can't unilaterally
        rule out support for STDDEV. We need to manually check
        whether the call works.
        """
        with self.connection.cursor() as cursor:
            cursor.execute('CREATE TABLE STDDEV_TEST (X INT)')
            try:
                cursor.execute('SELECT STDDEV(*) FROM STDDEV_TEST')
                has_support = True
            except utils.DatabaseError:
                has_support = False
            cursor.execute('DROP TABLE STDDEV_TEST')
        return has_support
base.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def validate_thread_sharing(self):
        """
        Validates that the connection isn't accessed by another thread than the
        one which originally created it, unless the connection was explicitly
        authorized to be shared between threads (via the `allow_thread_sharing`
        property). Raises an exception if the validation fails.
        """
        if not (self.allow_thread_sharing
                or self._thread_ident == thread.get_ident()):
            raise DatabaseError("DatabaseWrapper objects created in a "
                "thread can only be used in that same thread. The object "
                "with alias '%s' was created in thread id %s and this is "
                "thread id %s."
                % (self.alias, self._thread_ident, thread.get_ident()))

    # ##### Miscellaneous #####
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prefix_add_tags(request, prefix_id):
    """Adds usages to a prefix from post data"""
    prefix = Prefix.objects.get(pk=prefix_id)
    existing_usages = {u[0] for u in prefix.usages.values_list()}
    usages = set(request.POST.getlist('usages'))

    to_remove = list(existing_usages - usages)
    to_add = list(usages - existing_usages)

    PrefixUsage.objects.filter(prefix=prefix,
                               usage__in=to_remove).delete()
    for usage_key in to_add:
        usage = Usage.objects.get(pk=usage_key)
        try:
            PrefixUsage(prefix=prefix, usage=usage).save()
        except DatabaseError:
            pass

    return HttpResponse()
schema.py 文件源码 项目:CSCE482-WordcloudPlus 作者: ggaytan00 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
operations.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def spatial_version(self):
        """Determine the version of the PostGIS library."""
        # Trying to get the PostGIS version because the function
        # signatures will depend on the version used.  The cost
        # here is a database query to determine the version, which
        # can be mitigated by setting `POSTGIS_VERSION` with a 3-tuple
        # comprising user-supplied values for the major, minor, and
        # subminor revision of PostGIS.
        if hasattr(settings, 'POSTGIS_VERSION'):
            version = settings.POSTGIS_VERSION
        else:
            try:
                vtup = self.postgis_version_tuple()
            except DatabaseError:
                raise ImproperlyConfigured(
                    'Cannot determine PostGIS version for database "%s". '
                    'GeoDjango requires at least PostGIS version 1.3. '
                    'Was the database created from a spatial database '
                    'template?' % self.connection.settings_dict['NAME']
                    )
            version = vtup[1:]
        return version
schema.py 文件源码 项目:producthunt 作者: davidgengler 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
schema.py 文件源码 项目:django-rtc 作者: scifiswapnil 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
test_core.py 文件源码 项目:django-hordak 作者: adamcharnock 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_postgres_trigger_sum_zero(self):
        """"
        Check the database enforces leg amounts summing to zero

        This is enforced by a postgres trigger applied in migration 0005.
        Note that this requires the test case extend TransactionTestCase,
        as the trigger is only run when changes are committed to the DB
        (which the normal TestCase will not do)
        """
        account = self.account()
        transaction = Transaction.objects.create()

        with self.assertRaises(DatabaseError):
            Leg.objects.create(transaction=transaction, account=account, amount=100)

        with self.assertRaises(DatabaseError), db_transaction.atomic():
            # Also ensure we distinguish between currencies
            Leg.objects.create(transaction=transaction, account=account, amount=Money(100, 'EUR'))
            Leg.objects.create(transaction=transaction, account=account, amount=Money(-100, 'GBP'))
schema.py 文件源码 项目:geekpoint 作者: Lujinghu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete_model(self, model, **kwargs):
        from django.contrib.gis.db.models.fields import GeometryField
        # Drop spatial metadata (dropping the table does not automatically remove them)
        for field in model._meta.local_fields:
            if isinstance(field, GeometryField):
                self.remove_geometry_metadata(model, field)
        # Make sure all geom stuff is gone
        for geom_table in self.geometry_tables:
            try:
                self.execute(
                    self.sql_discard_geometry_columns % {
                        "geom_table": geom_table,
                        "table": self.quote_name(model._meta.db_table),
                    }
                )
            except DatabaseError:
                pass
        super(SpatialiteSchemaEditor, self).delete_model(model, **kwargs)
helpers.py 文件源码 项目:Sentry 作者: NetEaseGame 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def can_reconnect(exc):
    if isinstance(exc, psycopg2.InterfaceError):
        return True
    # elif isinstance(exc, psycopg2.OperationalError):
    #     exc_msg = str(exc)
    #     if "can't fetch default_isolation_level" in exc_msg:
    #         return True
    #     elif "can't set datestyle to ISO" in exc_msg:
    #         return True
    #     return True
    elif isinstance(exc, DatabaseError):
        exc_msg = str(exc)
        if 'server closed the connection unexpectedly' in exc_msg:
            return True
        elif 'client_idle_timeout' in exc_msg:
            return True
    return False


问题


面经


文章

微信
公众号

扫码关注公众号