python类where()的实例源码

test_utils.py 文件源码 项目:ESPEI 作者: PhasesResearchLab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_adding_bibtex_entries_to_bibliography_db(datasets_db):
    """Adding a BibTeX entries to a database works and the database can be searched."""
    TEST_BIBTEX = """@article{Roe1952gamma,
author = {Roe, W. P. and Fishel, W. P.},
journal = {Trans. Am. Soc. Met.},
keywords = {Fe-Cr,Fe-Ti,Fe-Ti-Cr},
pages = {1030--1041},
title = {{Gamma Loop Studies in the Fe-Ti, Fe-Cr, and Fe-Ti-Cr Systems}},
volume = {44},
year = {1952}
}

@phdthesis{shin2007thesis,
author = {Shin, D},
keywords = {Al-Cu,Al-Cu-Mg,Al-Cu-Si,Al-Mg,Al-Mg-Si,Al-Si,Cu-Mg,Mg-Si,SQS},
number = {May},
school = {The Pennsylvania State University},
title = {{Thermodynamic properties of solid solutions from special quasirandom structures and CALPHAD modeling: Application to aluminum-copper-magnesium-silicon and hafnium-silicon-oxygen}},
year = {2007}
}"""
    db = add_bibtex_to_bib_database(TEST_BIBTEX, datasets_db)
    search_res = db.search(where('ID') == 'Roe1952gamma')
    assert len(search_res) == 1
    assert len(db.all()) == 2
cmd.py 文件源码 项目:mpsign 作者: abrasumente233 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def info(name=None):
    if name is None:
        user_rows = user_table.all()
    else:
        user_rows = [user_table.get(where('name') == name)]

    if len(user_rows) == 0:
        print('No user yet.')
        return

    if user_rows[0] is None:
        raise UserNotFound

    row_format = '{:>15}{:>15}{:>20}'

    print(row_format.format('Name', 'EXP', 'is BDUSS valid'))

    for user_row in user_rows:
        print(row_format.format(user_row['name'],
                                user_row['exp'],
                                str(User(user_row['bduss']).validation)))
test_storages.py 文件源码 项目:annotated-py-tinydb 作者: hhstore 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
test_tables.py 文件源码 项目:annotated-py-tinydb 作者: hhstore 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
test_tables.py 文件源码 项目:annotated-py-tinydb 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
sqs_db.py 文件源码 项目:prlworkflows 作者: PhasesResearchLab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _parse_atat_lattice(lattice_in):
    """Parse an ATAT-style `lat.in` string.

    The parsed string will be in three groups: (Coordinate system) (lattice) (atoms)
    where the atom group is split up into subgroups, each describing the position and atom name
    """
    float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])])
    vector = Group(float_number + float_number + float_number)
    angles = vector
    vector_line = vector + Suppress(LineEnd())
    coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd())))
    lattice = Group(vector + vector + vector)
    atom = Group(vector + Group(OneOrMore(Word(alphas + '_'))))
    atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom))
    # parse the input string and convert it to a POSCAR string
    return atat_lattice_grammer.parseString(lattice_in)
__init__.py 文件源码 项目:genorch 作者: genorch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_service(self, opts):
        for target in self.targets:
            docker_client = docker.DockerClient('tcp://' + common.translate_id(target)[0] +
                                                ':' + cfg.docker['API_PORT'])
            if "sub_driver" in opts:
                sub_driver_name = opts['sub_driver']
                sub_driver = getattr(docker_client, sub_driver_name)

                del opts['sub_driver']

                if sub_driver_name == 'swarm':
                    sub_driver_opts = opts['opts']
                    node_type = sub_driver_opts['type']

                    if node_type == 'manager':
                        sub_driver.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                        db.vms.update(insert_join_token(sub_driver.attrs['JoinTokens']), where('name') == target)
                    elif node_type == 'worker':
                        manager = db.vms.get(where('name') == sub_driver_opts['managers'][0])
                        sub_driver.join([common.id_to_swarm(sub_driver_opts['managers'][0])], manager['docker']['join_tokens']['Worker'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])

            else:
                docker_client.containers.run(**opts, detach=True)
swarm.py 文件源码 项目:genorch 作者: genorch 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def create_cluster(self):
        self.init = {}
        for vm in self.vms:
            docker_client = docker.DockerClient('tcp://' +
                                                common.translate_id(vm['id'])[0]
                                                + ':' + cfg.docker['API_PORT'])
            swarm_client = docker_client.swarm
            if vm['role'] == 'manager':
                swarm_client.init('eth0:' + cfg.docker['SWARM_PORT'],
                                  '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                db.vms.update(
                              insert_join_token(
                                  swarm_client.attrs['JoinTokens']
                                  ),
                              where('name') == vm['id'])

                self.vms.remove(vm)
                self.init = vm
                break

        for vm in self.vms:
            if vm['role'] == 'manager':
                self.add_manager(vm)
            elif vm['role'] == 'worker':
                self.add_worker(vm)
bot.py 文件源码 项目:blacklion 作者: Foohx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _fncChatBotAlly_cmdWhoPlays(self, m):
        self.log("+", "\t\tReceive command !who_plays")
        table = self.db['ranks'].table('server_'+str(self.account.account['serveur']))

        # scan_date = table.get(where('id'), len(table))
        # scan_date = scan_date['scan_date']

        ranks_old = table.all()
        ranks_now = self.account.getRanking()
        self.log("?", "\t\tSearching for actives users..")
        rapport = "Active players for last 5 days :\n\n"
        count = 0
        for rn in ranks_now:
            for ro in ranks_old:
                if rn['user'] == ro['user']:
                    if (ro['points'] - rn['points']) != 0:
                        count = count +1
                        rapport = rapport + "- @" + rn['user'] + " ("+str((rn['points'] - ro['points']))+" pts)\n"
                    if count >= 7:
                        count = 0
                        self.account.rSendMessageToAlliance(rapport)
                        rapport = ""
                    break
        rapport = rapport + "OVER !"
        self.account.rSendMessageToAlliance(rapport)
test_storages.py 文件源码 项目:PokeBot 作者: bcc5160 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
test_tables.py 文件源码 项目:PokeBot 作者: bcc5160 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
test_tables.py 文件源码 项目:PokeBot 作者: bcc5160 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
test_storages.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_json_readwrite(tmpdir):
    """
    Regression test for issue #1
    """
    path = str(tmpdir.join('test.db'))

    # Create TinyDB instance
    db = TinyDB(path, storage=JSONStorage)

    item = {'name': 'A very long entry'}
    item2 = {'name': 'A short one'}

    get = lambda s: db.get(where('name') == s)

    db.insert(item)
    assert get('A very long entry') == item

    db.remove(where('name') == 'A very long entry')
    assert get('A very long entry') is None

    db.insert(item2)
    assert get('A short one') == item2

    db.remove(where('name') == 'A short one')
    assert get('A short one') is None
test_tables.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_multiple_tables(db):
    table1 = db.table('table1')
    table2 = db.table('table2')
    table3 = db.table('table3')

    table1.insert({'int': 1, 'char': 'a'})
    table2.insert({'int': 1, 'char': 'b'})
    table3.insert({'int': 1, 'char': 'c'})

    assert table1.count(where('char') == 'a') == 1
    assert table2.count(where('char') == 'b') == 1
    assert table3.count(where('char') == 'c') == 1

    db.purge_tables()

    assert len(table1) == 0
    assert len(table2) == 0
    assert len(table3) == 0
test_tables.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_lru_cache(db):
    # Test integration into TinyDB
    table = db.table('table3', cache_size=2)
    query = where('int') == 1

    table.search(query)
    table.search(where('int') == 2)
    table.search(where('int') == 3)
    assert query not in table._query_cache

    table.remove(where('int') == 1)
    assert not table._query_cache.lru

    table.search(query)

    assert len(table._query_cache) == 1
    table.clear_cache()
    assert len(table._query_cache) == 0
db.py 文件源码 项目:tinydb-jsonorm 作者: abnerjacobsen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, db='nonedb.json'):

        # Storage and serialization
        serializer = SerializationMiddleware(tinydb.storages.JSONStorage)
        serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime')

        # A reference to the actual database object.
        self._conn = tinydb.TinyDB(db, storage=serializer)

        # Activat SmartCache
        self._conn.table_class = SmartCacheTable

        # A shortcut to ``tinydb.TinyDB.table`` method.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#tables
        # for reference.
        self.table = self._conn.table

        # A shortcut to ``tinydb.where`` object.
        # See http://tinydb.readthedocs.org/en/latest/usage.html#queries
        # for reference.
        self.where = tinydb.where
tinydb.py 文件源码 项目:telebot 作者: DronMDF 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set(self, name, value):
        db = TinyDB(self.filename)
        db.upsert({'name': name, 'value': value}, where('name') == name)
tinydb.py 文件源码 项目:telebot 作者: DronMDF 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get(self, name, default=None):
        db = TinyDB(self.filename)
        item = db.get(where('name') == name)
        if item is not None:
            return item.get('value', default)
        return default
core_utils.py 文件源码 项目:ESPEI 作者: PhasesResearchLab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_data(comps, phase_name, configuration, symmetry, datasets, prop):
    desired_data = datasets.search((tinydb.where('output').test(lambda x: x in prop)) &
                                   (tinydb.where('components').test(lambda x: set(x).issubset(comps))) &
                                   (tinydb.where('solver').test(symmetry_filter, configuration, list_to_tuple(symmetry) if symmetry else symmetry)) &
                                   (tinydb.where('phases') == [phase_name]))
    # This seems to be necessary because the 'values' member does not modify 'datasets'
    # But everything else does!
    desired_data = copy.deepcopy(desired_data)

    def recursive_zip(a, b):
        if isinstance(a, (list, tuple)) and isinstance(b, (list, tuple)):
            return list(recursive_zip(x, y) for x, y in zip(a, b))
        else:
            return list(zip(a, b))

    for idx, data in enumerate(desired_data):
        # Filter output values to only contain data for matching sublattice configurations
        matching_configs = np.array([(canonicalize(sblconf, symmetry) == canonicalize(configuration, symmetry))
                                     for sblconf in data['solver']['sublattice_configurations']])
        matching_configs = np.arange(len(data['solver']['sublattice_configurations']))[matching_configs]
        # Rewrite output values with filtered data
        desired_data[idx]['values'] = np.array(data['values'], dtype=np.float)[..., matching_configs]
        desired_data[idx]['solver']['sublattice_configurations'] = list_to_tuple(np.array(data['solver']['sublattice_configurations'],
                                                                                          dtype=np.object)[matching_configs].tolist())
        try:
            desired_data[idx]['solver']['sublattice_occupancies'] = np.array(data['solver']['sublattice_occupancies'],
                                                                             dtype=np.object)[matching_configs].tolist()
        except KeyError:
            pass
        # Filter out temperatures below 298.15 K (for now, until better refstates exist)
        temp_filter = np.atleast_1d(data['conditions']['T']) >= 298.15
        desired_data[idx]['conditions']['T'] = np.atleast_1d(data['conditions']['T'])[temp_filter]
        # Don't use data['values'] because we rewrote it above; not sure what 'data' references now
        desired_data[idx]['values'] = desired_data[idx]['values'][..., temp_filter, :]
    return desired_data
core_utils.py 文件源码 项目:ESPEI 作者: PhasesResearchLab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def symmetry_filter(x, config, symmetry):
    """
    Return True if the candidate sublattice configuration has any symmetry
    which matches the phase model symmetry.

    Parameters
    ----------
    x : the candidate dataset 'solver' dict. Must contain the "sublattice_configurations" key
    config : the configuratino of interest: e.g. ['AL', ['AL', 'NI'], 'VA']
    symmetry : tuple of tuples where each inner tuple is a group of equivalent
               sublattices. A value of ((0, 1), (2, 3, 4)) means that sublattices
               at indices 0 and 1 are symmetrically equivalent to each other and
               sublattices at indices 2, 3, and 4 are symetrically equivalent to
               each other.

    Returns
    -------
    bool

    """
    if x['mode'] == 'manual':
        if len(config) != len(x['sublattice_configurations'][0]):
            return False
        # If even one matches, it's a match
        # We do more filtering downstream
        for data_config in x['sublattice_configurations']:
            if canonicalize(config, symmetry) == canonicalize(data_config, symmetry):
                return True
    return False
test_utils.py 文件源码 项目:ESPEI 作者: PhasesResearchLab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_pickelable_tinydb_can_be_pickled_and_unpickled():
    """PickleableTinyDB should be able to be pickled and unpickled."""
    test_dict = {'test_key': ['test', 'values']}
    db = PickleableTinyDB(storage=MemoryStorage)
    db.insert(test_dict)
    db = pickle.loads(pickle.dumps(db))
    assert db.search(where('test_key').exists())[0] == test_dict
public_sharing_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_file_public(self, bucket_id, file_id):
        if len(self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))) > 0:
            return True
        else:
            return False
public_sharing_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_public_file_hash(self, bucket_id, file_id):
        public_file_data = self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))
        return public_file_data[0]["public_download_hash"]
playlist_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_playlist_tracks_list(self, playlist_id):
        playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
        return playlist_tracks_list
playlist_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def count_tracks_in_playlist(self, playlist_id):
        playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
        #i = 0
        #for track in playlist_tracks_list:
        #    i += 1

        return len(playlist_tracks_list)
playlist_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def is_file_in_playlist(self, local_file_id):
        if len(self.tracks_table.search(where('local_file_id') == str(local_file_id))) > 0:
            return True
        else:
            return False
cmd.py 文件源码 项目:mpsign 作者: abrasumente233 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, name):
        if not is_user_existent(name):
            raise UserNotFound()
        user_row = user_table.get(where('name') == name)
        self.eid = user_row.eid
        self.name = user_row['name']
        self.exp = user_row['exp']
        self.obj = User(user_row['bduss'])
cmd.py 文件源码 项目:mpsign 作者: abrasumente233 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_user_existent(name):
    field_existence = user_table.search(where('name').exists())
    if not field_existence:
        return False

    user_existence = user_table.search(where('name') == name)
    return True if len(user_existence) is 1 else False
cmd.py 文件源码 项目:mpsign 作者: abrasumente233 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete(user):
    user_table.remove(where('name') == user.name)
    bar_table.remove(where('user') == user.eid)
    print('finished deleting {0}'.format(user.name))
cmd.py 文件源码 项目:mpsign 作者: abrasumente233 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update(user):
    bars = User(user.obj.bduss).bars
    bars_as_list = []

    # ? Bar ??????????? {kw: str, fid: str, eid: int} dict ? list
    for bar in bars:
        print('found {name}\'s bar {bar}'.format(bar=bar.kw, name=user.name))
        bars_as_list.append({'kw': bar.kw, 'fid': bar.fid, 'user': user.eid})

    print('{name} has {count} bars.'.format(name=user.name, count=len(bars)))
    bar_table.remove(where('user') == user.eid)  # ???????
    bar_table.insert_multiple(bars_as_list)
    return len(bars)


问题


面经


文章

微信
公众号

扫码关注公众号