python类connect()的实例源码

router.py 文件源码 项目:dcmha 作者: wwwbjqcom 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def CheckConn(self,port):
        retry_num = 0
        while True:
            try:
                local_conn = MySQLdb.connect(host='127.0.0.1', user=mysql_user, passwd=mysql_password, port=int(port), db='',charset="utf8")
                local_conn.cursor()
                local_conn.close()
                state = True
                break
            except MySQLdb.Error,e:
                logging.error(e)
                state = None
            retry_num += 1
            time.sleep(1)
            if retry_num >= 3:
                break
        return  state
sabo_database.py 文件源码 项目:sabo 作者: tokers 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sabo_db_connect(db_conf):
    ip      = db_conf["db_addr"]
    port    = db_conf["db_port"]
    user    = db_conf["db_user"]
    passwd  = db_conf["db_passwd"]
    db      = db_conf["db_name"]
    charset = "utf8"

    for i in range(db_conf["db_retry"]):
        try:
            mysql_db = MySQLdb.connect(host=ip, port=port, user=user, passwd=passwd, db=db, charset=charset)
        except Exception as e:
            sabo_error_log("error", "Connect to mysql failed: {0}".format(e))
        else:
            return mysql_db


# if there are some tasks didn't be judged,
# then the process will be blocked until the queue is empty.
inception_util.py 文件源码 项目:mysql_audit 作者: ycg 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute_sql(sql):
    connection, cursor = None, None
    try:
        connection = MySQLdb.connect(host=settings.inception_host,
                                     user=settings.inception_user,
                                     passwd=settings.inception_password,
                                     port=settings.inception_port,
                                     use_unicode=True, charset="utf8", connect_timeout=2)
        cursor = connection.cursor()
        cursor.execute(sql)
        return cursor.fetchall()
    finally:
        if (cursor is not None):
            cursor.close()
        if (connection is not None):
            connection.close()
    return []
initmysql.py 文件源码 项目:myautotest 作者: auuppp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sql_init(ip,username,password,cmd):
    try:
        client = paramiko.SSHClient()
        # Default accept unknown keys
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # Connect
        client.connect(ip, port=22, username=username, password=password,timeout="20s")
        # Execute shell remotely
        stdin, stdout, stderr = client.exec_command(cmd)
        stdout.read()
        print stdout
        client.close()
    #sftp = client.open_sftp()
    # Make a dir
    #sftp.mkdir('/home/testssh')
    # Down file from remote to local ????????????????
    #sftp.get('firewall.sh', '/tmp/firewall.sh')
    # Upload file from local to remote ????????
    #sftp.put('D:/iso/auto.sh', '/home/testssh/auto.sh')
    except Exception, e:
        print 'authentication failed or execute commands failed:',e
#dropdatabase("172.30.121.54","webui","123456","drop database frontend")
#sql_init("172.30.121.54", "root", "teamsun", "mysql -uwebui -p123456 < /etc/frontend/frontend.sql")
DepositDB.py 文件源码 项目:amplicon_sequencing_pipeline 作者: thomasgurry 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def DepositSampleInfo(self):

        ######################## 
        # Sample_info database #
        ########################

        # Insert into DB
        db = MySQLdb.connect(host="mbitdb1.cwnerb5rh4qg.us-east-1.rds.amazonaws.com", # your host, usually localhost
                     user="cmitaws", # your username
                     passwd="microbiome", # your password
                     db="Sample_info") # name of the data base
        insert_OTU = ("INSERT INTO turnbaugh_mouse_diet_2015 "
                      "(sample_ID, disease_labels, keywords)" 
                      "VALUES (%(sample_ID)s, %(disease_labels)s, %(keywords)s)")
        cursor = db.cursor()


        # CREATE sample info table
        mysql_cmd1 = "create table " + data_summary.datasetID + " (sampleID VARCHAR(20), disease_labels VARCHAR(30), keywords VARCHAR(200));"

        # Deposit sample info
mysql.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect(self):
        if self.app.config['MYSQL_DATABASE_HOST']:
            self.connect_args['host'] = self.app.config['MYSQL_DATABASE_HOST']
        if self.app.config['MYSQL_DATABASE_PORT']:
            self.connect_args['port'] = self.app.config['MYSQL_DATABASE_PORT']
        if self.app.config['MYSQL_DATABASE_USER']:
            self.connect_args['user'] = self.app.config['MYSQL_DATABASE_USER']
        if self.app.config['MYSQL_DATABASE_PASSWORD']:
            self.connect_args['passwd'] = self.app.config['MYSQL_DATABASE_PASSWORD']
        if self.app.config['MYSQL_DATABASE_DB']:
            self.connect_args['db'] = self.app.config['MYSQL_DATABASE_DB']
        if self.app.config['MYSQL_DATABASE_CHARSET']:
            self.connect_args['charset'] = self.app.config['MYSQL_DATABASE_CHARSET']
        if self.app.config['MYSQL_USE_UNICODE']:
            self.connect_args['use_unicode'] = self.app.config['MYSQL_USE_UNICODE']
        return MySQLdb.connect(**self.connect_args)
mysqldb_logger.py 文件源码 项目:honeyd-python 作者: sookyp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _connect(self):
        """Function connects to the database"""
        logger.debug('Connecting to MySQL database.')
        try:
            if str(self.logsocket).lower() == 'tcp':
                self.connection = MySQLdb.connect(
                    host=self.host,
                    port=self.port,
                    user=self.username,
                    passwd=self.passphrase,
                    db=self.db)
            elif str(self.logsocket).lower() == 'dev':
                self.connection = MySQLdb.connect(
                    unix_socket=self.logdevice,
                    user=self.username,
                    passwd=self.passphrase,
                    db=self.db)
            self._create_database()
        except (AttributeError, MySQLdb.OperationalError):
            logger.exception('Exception: Cannot connect to database.')
db_connect_warp.py 文件源码 项目:iOS-private-api-checker 作者: NetEaseGame 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def require_db_connection(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        ####?????
        if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
            print 'has db connect, do nothing'
        else:
            (g.conn, g.cursor) = _connect_db()
            print 'create new db connect'

        #????
        func = f(*args, **kwargs)

        ###???????
        if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
            g.cursor.close()
            g.cursor = None 
            g.conn.close()
            g.conn = None
            print 'close db connect'
        else:
            print 'no db connect, no need to close...'

        return func
    return decorated_function
dblib.py 文件源码 项目:weibo_scrawler_app 作者: coolspiderghy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __connect(self, db=None):
        if db or self.db:
            try:
                if db:
                    self.conn = mdb.connect(self.host, self.user, self.passwd, db)
                else:
                    self.conn = mdb.connect(self.host, self.user, self.passwd, self.db, charset='utf8')
            except Exception, e:
                print e
                return
        else:
            try:
                self.conn = mdb.connect(self.host, self.user, self.passwd)
            except Exception, e:
                print e
                return
        self.cursor = self.conn.cursor()

    #self.cursor = mdb.cursors.DictCursor(self.conn)
main.py 文件源码 项目:autopen 作者: autopen 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def checkTrafficTable(self,table):

        found = 0

        db = MySQLdb.connect("localhost","root","toor", self.VEHICLE_NAME )
        # prepare a cursor object using cursor() method
        cursor = db.cursor()
        try:
            # Check if the tags database exists
            cursor.execute("SHOW TABLES LIKE '%s';" % table)
            results = cursor.fetchall()

            for row in results:
                if(row[0] == table):
                    found = 1

            # Commit your changes in the database
            db.commit()
        except:
            # Rollback in case there is any error
            db.rollback()

        db.close()

        return(found)
pipelines.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self.conn = MySQLdb.connect('127.0.0.1', 'root', 'ty158917', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()
dbutil.py 文件源码 项目:uicourses_v2 作者: sumerinlan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect():
    try:
        log('I', 'db', 'Establishing connection...')
        db = dblib.connect(host='localhost',
                           user=DB['username'],
                           passwd=DB['password'],
                           db=DB['database'],
                           use_unicode=1,
                           charset='utf8')
        return db
    except:
        log('E', 'db', 'Could not establish connection')
        print_tb()
dbutil.py 文件源码 项目:uicourses_v2 作者: sumerinlan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect():
    try:
        log('I', 'db', 'Establishing connection...')
        db = dblib.connect(host='localhost',
                           user=DB['username'],
                           passwd=DB['password'],
                           db=DB['database'],
                           use_unicode=1,
                           charset='utf8')
        return db
    except:
        log('E', 'db', 'Could not establish connection')
        print_tb()
Database.py 文件源码 项目:HTP 作者: nklose 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):
        self.u = os.environ['htpuser']
        self.p = os.environ['htppass']

        self.d = "htp"
        self.con = MySQLdb.connect(host = 'localhost',
                          user = self.u,
                          passwd = self.p,
                          db = self.d)
        self.cursor = self.con.cursor()

    # search database and return search values
nbnetdb.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.conn = mysql.connect(
                        host=db_conf['host'], 
                        port=int(db_conf['port']),
                        user=db_conf['user'],
                        passwd=db_conf['passwd'],
                        db=db_conf['db'],
                        charset=db_conf['charset']
                        )

        self.conn.autocommit(True)
        self.cursor = self.conn.cursor()
nbnetdb.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.conn = mysql.connect(
                        host=db_conf['host'], 
                        port=int(db_conf['port']),
                        user=db_conf['user'],
                        passwd=db_conf['passwd'],
                        db=db_conf['db'],
                        charset=db_conf['charset']
                        )

        self.conn.autocommit(True)
        self.cursor = self.conn.cursor()
dealWithDBResults.py 文件源码 项目:Causality 作者: vcla 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getDB(connType):
    if connType == DBTYPE_MYSQL:
        conn = MySQLdb.connect (host = DBHOST, user = DBUSER, passwd = DBPASS, db = DBNAME)
    else:
        conn = sqlite3.connect ("{}.db".format(DBNAME))
    return conn
store.py 文件源码 项目:kafka_store 作者: smyte 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mysql_from_url(url):
        parsed = urlparse(url)
        assert parsed.scheme == 'mysql', (
            'MySQL url must be of the form mysql://hostname:port/db'
        )
        return MySQLdb.connect(
            user=parsed.username or 'root',
            passwd=parsed.password or '',
            host=parsed.hostname,
            port=parsed.port or 3306,
            db=parsed.path.strip('/'),
        )
database.py 文件源码 项目:microurl 作者: francium 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.connect()
        return self
database.py 文件源码 项目:microurl 作者: francium 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
        self.db = sqldb.connect(db=self.db_name, user=self.user,
                host=self.host, passwd=self.passwd)
        self.cur = self.db.cursor()
wechat_sms.py 文件源码 项目:BackManager 作者: linuxyan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getConnection(self):
        return MySQLdb.Connect(
                           host=self.DB_HOST,
                           port=self.DB_PORT,
                           user=self.DB_USER,
                           passwd=self.DB_PWD,
                           db=self.DB_NAME,
                           charset='utf8'
                           )
count_data.py 文件源码 项目:BackManager 作者: linuxyan 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getConnection(self):
        return MySQLdb.Connect(
                           host=self.DB_HOST,
                           port=self.DB_PORT,
                           user=self.DB_USER,
                           passwd=self.DB_PWD,
                           db=self.DB_NAME,
                           charset='utf8'
                           )
ServerDatabase.py 文件源码 项目:cscoins 作者: csgames 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def connect(self):
        conn = None
        try:
            conn = MySQLdb.Connect(
                host=self.hostname,
                port=self.port,
                user=self.username,
                password=self.password,
                db=self.db)
        except Exception as e:
            print("MySQL Connection error : {0}".format(e))

        return conn
ServerDatabase.py 文件源码 项目:cscoins 作者: BrandonWade 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
        conn = None
        try:
            conn = MySQLdb.Connect(host=self.hostname, port=self.port, user=self.username, password=self.password, db=self.db)
        except Exception as e:
            print("MySQL Connection error : {0}".format(e))

        return conn
mysql_curd.py 文件源码 项目:python 作者: hienha 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def getConn(conninfo):
    host = conninfo['host']
    user = conninfo['user']
    password = conninfo['password']
    port = conninfo['port']
    sock = conninfo['sock']
    db = conninfo['db']
    # print host, user, password, port, db
    try:
        conn = MySQLdb.Connect(host=host, user=user, passwd=password, port=port, unix_socket=sock, db=db, charset='utf8')
    except Exception as e:
        print e
        sys.exit()

    return conn
mysqk.py 文件源码 项目:myquerykill 作者: seanlook 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def db_reconnect(db_user, db_id):
    db_pass = settings.DB_AUTH[db_user]
    pc = prpcryptec.prpcrypt(KEY_DB_AUTH)

    db_instance = get_setttings("db_info", db_id)
    db_host, db_port = db_instance.replace(' ', '').split(':')

    db_conn = None

    while not db_conn:
        try:
            logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
                        db_id, db_host, db_user, db_port)
            db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
                                      connect_timeout=5, use_unicode=False)

        except MySQLdb.Error, e:

            if e.args[0] in (2013, 2003):
                logger.critical('Error %d: %s', e.args[0], e.args[1])
                logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s",
                            db_id, db_host, db_user, db_port)
                db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port),
                                          connect_timeout=5, use_unicode=False)

        except Exception as inst:
            print "Error %s %s" % type(inst), inst.args.__str__()

        time.sleep(10)

    return db_conn


# judge this thread meet kill_opt or not
databaseMysql.py 文件源码 项目:Snakepit 作者: K4lium 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.__cfgReporting = Config(os.path.join(RAGPICKER_ROOT, 'config', 'reporting.conf'))
        self.__mysqlEnabled = self.__cfgReporting.getOption("mysql", "enabled")

        if self.__mysqlEnabled:
            #Anbindung an Datenbank MySQL herstellen
            try:
                mysqlHost = self.__cfgReporting.getOption("mysql", "host")
                mysqlPort = self.__cfgReporting.getOption("mysql", "port")
                mysqlDatabase = self.__cfgReporting.getOption("mysql", "database")
                mysqlUser = self.__cfgReporting.getOption("mysql", "user")
                mysqlPassword = self.__cfgReporting.getOption("mysql", "password")
                self.__mysqlConnection = MySQLdb.Connect(host=mysqlHost, port=mysqlPort, db=mysqlDatabase, user=mysqlUser, passwd=mysqlPassword)
            except (Exception) as e:
                raise Exception("Cannot connect to MySQL (ragpicker): %s" % e)
db.py 文件源码 项目:OnlineSchemaChange 作者: facebookincubator 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def default_get_mysql_connection(
        user_name, user_pass, socket, dbname='',
        timeout=60, connect_timeout=10, charset=None):
    """
    Default method for connection to a MySQL instance.
    You can override this behaviour by define/import in cli.py and pass it to
    Payload at instantiation time.
    The function should return a valid Connection object just as
    MySQLdb.Connect does.
    """
    connection_config = {
        'user': user_name,
        'passwd': user_pass,
        'unix_socket': socket,
        'db': dbname,
        'use_unicode': True,
        'connect_timeout': connect_timeout
    }
    if charset:
        connection_config['charset'] = charset
    dbh = MySQLdb.Connect(**connection_config)
    dbh.autocommit(True)
    if timeout:
        cursor = dbh.cursor()
        cursor.execute("SET SESSION WAIT_TIMEOUT = %s", (timeout,))
    return dbh
dbhelper.py 文件源码 项目:NewBisMonitor 作者: cCloudSky 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def c2db(self):
        self.conn = MySQLdb.Connect(
            host = self.host,
            port = self.port,
            user = self.user,
            passwd = self.passwd,
            db = self.db,
            )
        self.cur = self.conn.cursor()
mysql_schema_info.py 文件源码 项目:DBschema_gather 作者: seanlook 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query_table_info(*dbinfo):

    sql_str = """
    SELECT
        IFNULL(@@hostname, @@server_id) SERVER_NAME,
        %s as HOST,
        t.TABLE_SCHEMA,
        t.TABLE_NAME,
        t.TABLE_ROWS,
        t.DATA_LENGTH,
        t.INDEX_LENGTH,
        t.AUTO_INCREMENT,
      c.COLUMN_NAME,
      c.DATA_TYPE,
      LOCATE('unsigned', c.COLUMN_TYPE) COL_UNSIGNED
      # CONCAT(c.DATA_TYPE, IF(LOCATE('unsigned', c.COLUMN_TYPE)=0, '', '_unsigned'))
    FROM
        information_schema.`TABLES` t
    LEFT JOIN information_schema.`COLUMNS` c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA
    AND t.TABLE_NAME = c.TABLE_NAME
    AND c.EXTRA = 'auto_increment'
    WHERE
        t.TABLE_SCHEMA NOT IN (
            'mysql',
            'information_schema',
            'performance_schema',
            'sys'
        )
    AND t.TABLE_TYPE = 'BASE TABLE'
    """
    dbinfo_str = "%s:%d" % (dbinfo[0], dbinfo[1])
    rs = ({},)
    try:
        db_conn = MySQLdb.Connect(host=dbinfo[0], port=dbinfo[1], user=dbinfo[2], passwd=dbinfo[3],
                                  connect_timeout=5)
        cur = db_conn.cursor(MySQLdb.cursors.DictCursor)
        param = (dbinfo_str,)

        print "\n[%s] Get schema info from db: '%s'..." % (datetime.today(), dbinfo_str)
        cur.execute(sql_str, param)

        rs = cur.fetchall()

    except MySQLdb.Error, e:
        print "Error[%d]: %s (%s)" % (e.args[0], e.args[1], dbinfo_str)
        exit(-1)

    if InfluxDB_INFO is not None:
        print "Write '%s' schema table info to influxdb ..." % dbinfo_str
        write_influxdb(*rs)
    else:
        print rs


问题


面经


文章

微信
公众号

扫码关注公众号