python类connection()的实例源码

plugin.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _ensureConnection(self):
        # If connection is already made close it.
        from django.db import connection
        if connection.connection is not None:
            connection.close()

        # Loop forever until a connection can be made.
        while True:
            try:
                connection.ensure_connection()
            except Exception:
                log.err(_why=(
                    "Error starting: "
                    "Connection to database cannot be established."))
                time.sleep(1)
            else:
                # Connection made, now close it.
                connection.close()
                break
operations.py 文件源码 项目:django-mpathy 作者: craigds 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post_migrate_mpathnode(model):
    # Note: model *isn't* a subclass of MPathNode, because django migrations are Weird.
    # if not issubclass(model, MPathNode):
    # Hence the following workaround:
    try:
        ltree_field = model._meta.get_field('ltree')
        if not isinstance(ltree_field, LTreeField):
            return
    except FieldDoesNotExist:
        return

    names = {
        "table": quote_ident(model._meta.db_table, connection.connection),
        "check_constraint": quote_ident('%s__check_ltree' % model._meta.db_table, connection.connection),
    }

    cur = connection.cursor()
    # Check that the ltree is always consistent with being a child of _parent
    cur.execute('''
        ALTER TABLE %(table)s ADD CONSTRAINT %(check_constraint)s CHECK (
            (parent_id IS NOT NULL AND ltree ~ (parent_id::text || '.*{1}')::lquery)
            OR (parent_id IS NULL AND ltree ~ '*{1}'::lquery)
        )
    ''' % names)
tasks.py 文件源码 项目:ansible-manager 作者: telminov 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run_task_process(self, task):
        proc = Process(target=self.run_task, args=(task.id,))
        proc.start()
        pid = proc.pid

        from django.db import connection
        connection.connection.close()
        connection.connection = None

        task.pid = pid
        task.status = consts.IN_PROGRESS
        task.save()

        task.logs.create(
            status=consts.IN_PROGRESS,
            message='Start task with pid %s' % pid,
        )
db.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_named_cursor():
    """
    This function returns a named cursor, which speeds up queries
    returning large result sets immensely by caching them on the
    server side.

    This is not yet supported by Django itself.
    """
    # This is required to populate the connection object properly
    if connection.connection is None:
        connection.cursor()

    # Prefixing the name to ensure that it starts with a letter.
    # Needed for psycopg2.2 compatibility
    name = 'nav{0}'.format(str(uuid.uuid4()).replace('-', ''))

    cursor = connection.connection.cursor(name=name)
    return cursor
engine.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _notifysleep(self, delay):
        """Sleeps up to delay number of seconds, but will schedule an
        immediate new event queue check if an event notification is received
        from PostgreSQL.

        """
        conn = connection.connection
        if conn:
            try:
                select.select([conn], [], [], delay)
            except select.error as err:
                if err.args[0] != errno.EINTR:
                    raise
            try:
                conn.poll()
            except OperationalError:
                connection.connection = None
                self._listen()
                return
            if conn.notifies:
                self._logger.debug("got event notification from database")
                self._schedule_next_queuecheck()
                del conn.notifies[:]
        else:
            time.sleep(delay)
dblocks.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_locked(self):
        stmt = (
            "SELECT 1 FROM pg_locks, pg_database"
            " WHERE pg_locks.locktype = 'advisory'"
            "   AND pg_locks.classid = %s"
            "   AND pg_locks.objid = %s"
            # objsubid is 2 when using the 2-argument version of the
            # pg_advisory_* locking functions.
            "   AND pg_locks.objsubid = 2"
            "   AND pg_locks.granted"
            # Advisory locks are local to each database so we join to
            # pg_databases to discover the OID of the currrent database.
            "   AND pg_locks.database = pg_database.oid"
            "   AND pg_database.datname = current_database()"
        )
        with closing(connection.cursor()) as cursor:
            cursor.execute(stmt, self)
            return len(cursor.fetchall()) >= 1
queue.py 文件源码 项目:django-postgres-queue 作者: gavinwahl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def listen(self):
        with connection.cursor() as cur:
            cur.execute('LISTEN "{}";'.format(self.notify_channel))
queue.py 文件源码 项目:django-postgres-queue 作者: gavinwahl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def wait(self, timeout=30):
        connection.connection.poll()
        notifies = self.filter_notifies()
        if notifies:
            return notifies

        select.select([connection.connection], [], [], timeout)
        connection.connection.poll()
        return self.filter_notifies()
queue.py 文件源码 项目:django-postgres-queue 作者: gavinwahl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def filter_notifies(self):
        notifies = [
            i for i in connection.connection.notifies
            if i.channel == self.notify_channel
        ]
        connection.connection.notifies = [
            i for i in connection.connection.notifies
            if i.channel != self.notify_channel
        ]
        return notifies
queue.py 文件源码 项目:django-postgres-queue 作者: gavinwahl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def notify(self, job):
        with connection.cursor() as cur:
            cur.execute('NOTIFY "{}", %s;'.format(self.notify_channel), [str(job.pk)])
queue.py 文件源码 项目:django-postgres-queue 作者: gavinwahl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_once(self, exclude_ids=[]):
        assert not connection.in_atomic_block
        return self._run_once(exclude_ids=exclude_ids)
test_field.py 文件源码 项目:django-postgres-composite-types 作者: danni 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def does_type_exist(self, type_name):
        """
        Check if a composite type exists in the database
        """
        sql = 'select exists (select 1 from pg_type where typname = %s);'
        with connection.cursor() as cursor:
            cursor.execute(sql, [type_name])
            row = cursor.fetchone()
            return row[0]
test_field.py 文件源码 项目:django-postgres-composite-types 作者: danni 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def migrate(self, targets):
        """
        Migrate to a new state.

        MigrationExecutors can not be reloaded, as they cache the state of the
        migrations when created. Attempting to reuse one might make some
        migrations not run, as it thinks they have already been run.
        """
        executor = MigrationExecutor(connection)
        executor.migrate(targets)

        # Cant load state for apps in the initial empty state
        state_nodes = [node for node in targets if node[1] is not None]
        return executor.loader.project_state(state_nodes).apps
test_field.py 文件源码 项目:django-postgres-composite-types 作者: danni 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_migration(self):
        """Data data migration."""

        # The migrations have already been run, and the type already exists in
        # the database
        self.assertTrue(self.does_type_exist(SimpleType._meta.db_type))

        # Run the migration backwards to check the type is deleted
        self.migrate(self.migrate_from)

        # The type should now not exist
        self.assertFalse(self.does_type_exist(SimpleType._meta.db_type))

        # A signal is fired when the migration creates the type
        signal_func = mock.Mock()
        composite_type_created.connect(receiver=signal_func, sender=SimpleType)

        # Run the migration forwards to create the type again
        self.migrate(self.migrate_to)

        # The signal should have been sent
        self.assertEqual(signal_func.call_count, 1)
        self.assertEqual(signal_func.call_args, ((), {
            'sender': SimpleType,
            'signal': composite_type_created,
            'connection': connection}))

        # The type should now exist again
        self.assertTrue(self.does_type_exist(SimpleType._meta.db_type))
test_field.py 文件源码 项目:django-postgres-composite-types 作者: danni 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_field_save_and_load(self):
        """Save and load a test model."""
        # pylint:disable=invalid-name
        t = SimpleType(a=1, b="? ?", c=datetime.datetime(1985, 10, 26, 9, 0))
        m = SimpleModel(test_field=t)
        m.save()  # pylint:disable=no-member

        # Retrieve from DB
        m = SimpleModel.objects.get(id=1)
        self.assertIsNotNone(m.test_field)
        self.assertIsInstance(m.test_field, SimpleType)
        self.assertEqual(m.test_field.a, 1)
        self.assertEqual(m.test_field.b, "? ?")
        self.assertEqual(m.test_field.c, datetime.datetime(1985, 10, 26, 9, 0))

        cursor = connection.connection.cursor()
        cursor.execute("SELECT (test_field).a FROM %s" % (
            SimpleModel._meta.db_table,))
        result, = cursor.fetchone()

        self.assertEqual(result, 1)

        cursor = connection.connection.cursor()
        cursor.execute("SELECT (test_field).b FROM %s" % (
            SimpleModel._meta.db_table,))
        result, = cursor.fetchone()

        self.assertEqual(result, "? ?")
test_field.py 文件源码 项目:django-postgres-composite-types 作者: danni 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_adapted_sql(self):
        """
        Check that the value is serialised to the correct SQL string, including
        a type cast
        """
        value = SimpleType(a=1, b="b", c=datetime.datetime(1985, 10, 26, 9, 0))

        adapted = adapt(value)
        adapted.prepare(connection.connection)

        self.assertEqual(
            b"(1, 'b', '1985-10-26T09:00:00'::timestamp)::test_type",
            adapted.getquoted())
inception.py 文件源码 项目:db_platform 作者: speedocjx 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_sure_mysql_usable():
    # mysql is lazily connected to in django.
    # connection.connection is None means
    # you have not connected to mysql before
    if connection.connection and not connection.is_usable():
        # destroy the default mysql connection
        # after this line, when you use ORM methods
        # django will reconnect to the default mysql
        del connections._connections.default

#
# def get_item(data_dict,item):
#     try:
#        item_value = data_dict[item]
#        return item_value
#     except:
#        return '-1'
#
# def get_config(group,config_name):
#     config = ConfigParser.ConfigParser()
#     config.readfp(open('./myapp/etc/config.ini','r'))
#     config_value=config.get(group,config_name).strip(' ').strip('\'').strip('\"')
#     return config_value
#
# def filters(data):
#     return data.strip(' ').strip('\n').strip('\br')
#
# select_limit = int(get_config('settings','select_limit'))
# export_limit = int(get_config('settings','export_limit'))
# host = get_config('settings','host')
# port = get_config('settings','port')
# user = get_config('settings','user')
# passwd = get_config('settings','passwd')
# dbname = get_config('settings','dbname')
# wrong_msg = get_config('settings','wrong_msg')
# incp_host = get_config('settings','incp_host')
# incp_port = int(get_config('settings','incp_port'))
# incp_user = get_config('settings','incp_user')
# incp_passwd = get_config('settings','incp_passwd')
# public_user = get_config('settings','public_user')
__init__.py 文件源码 项目:helloscrapy 作者: clayandgithub 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_db_connection():
    from django.db import connection

    if connection.connection:
        #NOTE: (zacky, 2016.MAR.21st) IF CONNECTION IS CLOSED BY BACKEND, CLOSE IT AT DJANGO, WHICH WILL BE SETUP AFTERWARDS.
        if not connection.is_usable():
            connection.close()
engine.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def harakiri():
    """Kills the entire daemon when no database is available"""
    _logger.fatal("unable to establish database connection, qutting...")
    raise SystemExit(1)
engine.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _listen():
        """Ensures that we subscribe to new_event notifications on our
        PostgreSQL connection.

        """
        _logger.debug("registering event listener with PostgreSQL")
        cursor = connection.cursor()
        cursor.execute('LISTEN new_event')
dblocks.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __enter__(self):
        if connection.connection is None:
            raise DatabaseLockAttemptWithoutConnection(self)
        with closing(connection.cursor()) as cursor:
            query = "SELECT %s(%%s, %%s)" % self.lock
            cursor.execute(query, self)
            if cursor.fetchone() == (False,):
                raise DatabaseLockNotHeld(self)
dblocks.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __exit__(self, *exc_info):
        with closing(connection.cursor()) as cursor:
            query = "SELECT %s(%%s, %%s)" % self.unlock
            cursor.execute(query, self)
            if cursor.fetchone() != (True,):
                raise DatabaseLockNotHeld(self)
dblocks.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __enter__(self):
        """Obtain lock using pg_advisory_xact_lock()."""
        if not connection.in_atomic_block:
            raise DatabaseLockAttemptOutsideTransaction(self)
        with closing(connection.cursor()) as cursor:
            query = "SELECT %s(%%s, %%s)" % self.lock
            cursor.execute(query, self)
            if cursor.fetchone() == (False,):
                raise DatabaseLockNotHeld(self)
testcase.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def checkDatabaseUse(self):
        """Enforce `database_use_permitted`."""
        if self.database_use_possible and not self.database_use_permitted:
            from django.db import connection
            self.expectThat(
                connection.connection, testtools.matchers.Is(None),
                "Test policy forbids use of the database.")
            connection.close()


问题


面经


文章

微信
公众号

扫码关注公众号