python类connect()的实例源码

SettingCheck.py 文件源码 项目:BDA 作者: kotobukki 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def checkUserDbGrants(self, username):
        if self.cursor is None:
            self.connect()
        self.cursor.execute("select Drop_priv, Grant_priv, References_priv,Create_tmp_table_priv,"
                            "Lock_tables_priv,Create_routine_priv,Alter_routine_priv,Execute_priv,"
                            "Event_priv,Trigger_priv from mysql.db "
                            "where User ='%s'" % username)
        res = self.cursor.fetchall()
        flag = 0
        for row in res:
            # print row
            for key, value in row.items():
                if value == 'Y':
                    flag = 1
                    LOg.log_warn('setting %s = %s is suggested!' % (key, 'N'))
        if (flag == 0):
            Log.log_pass('All the setting are approriate!')
recipe-576574.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def createDefCon(self):

                try:
                        host = "127.0.0.1"
                        port = 3306         ### default mysql port, change if you know better
                        user = "krisk"      ### def parameters
                        passwd = "kish"     ### def parameters
                        db = "loginfo"      ### connection.user_info contains the autho users

                        ### Create a connection object, use it to create a cursor

                        con = MySQLdb.connect(host = host  ,port = port , user = user,passwd = passwd ,db = db)
                        return con ### returns a connection object

                except: 
                        return 0;



####################################### Test connection #######################################################
recipe-578774.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def populate_store(self, store):

        try:
            connection = None
            connection = MySQLdb.connect('localhost', 'annon', 'pass')
            cursor = connection.cursor()
            cursor.execute("Select * From `INFORMATION_SCHEMA`.`SCHEMATA`")
            rows = cursor.fetchall()

            for row in rows:
                store.append([row[0], row[1], row[2], row[3]])

        except MySQLdb.Error, e:
            store.append([str(e.args[0]), e.args[1], '', ''])

        finally:
            if connection != None:
                connection.close()
scripthelpers.py 文件源码 项目:supremm 作者: ubccr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getdbconnection(configsection, as_dict=False, defaultargs={}):
    """ Helper function that gets a database connection object from a config dictionary """

    dbengine = configsection['dbengine'] if 'dbengine' in configsection else 'MySQLDB'

    if dbengine == 'MySQLDB':

        dbargs = defaultargs.copy()
        # Convert the external configuration names to python PEP-249 config names
        translate = {"host": "host", 
                     "defaultsfile": "read_default_file",
                     "user": "user",
                     "pass": "passwd",
                     "port": "port"}

        for confval, myval in translate.iteritems():
            if confval in configsection:
                dbargs[myval] = configsection[confval]

        if as_dict:
            dbargs['cursorclass'] = MySQLdb.cursors.DictCursor

        return MySQLdb.connect(**dbargs)
    else:
        raise Exception("Unsupported database engine %s" % (dbengine))
mysqlconnpool.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def new_connection(conn_argkw, conv=None, options=None):

    # useful arg could be added in future.:
    # conn_argkw.init_command

    conv = conv or {}
    options = options or {}

    opt = {
        'autocommit': 1,
    }
    opt.update(options)

    conn = MySQLdb.connect(conv=conv, **conn_argkw)
    for k, v in opt.items():
        conn.query('set {k}={v}'.format(k=k, v=v))

    return conn
database.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cursor(self, autocommit=False, readonly=False):
        conv = MySQLdb.converters.conversions.copy()
        conv[float] = lambda value, _: repr(value)
        conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
        args = {
            'db': self.database_name,
            'sql_mode': 'traditional,postgresql',
            'use_unicode': True,
            'charset': 'utf8',
            'conv': conv,
        }
        uri = parse_uri(config.get('database', 'uri'))
        assert uri.scheme == 'mysql'
        if uri.hostname:
            args['host'] = uri.hostname
        if uri.port:
            args['port'] = uri.port
        if uri.username:
            args['user'] = uri.username
        if uri.password:
            args['passwd'] = urllib.unquote_plus(uri.password)
        conn = MySQLdb.connect(**args)
        cursor = Cursor(conn, self.database_name)
        cursor.execute('SET time_zone = "+00:00"')
        return cursor
database.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def list(cursor):
        now = time.time()
        timeout = config.getint('session', 'timeout')
        res = Database._list_cache
        if res and abs(Database._list_cache_timestamp - now) < timeout:
            return res
        cursor.execute('SHOW DATABASES')
        res = []
        for db_name, in cursor.fetchall():
            try:
                database = Database(db_name).connect()
            except Exception:
                continue
            cursor2 = database.cursor()
            if cursor2.test():
                res.append(db_name)
                cursor2.close(close=True)
            else:
                cursor2.close()
                database.close()
        Database._list_cache = res
        Database._list_cache_timestamp = now
        return res
database_helpers.py 文件源码 项目:envoy-perf 作者: envoyproxy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def AuthorizeMachineAndConnectDB(db_instance_name, project, username, database,
                                 logfile):
  """This function authorizes a machine to connect to a DB.

  Args:
    db_instance_name: name of the DB instance.
    project: name of the project in which the db_instance belongs to.
    username: username that will be used to log in to the DB
    database: name of the database to connect to
    logfile: logfile for the gcloud command in shell
  Returns:
    Returns the connection after being connected to the DB.
  """
  hostname = GetInstanceIP(db_instance_name, project)

  password = utils.GetRandomPassword()
  sh_utils.RunGCloudService(["users", "set-password", username, "%",
                             "--instance", db_instance_name, "--password",
                             password], project=project,
                            service="sql", logfile=logfile)
  print "DB Usernames and passwords are set."
  connection = MySQLdb.connect(host=hostname, user=username,
                               passwd=password, db=database)
  return connection
image_info.py 文件源码 项目:DockerSecurityResearch 作者: puyangsky 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getImageName(_index):
    global MAX
    if _index > MAX:
        return "-1"

    conn = MySQLdb.connect(IP, DBUSERNAME, DBPWD, DBNAME)
    cursor = conn.cursor()
    sql = "select url from image where id = %d" % _index
    cursor.execute(sql)
    url = cursor.fetchone()
    print url
    if url != None:
        url = str(url[0])
        items = url.split('/')
        if len(items) != 7:
            return "#error#"
        name = items[-3] + '/' + items[-2]
        return name
    else:
        print "+++None return"
        return "#error#"

    conn.close()
insertmysql.py 文件源码 项目:dpspider 作者: doupengs 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4):
        '''
        :param host: <class str|host name>
        :param user: <class str|user name>
        :param password: <class str|password>
        :param db: <class str|database name>
        :param charset: default='utf8' <class str>
        :param logFile: default=None <class str>
        :param color: default=True <class bool>
        :param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL>
        '''
        self.logFile = logFile
        self.color = color
        self.debug = debug
        self.success = 0
        self.fail = 0
        self.repeat = 0
        self._conn = MYSQL.connect(host,user,password,db,charset=charset)
        self._cursor = self._conn.cursor()
migrate_mysql_to_influxdb.py 文件源码 项目:infra_helpers 作者: ayush-sharma 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_data_from_mysql(host: str, username: str, password: str, db: str, sql: str):
    """ Run SQL query and get data from MySQL table. """

    db = MySQLdb.connect(host, username, password, db)

    cursor = db.cursor(MySQLdb.cursors.DictCursor)

    try:
        cursor.execute(sql)
        data = cursor.fetchall()
    except Exception as e:
        print("MySQL error %s: %s" % (e.args[0], e.args[1]))
        data = None

    db.close()

    return data
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def insertBooks(self, title, page, bookQuote):
        # connect to MySQL
        conn, cursor = self.connect()

        self.useGuiDB(cursor)

        # insert data
        cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES (%s,%s)", (title, page))

        # last inserted auto increment value   
        keyID = cursor.lastrowid 
        # print(keyID)

        cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \
                       (bookQuote, keyID))

        # commit transaction
        conn.commit ()

        # close cursor and connection
        self.close(cursor, conn)

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def insertBooksExample(self):
        # connect to MySQL
        conn, cursor = self.connect()

        self.useGuiDB(cursor)

        # insert hard-coded data
        cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES ('Design Patterns', 17)")

        # last inserted auto increment value   
        keyID = cursor.lastrowid 
        print(keyID)

        cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \
                       ('Programming to an Interface, not an Implementation', keyID))

        # commit transaction
        conn.commit ()

        # close cursor and connection
        self.close(cursor, conn)

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def showBooks(self):
        # connect to MySQL
        conn, cursor = self.connect()    

        self.useGuiDB(cursor)    

        # print results
        cursor.execute("SELECT * FROM Books")
        allBooks = cursor.fetchall()
        print(allBooks)

        # close cursor and connection
        self.close(cursor, conn)   

        return allBooks     

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def showData(self):
        # connect to MySQL
        conn, cursor = self.connect()   

        self.useGuiDB(cursor)      

        # execute command
        cursor.execute("SELECT * FROM books")
        print(cursor.fetchall())

        cursor.execute("SELECT * FROM quotations")
        print(cursor.fetchall())

        # close cursor and connection
        self.close(cursor, conn) 

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def showDataWithReturn(self):
        # connect to MySQL
        conn, cursor = self.connect()   

        self.useGuiDB(cursor)      

        # execute command
        cursor.execute("SELECT * FROM books")
        booksData = cursor.fetchall()

        cursor.execute("SELECT * FROM quotations")
        quoteData = cursor.fetchall()

        # close cursor and connection
        self.close(cursor, conn) 

        # print(booksData, quoteData)
        for record in quoteData:
            print(record)

        return booksData, quoteData

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def updateGOF(self):
        # connect to MySQL
        conn, cursor = self.connect()   

        self.useGuiDB(cursor)      

        # execute command
        cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'")
        primKey = cursor.fetchall()[0][0]
        print("Primary key=" + str(primKey))

        cursor.execute("SELECT * FROM quotations WHERE Books_Book_ID = (%s)", (primKey,))
        print(cursor.fetchall())

        # close cursor and connection
        self.close(cursor, conn) 

    #------------------------------------------------------
GUI_MySQL_class.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def deleteRecord(self):
        # connect to MySQL
        conn, cursor = self.connect()   

        self.useGuiDB(cursor)      

        try: 
            # execute command
            cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'")
            primKey = cursor.fetchall()[0][0]
            # print(primKey)

            cursor.execute("DELETE FROM books WHERE Book_ID = (%s)", (primKey,))

            # commit transaction
            conn.commit ()
        except:
            pass

        # close cursor and connection
        self.close(cursor, conn)     


#==========================================================
ost_perf_history_table_handler.py 文件源码 项目:lustre_task_driven_monitoring_framework 作者: GSI-HPC 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_table(self):

        sql = """
CREATE TABLE """ + self._table_name + """ (
   id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
   read_timestamp  TIMESTAMP NOT NULL DEFAULT "0000-00-00 00:00:00",
   write_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   ost CHAR(7) NOT NULL,
   ip CHAR(15) NOT NULL,
   size BIGINT(20) UNSIGNED NOT NULL,
   read_throughput BIGINT(20) SIGNED NOT NULL,
   write_throughput BIGINT(20) SIGNED NOT NULL,
   read_duration INT(10) SIGNED NOT NULL,
   write_duration INT(10) SIGNED NOT NULL,
   PRIMARY KEY (id)
) ENGINE=MyISAM DEFAULT CHARSET=latin1
"""

        logging.debug("Creating database table:\n" + sql)

        with closing(MySQLdb.connect(host=self._host, user=self._user, passwd=self._passwd, db=self._db)) as conn:
            with closing(conn.cursor()) as cur:
                cur.execute(sql)
mysql_util.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def query_single(_db_config, _sql, _args):
    config = _db_config
    conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'],
                           db=config['db'], charset=config['charset'], use_unicode=True)
    cursor = conn.cursor(MySQLdb.cursors.DictCursor)
    result = ()
    try:
        cursor.execute(_sql, _args)
        result = cursor.fetchall()
    except:
        pass
        rootLogger.error("query exception sql is %s ,_args is %s,stacks is %s", _sql, _args, get_caller_info_total())
        rootLogger.exception("message")
    finally:
        cursor.close()
        conn.close()
    return result


# ===============================================
# FUNCTION  ??????
# ===============================================
mysql_util.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def insertOrUpdate_getId(_db_config, _sql, _args):
    result = 0
    id = 0
    config = _db_config
    conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'],
                           db=config['db'], charset=config['charset'], use_unicode=True)
    cursor = conn.cursor(MySQLdb.cursors.DictCursor)
    try:
        cursor.execute(_sql, _args)
        id = conn.insert_id()
        conn.commit()
        result = cursor.rowcount
    except:
        pass
        rootLogger.error("exception sql is %s ,_args is %s", _sql, _args)
        rootLogger.exception("message")
        conn.rollback()
    finally:
        print("affected rows = {}".format(cursor.rowcount))
        cursor.close()
        conn.close()
    return result, id
db.py 文件源码 项目:entity-linker 作者: seucs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, db_name):
        if db_name == 'wiki':
            kb_name = 'wiki_kb'
        elif db_name == 'baidu':
            kb_name = 'kb'
        elif db_name == 'hudong':
            kb_name = 'hudong_kb'

        self.db = MySQLdb.connect(host='192.168.1.104',user="root",passwd="",db=kb_name ,charset="utf8", cursorclass=cursors.SSCursor)  
        self.cur = self.db.cursor()
        self.db_titles = []
        self.db_disambiguations = []

        print 'db is loading......'
        if os.path.exists('title_disambiguation.data'):
            with open('title_disambiguation.data','r') as f:
                self.db_titles, self.db_disambiguations = pickle.load(f)
        else:
            self.load_save_db()
        print 'db has been loaded !'
db.py 文件源码 项目:entity-linker 作者: seucs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, db_name):
        if db_name == 'wiki':
            kb_name = 'wiki_kb'
        elif db_name == 'baidu':
            kb_name = 'kb'
        elif db_name == 'hudong':
            kb_name = 'hudong_kb'

        self.db = MySQLdb.connect(host='192.168.1.104',user="root",passwd="",db=kb_name ,charset="utf8", cursorclass=cursors.SSCursor)  
        self.cur = self.db.cursor()
        self.db_titles = []
        self.db_disambiguations = []

        print 'db is loading......'
        if os.path.exists('title_disambiguation.data'):
            with open('title_disambiguation.data','r') as f:
                self.db_titles, self.db_disambiguations = pickle.load(f)
        else:
            self.load_save_db()
        print 'db has been loaded !'
cook_device.py 文件源码 项目:cookdevice 作者: Simone-Della 项目源码 文件源码 阅读 121 收藏 0 点赞 0 评论 0
def search_ID_in_list_and_connect(search_ID):

  CUR.execute("SELECT Id, Ip, DeviceName, Session FROM %s" % line[7][:-1])
  row = CUR.fetchone()

  while row is not None:
    rows = {row[0] : row[1]}
    type_protocol = row[3]
    row = CUR.fetchone()
    for key in rows.keys():
      if search_ID == key:
        rb1 = str(rows.values())
        ip_device = rb1[2:-2]
        # verify where you try to connect
        print 'Connect to: ' + ip_device
        # condition which protocol use, view in coloumn session in db
        if 'ssh' in type_protocol:
          proto_connections.ssh_connections(ip_device)
        else:
          proto_connections.telnet_connections(ip_device)

  CUR.close()
  CONN.close()

# main function
cook_device.py 文件源码 项目:cookdevice 作者: Simone-Della 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():

  banner()
  # search device but is empty return device not found!
  try:
    userdate = raw_input('Search: ')
    while len(userdate) <= 0:
      userdate = raw_input('Search: ')
    search(userdate)
    # search ID for connect but if empty exit
    global_ItemFound
    search_ID = input('Connect to ID: ')
    search_ID_in_list_and_connect(search_ID)
  # manage exception KeyboardError ctrl+c and NameError
  except KeyboardInterrupt, e:
      print "\nexit"
  except NameError, e:
      print "%s, not found !" % userdate
  except EOFError, e:
      print "\nEOFError, exit"
  except SyntaxError, e:
      print "\nSyntaxError, exit"
mysqlConnector.py 文件源码 项目:pyExamples 作者: mike-zhang 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def execute(self,query,doCommit=False):
        '''exec query insert or delete'''
        retCount = 0
        #self.logger.debug(query)
        try:
            cursor = self.conn.cursor()
            retCount = cursor.execute(query)
        except Exception:
            #self.logger.debug("query : {0}".format(query))
            self.connect()
            if self.conn :
                cursor = self.conn.cursor()
                retCount = cursor.execute(query)
        if doCommit :
            #self.logger.debug("before commit")
            self.commit()
            #self.logger.debug("after commit")
        return retCount
mysqlConnector.py 文件源码 项目:pyExamples 作者: mike-zhang 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def execWithRet(self,query):
        '''exec query '''
        retList = []

        try :
            cursor = self.conn.cursor()
            count = cursor.execute(query)
            retList = cursor.fetchmany(count)
        except Exception:
            #self.logger.debug("query : {0}".format(query))
            self.connect()
            if self.conn :
                cursor = self.conn.cursor()
                count = cursor.execute(query)
                retList = cursor.fetchmany(count)
        return retList
mysql_conn.py 文件源码 项目:MyPythonLib 作者: BillWang139967 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self):
        self.con = mdb.connect(host=config.mysql_host, user=config.mysql_user, passwd=config.mysql_passwd, db=config.mysql_db, charset=config.mysql_charset)
        self.cur = self.con.cursor(mdb.cursors.DictCursor)
        self.fromlist = []
        self.wherelist = []
        self.deletelist = []
        self.selectlist = []
        self.orderlist = []
        self.limitlist = []
        self.updatelist = []
        self.inslist = []
        self.leftjoinlist = []
        self.selectas = ''
        self.selectas_b = ''
        self.sql = ''
        self.withtag = False
DB.py 文件源码 项目:CTPOrderService 作者: lllzzz 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self):

        env = C.get('sys', 'env')
        host = C.get('mysql_' + env, 'host')
        user = C.get('mysql_' + env, 'user')
        passwd = C.get('mysql_' + env, 'password')
        name = C.get('mysql_' + env, 'name')

        self.db = MySQLdb.connect(
            host = host,
            port = 3306,
            user = user,
            passwd = passwd,
            db = name,
            charset='utf8',
            cursorclass = MySQLdb.cursors.DictCursor)
        self.cursor = self.db.cursor()
camisade.py 文件源码 项目:camisade 作者: tomride 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def bannerread(host,port):
    try:
        if port==80 or port==8443 or port==2089 or port==10000:
            con = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            con.connect((host,port))
            con.send("GET / HTTP/1.1\r\n\r\n\r\n")
            data = (con.recv(200))
            dbcon(host,port,data)
            return(data)
        elif port==53:
            pkt = IP(dst=host)/UDP(dport=port,sport=RandShort())/DNS(aa=0L, qr=0L, qd=DNSQR(qclass=3, qtype=16, qname='version.bind.'))
            x = sr1(pkt)
            ban = x[DNS].summary()
            dbcon(host,port,data)
            return(ban)
        else:
            conexion = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            conexion.settimeout(1.0) #Timeout - socket non-blocking
            conexion.connect((host,port))
            banner = conexion.recv(1024)
            dbcon(host,port,banner)
            return(banner)
    except:
        return("No Banner")


问题


面经


文章

微信
公众号

扫码关注公众号