def request(query, verbose=False):
try:
db = pymysql.connect(host="localhost",user="yann",passwd="yann",db="doctorat")
except Exception:
print("Error in MySQL connexion")
else:
cur = db.cursor()
try:
cur.execute(query)
except Exception:
print("Error with query: " + query)
else:
db.commit()
result = cur.fetchall()
print(result)
db.close()
python类connect()的实例源码
def catch_ptt_max_index(ptt_class_name,sql_name):
conn = ( pymysql.connect(host = '114.34.138.146',
port = 3306,
user = user,
password = password,
database = database,
charset="utf8") )
cursor = conn.cursor()
cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;')
data = cursor.fetchone()
index = data[10]
index = index.replace('http://www.ptt.cc/bbs/'+ ptt_class_name +'/index','')
index = index.replace('.html','')
index = int(index)
cursor.close()
conn.close()
return index
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
# ? SQL ? data ???, ?data?????? SQL ? max data time
def catch_ptt_history_date_time(ptt_class_name,sql_name):
conn = ( pymysql.connect(host = '114.34.138.146',
port = 3306,
user = user,
password = password,
database = database,
charset="utf8") )
cursor = conn.cursor()
cursor.execute('SELECT * FROM `'+ sql_name +'` ORDER BY id DESC LIMIT 1;')
data = cursor.fetchone()
date_time = date_to_numeric( data[1] )
cursor.close()
conn.close()
return date_time
#---------------------------------------------------------------------------------
# ?????? index, index ???????, ????????? index
def logged_in(self, response):
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
sql = 'select * from section'
cursor.execute(sql)
for row in cursor.fetchall():
item = ByrbbsArticleItem()
item['section_url'] = row[1]
yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
callback=self.parse_article_list)
# ???????????
# self.start_urls = ['https://bbs.byr.cn/board/BM_Market']
# item = ByrbbsArticleItem()
# item['section_url'] = 'board/BM_Market'
# return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
# headers=HEADERS, callback=self.parse_article_list)
# ??????????????????????
def logged_in(self, response):
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
sql = 'select * from section'
cursor.execute(sql)
for row in cursor.fetchall():
item = ByrbbsArticleItem()
item['section_url'] = row[1]
yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
callback=self.parse_article_list_pre)
# ???????????
# self.start_urls = ['https://bbs.byr.cn/board/BUPTPost']
# item = ByrbbsArticleItem()
# item['section_url'] = 'BUPTPost'
# return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
# headers=HEADERS, callback=self.parse_article_list)
# ?????????
def process_item(self, item, spider):
con = pymysql.connect(**DB_CONFIG)
cur = con.cursor()
sql = 'insert into articleinfo(section_url,article_title,' \
'article_url,article_comment,article_author,article_createtime,updatetime) ' \
'values(%s,%s,%s,%s,%s,%s,%s)'
values = (item['section_url'], item['article_title'], item['article_url'], item['article_comment'],
item['article_author'], item['article_createtime'], item['updatetime'])
cur.execute(sql, values) # second parameter must be iterabale
# sql2 = 'insert into articlebody(article_url,article_content) values(%s,%s)'
# values2 = (item['article_url'], item['article_content'])
# cur.execute(sql2, values2)
con.commit()
cur.close()
con.close()
return item
def __init__(self):
QtWidgets.QMainWindow.__init__(self)
Ui_MainWindow.__init__(self)
self.setupUi(self)
self.__register__ = None
self.__attendance____ = None
self.btn_Register.clicked.connect(self.Register)
self.btn_Attendance.clicked.connect(self.Attendance)
self.btnSearch.clicked.connect(self.Search)
self.report_date.setDate(QtCore.QDate.currentDate())
cursor = connection.cursor()
sql = "Select * from attendance"
cursor.execute(sql)
result = cursor.fetchall()
rows = len(result)
if rows <= 0:
QMessageBox.about(self, "No Data", "No Attendance has been recorded yet")
else:
self.tableWidget.setRowCount(rows)
self.tableWidget.setColumnCount(3)
header_labels = ['Matric Number', 'Date', 'Status']
self.tableWidget.setHorizontalHeaderLabels(header_labels)
for count in range(0, rows):
self.tableWidget.setItem(count, 0,
QTableWidgetItem(str(result[count]["matric_num"].encode('ascii', 'ignore'))))
self.tableWidget.setItem(count, 1, QTableWidgetItem(result[count]["dte"].encode('ascii', 'ignore')))
self.tableWidget.setItem(count, 2, QTableWidgetItem(result[count]["status"].encode('ascii', 'ignore')))
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4):
'''
:param host: <class str|host name>
:param user: <class str|user name>
:param password: <class str|password>
:param db: <class str|database name>
:param charset: default='utf8' <class str>
:param logFile: default=None <class str>
:param color: default=True <class bool>
:param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL>
'''
self.logFile = logFile
self.color = color
self.debug = debug
self.success = 0
self.fail = 0
self.repeat = 0
self._conn = MYSQL.connect(host,user,password,db,charset=charset)
self._cursor = self._conn.cursor()
def connect(self, **kwargs):
"""
Connect
:param path: sqlite file to connect
"""
host = kwargs['host']
port = kwargs['port']
user = kwargs['user']
pwd = kwargs['pwd']
schema = kwargs['schema']
self.conn = pymysql.connect(host=host,
port=port,
user=user,
password=pwd,
db=schema,
charset='utf8mb4',
cursorclass=pymysql.cursors.SSDictCursor)
self.cursor = self.conn.cursor()
return self.conn is not None and self.cursor is not None
def test_context(self):
with self.assertRaises(ValueError):
c = pymysql.connect(**self.databases[0])
with c as cur:
cur.execute('create table test ( a int )')
c.begin()
cur.execute('insert into test values ((1))')
raise ValueError('pseudo abort')
c.commit()
c = pymysql.connect(**self.databases[0])
with c as cur:
cur.execute('select count(*) from test')
self.assertEqual(0, cur.fetchone()[0])
cur.execute('insert into test values ((1))')
with c as cur:
cur.execute('select count(*) from test')
self.assertEqual(1,cur.fetchone()[0])
cur.execute('drop table test')
def test_defer_connect(self):
import socket
for db in self.databases:
d = db.copy()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(d['unix_socket'])
except KeyError:
sock = socket.create_connection(
(d.get('host', 'localhost'), d.get('port', 3306)))
for k in ['unix_socket', 'host', 'port']:
try:
del d[k]
except KeyError:
pass
c = pymysql.connect(defer_connect=True, **d)
self.assertFalse(c.open)
c.connect(sock)
c.close()
sock.close()
def test_issue_17(self):
"""could not connect mysql use passwod"""
conn = self.connections[0]
host = self.databases[0]["host"]
db = self.databases[0]["db"]
c = conn.cursor()
# grant access to a table to a user with a password
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue17")
c.execute("create table issue17 (x varchar(32) primary key)")
c.execute("insert into issue17 (x) values ('hello, world!')")
c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
conn.commit()
conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
c2 = conn2.cursor()
c2.execute("select x from issue17")
self.assertEqual("hello, world!", c2.fetchone()[0])
finally:
c.execute("drop table issue17")
def test_issue_114(self):
""" autocommit is not set after reconnecting with ping() """
conn = pymysql.connect(charset="utf8", **self.databases[0])
conn.autocommit(False)
c = conn.cursor()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
conn.ping()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
# Ensure autocommit() is still working
conn = pymysql.connect(charset="utf8", **self.databases[0])
c = conn.cursor()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
conn.ping()
conn.autocommit(True)
c.execute("""select @@autocommit;""")
self.assertTrue(c.fetchone()[0])
conn.close()
def test_issue_491(self):
""" Test warning propagation """
conn = pymysql.connect(charset="utf8", **self.databases[0])
with warnings.catch_warnings():
# Ignore all warnings other than pymysql generated ones
warnings.simplefilter("ignore")
warnings.simplefilter("error", category=pymysql.Warning)
# verify for both buffered and unbuffered cursor types
for cursor_class in (cursors.Cursor, cursors.SSCursor):
c = conn.cursor(cursor_class)
try:
c.execute("SELECT CAST('124b' AS SIGNED)")
c.fetchall()
except pymysql.Warning as e:
# Warnings should have errorcode and string message, just like exceptions
self.assertEqual(len(e.args), 2)
self.assertEqual(e.args[0], 1292)
self.assertTrue(isinstance(e.args[1], text_type))
else:
self.fail("Should raise Warning")
finally:
c.close()
def test_context(self):
with self.assertRaises(ValueError):
c = pymysql.connect(**self.databases[0])
with c as cur:
cur.execute('create table test ( a int )')
c.begin()
cur.execute('insert into test values ((1))')
raise ValueError('pseudo abort')
c.commit()
c = pymysql.connect(**self.databases[0])
with c as cur:
cur.execute('select count(*) from test')
self.assertEqual(0, cur.fetchone()[0])
cur.execute('insert into test values ((1))')
with c as cur:
cur.execute('select count(*) from test')
self.assertEqual(1,cur.fetchone()[0])
cur.execute('drop table test')
def test_example(self):
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')
cur = conn.cursor()
cur.execute("SELECT Host,User FROM user")
# print cur.description
# r = cur.fetchall()
# print r
# ...or...
u = False
for r in cur.fetchall():
u = u or conn.user in r
self.assertTrue(u)
cur.close()
conn.close()
def test_issue_17(self):
"""could not connect mysql use passwod"""
conn = self.connections[0]
host = self.databases[0]["host"]
db = self.databases[0]["db"]
c = conn.cursor()
# grant access to a table to a user with a password
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue17")
c.execute("create table issue17 (x varchar(32) primary key)")
c.execute("insert into issue17 (x) values ('hello, world!')")
c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
conn.commit()
conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
c2 = conn2.cursor()
c2.execute("select x from issue17")
self.assertEqual("hello, world!", c2.fetchone()[0])
finally:
c.execute("drop table issue17")
def test_issue_114(self):
""" autocommit is not set after reconnecting with ping() """
conn = pymysql.connect(charset="utf8", **self.databases[0])
conn.autocommit(False)
c = conn.cursor()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
conn.ping()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
# Ensure autocommit() is still working
conn = pymysql.connect(charset="utf8", **self.databases[0])
c = conn.cursor()
c.execute("""select @@autocommit;""")
self.assertFalse(c.fetchone()[0])
conn.close()
conn.ping()
conn.autocommit(True)
c.execute("""select @@autocommit;""")
self.assertTrue(c.fetchone()[0])
conn.close()
def test_issue_491(self):
""" Test warning propagation """
conn = pymysql.connect(charset="utf8", **self.databases[0])
with warnings.catch_warnings():
# Ignore all warnings other than pymysql generated ones
warnings.simplefilter("ignore")
warnings.simplefilter("error", category=pymysql.Warning)
# verify for both buffered and unbuffered cursor types
for cursor_class in (cursors.Cursor, cursors.SSCursor):
c = conn.cursor(cursor_class)
try:
c.execute("SELECT CAST('124b' AS SIGNED)")
c.fetchall()
except pymysql.Warning as e:
# Warnings should have errorcode and string message, just like exceptions
self.assertEqual(len(e.args), 2)
self.assertEqual(e.args[0], 1292)
self.assertTrue(isinstance(e.args[1], text_type))
else:
self.fail("Should raise Warning")
finally:
c.close()
def __init__(self,dbname,key,city):
self.dbname = dbname
self.T = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d%H%M")
# ????????
self.table_name = "{}_{}_{}".format(self.T,key,city)
# ?????
self.conn = pymysql.connect(
host="localhost",
port=3306,
user='root',
password='python',
db=self.dbname,
charset='utf8'
)
# ????
self.cursor = self.conn.cursor()
def load_db_sqlite(dicts, dbname, table):
'''
dicts: a dictionary
dbname: sqlite3 database file name
'''
import sqlite3
conn = sqlite3.connect(dbname)
curs = conn.cursor()
curs.execute('''
create table IF NOT EXISTS ?
(time char(100), author char(10), url char(100), title char(150), content char(10000))
''', table)
curs.execute('''
INSERT INTO `articles` (`time`, `author`, `url`, `title`, `content`)
VALUES (?,?,?,?,?)
''', dicts)
conn.commit()
conn.close()
init_db.py 文件源码
项目:oss-github-analysis-project
作者: itu-oss-project-team
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def main():
mysql_config = secret_config['mysql']
db = pymysql.connect(host=mysql_config['host'], port=mysql_config['port'], db=mysql_config['db'],
user=mysql_config['user'],
passwd=mysql_config['passwd'])
msg = "CAUTION! This will drop all tables and probably going to cause loss of data. Are you sure want to continue?"
approved = input("%s (y/N) " % msg).lower() == 'y'
if not approved:
return
print("-->Dropping tables from \"" + mysql_config['db'] + "\"...")
clear_db(db)
print("-->Tables dropped")
print("-->Creating tables on \"" + mysql_config['db'] + "\"...")
init_db(db)
print("-->Tables created.")
def connect(self, **kwargs):
"""
Connect
:param path: sqlite file to connect
"""
host = kwargs['host']
port = kwargs['port']
user = kwargs['user']
pwd = kwargs['pwd']
schema = kwargs['schema']
self.conn = pymysql.connect(host=host,
port=port,
user=user,
password=pwd,
db=schema,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
self.cursor = self.conn.cursor()
return self.conn is not None and self.cursor is not None
def connect(self,tryTimes=1):
i = 0
while i < tryTimes :
try :
self.conn = pymysql.connect(
host=self.host,
user=self.user,
port=self.port,
passwd=self.passwd,
db=self.db,
charset="utf8",
)
self.logger.debug("connect ok!current retry : {0}".format(i))
break
except Exception:
i += 1
self.logger.error(traceback.format_exc())
return self.conn
def create_connection(arguments):
"""Create a connection to the SQL database using arguments."""
arguments = vars(arguments)
if on_forge():
arguments["host"] = "tools-db"
arguments["db"] = "s51138__heritage_p"
credentials = get_db_credentials()
arguments["user"] = credentials["user"]
arguments["password"] = credentials["password"]
return pymysql.connect(
host=arguments["host"],
user=arguments["user"],
password=arguments["password"],
db=arguments["db"],
charset="utf8")
def good_token(email:str, uri:str) -> bool:
"""
Confirms whether an email and uri pair are valid.
:param email the email which we are testing the uri for
:param uri the identifier token which we geerated and sent
:returns True if the token is valid (i.e. sent by us to this email),
False otherwise (including if a DB error occured)
"""
with sqlite3.connect(p.DBNAME) as conn:
c = conn.cursor()
c.execute("SELECT * FROM uris WHERE uri=?", (uri,))
row = c.fetchone()
if not row or row[0] != email:
return False
return True
def Open(self, p_autocommit=True):
try:
self.v_con = psycopg2.connect(
self.GetConnectionString(),
cursor_factory=psycopg2.extras.DictCursor
)
self.v_con.autocommit = p_autocommit
self.v_cur = self.v_con.cursor()
self.v_start = True
# PostgreSQL types
self.v_cur.execute('select oid, typname from pg_type')
self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()])
if not p_autocommit:
self.v_con.commit()
self.v_con.notices = DataList()
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def __init__(self, block=None, server_id=None, log_file=None,
log_pos=None, host=None, user=None, passwd=None, rollback=None,
port=None, gtid=None, _thread_id=None, stop_pos=None):
import pymysql
_remote_filed._gtid = gtid
_remote_filed._thread_id = _thread_id
self._stop_pos = stop_pos
self._log_file = log_file
self._log_pos = log_pos
self.block = block if block != None else False
self.server_id = server_id if server_id != None else 133
self.port = port if port != None else 3306
self.connection = pymysql.connect(host=host,
user=user,
password=passwd, port=self.port,
db='',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
if rollback:
_remote_filed._rollback_status = True
_rollback._myfunc = GetRollStatement(host=host, user=user, passwd=passwd, port=self.port)
self.ReadPack()
def test_example(self):
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')
cur = conn.cursor()
cur.execute("SELECT Host,User FROM user")
# print cur.description
# r = cur.fetchall()
# print r
# ...or...
u = False
for r in cur.fetchall():
u = u or conn.user in r
self.assertTrue(u)
cur.close()
conn.close()
def test_issue_17(self):
""" could not connect mysql use passwod """
conn = self.connections[0]
host = self.databases[0]["host"]
db = self.databases[0]["db"]
c = conn.cursor()
# grant access to a table to a user with a password
try:
c.execute("create table issue17 (x varchar(32) primary key)")
c.execute("insert into issue17 (x) values ('hello, world!')")
c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
conn.commit()
conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
c2 = conn2.cursor()
c2.execute("select x from issue17")
self.assertEqual("hello, world!", c2.fetchone()[0])
finally:
c.execute("drop table issue17")