def getdbconnection(configsection, as_dict=False, defaultargs={}):
""" Helper function that gets a database connection object from a config dictionary """
dbengine = configsection['dbengine'] if 'dbengine' in configsection else 'MySQLDB'
if dbengine == 'MySQLDB':
dbargs = defaultargs.copy()
# Convert the external configuration names to python PEP-249 config names
translate = {"host": "host",
"defaultsfile": "read_default_file",
"user": "user",
"pass": "passwd",
"port": "port"}
for confval, myval in translate.iteritems():
if confval in configsection:
dbargs[myval] = configsection[confval]
if as_dict:
dbargs['cursorclass'] = MySQLdb.cursors.DictCursor
return MySQLdb.connect(**dbargs)
else:
raise Exception("Unsupported database engine %s" % (dbengine))
python类cursors()的实例源码
def __init__(self):
env = C.get('sys', 'env')
host = C.get('mysql_' + env, 'host')
user = C.get('mysql_' + env, 'user')
passwd = C.get('mysql_' + env, 'password')
name = C.get('mysql_' + env, 'name')
self.db = MySQLdb.connect(
host = host,
port = 3306,
user = user,
passwd = passwd,
db = name,
charset='utf8',
cursorclass = MySQLdb.cursors.DictCursor)
self.cursor = self.db.cursor()
def __init__(self,db_name,host='',user='',passwd='',autocommit=True,loglevel='WARNING',use_tuples=False,default_cursor=None):
""" Initialization of MySQL connection """
self.call__init__(log.Logger,self.__class__.__name__,format='%(levelname)-8s %(asctime)s %(name)s: %(message)s')
self.setLogLevel(loglevel or 'WARNING')
#def __init__(self,api,db_name,user='',passwd='', host=''):
#if not api or not database:
#self.error('ArchivingAPI and database are required arguments for ArchivingDB initialization!')
#return
#self.api=api
self.db_name=db_name
self.host=host
self.use_tuples = use_tuples #It will control if data is returned in tuples or lists
self.setUser(user,passwd)
self.autocommit = autocommit
self.renewMySQLconnection()
self.default_cursor = default_cursor or MySQLdb.cursors.Cursor
self._cursor=None
self._recursion = 0
self.tables={}
def getCursor(self,renew=True,klass=None):
'''
returns the Cursor for the database
renew will force the creation of a new cursor object
klass may be any of MySQLdb.cursors classes (e.g. DictCursor)
MySQLdb.cursors.SSCursor allows to minimize mem usage in clients (although it relies memory cleanup to the server!)
'''
try:
if klass in ({},dict):
klass = MySQLdb.cursors.DictCursor
if (renew or klass) and self._cursor:
if not self._recursion:
self._cursor.close()
del self._cursor
if renew or klass or not self._cursor:
self._cursor = self.db.cursor(self.default_cursor) if klass is None else self.db.cursor(cursorclass=klass)
return self._cursor
except:
print traceback.format_exc()
self.renewMySQLconnection()
self._recursion += 1
return self.getCursor(renew=True,klass=klass)
def ConnecttoDB(self, params):
try:
conn = \
MySQLdb.connect(host = params[0], \
user = params[1], \
passwd = params[2], \
db = params[3], \
port = params[4], \
charset = params[5], \
cursorclass = MySQLdb.cursors.DictCursor)
return conn
except MySQLdb.Warning, w:
print "Warning:%s" % str(w)
return None
except MySQLdb.Error, e:
print "Error:%s" % str(e)
return None
def setup_mysqldb(self):
import MySQLdb
import MySQLdb.cursors
self.schema_supported['mysql'] = (
MySQLdb.connect,
{
'host': 'hostname',
'port': 'port',
'user': 'username',
'passwd': 'password',
'db': 'args',
'use_unicode': 'kwargs',
'cursorclass': 'kwargs',
'__kwargs__': 'kwargs'
},
{
'port': lambda port: port or 3306,
'db': lambda args: args[0],
'use_unicode': lambda kwargs: eval(kwargs.get('use_unicode', 'True')),
'cursorclass': lambda kwargs: getattr(MySQLdb.cursors, kwargs.get('cursorclass', 'Cursor')),
},
None,
)
def __init__(self, settings):
dbname = settings['MYSQL_DBNAME']
tbname = settings['TIEBA_NAME']
if not dbname.strip():
raise ValueError("No database name!")
if not tbname.strip():
raise ValueError("No tieba name!")
if isinstance(tbname, unicode):
settings['TIEBA_NAME'] = tbname.encode('utf8')
self.settings = settings
self.dbpool = adbapi.ConnectionPool('MySQLdb',
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset='utf8mb4',
cursorclass = MySQLdb.cursors.DictCursor,
init_command = 'set foreign_key_checks=0' #??????
)
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def connect(self):
try:
self.conn = MySQLdb.connect(host=self.__host, user=self.__username, passwd=self.__password,
port=self.__port, cursorclass = MySQLdb.cursors.DictCursor)
self.cursor = self.conn.cursor()
# res = cur.execute('select * from user');
# info = cur.fetchmany(res)
# for data in info:
# print data
# cur.close()
# conn.close()
except MySQLdb.Error, e:
Log.log_error('MySql Error %%s'% str(e))
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
password = settings["MYSQL_PASSWORD"],
charset = "utf8",
cursorclass = MySQLdb.cursors.DictCursor,
use_unicode = True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def connect(self):
if hasattr(self, 'conn') and self.conn.open:
return True
logger.debug('No open MySQL connection, setting up new')
self.conn = MySQLdb.connect(
host='localhost',
user=self.mycnf.get('user', 'root'),
passwd=self.mycnf.get('password', None),
cursorclass=MySQLdb.cursors.DictCursor
)
def __init__(self):
dbargs = dict(
host = '127.0.0.1',
db = 'test',
user = 'root',
passwd = '123456',
cursorclass = MySQLdb.cursors.DictCursor,
charset = 'utf8',
use_unicode = True
)
self.dbpool = adbapi.ConnectionPool('MySQLdb',**dbargs)
def config_conn(self, connection):
self.db_config = self.config['connections'].get(connection)
return self.manual_conn(host=self.db_config.get('host', None),
user=self.db_config.get('user', None),
passwd=self.db_config.get('pass', None),
db=self.db_config.get('db', None),
cursorclass=MySQLdb.cursors.DictCursor)
def manual_conn(self, host, user, passwd, db,
cursorclass=MySQLdb.cursors.DictCursor):
return MySQLdb.connect(host=host,
user=user,
passwd=passwd,
db=db,
cursorclass=cursorclass)
def from_settings(cls, settings):
dbparams = dict(
host=settings["MYSQL_HOST"],
port=settings["MYSQL_PORT"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PWD"],
db=settings["MYSQL_DB"],
charset=settings["MYSQL_CHARSET"],
use_unicode=settings["MYSQL_USER_UNICODE"],
cursorclass=MySQLdb.cursors.DictCursor,
)
# ????adbapi????dbpool????MyTwistedPipeline?????
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparams)
return cls(dbpool)
pipelines.py 文件源码
项目:PythonCrawler-Scrapy-Mysql-File-Template
作者: lawlite19
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def __init__(self,dbpool):
self.dbpool=dbpool
''' ?????????????????????????settings????????????
self.dbpool=adbapi.ConnectionPool('MySQLdb',
host='127.0.0.1',
db='crawlpicturesdb',
user='root',
passwd='123456',
cursorclass=MySQLdb.cursors.DictCursor,
charset='utf8',
use_unicode=False)'''
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or '',
"host": conn.host or 'localhost',
"db": self.schema or conn.schema or ''
}
if not conn.port:
conn_config["port"] = 3306
else:
conn_config["port"] = int(conn.port)
if conn.extra_dejson.get('charset', False):
conn_config["charset"] = conn.extra_dejson["charset"]
if (conn_config["charset"]).lower() == 'utf8' or\
(conn_config["charset"]).lower() == 'utf-8':
conn_config["use_unicode"] = True
if conn.extra_dejson.get('cursor', False):
if (conn.extra_dejson["cursor"]).lower() == 'sscursor':
conn_config["cursorclass"] = MySQLdb.cursors.SSCursor
elif (conn.extra_dejson["cursor"]).lower() == 'dictcursor':
conn_config["cursorclass"] = MySQLdb.cursors.DictCursor
elif (conn.extra_dejson["cursor"]).lower() == 'ssdictcursor':
conn_config["cursorclass"] = MySQLdb.cursors.SSDictCursor
local_infile = conn.extra_dejson.get('local_infile',False)
if conn.extra_dejson.get('ssl', False):
conn_config['ssl'] = conn.extra_dejson['ssl']
if local_infile:
conn_config["local_infile"] = 1
conn = MySQLdb.connect(**conn_config)
return conn
def __init__(self):
self.dbpool = adbapi.ConnectionPool('MySQLdb',
db = 'guazi',
user = 'root',
passwd = '',
cursorclass = MySQLdb.cursors.DictCursor,
charset = 'utf8',
use_unicode = True
)
def __init__(self):
# host = config.DB_config.get('mysql').get('host')
# port = config.DB_config.get('mysql').get('port')
# user = config.DB_config.get('mysql').get('user')
# password = config.DB_config.get('mysql').get('password')
# db = config.DB_config.get('database')
# self.mysql_conn = MySQLdb.connect(host, user, password, db, charset="utf8", use_unicode=True)
dbparms = config.DB_config.get("mysql")
dbparms['db'] = config.database
dbparms['cursorclass'] = MySQLdb.cursors.DictCursor
dbparms['use_unicode'] = True
self.mysql_conn = MySQLdb.connect(**dbparms)
def from_settings(cls, settings):
dbparms = config.DB_config.get("mysql")
dbparms['db'] = config.database
dbparms['cursorclass'] = MySQLdb.cursors.DictCursor
dbparms['use_unicode'] = True
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def commit(query, params):
c = database.cursor(MySQLdb.cursors.Cursor)
try:
c.execute(query, params)
database.commit()
except Exception as ex:
database.rollback()
print ex
print c._last_executed
def from_settings(cls,settings):
dbparams=dict(
host=settings['MYSQL_HOST'],#??settings????
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset='utf8',#??????????????????
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=False,
)
dbpool=adbapi.ConnectionPool('MySQLdb',**dbparams)#**?????????????,???host=xxx,db=yyy....
return cls(dbpool)#???dbpool???????self?????
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
password = settings["MYSQL_PASSWORD"],
charset = 'utf8',
cursorclass = MySQLdb.cursors.DictCursor,
use_unicode = True,
)
dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms)
return cls(dbpool)
def connect_mysql(self, dbhost, dbuser, dbpassword, dbname):
self.type = self.TYPE_MYSQL
self.db = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpassword,
db=dbname, cursorclass=MySQLdb.cursors.DictCursor)
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def __init__(self):
self.dbpool = adbapi.ConnectionPool(
dbapiName = 'MySQLdb',
host = 'localhost',
db = 'sina_spider_db',
user = 'root',
passwd = 'gongli',
cursorclass = MySQLdb.cursors.DictCursor,
charset = 'utf8mb4',
)