def get_data(self):
"""
Retrieve the data objects sorted by increasing popularity, namely in increasing receive_count, then send_count
and finally the last time they were sent by the current aDTN node.
:return: data objects sorted by increasing popularity.
"""
with self.lock:
Stats = Query()
stats = self.stats.search(Stats.deleted == False)
res = sorted(stats, key=lambda x: (x['receive_count'], x['send_count'], x['last_sent']))[:10]
now = int(time.time())
objects = []
for r in res:
idx = r['idx']
Objects = Query()
obj = self.data.search(Objects.idx == idx)[0]['content']
objects.append(obj)
self.stats.update({'last_sent': now}, Objects.idx == idx)
self.stats.update(increment('send_count'), Objects.idx == idx)
return objects
python类Query()的实例源码
def delete_data(self, object_id):
"""
Delete a data object given its ID.
:param object_id: ID of the data object to delete.
"""
with self.lock:
Stats = Query()
Message = Query()
res = self.stats.search(Stats.idx == object_id)
self.stats.update({'deleted': True}, Stats.idx == object_id)
record = self.data.get(Message.idx == object_id)
if record is not None:
self.data.remove(eids=[record.eid])
log_debug("Deleted message: {}".format(object_id))
else:
log_debug("No data to delete: {}".format(object_id))
def update_tip(user_history, tip):
# convert object to string of name if necessary
if type(user_history) is models.User:
user_history = user_history.username
# update only finish tips
bot_logger.logger.info("update history for user=%s, tip.tx_id=%s" % (user_history, tip.tx_id))
if tip.id is not None:
bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip.id))
db = TinyDB(config.history_path + user_history + '.json')
tip_query = Query()
db.update({'finish': tip.finish}, tip_query.id == tip.id)
db.update({'tx_id': tip.tx_id}, tip_query.id == tip.id)
db.update({'status': tip.status}, tip_query.id == tip.id)
db.close()
else:
bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip.id))
def update_withdraw(user_history, status, tx_id, tip_id):
# convert object to string of name if necessary
if type(user_history) is models.User:
user_history = user_history.username
# update only finish tips
if tip_id is not None:
bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip_id))
db = TinyDB(config.history_path + user_history + '.json')
tip_query = Query()
db.update({'finish': status}, tip_query.id == tip_id)
db.update({'tx_id': tx_id}, tip_query.id == tip_id)
db.update({'status': "finish"}, tip_query.id == tip_id)
db.close()
else:
bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip_id))
def add_address(username, address, active=True):
# sanitize (just lower)
username = str(unicode(username).lower())
db = TinyDB(config.user_file)
table = db.table(username)
# check if address not already exist
user_db = Query()
data = table.count(user_db.address == address)
if data == 0:
table.insert({"type": "simple", "address": address, "coin": "doge", "enable": False})
if active is True:
UserStorage.active_user_address(username, address)
else:
bot_logger.logger.error("address %s already registered for %s " % (str(address), str(username)))
def get_user_address(cls, username):
# sanitize (just lower)
username = str(unicode(username).lower())
if UserStorage.exist(username):
db = TinyDB(config.user_file)
table = db.table(username)
user_db = Query()
data = table.search(user_db.enable == True)
if len(data) > 0:
return data[0].get('address')
else:
# username not found
return None
else:
bot_logger.logger.error("get address of un-registered user %s " % (str(username)))
def make_user(username, admin=False):
check_query = Query()
check_if_user_exists = THE_DB.search(check_query.id == username)
if len(check_if_user_exists) > 0:
return
# generates a random 32 hex digit password
password = "%032x" % random.getrandbits(128)
new_user = {
'id': username,
'password': password,
'banned': False,
'admin': admin,
'dj_name': username,
'dj_pic': '',
'stream_title': '',
'stream_desc': '',
}
THE_DB.insert(new_user)
return (username, password)
def make_token(user_id: int, poll_tag: str):
q = Query()
while True:
# generate a unique token, made of 8 characters a-z.
token = "".join(
[random.choice(string.ascii_lowercase) for i in range(8)])
if not tokens_db.contains(q.token == token):
break
tokens_db.insert({
"user_id": user_id,
"poll_tag": poll_tag,
"token": token
})
return token
def get(self, key_name):
"""Return a dictionary consisting of the key itself
e.g.
{u'created_at': u'2016-10-10 08:31:53',
u'description': None,
u'metadata': None,
u'modified_at': u'2016-10-10 08:31:53',
u'name': u'aws',
u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
u'value': u'the_value'}
"""
result = self.db.search(Query().name == key_name)
if not result:
return {}
return result[0]
def list(self):
"""Return a list of all keys (not just key names, but rather the keys
themselves).
e.g.
{u'created_at': u'2016-10-10 08:31:53',
u'description': None,
u'metadata': None,
u'modified_at': u'2016-10-10 08:31:53',
u'name': u'aws',
u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
u'value': u'the_value'},
{u'created_at': u'2016-10-10 08:32:29',
u'description': u'my gcp token',
u'metadata': {u'owner': u'nir'},
u'modified_at': u'2016-10-10 08:32:29',
u'name': u'gcp',
u'uid': u'a51a0043-f241-4d52-93c1-266a3c5de15e',
u'value': u'the_value'}]
"""
# TODO: Return only the key names from all storages
return self.db.search(Query().name.matches('.*'))
def __init__(self, publisher, settings):
self.logger = logging.getLogger(' Db')
try:
if not os.path.exists(settings.db_dir):
self.logger.info("db directory doesn't exist - creating...")
os.makedirs(settings.db_dir)
except IOError as e:
self.logger.critical("Couldn't create directory " + settings.db_dir + " : " + str(e))
self.db_file = 'db.json'
db_path = settings.db_dir + '/' + self.db_file
self.publisher = publisher
try:
if not os.path.isfile(db_path):
self.logger.info("db file doesn't exist - creating...")
self.db = TinyDB(db_path)
self.db.table('worksets')
self.db.purge_table('default')
except IOError as e:
self.logger.critical("Couldn't create db file: " + str(e))
self.db = TinyDB(db_path)
self.w_query = Query()
def parse_query(self, query):
"""
Creates a tinydb Query() object from the query dict
:param query: object containing the dictionary representation of the
query
:return: composite Query()
"""
logger.debug(u'query to parse2: {}'.format(query))
# this should find all records
if query == {} or query is None:
return Query()._id != u'-1' # noqa
q = None
# find the final result of the generator
for c in self.parse_condition(query):
if q is None:
q = c
else:
q = q & c
logger.debug(u'new query item2: {}'.format(q))
return q
def save(self, day, count):
'''
Save count for given day.
Args:
- day: <datetime>
- count: <int>
Returns: None
'''
date = day.strftime(self.__date_format)
count = int(count)
Day = tinydb.Query()
if self.__db.contains(Day.Date == date):
self.__db.update({'Count': count}, Day.Date == date)
else:
self.__db.insert({'Date': date, 'Count': count})
def populate():
''' Populates the DB with data. This is old code, needs cleanup '''
with open(filename, mode='r') as f:
reader = csv.DictReader(f)
# Now iterate through every row in the CSVfile and set variables
for row in reader:
ip = row['IP_Address']
hostname = row['SysName']
device_type = row['device_type']
department = row['Department']
switch = {
'ip': row['IP_Address'],
'hostname': row['SysName'],
'device_type': row['device_type'],
'department': row['Department']
}
dbf = Query()
resultf = db.search(dbf.ip == row['IP_Address'])
if str(resultf) != "[]":
print("Skipping " + row['IP_Address'] + " as it already exists.")
else:
db.insert(switch)
print ("Added " + row['IP_Address'])
def has_result_of_status(self, status, results):
inbound_str = self.items["operation"]["inbound"]
query = Query()
result_q = reduce(or_, [
query.result == a_result for a_result in results])
querys = [query.inbound == inbound_str,
query.inbound_status_id == status.get_status_id(), result_q]
combined_query = reduce(and_, querys)
return self.search_db(combined_query)
def get_result_summaries_by_status(self, status):
inbound_str = self.items["operation"]["inbound"]
query = Query()
combined_query = (query.inbound == inbound_str) & (
query.inbound_status_id == status.get_status_id())
return self.search_db(combined_query)
def get_result_summaries_by_results(self, results):
query = Query()
querys = [query.result == a_result for a_result in results]
combined_query = reduce(or_, querys)
return self.search_db(combined_query)
def get_user(self, user_id):
User = Query()
user = self.tinydb.get(User.user_id == user_id)
return user["data"] if user else {}
def save_user(self, user_id, data):
User = Query()
if not self.tinydb.update({'user_id': user_id, 'data': data}, User.user_id == user_id):
self.tinydb.insert({'user_id': user_id, 'data': data})
def delete_user(self, user_id):
User = Query()
self.tinydb.remove(User.user_id == user_id)
dbworker.py 文件源码
项目:telegram-xkcd-password-generator
作者: MasterGroosha
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def user_exists(user_id):
return True if len(db.search(Query().user_id == user_id)) > 0 else False
dbworker.py 文件源码
项目:telegram-xkcd-password-generator
作者: MasterGroosha
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_person(user_id):
# Check if user exists
S = Query()
person = db.search(S.user_id == user_id)
if len(person) is 0:
usr = {"user_id": user_id,
"word_count": DEFAULT_WORD_COUNT,
"prefixes": DEFAULT_PREFIX_SUFFIX,
"separators": DEFAULT_SEPARATOR}
db.insert(usr)
return usr
return person[0]
dbworker.py 文件源码
项目:telegram-xkcd-password-generator
作者: MasterGroosha
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def change_word_count(user_id, increase):
S = Query()
if increase:
db.update(increment("word_count"), S.user_id == user_id)
else:
db.update(decrement("word_count"), S.user_id == user_id)
return db.search(S.user_id == user_id)[0]
dbworker.py 文件源码
项目:telegram-xkcd-password-generator
作者: MasterGroosha
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def change_prefixes(user_id, enable_prefixes):
S = Query()
if enable_prefixes:
db.update({"prefixes": True}, S.user_id == user_id)
else:
db.update({"prefixes": False}, S.user_id == user_id)
return db.search(S.user_id == user_id)[0]
dbworker.py 文件源码
项目:telegram-xkcd-password-generator
作者: MasterGroosha
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def change_separators(user_id, enable_separators):
S = Query()
if enable_separators:
db.update({"separators": True}, S.user_id == user_id)
else:
db.update({"separators": False}, S.user_id == user_id)
return db.search(S.user_id == user_id)[0]
def update_score(db, json_object):
"""
Method to update the score of the song
"""
q = Query()
j = db.search(q.url == json_object['url'])[0]
logger.debug("Duplicate post found: {}".format(j['title'].encode('ascii', 'ignore')))
logger.debug("Old Score: {}".format(j['score']))
logger.debug("Now Score: {}".format(json_object['score']))
if int(j['score']) != int(json_object['score']):
logger.debug("Updating Score for {}".format(j['title'].encode('ascii', 'ignore')))
db.update({'score':json_object['score']}, q.url == json_object['url'])
else:
logger.debug("The scores are still the same, not updating")
def is_json_unique(db, json_object):
"""
Checks if the url of this post is present in
the database already
"""
q = Query()
if len(db.search(q.url == json_object['url'])) == 0:
return True
return False
def where(key):
return Query([key])
def get_public_download_indicators(self, public_download_hash_url):
query = Query()
public_download_indicators = self.table.search(query.public_download_hash_url.search(public_download_hash_url))
return public_download_indicators
def get_public_file_details_by_local_hash(self, local_file_hash):
query = Query()
public_download_indicators = self.table.search(query.public_download_hash.search(local_file_hash))
return public_download_indicators