python类connect()的实例源码

reproduce.py 文件源码 项目:ISM2017 作者: ybayle 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def request(query, verbose=False):
    try:
        db = pymysql.connect(host="localhost",user="yann",passwd="yann",db="doctorat")
    except Exception:
        print("Error in MySQL connexion")
    else:
        cur = db.cursor()
        try:
            cur.execute(query)
        except Exception:
            print("Error with query: " + query)
        else:
            db.commit()
            result = cur.fetchall()
            print(result)
        db.close()
craw_ptt.py 文件源码 项目:Crawler_and_Share 作者: f496328mm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def catch_ptt_max_index(ptt_class_name,sql_name):
    conn = ( pymysql.connect(host = '114.34.138.146',
                             port = 3306,
                             user = user,
                             password = password,
                             database = database,  
                             charset="utf8") )
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;')
    data = cursor.fetchone()       


    index = data[10]
    index = index.replace('http://www.ptt.cc/bbs/'+ ptt_class_name +'/index','')
    index = index.replace('.html','')
    index = int(index)

    cursor.close()
    conn.close()    

    return index
#---------------------------------------------------------------------------------  
#---------------------------------------------------------------------------------  
# ? SQL ? data ???, ?data?????? SQL ? max data time
craw_ptt.py 文件源码 项目:Crawler_and_Share 作者: f496328mm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def catch_ptt_history_date_time(ptt_class_name,sql_name):
    conn = ( pymysql.connect(host = '114.34.138.146',
                             port = 3306,
                             user = user,
                             password = password,
                             database = database,  
                             charset="utf8") )
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;')
    data = cursor.fetchone()     

    date_time  = date_to_numeric( data[1] )

    cursor.close()
    conn.close()    

    return date_time  
#---------------------------------------------------------------------------------            
# ?????? index, index ???????, ????????? index
byrbbs_article_hour.py 文件源码 项目:byrbbs-py3 作者: ryderchan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def logged_in(self, response):
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()
        sql = 'select * from section'
        cursor.execute(sql)
        for row in cursor.fetchall():
            item = ByrbbsArticleItem()
            item['section_url'] = row[1]
            yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
                                 callback=self.parse_article_list)

    # ???????????
    #     self.start_urls = ['https://bbs.byr.cn/board/BM_Market']
    #     item = ByrbbsArticleItem()
    #     item['section_url'] = 'board/BM_Market'
    #     return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
    #                           headers=HEADERS, callback=self.parse_article_list)

    # ??????????????????????
byrbbs_article.py 文件源码 项目:byrbbs-py3 作者: ryderchan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def logged_in(self, response):
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()
        sql = 'select * from section'
        cursor.execute(sql)
        for row in cursor.fetchall():
            item = ByrbbsArticleItem()
            item['section_url'] = row[1]
            yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
                                 callback=self.parse_article_list_pre)

    # ???????????
    #     self.start_urls = ['https://bbs.byr.cn/board/BUPTPost']
    #     item = ByrbbsArticleItem()
    #     item['section_url'] = 'BUPTPost'
    #     return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
    #                           headers=HEADERS, callback=self.parse_article_list)

    # ?????????
pipelines.py 文件源码 项目:byrbbs-py3 作者: ryderchan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_item(self, item, spider):
        con = pymysql.connect(**DB_CONFIG)
        cur = con.cursor()
        sql = 'insert into articleinfo(section_url,article_title,' \
              'article_url,article_comment,article_author,article_createtime,updatetime) ' \
              'values(%s,%s,%s,%s,%s,%s,%s)'
        values = (item['section_url'], item['article_title'], item['article_url'], item['article_comment'],
                  item['article_author'], item['article_createtime'], item['updatetime'])
        cur.execute(sql, values)  # second parameter must be iterabale
        # sql2 = 'insert into articlebody(article_url,article_content) values(%s,%s)'
        # values2 = (item['article_url'], item['article_content'])
        # cur.execute(sql2, values2)
        con.commit()
        cur.close()
        con.close()
        return item
dashboard.py 文件源码 项目:face 作者: MOluwole 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        QtWidgets.QMainWindow.__init__(self)
        Ui_MainWindow.__init__(self)
        self.setupUi(self)

        self.__register__ = None
        self.__attendance____ = None

        self.btn_Register.clicked.connect(self.Register)
        self.btn_Attendance.clicked.connect(self.Attendance)
        self.btnSearch.clicked.connect(self.Search)
        self.report_date.setDate(QtCore.QDate.currentDate())

        cursor = connection.cursor()
        sql = "Select * from attendance"
        cursor.execute(sql)

        result = cursor.fetchall()
        rows = len(result)
        if rows <= 0:
            QMessageBox.about(self, "No Data", "No Attendance has been recorded yet")
        else:
            self.tableWidget.setRowCount(rows)
            self.tableWidget.setColumnCount(3)
            header_labels = ['Matric Number', 'Date', 'Status']
            self.tableWidget.setHorizontalHeaderLabels(header_labels)

            for count in range(0, rows):
                self.tableWidget.setItem(count, 0,
                                         QTableWidgetItem(str(result[count]["matric_num"].encode('ascii', 'ignore'))))
                self.tableWidget.setItem(count, 1, QTableWidgetItem(result[count]["dte"].encode('ascii', 'ignore')))
                self.tableWidget.setItem(count, 2, QTableWidgetItem(result[count]["status"].encode('ascii', 'ignore')))
insertmysql.py 文件源码 项目:dpspider 作者: doupengs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4):
        '''
        :param host: <class str|host name>
        :param user: <class str|user name>
        :param password: <class str|password>
        :param db: <class str|database name>
        :param charset: default='utf8' <class str>
        :param logFile: default=None <class str>
        :param color: default=True <class bool>
        :param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL>
        '''
        self.logFile = logFile
        self.color = color
        self.debug = debug
        self.success = 0
        self.fail = 0
        self.repeat = 0
        self._conn = MYSQL.connect(host,user,password,db,charset=charset)
        self._cursor = self._conn.cursor()
mysql_client.py 文件源码 项目:TickTickBacktest 作者: gavincyi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self, **kwargs):
        """
        Connect
        :param path: sqlite file to connect
        """
        host = kwargs['host']
        port = kwargs['port']
        user = kwargs['user']
        pwd = kwargs['pwd']
        schema = kwargs['schema']
        self.conn = pymysql.connect(host=host,
                                    port=port,
                                    user=user,
                                    password=pwd,
                                    db=schema,
                                    charset='utf8mb4',
                                    cursorclass=pymysql.cursors.SSDictCursor)
        self.cursor = self.conn.cursor()
        return self.conn is not None and self.cursor is not None
test_connection.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_context(self):
        with self.assertRaises(ValueError):
            c = pymysql.connect(**self.databases[0])
            with c as cur:
                cur.execute('create table test ( a int )')
                c.begin()
                cur.execute('insert into test values ((1))')
                raise ValueError('pseudo abort')
                c.commit()
        c = pymysql.connect(**self.databases[0])
        with c as cur:
            cur.execute('select count(*) from test')
            self.assertEqual(0, cur.fetchone()[0])
            cur.execute('insert into test values ((1))')
        with c as cur:
            cur.execute('select count(*) from test')
            self.assertEqual(1,cur.fetchone()[0])
            cur.execute('drop table test')
test_connection.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_defer_connect(self):
        import socket
        for db in self.databases:
            d = db.copy()
            try:
                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                sock.connect(d['unix_socket'])
            except KeyError:
                sock = socket.create_connection(
                                (d.get('host', 'localhost'), d.get('port', 3306)))
            for k in ['unix_socket', 'host', 'port']:
                try:
                    del d[k]
                except KeyError:
                    pass

            c = pymysql.connect(defer_connect=True, **d)
            self.assertFalse(c.open)
            c.connect(sock)
            c.close()
            sock.close()
test_issues.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_issue_17(self):
        """could not connect mysql use passwod"""
        conn = self.connections[0]
        host = self.databases[0]["host"]
        db = self.databases[0]["db"]
        c = conn.cursor()

        # grant access to a table to a user with a password
        try:
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore")
                c.execute("drop table if exists issue17")
            c.execute("create table issue17 (x varchar(32) primary key)")
            c.execute("insert into issue17 (x) values ('hello, world!')")
            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
            conn.commit()

            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
            c2 = conn2.cursor()
            c2.execute("select x from issue17")
            self.assertEqual("hello, world!", c2.fetchone()[0])
        finally:
            c.execute("drop table issue17")
test_issues.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_issue_114(self):
        """ autocommit is not set after reconnecting with ping() """
        conn = pymysql.connect(charset="utf8", **self.databases[0])
        conn.autocommit(False)
        c = conn.cursor()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()
        conn.ping()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()

        # Ensure autocommit() is still working
        conn = pymysql.connect(charset="utf8", **self.databases[0])
        c = conn.cursor()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()
        conn.ping()
        conn.autocommit(True)
        c.execute("""select @@autocommit;""")
        self.assertTrue(c.fetchone()[0])
        conn.close()
test_issues.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_issue_491(self):
        """ Test warning propagation """
        conn = pymysql.connect(charset="utf8", **self.databases[0])

        with warnings.catch_warnings():
            # Ignore all warnings other than pymysql generated ones
            warnings.simplefilter("ignore")
            warnings.simplefilter("error", category=pymysql.Warning)

            # verify for both buffered and unbuffered cursor types
            for cursor_class in (cursors.Cursor, cursors.SSCursor):
                c = conn.cursor(cursor_class)
                try:
                    c.execute("SELECT CAST('124b' AS SIGNED)")
                    c.fetchall()
                except pymysql.Warning as e:
                    # Warnings should have errorcode and string message, just like exceptions
                    self.assertEqual(len(e.args), 2)
                    self.assertEqual(e.args[0], 1292)
                    self.assertTrue(isinstance(e.args[1], text_type))
                else:
                    self.fail("Should raise Warning")
                finally:
                    c.close()
test_connection.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_context(self):
        with self.assertRaises(ValueError):
            c = pymysql.connect(**self.databases[0])
            with c as cur:
                cur.execute('create table test ( a int )')
                c.begin()
                cur.execute('insert into test values ((1))')
                raise ValueError('pseudo abort')
                c.commit()
        c = pymysql.connect(**self.databases[0])
        with c as cur:
            cur.execute('select count(*) from test')
            self.assertEqual(0, cur.fetchone()[0])
            cur.execute('insert into test values ((1))')
        with c as cur:
            cur.execute('select count(*) from test')
            self.assertEqual(1,cur.fetchone()[0])
            cur.execute('drop table test')
test_example.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_example(self):
        conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')


        cur = conn.cursor()

        cur.execute("SELECT Host,User FROM user")

        # print cur.description

        # r = cur.fetchall()
        # print r
        # ...or...
        u = False

        for r in cur.fetchall():
            u = u or conn.user in r

        self.assertTrue(u)

        cur.close()
        conn.close()
test_issues.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_issue_17(self):
        """could not connect mysql use passwod"""
        conn = self.connections[0]
        host = self.databases[0]["host"]
        db = self.databases[0]["db"]
        c = conn.cursor()

        # grant access to a table to a user with a password
        try:
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore")
                c.execute("drop table if exists issue17")
            c.execute("create table issue17 (x varchar(32) primary key)")
            c.execute("insert into issue17 (x) values ('hello, world!')")
            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
            conn.commit()

            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
            c2 = conn2.cursor()
            c2.execute("select x from issue17")
            self.assertEqual("hello, world!", c2.fetchone()[0])
        finally:
            c.execute("drop table issue17")
test_issues.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_issue_114(self):
        """ autocommit is not set after reconnecting with ping() """
        conn = pymysql.connect(charset="utf8", **self.databases[0])
        conn.autocommit(False)
        c = conn.cursor()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()
        conn.ping()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()

        # Ensure autocommit() is still working
        conn = pymysql.connect(charset="utf8", **self.databases[0])
        c = conn.cursor()
        c.execute("""select @@autocommit;""")
        self.assertFalse(c.fetchone()[0])
        conn.close()
        conn.ping()
        conn.autocommit(True)
        c.execute("""select @@autocommit;""")
        self.assertTrue(c.fetchone()[0])
        conn.close()
test_issues.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_issue_491(self):
        """ Test warning propagation """
        conn = pymysql.connect(charset="utf8", **self.databases[0])

        with warnings.catch_warnings():
            # Ignore all warnings other than pymysql generated ones
            warnings.simplefilter("ignore")
            warnings.simplefilter("error", category=pymysql.Warning)

            # verify for both buffered and unbuffered cursor types
            for cursor_class in (cursors.Cursor, cursors.SSCursor):
                c = conn.cursor(cursor_class)
                try:
                    c.execute("SELECT CAST('124b' AS SIGNED)")
                    c.fetchall()
                except pymysql.Warning as e:
                    # Warnings should have errorcode and string message, just like exceptions
                    self.assertEqual(len(e.args), 2)
                    self.assertEqual(e.args[0], 1292)
                    self.assertTrue(isinstance(e.args[1], text_type))
                else:
                    self.fail("Should raise Warning")
                finally:
                    c.close()
savedata.py 文件源码 项目:Jobs-search 作者: Hopetree 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self,dbname,key,city):
        self.dbname = dbname
        self.T = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d%H%M")
        # ????????
        self.table_name = "{}_{}_{}".format(self.T,key,city)
        # ?????
        self.conn = pymysql.connect(
            host="localhost",
            port=3306,
            user='root',
            password='python',
            db=self.dbname,
            charset='utf8'
        )
        # ????
        self.cursor = self.conn.cursor()
wxparser.py 文件源码 项目:zgtoolkits 作者: xuzhougeng 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_db_sqlite(dicts, dbname, table):
    '''
    dicts: a dictionary
    dbname: sqlite3 database file name
    '''
    import sqlite3
    conn = sqlite3.connect(dbname)
    curs = conn.cursor()
    curs.execute('''
        create table IF NOT EXISTS ?
        (time char(100), author char(10), url char(100), title char(150), content char(10000))
        ''', table)
    curs.execute('''
    INSERT INTO `articles` (`time`, `author`, `url`, `title`, `content`)
    VALUES (?,?,?,?,?)
    ''', dicts)
    conn.commit()
    conn.close()
init_db.py 文件源码 项目:oss-github-analysis-project 作者: itu-oss-project-team 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
    mysql_config = secret_config['mysql']
    db = pymysql.connect(host=mysql_config['host'], port=mysql_config['port'], db=mysql_config['db'],
                                user=mysql_config['user'],
                                passwd=mysql_config['passwd'])

    msg = "CAUTION! This will drop all tables and probably going to cause loss of data. Are you sure want to continue?"
    approved = input("%s (y/N) " % msg).lower() == 'y'

    if not approved:
        return

    print("-->Dropping tables from \"" + mysql_config['db'] + "\"...")
    clear_db(db)
    print("-->Tables dropped")
    print("-->Creating tables on \"" + mysql_config['db'] + "\"...")
    init_db(db)
    print("-->Tables created.")
mysql_client.py 文件源码 项目:BitcoinExchangeFH 作者: Aurora-Team 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self, **kwargs):
        """
        Connect
        :param path: sqlite file to connect
        """
        host = kwargs['host']
        port = kwargs['port']
        user = kwargs['user']
        pwd = kwargs['pwd']
        schema = kwargs['schema']
        self.conn = pymysql.connect(host=host,
                                    port=port,
                                    user=user,
                                    password=pwd,
                                    db=schema,
                                    charset='utf8mb4',
                                    cursorclass=pymysql.cursors.DictCursor)
        self.cursor = self.conn.cursor()
        return self.conn is not None and self.cursor is not None
mysqlConnector.py 文件源码 项目:pyExamples 作者: mike-zhang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect(self,tryTimes=1):
        i = 0
        while i < tryTimes :            
            try :
                self.conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    port=self.port,
                    passwd=self.passwd,
                    db=self.db,
                    charset="utf8", 
                )
                self.logger.debug("connect ok!current retry : {0}".format(i))
                break
            except Exception:
                i += 1
                self.logger.error(traceback.format_exc())
        return self.conn
wlmhelpers.py 文件源码 项目:COH-tools 作者: Vesihiisi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_connection(arguments):
    """Create a connection to the SQL database using arguments."""
    arguments = vars(arguments)
    if on_forge():
        arguments["host"] = "tools-db"
        arguments["db"] = "s51138__heritage_p"
        credentials = get_db_credentials()
        arguments["user"] = credentials["user"]
        arguments["password"] = credentials["password"]

    return pymysql.connect(
        host=arguments["host"],
        user=arguments["user"],
        password=arguments["password"],
        db=arguments["db"],
        charset="utf8")
register_tools.py 文件源码 项目:netsocadmin2 作者: UCCNetworkingSociety 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def good_token(email:str, uri:str) -> bool:
    """
    Confirms whether an email and uri pair are valid.

    :param email the email which we are testing the uri for
    :param uri the identifier token which we geerated and sent
    :returns True if the token is valid (i.e. sent by us to this email),
        False otherwise (including if a DB error occured)
    """
    with sqlite3.connect(p.DBNAME) as conn:
        c = conn.cursor()
        c.execute("SELECT * FROM uris WHERE uri=?", (uri,))
        row = c.fetchone()
        if not row or row[0] != email:
            return False
    return True
Database.py 文件源码 项目:spartacus 作者: wind39 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def Open(self, p_autocommit=True):
        try:
            self.v_con = psycopg2.connect(
                self.GetConnectionString(),
                cursor_factory=psycopg2.extras.DictCursor
            )
            self.v_con.autocommit = p_autocommit
            self.v_cur = self.v_con.cursor()
            self.v_start = True
            # PostgreSQL types
            self.v_cur.execute('select oid, typname from pg_type')
            self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()])
            if not p_autocommit:
                self.v_con.commit()
            self.v_con.notices = DataList()
        except Spartacus.Database.Exception as exc:
            raise exc
        except psycopg2.Error as exc:
            raise Spartacus.Database.Exception(str(exc))
        except Exception as exc:
            raise Spartacus.Database.Exception(str(exc))
Replication.py 文件源码 项目:dcmha 作者: wwwbjqcom 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, block=None, server_id=None, log_file=None,
                 log_pos=None, host=None, user=None, passwd=None, rollback=None,
                 port=None, gtid=None, _thread_id=None, stop_pos=None):
        import pymysql
        _remote_filed._gtid = gtid
        _remote_filed._thread_id = _thread_id

        self._stop_pos = stop_pos
        self._log_file = log_file
        self._log_pos = log_pos
        self.block = block if block != None else False
        self.server_id = server_id if server_id != None else 133
        self.port = port if port != None else 3306
        self.connection = pymysql.connect(host=host,
                                          user=user,
                                          password=passwd, port=self.port,
                                          db='',
                                          charset='utf8mb4',
                                          cursorclass=pymysql.cursors.DictCursor)
        if rollback:
            _remote_filed._rollback_status = True
            _rollback._myfunc = GetRollStatement(host=host, user=user, passwd=passwd, port=self.port)

        self.ReadPack()
test_example.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_example(self):
        conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')


        cur = conn.cursor()

        cur.execute("SELECT Host,User FROM user")

        # print cur.description

        # r = cur.fetchall()
        # print r
        # ...or...
        u = False

        for r in cur.fetchall():
            u = u or conn.user in r

        self.assertTrue(u)

        cur.close()
        conn.close()
test_issues.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_issue_17(self):
        """ could not connect mysql use passwod """
        conn = self.connections[0]
        host = self.databases[0]["host"]
        db = self.databases[0]["db"]
        c = conn.cursor()
        # grant access to a table to a user with a password
        try:
            c.execute("create table issue17 (x varchar(32) primary key)")
            c.execute("insert into issue17 (x) values ('hello, world!')")
            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
            conn.commit()

            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
            c2 = conn2.cursor()
            c2.execute("select x from issue17")
            self.assertEqual("hello, world!", c2.fetchone()[0])
        finally:
            c.execute("drop table issue17")


问题


面经


文章

微信
公众号

扫码关注公众号