python类connect()的实例源码

db.py 文件源码 项目:bark 作者: kylerbrown 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_root(root, db_connection_string, events=False):
    db = dataset.connect(db_connection_string)
    root_table = db['root']
    entry_table = db['entry']
    dataset_table = db['dataset']
    column_table = db['column']
    if events:
        event_table = db['event']
    root_table.upsert(root_to_record(root), ['path'])
    for entrykey in root.entries:
        entry = root[entrykey]
        entry_table.upsert(entry_to_record(entry, root), ['path'])
        for dsetkey in entry.datasets:
            dset = entry[dsetkey]
            dataset_table.upsert(dset_to_record(dset, entry), ['path'])
            for column in dset_columns_to_records(dset):
                column_table.upsert(column, ['path', 'name'])
            if events and isinstance(dset, bark.EventData):
                for event in events_to_records(dset):
                    event_table.upsert(event, ['path', 'index'])
createDataBase.py 文件源码 项目:MonsterMizer 作者: oliviermirat 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def createTable(tab):

    tableName=tab.tableName
    columns=tab.m  

    db = dataset.connect('sqlite:///olidata.db')

    db.query('DROP table IF EXISTS '+tableName)

    table = db.create_table(tableName)

    for row in columns:
        if (row[2]=='int'):
            table.create_column(row[0], sqlalchemy.Integer)
        elif (row[2]=='varchar'):
            table.create_column(row[0], sqlalchemy.VARCHAR(255))

    cols=', '.join(columns[:,0])
    db.query('DROP INDEX IF EXISTS unique_name')
    db.query('create unique index unique_name on '+tableName+'('+cols+')')

    print(db[tableName].columns)
collect_rdataframe.py 文件源码 项目:bioimg-sciluigi-casestudy 作者: pharmbio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_audit_files():
    audit_paths = glob.glob('audit/workflow_mmlinear_started_20151026_17*')
    audit_data = []
    for path in audit_paths:
        with open(path) as fh:
            cp = ConfigParser()
            cp.readfp(fh)
            for sec in cp.sections():
                dat = dict(cp.items(sec))
                if 'train_lin' in dat['instance_name']:
                    ms = re.match('train_lin_trn([0-9]+|rest)_tst([0-9]+)_c([0-9\.]+)', dat['instance_name'])
                    if ms is None:
                        raise Exception('No match in name: ' + dat['instance_name'])
                    m = ms.groups()
                    dat['training_size'] = m[0]
                    dat['test_size'] = m[1]
                    dat['cost'] = m[2]
                    audit_data.append(dat)
    db = dataset.connect('sqlite:///:memory:')
    tbl = db['audit']
    for d in audit_data:
        tbl.insert(d)
    return tbl
utils.py 文件源码 项目:collect-social 作者: Data4Democracy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setup_db(connection_string):
    db = dataset.connect(connection_string)

    connections = db['connection']
    users = db['user']
    tweets = db['tweet']
    medias = db['media']
    mentions = db['mention']
    urls = db['url']
    hashtags = db['hashtag']

    tweets.create_index(['tweet_id'])
    medias.create_index(['tweet_id'])
    mentions.create_index(['user_id'])
    mentions.create_index(['mentioned_user_id'])
    urls.create_index(['tweet_id'])
    hashtags.create_index(['tweet_id'])
    users.create_index(['user_id'])
    connections.create_index(['friend_id'])
    connections.create_index(['follower_id'])

    return db
utils.py 文件源码 项目:collect-social 作者: Data4Democracy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_db(connection_string):
    db = dataset.connect(connection_string)

    pages = db['page']
    users = db['user']
    posts = db['post']
    comments = db['comment']
    interactions = db['interaction']

    users.create_index(['user_id'])
    posts.create_index(['post_id'])
    comments.create_index(['comment_id'])
    comments.create_index(['post_id'])
    interactions.create_index(['comment_id'])
    interactions.create_index(['post_id'])
    interactions.create_index(['user_id'])

    return db
promo_alert.py 文件源码 项目:ShiokBot 作者: kianhean 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def subscribe(user_id):
    """ subscribe to the database """
    db = dataset.connect(database_url)
    table = db['subscriptions']

    if table.find_one(id_user=user_id) is None:
        table.insert(dict(id_user=user_id))
        text_ = """This thread has succesfully subscribed to recieve New Uber Codes! \nI will send you the latest Uber Promo Codes when they get released so that you can apply them first!\n\n"""
        text_ += """These are the latest codes right now\n\n"""
        new_codes = get_code()

        for key in new_codes:
            text_ += "<b>" + key + "</b> | Expires - " + new_codes[key][0] + " | " + new_codes[key][1]
            text_ += "\n"
        return text_
    else:
        return "This thread is already subscribed to recieve New Uber Codes!"
promo_alert.py 文件源码 项目:ShiokBot 作者: kianhean 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_new_codes():
    """ Return New Codes and Refresh DB"""
    db = dataset.connect(database_url)
    new_codes = get_code()

    table = db['promo']

    """ Get New Codes"""
    new = {}

    for key, value in new_codes.items():

        if table.find_one(promo=key) is None:
            new[key] = [new_codes[key][0], new_codes[key][1]]
        else:
            pass

    """ Add to DB """
    for key in new:
        table.insert(dict(promo=key, desc=new_codes[key][1], exp=new_codes[key][0]))

    return new
train_alert.py 文件源码 项目:ShiokBot 作者: kianhean 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_new_breakdowns():
    """ Return Breakdown Notifications and Refresh DB"""
    db = dataset.connect(database_url)
    new_breakdown = all_breakdowns()

    table = db['train']

    """ Get New Breakdowns"""
    new = []
    for key, value in new_breakdown.items():

        if table.find_one(tweet_id=key) is None:
            new.append(key)
        else:
            pass

    """ Add to DB """
    for key in new:
        table.insert(dict(tweet_id=key,
                          tweet=new_breakdown[key]['tweet'],
                          created_at=new_breakdown[key]['created_at']))

    return new
jx_usingDataset.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.db = dataset.connect('sqlite:///:memory:')
database.py 文件源码 项目:SiDBot 作者: TiagoDanin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def incr_database(table=None, name=None):
    db = dataset.connect('sqlite:///db' + hash)
    if table and name:
        table = db[table]
        r = table.find_one(name=name)
        if r == None:
            table.insert(dict(name=name, value=0))
            r = table.find_one(name=name)

        new = r['value'] + 1
        a = dict(name=name, value=new)
        table.update(a, ['name'])
        return new
    return Falseacpi -V
languages.py 文件源码 项目:SiDBot 作者: TiagoDanin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_user_lang(self):
    db = dataset.connect('sqlite:///db' + hash)
    table = db['user:' + str(self.bot_type) + str(self.chat_id)]
    r = table.find_one(info='lang')
    if r == None:
        table.insert(dict(info='lang', value=defaut_lang))
        return defaut_lang

    return r['value']
languages.py 文件源码 项目:SiDBot 作者: TiagoDanin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_user_lang(self, new_lag):
    if get_user_lang(self) == new_lag:
        return new_lag
    else:
        db = dataset.connect('sqlite:///db' + hash)
        table = db['user:' + str(self.bot_type) + str(self.chat_id)]
        a = dict(info='lang', value=new_lag)
        table.update(a, ['info'])
        return new_lag
status.py 文件源码 项目:SiDBot 作者: TiagoDanin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_status(input):
    db = dataset.connect('sqlite:///db' + hash)
    table = db['status']
    r = table.find_one(name=input)
    if r == None:
        return str(input + ': 0')
    i = r['value']
    output = str(input + ': ' + str(i))
    return output
moh_smell_category_borough_json.py 文件源码 项目:Smelly-London 作者: Smelly-London 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect_to_db():
    """Connecting to DB"""
    db = dataset.connect('sqlite:///../../database/smells.sqlite')
    return db
sqlite_utilities_v2.py 文件源码 项目:Smelly-London 作者: Smelly-London 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect_to_db():
    """ Connect to an SQLite database. Return a connection."""
    db = dataset.connect('sqlite:///../database/smells.sqlite')
    return db
sqlite_utilities_v2.py 文件源码 项目:Smelly-London 作者: Smelly-London 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def main():
    db = dataset.connect('sqlite:///../database/smells.sqlite')

    sqlite_file = '/home/jen/projects/smelly_london/git/smelly_london/database'
    column_names = ['category', 'location', 'number_of_smells', 'centroid_lat', 'centroid_lon', 'id', 'year', 'sentence']
    sql = 'select {column_names} from (select Category category, Borough location, Id id, Year year, Sentence sentence, count(*) number_of_smells from smells group by Category, Borough having Year = "1904") join locations on location = name;'

    conn, cur = connect_to_sqlite_db(sqlite_file)
    data = sql_get_data_colnames(cur, sql, column_names)

    close_sqlite_connection(conn)    

    return data
smell_datamine_multiprocessing.py 文件源码 项目:Smelly-London 作者: Smelly-London 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save_to_database(self, results):
        """Save results to the database."""
        db = dataset.connect('sqlite:///../database/smells.sqlite')
        table = db['smells']
        for result in results:
            table.insert({'Category': result.category,
                          'Borough': result.borough,
                          'Year': result.year,
                          'Sentence': result.sentence,
                          'bID': result.bID,
                          'URL': result.url,
                          'MOH': result.mohRegion})
bearddbtable.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.db = dataset.connect(pyconfig.get('db_url'))
        self.db.__enter__()
        self.table = self.db.get_table(self.table_name, **self.kwargs)
        logger.debug(
            "BeardDBTable initalised with: self.table: {}, self.db: {}".format(
                self.table, self.db))
        return self
database.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_all_keys():
    with dataset.connect(config.db_name) as db:
        table = db['keys']
        return table.all()
database.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_key(chat_id):
    """Make key."""
    with dataset.connect(config.db_name) as db:
        table = db['keys']
        return table.insert(
            dict(
                chat_id=chat_id,
                key="".join(
                    (random.choice(string.ascii_letters) for x in range(20))
                )
            )
        )
database.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_key(chat_id):
    """Get key.

    If key exists, get key. If key does not exist, create key and get it.

    """
    with dataset.connect(config.db_name) as db:
        table = db['keys']
        existing_key = table.find_one(chat_id=chat_id)
        if existing_key:
            return existing_key
        else:
            make_key(chat_id)

    return get_key(chat_id)
database.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def is_key_match(key):
    logger.debug("Key is: {}".format(key))
    with dataset.connect(config.db_name) as db:
        table = db['keys']
        if table.find_one(key=key):
            return True
store.py 文件源码 项目:datasurvey 作者: occrp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, db_path):
        if db_path is None:
            self.engine = dataset.connect('sqlite:///:memory:')
        else:
            self.engine = dataset.connect('sqlite:///%s' % db_path)
        self.table = self.engine['files']
        self.table.delete()
utilities.py 文件源码 项目:MonsterMizer 作者: oliviermirat 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getDbStartEnd(start, end):
    db = dataset.connect('sqlite:///../2_createAndFillDatabase/olidata.db')

    startLin = str(toLinuxTime(start, '%Y/%m/%d %H:%M'))
    endLin = str(toLinuxTime(end, '%Y/%m/%d %H:%M'))

    return [db,startLin,endLin]
database_tools.py 文件源码 项目:mal 作者: chaosking121 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def openDB(databaseFilePath = None):
    if ((databaseFilePath == None) or (not (os.path.isfile(databaseFilePath)))):
        databaseFilePath = settings.getDBFilePath()

    databaseURL = 'sqlite:///{}'.format(databaseFilePath)

    return dataset.connect(databaseURL)
backend.py 文件源码 项目:smtk 作者: Data4Democracy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_sqlite(db_url, collection):
    """
    create required tables & indexes
    """
    if collection == 'twitter':
        db = dataset.connect(db_url)

        connections = db['connection']
        users = db['user']
        tweets = db['tweet']
        medias = db['media']
        mentions = db['mention']
        urls = db['url']
        hashtags = db['hashtag']

        tweets.create_index(['tweet_id'])
        medias.create_index(['tweet_id'])
        mentions.create_index(['user_id'])
        mentions.create_index(['mentioned_user_id'])
        urls.create_index(['tweet_id'])
        hashtags.create_index(['tweet_id'])
        users.create_index(['user_id'])
        connections.create_index(['friend_id'])
        connections.create_index(['follower_id'])

    return db
royal_calendar.py 文件源码 项目:kungahusgranskning 作者: jplusplus 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, db_path):
        db = dataset.connect('sqlite:///%s' % db_path)
        self.tables = {
            "by_event": db["by_event"],
            "by_person": db["by_person"]
        }
sqlite.py 文件源码 项目:chocolate 作者: AIworx-Labs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, url, result_table="results", complementary_table="complementary", space_table="space"):
        super(SQLiteConnection, self).__init__()
        if url.endswith("/"):
            raise RuntimeError("Empty database name {}".format(url))

        if url.endswith((" ", "\t")):
            raise RuntimeError("Database name ends with space {}".format(url))

        if not url.startswith("sqlite://"):
            raise RuntimeError("Missing 'sqlite:///' at the begin of url".format(url))

        if url == "sqlite://" or url == "sqlite:///:memory:":
            raise RuntimeError("Cannot use memory database as it exists only for the time of the connection")

        match = re.search("sqlite:///(.*)", url)
        if match is not None:
            db_path = match.group(1)
        else:
            raise RuntimeError("Cannot find sqlite db path in {}".format(url))

        self.url = url
        self.result_table_name = result_table
        self.complementary_table_name = complementary_table
        self.space_table_name = space_table

        self._lock = filelock.FileLock("{}.lock".format(db_path))
        self.hold_lock = False

        # with self.lock():
        #     db = dataset.connect(self.url)

        #     # Initialize a result table and ensure float for loss
        #     results = db[self.result_table_name]
        #     results.create_column("_loss", sqlalchemy.Float)
sqlite.py 文件源码 项目:chocolate 作者: AIworx-Labs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def all_results(self):
        """Get a list of all entries of the result table. The order is
        undefined.
        """
        # Only way to ensure old db instances are closed is to force garbage collection
        # See dataset note : https://dataset.readthedocs.io/en/latest/api.html#notes
        gc.collect()
        db = dataset.connect(self.url)
        return list(db[self.result_table_name].all())
sqlite.py 文件源码 项目:chocolate 作者: AIworx-Labs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def find_results(self, filter):
        """Get a list of all results associated with *filter*. The order is
        undefined.
        """
        gc.collect()
        db = dataset.connect(self.url)
        return list(db[self.result_table_name].find(**filter))


问题


面经


文章

微信
公众号

扫码关注公众号