def test_adding_bibtex_entries_to_bibliography_db(datasets_db):
"""Adding a BibTeX entries to a database works and the database can be searched."""
TEST_BIBTEX = """@article{Roe1952gamma,
author = {Roe, W. P. and Fishel, W. P.},
journal = {Trans. Am. Soc. Met.},
keywords = {Fe-Cr,Fe-Ti,Fe-Ti-Cr},
pages = {1030--1041},
title = {{Gamma Loop Studies in the Fe-Ti, Fe-Cr, and Fe-Ti-Cr Systems}},
volume = {44},
year = {1952}
}
@phdthesis{shin2007thesis,
author = {Shin, D},
keywords = {Al-Cu,Al-Cu-Mg,Al-Cu-Si,Al-Mg,Al-Mg-Si,Al-Si,Cu-Mg,Mg-Si,SQS},
number = {May},
school = {The Pennsylvania State University},
title = {{Thermodynamic properties of solid solutions from special quasirandom structures and CALPHAD modeling: Application to aluminum-copper-magnesium-silicon and hafnium-silicon-oxygen}},
year = {2007}
}"""
db = add_bibtex_to_bib_database(TEST_BIBTEX, datasets_db)
search_res = db.search(where('ID') == 'Roe1952gamma')
assert len(search_res) == 1
assert len(db.all()) == 2
python类where()的实例源码
def info(name=None):
if name is None:
user_rows = user_table.all()
else:
user_rows = [user_table.get(where('name') == name)]
if len(user_rows) == 0:
print('No user yet.')
return
if user_rows[0] is None:
raise UserNotFound
row_format = '{:>15}{:>15}{:>20}'
print(row_format.format('Name', 'EXP', 'is BDUSS valid'))
for user_row in user_rows:
print(row_format.format(user_row['name'],
user_row['exp'],
str(User(user_row['bduss']).validation)))
def test_json_readwrite(tmpdir):
"""
Regression test for issue #1
"""
path = str(tmpdir.join('test.db'))
# Create TinyDB instance
db = TinyDB(path, storage=JSONStorage)
item = {'name': 'A very long entry'}
item2 = {'name': 'A short one'}
get = lambda s: db.get(where('name') == s)
db.insert(item)
assert get('A very long entry') == item
db.remove(where('name') == 'A very long entry')
assert get('A very long entry') is None
db.insert(item2)
assert get('A short one') == item2
db.remove(where('name') == 'A short one')
assert get('A short one') is None
def test_multiple_tables(db):
table1 = db.table('table1')
table2 = db.table('table2')
table3 = db.table('table3')
table1.insert({'int': 1, 'char': 'a'})
table2.insert({'int': 1, 'char': 'b'})
table3.insert({'int': 1, 'char': 'c'})
assert table1.count(where('char') == 'a') == 1
assert table2.count(where('char') == 'b') == 1
assert table3.count(where('char') == 'c') == 1
db.purge_tables()
assert len(table1) == 0
assert len(table2) == 0
assert len(table3) == 0
def test_lru_cache(db):
# Test integration into TinyDB
table = db.table('table3', cache_size=2)
query = where('int') == 1
table.search(query)
table.search(where('int') == 2)
table.search(where('int') == 3)
assert query not in table._query_cache
table.remove(where('int') == 1)
assert not table._query_cache.lru
table.search(query)
assert len(table._query_cache) == 1
table.clear_cache()
assert len(table._query_cache) == 0
def _parse_atat_lattice(lattice_in):
"""Parse an ATAT-style `lat.in` string.
The parsed string will be in three groups: (Coordinate system) (lattice) (atoms)
where the atom group is split up into subgroups, each describing the position and atom name
"""
float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])])
vector = Group(float_number + float_number + float_number)
angles = vector
vector_line = vector + Suppress(LineEnd())
coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd())))
lattice = Group(vector + vector + vector)
atom = Group(vector + Group(OneOrMore(Word(alphas + '_'))))
atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom))
# parse the input string and convert it to a POSCAR string
return atat_lattice_grammer.parseString(lattice_in)
def create_service(self, opts):
for target in self.targets:
docker_client = docker.DockerClient('tcp://' + common.translate_id(target)[0] +
':' + cfg.docker['API_PORT'])
if "sub_driver" in opts:
sub_driver_name = opts['sub_driver']
sub_driver = getattr(docker_client, sub_driver_name)
del opts['sub_driver']
if sub_driver_name == 'swarm':
sub_driver_opts = opts['opts']
node_type = sub_driver_opts['type']
if node_type == 'manager':
sub_driver.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])
db.vms.update(insert_join_token(sub_driver.attrs['JoinTokens']), where('name') == target)
elif node_type == 'worker':
manager = db.vms.get(where('name') == sub_driver_opts['managers'][0])
sub_driver.join([common.id_to_swarm(sub_driver_opts['managers'][0])], manager['docker']['join_tokens']['Worker'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])
else:
docker_client.containers.run(**opts, detach=True)
def create_cluster(self):
self.init = {}
for vm in self.vms:
docker_client = docker.DockerClient('tcp://' +
common.translate_id(vm['id'])[0]
+ ':' + cfg.docker['API_PORT'])
swarm_client = docker_client.swarm
if vm['role'] == 'manager':
swarm_client.init('eth0:' + cfg.docker['SWARM_PORT'],
'0.0.0.0:' + cfg.docker['SWARM_PORT'])
db.vms.update(
insert_join_token(
swarm_client.attrs['JoinTokens']
),
where('name') == vm['id'])
self.vms.remove(vm)
self.init = vm
break
for vm in self.vms:
if vm['role'] == 'manager':
self.add_manager(vm)
elif vm['role'] == 'worker':
self.add_worker(vm)
def _fncChatBotAlly_cmdWhoPlays(self, m):
self.log("+", "\t\tReceive command !who_plays")
table = self.db['ranks'].table('server_'+str(self.account.account['serveur']))
# scan_date = table.get(where('id'), len(table))
# scan_date = scan_date['scan_date']
ranks_old = table.all()
ranks_now = self.account.getRanking()
self.log("?", "\t\tSearching for actives users..")
rapport = "Active players for last 5 days :\n\n"
count = 0
for rn in ranks_now:
for ro in ranks_old:
if rn['user'] == ro['user']:
if (ro['points'] - rn['points']) != 0:
count = count +1
rapport = rapport + "- @" + rn['user'] + " ("+str((rn['points'] - ro['points']))+" pts)\n"
if count >= 7:
count = 0
self.account.rSendMessageToAlliance(rapport)
rapport = ""
break
rapport = rapport + "OVER !"
self.account.rSendMessageToAlliance(rapport)
def test_json_readwrite(tmpdir):
"""
Regression test for issue #1
"""
path = str(tmpdir.join('test.db'))
# Create TinyDB instance
db = TinyDB(path, storage=JSONStorage)
item = {'name': 'A very long entry'}
item2 = {'name': 'A short one'}
get = lambda s: db.get(where('name') == s)
db.insert(item)
assert get('A very long entry') == item
db.remove(where('name') == 'A very long entry')
assert get('A very long entry') is None
db.insert(item2)
assert get('A short one') == item2
db.remove(where('name') == 'A short one')
assert get('A short one') is None
def test_multiple_tables(db):
table1 = db.table('table1')
table2 = db.table('table2')
table3 = db.table('table3')
table1.insert({'int': 1, 'char': 'a'})
table2.insert({'int': 1, 'char': 'b'})
table3.insert({'int': 1, 'char': 'c'})
assert table1.count(where('char') == 'a') == 1
assert table2.count(where('char') == 'b') == 1
assert table3.count(where('char') == 'c') == 1
db.purge_tables()
assert len(table1) == 0
assert len(table2) == 0
assert len(table3) == 0
def test_lru_cache(db):
# Test integration into TinyDB
table = db.table('table3', cache_size=2)
query = where('int') == 1
table.search(query)
table.search(where('int') == 2)
table.search(where('int') == 3)
assert query not in table._query_cache
table.remove(where('int') == 1)
assert not table._query_cache.lru
table.search(query)
assert len(table._query_cache) == 1
table.clear_cache()
assert len(table._query_cache) == 0
def test_json_readwrite(tmpdir):
"""
Regression test for issue #1
"""
path = str(tmpdir.join('test.db'))
# Create TinyDB instance
db = TinyDB(path, storage=JSONStorage)
item = {'name': 'A very long entry'}
item2 = {'name': 'A short one'}
get = lambda s: db.get(where('name') == s)
db.insert(item)
assert get('A very long entry') == item
db.remove(where('name') == 'A very long entry')
assert get('A very long entry') is None
db.insert(item2)
assert get('A short one') == item2
db.remove(where('name') == 'A short one')
assert get('A short one') is None
def test_multiple_tables(db):
table1 = db.table('table1')
table2 = db.table('table2')
table3 = db.table('table3')
table1.insert({'int': 1, 'char': 'a'})
table2.insert({'int': 1, 'char': 'b'})
table3.insert({'int': 1, 'char': 'c'})
assert table1.count(where('char') == 'a') == 1
assert table2.count(where('char') == 'b') == 1
assert table3.count(where('char') == 'c') == 1
db.purge_tables()
assert len(table1) == 0
assert len(table2) == 0
assert len(table3) == 0
def test_lru_cache(db):
# Test integration into TinyDB
table = db.table('table3', cache_size=2)
query = where('int') == 1
table.search(query)
table.search(where('int') == 2)
table.search(where('int') == 3)
assert query not in table._query_cache
table.remove(where('int') == 1)
assert not table._query_cache.lru
table.search(query)
assert len(table._query_cache) == 1
table.clear_cache()
assert len(table._query_cache) == 0
def __init__(self, db='nonedb.json'):
# Storage and serialization
serializer = SerializationMiddleware(tinydb.storages.JSONStorage)
serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime')
# A reference to the actual database object.
self._conn = tinydb.TinyDB(db, storage=serializer)
# Activat SmartCache
self._conn.table_class = SmartCacheTable
# A shortcut to ``tinydb.TinyDB.table`` method.
# See http://tinydb.readthedocs.org/en/latest/usage.html#tables
# for reference.
self.table = self._conn.table
# A shortcut to ``tinydb.where`` object.
# See http://tinydb.readthedocs.org/en/latest/usage.html#queries
# for reference.
self.where = tinydb.where
def set(self, name, value):
db = TinyDB(self.filename)
db.upsert({'name': name, 'value': value}, where('name') == name)
def get(self, name, default=None):
db = TinyDB(self.filename)
item = db.get(where('name') == name)
if item is not None:
return item.get('value', default)
return default
def get_data(comps, phase_name, configuration, symmetry, datasets, prop):
desired_data = datasets.search((tinydb.where('output').test(lambda x: x in prop)) &
(tinydb.where('components').test(lambda x: set(x).issubset(comps))) &
(tinydb.where('solver').test(symmetry_filter, configuration, list_to_tuple(symmetry) if symmetry else symmetry)) &
(tinydb.where('phases') == [phase_name]))
# This seems to be necessary because the 'values' member does not modify 'datasets'
# But everything else does!
desired_data = copy.deepcopy(desired_data)
def recursive_zip(a, b):
if isinstance(a, (list, tuple)) and isinstance(b, (list, tuple)):
return list(recursive_zip(x, y) for x, y in zip(a, b))
else:
return list(zip(a, b))
for idx, data in enumerate(desired_data):
# Filter output values to only contain data for matching sublattice configurations
matching_configs = np.array([(canonicalize(sblconf, symmetry) == canonicalize(configuration, symmetry))
for sblconf in data['solver']['sublattice_configurations']])
matching_configs = np.arange(len(data['solver']['sublattice_configurations']))[matching_configs]
# Rewrite output values with filtered data
desired_data[idx]['values'] = np.array(data['values'], dtype=np.float)[..., matching_configs]
desired_data[idx]['solver']['sublattice_configurations'] = list_to_tuple(np.array(data['solver']['sublattice_configurations'],
dtype=np.object)[matching_configs].tolist())
try:
desired_data[idx]['solver']['sublattice_occupancies'] = np.array(data['solver']['sublattice_occupancies'],
dtype=np.object)[matching_configs].tolist()
except KeyError:
pass
# Filter out temperatures below 298.15 K (for now, until better refstates exist)
temp_filter = np.atleast_1d(data['conditions']['T']) >= 298.15
desired_data[idx]['conditions']['T'] = np.atleast_1d(data['conditions']['T'])[temp_filter]
# Don't use data['values'] because we rewrote it above; not sure what 'data' references now
desired_data[idx]['values'] = desired_data[idx]['values'][..., temp_filter, :]
return desired_data
def symmetry_filter(x, config, symmetry):
"""
Return True if the candidate sublattice configuration has any symmetry
which matches the phase model symmetry.
Parameters
----------
x : the candidate dataset 'solver' dict. Must contain the "sublattice_configurations" key
config : the configuratino of interest: e.g. ['AL', ['AL', 'NI'], 'VA']
symmetry : tuple of tuples where each inner tuple is a group of equivalent
sublattices. A value of ((0, 1), (2, 3, 4)) means that sublattices
at indices 0 and 1 are symmetrically equivalent to each other and
sublattices at indices 2, 3, and 4 are symetrically equivalent to
each other.
Returns
-------
bool
"""
if x['mode'] == 'manual':
if len(config) != len(x['sublattice_configurations'][0]):
return False
# If even one matches, it's a match
# We do more filtering downstream
for data_config in x['sublattice_configurations']:
if canonicalize(config, symmetry) == canonicalize(data_config, symmetry):
return True
return False
def test_pickelable_tinydb_can_be_pickled_and_unpickled():
"""PickleableTinyDB should be able to be pickled and unpickled."""
test_dict = {'test_key': ['test', 'values']}
db = PickleableTinyDB(storage=MemoryStorage)
db.insert(test_dict)
db = pickle.loads(pickle.dumps(db))
assert db.search(where('test_key').exists())[0] == test_dict
def is_file_public(self, bucket_id, file_id):
if len(self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))) > 0:
return True
else:
return False
def get_public_file_hash(self, bucket_id, file_id):
public_file_data = self.table.search((where('bucket_id') == str(bucket_id)) & (where('file_id') == str(file_id)))
return public_file_data[0]["public_download_hash"]
def get_playlist_tracks_list(self, playlist_id):
playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
return playlist_tracks_list
def count_tracks_in_playlist(self, playlist_id):
playlist_tracks_list = self.tracks_table.search(where('playlist_id') == str(playlist_id))
#i = 0
#for track in playlist_tracks_list:
# i += 1
return len(playlist_tracks_list)
def is_file_in_playlist(self, local_file_id):
if len(self.tracks_table.search(where('local_file_id') == str(local_file_id))) > 0:
return True
else:
return False
def __init__(self, name):
if not is_user_existent(name):
raise UserNotFound()
user_row = user_table.get(where('name') == name)
self.eid = user_row.eid
self.name = user_row['name']
self.exp = user_row['exp']
self.obj = User(user_row['bduss'])
def is_user_existent(name):
field_existence = user_table.search(where('name').exists())
if not field_existence:
return False
user_existence = user_table.search(where('name') == name)
return True if len(user_existence) is 1 else False
def delete(user):
user_table.remove(where('name') == user.name)
bar_table.remove(where('user') == user.eid)
print('finished deleting {0}'.format(user.name))
def update(user):
bars = User(user.obj.bduss).bars
bars_as_list = []
# ? Bar ??????????? {kw: str, fid: str, eid: int} dict ? list
for bar in bars:
print('found {name}\'s bar {bar}'.format(bar=bar.kw, name=user.name))
bars_as_list.append({'kw': bar.kw, 'fid': bar.fid, 'user': user.eid})
print('{name} has {count} bars.'.format(name=user.name, count=len(bars)))
bar_table.remove(where('user') == user.eid) # ???????
bar_table.insert_multiple(bars_as_list)
return len(bars)