def CheckConn(self,port):
retry_num = 0
while True:
try:
local_conn = MySQLdb.connect(host='127.0.0.1', user=mysql_user, passwd=mysql_password, port=int(port), db='',charset="utf8")
local_conn.cursor()
local_conn.close()
state = True
break
except MySQLdb.Error,e:
logging.error(e)
state = None
retry_num += 1
time.sleep(1)
if retry_num >= 3:
break
return state
python类connect()的实例源码
def sabo_db_connect(db_conf):
ip = db_conf["db_addr"]
port = db_conf["db_port"]
user = db_conf["db_user"]
passwd = db_conf["db_passwd"]
db = db_conf["db_name"]
charset = "utf8"
for i in range(db_conf["db_retry"]):
try:
mysql_db = MySQLdb.connect(host=ip, port=port, user=user, passwd=passwd, db=db, charset=charset)
except Exception as e:
sabo_error_log("error", "Connect to mysql failed: {0}".format(e))
else:
return mysql_db
# if there are some tasks didn't be judged,
# then the process will be blocked until the queue is empty.
def execute_sql(sql):
connection, cursor = None, None
try:
connection = MySQLdb.connect(host=settings.inception_host,
user=settings.inception_user,
passwd=settings.inception_password,
port=settings.inception_port,
use_unicode=True, charset="utf8", connect_timeout=2)
cursor = connection.cursor()
cursor.execute(sql)
return cursor.fetchall()
finally:
if (cursor is not None):
cursor.close()
if (connection is not None):
connection.close()
return []
def sql_init(ip,username,password,cmd):
try:
client = paramiko.SSHClient()
# Default accept unknown keys
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect
client.connect(ip, port=22, username=username, password=password,timeout="20s")
# Execute shell remotely
stdin, stdout, stderr = client.exec_command(cmd)
stdout.read()
print stdout
client.close()
#sftp = client.open_sftp()
# Make a dir
#sftp.mkdir('/home/testssh')
# Down file from remote to local ????????????????
#sftp.get('firewall.sh', '/tmp/firewall.sh')
# Upload file from local to remote ????????
#sftp.put('D:/iso/auto.sh', '/home/testssh/auto.sh')
except Exception, e:
print 'authentication failed or execute commands failed:',e
#dropdatabase("172.30.121.54","webui","123456","drop database frontend")
#sql_init("172.30.121.54", "root", "teamsun", "mysql -uwebui -p123456 < /etc/frontend/frontend.sql")
def DepositSampleInfo(self):
########################
# Sample_info database #
########################
# Insert into DB
db = MySQLdb.connect(host="mbitdb1.cwnerb5rh4qg.us-east-1.rds.amazonaws.com", # your host, usually localhost
user="cmitaws", # your username
passwd="microbiome", # your password
db="Sample_info") # name of the data base
insert_OTU = ("INSERT INTO turnbaugh_mouse_diet_2015 "
"(sample_ID, disease_labels, keywords)"
"VALUES (%(sample_ID)s, %(disease_labels)s, %(keywords)s)")
cursor = db.cursor()
# CREATE sample info table
mysql_cmd1 = "create table " + data_summary.datasetID + " (sampleID VARCHAR(20), disease_labels VARCHAR(30), keywords VARCHAR(200));"
# Deposit sample info
def connect(self):
if self.app.config['MYSQL_DATABASE_HOST']:
self.connect_args['host'] = self.app.config['MYSQL_DATABASE_HOST']
if self.app.config['MYSQL_DATABASE_PORT']:
self.connect_args['port'] = self.app.config['MYSQL_DATABASE_PORT']
if self.app.config['MYSQL_DATABASE_USER']:
self.connect_args['user'] = self.app.config['MYSQL_DATABASE_USER']
if self.app.config['MYSQL_DATABASE_PASSWORD']:
self.connect_args['passwd'] = self.app.config['MYSQL_DATABASE_PASSWORD']
if self.app.config['MYSQL_DATABASE_DB']:
self.connect_args['db'] = self.app.config['MYSQL_DATABASE_DB']
if self.app.config['MYSQL_DATABASE_CHARSET']:
self.connect_args['charset'] = self.app.config['MYSQL_DATABASE_CHARSET']
if self.app.config['MYSQL_USE_UNICODE']:
self.connect_args['use_unicode'] = self.app.config['MYSQL_USE_UNICODE']
return MySQLdb.connect(**self.connect_args)
def _connect(self):
"""Function connects to the database"""
logger.debug('Connecting to MySQL database.')
try:
if str(self.logsocket).lower() == 'tcp':
self.connection = MySQLdb.connect(
host=self.host,
port=self.port,
user=self.username,
passwd=self.passphrase,
db=self.db)
elif str(self.logsocket).lower() == 'dev':
self.connection = MySQLdb.connect(
unix_socket=self.logdevice,
user=self.username,
passwd=self.passphrase,
db=self.db)
self._create_database()
except (AttributeError, MySQLdb.OperationalError):
logger.exception('Exception: Cannot connect to database.')
def require_db_connection(f):
@wraps(f)
def decorated_function(*args, **kwargs):
####?????
if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
print 'has db connect, do nothing'
else:
(g.conn, g.cursor) = _connect_db()
print 'create new db connect'
#????
func = f(*args, **kwargs)
###???????
if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
g.cursor.close()
g.cursor = None
g.conn.close()
g.conn = None
print 'close db connect'
else:
print 'no db connect, no need to close...'
return func
return decorated_function
def __connect(self, db=None):
if db or self.db:
try:
if db:
self.conn = mdb.connect(self.host, self.user, self.passwd, db)
else:
self.conn = mdb.connect(self.host, self.user, self.passwd, self.db, charset='utf8')
except Exception, e:
print e
return
else:
try:
self.conn = mdb.connect(self.host, self.user, self.passwd)
except Exception, e:
print e
return
self.cursor = self.conn.cursor()
#self.cursor = mdb.cursors.DictCursor(self.conn)
def checkTrafficTable(self,table):
found = 0
db = MySQLdb.connect("localhost","root","toor", self.VEHICLE_NAME )
# prepare a cursor object using cursor() method
cursor = db.cursor()
try:
# Check if the tags database exists
cursor.execute("SHOW TABLES LIKE '%s';" % table)
results = cursor.fetchall()
for row in results:
if(row[0] == table):
found = 1
# Commit your changes in the database
db.commit()
except:
# Rollback in case there is any error
db.rollback()
db.close()
return(found)
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'ty158917', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def connect():
try:
log('I', 'db', 'Establishing connection...')
db = dblib.connect(host='localhost',
user=DB['username'],
passwd=DB['password'],
db=DB['database'],
use_unicode=1,
charset='utf8')
return db
except:
log('E', 'db', 'Could not establish connection')
print_tb()
def connect():
try:
log('I', 'db', 'Establishing connection...')
db = dblib.connect(host='localhost',
user=DB['username'],
passwd=DB['password'],
db=DB['database'],
use_unicode=1,
charset='utf8')
return db
except:
log('E', 'db', 'Could not establish connection')
print_tb()
def __init__(self):
self.u = os.environ['htpuser']
self.p = os.environ['htppass']
self.d = "htp"
self.con = MySQLdb.connect(host = 'localhost',
user = self.u,
passwd = self.p,
db = self.d)
self.cursor = self.con.cursor()
# search database and return search values
def __init__(self):
self.conn = mysql.connect(
host=db_conf['host'],
port=int(db_conf['port']),
user=db_conf['user'],
passwd=db_conf['passwd'],
db=db_conf['db'],
charset=db_conf['charset']
)
self.conn.autocommit(True)
self.cursor = self.conn.cursor()
def __init__(self):
self.conn = mysql.connect(
host=db_conf['host'],
port=int(db_conf['port']),
user=db_conf['user'],
passwd=db_conf['passwd'],
db=db_conf['db'],
charset=db_conf['charset']
)
self.conn.autocommit(True)
self.cursor = self.conn.cursor()
def getDB(connType):
if connType == DBTYPE_MYSQL:
conn = MySQLdb.connect (host = DBHOST, user = DBUSER, passwd = DBPASS, db = DBNAME)
else:
conn = sqlite3.connect ("{}.db".format(DBNAME))
return conn
def mysql_from_url(url):
parsed = urlparse(url)
assert parsed.scheme == 'mysql', (
'MySQL url must be of the form mysql://hostname:port/db'
)
return MySQLdb.connect(
user=parsed.username or 'root',
passwd=parsed.password or '',
host=parsed.hostname,
port=parsed.port or 3306,
db=parsed.path.strip('/'),
)
def __enter__(self):
self.connect()
return self
def connect(self):
self.db = sqldb.connect(db=self.db_name, user=self.user,
host=self.host, passwd=self.passwd)
self.cur = self.db.cursor()
def getConnection(self):
return MySQLdb.Connect(
host=self.DB_HOST,
port=self.DB_PORT,
user=self.DB_USER,
passwd=self.DB_PWD,
db=self.DB_NAME,
charset='utf8'
)
def getConnection(self):
return MySQLdb.Connect(
host=self.DB_HOST,
port=self.DB_PORT,
user=self.DB_USER,
passwd=self.DB_PWD,
db=self.DB_NAME,
charset='utf8'
)
def connect(self):
conn = None
try:
conn = MySQLdb.Connect(
host=self.hostname,
port=self.port,
user=self.username,
password=self.password,
db=self.db)
except Exception as e:
print("MySQL Connection error : {0}".format(e))
return conn
def connect(self):
conn = None
try:
conn = MySQLdb.Connect(host=self.hostname, port=self.port, user=self.username, password=self.password, db=self.db)
except Exception as e:
print("MySQL Connection error : {0}".format(e))
return conn
def getConn(conninfo):
host = conninfo['host']
user = conninfo['user']
password = conninfo['password']
port = conninfo['port']
sock = conninfo['sock']
db = conninfo['db']
# print host, user, password, port, db
try:
conn = MySQLdb.Connect(host=host, user=user, passwd=password, port=port, unix_socket=sock, db=db, charset='utf8')
except Exception as e:
print e
sys.exit()
return conn
def db_reconnect(db_user, db_id):
db_pass = settings.DB_AUTH[db_user]
pc = prpcryptec.prpcrypt(KEY_DB_AUTH)
db_instance = get_setttings("db_info", db_id)
db_host, db_port = db_instance.replace(' ', '').split(':')
db_conn = None
while not db_conn:
try:
logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
db_id, db_host, db_user, db_port)
db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
connect_timeout=5, use_unicode=False)
except MySQLdb.Error, e:
if e.args[0] in (2013, 2003):
logger.critical('Error %d: %s', e.args[0], e.args[1])
logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
db_id, db_host, db_user, db_port)
db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
connect_timeout=5, use_unicode=False)
except Exception as inst:
print "Error %s %s" % type(inst), inst.args.__str__()
time.sleep(10)
return db_conn
# judge this thread meet kill_opt or not
def __init__(self):
self.__cfgReporting = Config(os.path.join(RAGPICKER_ROOT, 'config', 'reporting.conf'))
self.__mysqlEnabled = self.__cfgReporting.getOption("mysql", "enabled")
if self.__mysqlEnabled:
#Anbindung an Datenbank MySQL herstellen
try:
mysqlHost = self.__cfgReporting.getOption("mysql", "host")
mysqlPort = self.__cfgReporting.getOption("mysql", "port")
mysqlDatabase = self.__cfgReporting.getOption("mysql", "database")
mysqlUser = self.__cfgReporting.getOption("mysql", "user")
mysqlPassword = self.__cfgReporting.getOption("mysql", "password")
self.__mysqlConnection = MySQLdb.Connect(host=mysqlHost, port=mysqlPort, db=mysqlDatabase, user=mysqlUser, passwd=mysqlPassword)
except (Exception) as e:
raise Exception("Cannot connect to MySQL (ragpicker): %s" % e)
def default_get_mysql_connection(
user_name, user_pass, socket, dbname='',
timeout=60, connect_timeout=10, charset=None):
"""
Default method for connection to a MySQL instance.
You can override this behaviour by define/import in cli.py and pass it to
Payload at instantiation time.
The function should return a valid Connection object just as
MySQLdb.Connect does.
"""
connection_config = {
'user': user_name,
'passwd': user_pass,
'unix_socket': socket,
'db': dbname,
'use_unicode': True,
'connect_timeout': connect_timeout
}
if charset:
connection_config['charset'] = charset
dbh = MySQLdb.Connect(**connection_config)
dbh.autocommit(True)
if timeout:
cursor = dbh.cursor()
cursor.execute("SET SESSION WAIT_TIMEOUT = %s", (timeout,))
return dbh
def c2db(self):
self.conn = MySQLdb.Connect(
host = self.host,
port = self.port,
user = self.user,
passwd = self.passwd,
db = self.db,
)
self.cur = self.conn.cursor()
def query_table_info(*dbinfo):
sql_str = """
SELECT
IFNULL(@@hostname, @@server_id) SERVER_NAME,
%s as HOST,
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.TABLE_ROWS,
t.DATA_LENGTH,
t.INDEX_LENGTH,
t.AUTO_INCREMENT,
c.COLUMN_NAME,
c.DATA_TYPE,
LOCATE('unsigned', c.COLUMN_TYPE) COL_UNSIGNED
# CONCAT(c.DATA_TYPE, IF(LOCATE('unsigned', c.COLUMN_TYPE)=0, '', '_unsigned'))
FROM
information_schema.`TABLES` t
LEFT JOIN information_schema.`COLUMNS` c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA
AND t.TABLE_NAME = c.TABLE_NAME
AND c.EXTRA = 'auto_increment'
WHERE
t.TABLE_SCHEMA NOT IN (
'mysql',
'information_schema',
'performance_schema',
'sys'
)
AND t.TABLE_TYPE = 'BASE TABLE'
"""
dbinfo_str = "%s:%d" % (dbinfo[0], dbinfo[1])
rs = ({},)
try:
db_conn = MySQLdb.Connect(host=dbinfo[0], port=dbinfo[1], user=dbinfo[2], passwd=dbinfo[3],
connect_timeout=5)
cur = db_conn.cursor(MySQLdb.cursors.DictCursor)
param = (dbinfo_str,)
print "\n[%s] Get schema info from db: '%s'..." % (datetime.today(), dbinfo_str)
cur.execute(sql_str, param)
rs = cur.fetchall()
except MySQLdb.Error, e:
print "Error[%d]: %s (%s)" % (e.args[0], e.args[1], dbinfo_str)
exit(-1)
if InfluxDB_INFO is not None:
print "Write '%s' schema table info to influxdb ..." % dbinfo_str
write_influxdb(*rs)
else:
print rs