python类connect()的实例源码

test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_tpc_commit_recovered(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        cnn.close()
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        cnn.tpc_commit(xid)

        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(1, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_tpc_rollback(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_rollback');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        self.assertEqual(cnn.status, ext.STATUS_PREPARED)
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_rollback()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_tpc_rollback_one_phase(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_rollback()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_tpc_rollback_recovered(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        cnn.close()
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        cnn.tpc_rollback(xid)

        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_xid_roundtrip(self):
        for fid, gtrid, bqual in [
            (0, "", ""),
            (42, "gtrid", "bqual"),
            (0x7fffffff, "x" * 64, "y" * 64),
        ]:
            cnn = self.connect()
            xid = cnn.xid(fid, gtrid, bqual)
            cnn.tpc_begin(xid)
            cnn.tpc_prepare()
            cnn.close()

            cnn = self.connect()
            xids = [x for x in cnn.tpc_recover() if x.database == dbname]
            self.assertEqual(1, len(xids))
            xid = xids[0]
            self.assertEqual(xid.format_id, fid)
            self.assertEqual(xid.gtrid, gtrid)
            self.assertEqual(xid.bqual, bqual)

            cnn.tpc_rollback(xid)
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_unparsed_roundtrip(self):
        for tid in [
            '',
            'hello, world!',
            'x' * 199,  # PostgreSQL's limit in transaction id length
        ]:
            cnn = self.connect()
            cnn.tpc_begin(tid)
            cnn.tpc_prepare()
            cnn.close()

            cnn = self.connect()
            xids = [x for x in cnn.tpc_recover() if x.database == dbname]
            self.assertEqual(1, len(xids))
            xid = xids[0]
            self.assertEqual(xid.format_id, None)
            self.assertEqual(xid.gtrid, tid)
            self.assertEqual(xid.bqual, None)

            cnn.tpc_rollback(xid)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_copy_from_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)])
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_copy_to_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_module.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_supported_keywords(self):
        psycopg2.connect(database='foo')
        self.assertEqual(self.args[0], 'dbname=foo')
        psycopg2.connect(user='postgres')
        self.assertEqual(self.args[0], 'user=postgres')
        psycopg2.connect(password='secret')
        self.assertEqual(self.args[0], 'password=secret')
        psycopg2.connect(port=5432)
        self.assertEqual(self.args[0], 'port=5432')
        psycopg2.connect(sslmode='require')
        self.assertEqual(self.args[0], 'sslmode=require')

        psycopg2.connect(database='foo',
            user='postgres', password='secret', port=5432)
        self.assert_('dbname=foo' in self.args[0])
        self.assert_('user=postgres' in self.args[0])
        self.assert_('password=secret' in self.args[0])
        self.assert_('port=5432' in self.args[0])
        self.assertEqual(len(self.args[0].split()), 4)
base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_new_connection(self, conn_params):
        connection = Database.connect(**conn_params)

        # self.isolation_level must be set:
        # - after connecting to the database in order to obtain the database's
        #   default when no value is explicitly specified in options.
        # - before calling _set_autocommit() because if autocommit is on, that
        #   will set connection.isolation_level to ISOLATION_LEVEL_AUTOCOMMIT.
        options = self.settings_dict['OPTIONS']
        try:
            self.isolation_level = options['isolation_level']
        except KeyError:
            self.isolation_level = connection.isolation_level
        else:
            # Set the isolation level to the value from OPTIONS.
            if self.isolation_level != connection.isolation_level:
                connection.set_session(isolation_level=self.isolation_level)

        return connection
db.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _connect(self) -> None:
        logger.info("initializing database connection:")
        logger.info("host: %s", self.host)
        logger.info("port: %s", self.port)
        logger.info("dbname: %s", self.dbname)
        try:
            self.conn = psycopg2.connect(host=self.host,
                                         port=self.port,
                                         user=self.user,
                                         password=self.password,
                                         dbname=self.dbname,
                                         connect_timeout=5)
            self.conn.set_session(autocommit=True)
            logger.info("successfully initialized database connection")
        except psycopg2.Error as error:
            logger.exception("unable to connect to database")
            raise error
db.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def from_environment(cls) -> Optional['PostgresDemoDatabase']:
        host = os.environ.get("DEMO_POSTGRES_HOST")
        port = os.environ.get("DEMO_POSTGRES_PORT") or "5432"
        dbname = os.environ.get("DEMO_POSTGRES_DBNAME")
        user = os.environ.get("DEMO_POSTGRES_USER")
        password = os.environ.get("DEMO_POSTGRES_PASSWORD")

        if all([host, port, dbname, user, password]):
            try:
                logger.info("Initializing demo database connection using environment variables")
                return PostgresDemoDatabase(dbname=dbname, host=host, port=port, user=user, password=password)
            except psycopg2.Error:
                logger.exception("unable to connect to database, permalinks not enabled")
                return None
        else:
            logger.info("Relevant environment variables not found, so no demo database")
            return None
connector.py 文件源码 项目:AutoForm 作者: sourcepole 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def uriDatabaseConnect(self, uri):
        """Create a connection from a uri and return a cursor of it."""
        conninfo = uri.connectionInfo()
        conn = None
        cur = None
        ok = False
        while not conn:
            try:
                conn = psycopg2.connect(uri.connectionInfo())
                cur = conn.cursor()
            except psycopg2.OperationalError as e:
                (ok, user, passwd) = QgsCredentials.instance().get(conninfo, uri.username(), uri.password())
                if not ok:
                    break

        if not conn:
            QMessageBox.warning(self.iface.mainWindow(), "Connection Error", "Could not connect to PostgreSQL database - check connection info")

        if ok:
            QgsCredentials.instance().put(conninfo, user, passwd)

        return cur
bot.py 文件源码 项目:globibot 作者: best-coloc-ever 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, config, db_config, web, plugin_path):
        super().__init__()

        self.db = db_connect(
            host = db_config.get(c.DB_HOST_KEY),
            user = db_config.get(c.DB_USER_KEY)
        )

        self.config = config
        self.web = web
        self.plugin_collection = PluginCollection(self, plugin_path)

        self.token = self.config.get(c.GLOBIBOT_TOKEN_KEY)

        self.masters = [
            str(id) for id in
            self.config.get(c.MASTER_IDS_KEY, [])
        ]

        # self.enabled_servers = [
        #     str(id) for id in
        #     self.config.get(c.ENABLED_SERVERS_KEY, [])
        # ]

        self.web.add_routes('bot', *api.routes(self))
utils.py 文件源码 项目:census-loader 作者: minus34 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_sql_multiprocessing(args):
    the_sql = args[0]
    settings = args[1]
    pg_conn = psycopg2.connect(settings['pg_connect_string'])
    pg_conn.autocommit = True
    pg_cur = pg_conn.cursor()

    # # set raw gnaf database schema (it's needed for the primary and foreign key creation)
    # if settings['raw_gnaf_schema'] != "public":
    #     pg_cur.execute("SET search_path = {0}, public, pg_catalog".format(settings['raw_gnaf_schema'],))

    try:
        pg_cur.execute(the_sql)
        result = "SUCCESS"
    except Exception as ex:
        result = "SQL FAILED! : {0} : {1}".format(the_sql, ex)

    pg_cur.close()
    pg_conn.close()

    return result
utils.py 文件源码 项目:census-loader 作者: minus34 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def intermediate_shapefile_load_step(args):
    work_dict = args[0]
    settings = args[1]
    # logger = args[2]

    file_path = work_dict['file_path']
    pg_table = work_dict['pg_table']
    pg_schema = work_dict['pg_schema']
    delete_table = work_dict['delete_table']
    spatial = work_dict['spatial']

    pg_conn = psycopg2.connect(settings['pg_connect_string'])
    pg_conn.autocommit = True
    pg_cur = pg_conn.cursor()

    result = import_shapefile_to_postgres(pg_cur, file_path, pg_table, pg_schema, delete_table, spatial)

    return result


# imports a Shapefile into Postgres in 2 steps: SHP > SQL; SQL > Postgres
# overcomes issues trying to use psql with PGPASSWORD set at runtime
after_install.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def clear_static_routes(host, port, user, passwd):
    # --------------????????----------------------
    con = None
    try:
        con = psycopg2.connect(database='cloud_controller', user=user,
                               password=passwd, host=host, port=port)
        cur = con.cursor()
        cur.execute('delete from static_routes')
        con.commit()

    except psycopg2.DatabaseError as e:
        if con:
            con.rollback()
        print('Error is %s' % e)
    finally:
        if con:
            con.close()
after_install.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update_redirect_url(host, port, user, passwd, domain_name):
    # --------------????????----------------------
    con = None
    try:
        con = psycopg2.connect(database='uaa', user=user,
                               password=passwd, host=host, port=port)
        cur = con.cursor()
        cur.execute("update oauth_client_details set"
                    " web_server_redirect_uri='http://uaa.cloudfoundry.com/redirect/vmc,"
                    "https://uaa.cloudfoundry.com/redirect/vmc,"
                    "http://uaa.%s/redirect/vmc,https://uaa.%s/redirect/vmc'"
                    " where client_id in ('simple', 'vmc')" % (domain_name, domain_name))
        con.commit()

    except psycopg2.DatabaseError as e:
        if con:
            con.rollback()
        print('Error is %s' % e)
    finally:
        if con:
            con.close()
admin.py 文件源码 项目:postpy 作者: portfoliome 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def install_extensions(extensions, **connection_parameters):
    """Install Postgres extension if available.

    Notes
    -----
    - superuser is generally required for installing extensions.
    - Currently does not support specific schema.
    """

    from postpy.connections import connect

    conn = connect(**connection_parameters)
    conn.autocommit = True

    for extension in extensions:
        install_extension(conn, extension)


问题


面经


文章

微信
公众号

扫码关注公众号