python类Query()的实例源码

message_store.py 文件源码 项目:aDTN-python 作者: megfault 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_data(self):
        """
        Retrieve the data objects sorted by increasing popularity, namely in increasing receive_count, then send_count
        and finally the last time they were sent by the current aDTN node.
        :return: data objects sorted by increasing popularity.
        """
        with self.lock:
            Stats = Query()
            stats = self.stats.search(Stats.deleted == False)
            res = sorted(stats, key=lambda x: (x['receive_count'], x['send_count'], x['last_sent']))[:10]
            now = int(time.time())
            objects = []
            for r in res:
                idx = r['idx']
                Objects = Query()
                obj = self.data.search(Objects.idx == idx)[0]['content']
                objects.append(obj)
                self.stats.update({'last_sent': now}, Objects.idx == idx)
                self.stats.update(increment('send_count'), Objects.idx == idx)
        return objects
message_store.py 文件源码 项目:aDTN-python 作者: megfault 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_data(self, object_id):
        """
        Delete a data object given its ID.
        :param object_id: ID of the data object to delete.
        """
        with self.lock:
            Stats = Query()
            Message = Query()
            res = self.stats.search(Stats.idx == object_id)
            self.stats.update({'deleted': True}, Stats.idx == object_id)
            record = self.data.get(Message.idx == object_id)
            if record is not None:
                self.data.remove(eids=[record.eid])
                log_debug("Deleted message: {}".format(object_id))
            else:
                log_debug("No data to delete: {}".format(object_id))
history.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def update_tip(user_history, tip):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        bot_logger.logger.info("update history for user=%s, tip.tx_id=%s" % (user_history, tip.tx_id))
        if tip.id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip.id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': tip.finish}, tip_query.id == tip.id)
            db.update({'tx_id': tip.tx_id}, tip_query.id == tip.id)
            db.update({'status': tip.status}, tip_query.id == tip.id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip.id))
history.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def update_withdraw(user_history, status, tx_id, tip_id):
        # convert object to string of name if necessary
        if type(user_history) is models.User:
            user_history = user_history.username

        # update only finish tips
        if tip_id is not None:
            bot_logger.logger.info("update history for user=%s, tip.id=%s" % (user_history, tip_id))

            db = TinyDB(config.history_path + user_history + '.json')
            tip_query = Query()
            db.update({'finish': status}, tip_query.id == tip_id)
            db.update({'tx_id': tx_id}, tip_query.id == tip_id)
            db.update({'status': "finish"}, tip_query.id == tip_id)
            db.close()
        else:
            bot_logger.logger.warn("update history fail user=%s, tip.id=%s" % (user_history, tip_id))
user.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_address(username, address, active=True):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        db = TinyDB(config.user_file)
        table = db.table(username)

        # check if address not already exist
        user_db = Query()
        data = table.count(user_db.address == address)

        if data == 0:
            table.insert({"type": "simple", "address": address, "coin": "doge", "enable": False})

            if active is True:
                UserStorage.active_user_address(username, address)
        else:
            bot_logger.logger.error("address %s already registered for  %s " % (str(address), str(username)))
user.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_user_address(cls, username):
        # sanitize (just lower)
        username = str(unicode(username).lower())

        if UserStorage.exist(username):
            db = TinyDB(config.user_file)
            table = db.table(username)
            user_db = Query()
            data = table.search(user_db.enable == True)
            if len(data) > 0:
                return data[0].get('address')
            else:
                # username not found
                return None
        else:
            bot_logger.logger.error("get address of un-registered user  %s " % (str(username)))
database.py 文件源码 项目:lainonlife 作者: barrucadu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_user(username, admin=False):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) > 0:
        return
    # generates a random 32 hex digit password
    password = "%032x" % random.getrandbits(128)
    new_user = {
        'id': username,
        'password': password,
        'banned': False,
        'admin': admin,
        'dj_name': username,
        'dj_pic': '',
        'stream_title': '',
        'stream_desc': '',
    }

    THE_DB.insert(new_user)
    return (username, password)
vote.py 文件源码 项目:democracybot 作者: LiaungYip 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_token(user_id: int, poll_tag: str):
    q = Query()
    while True:
        # generate a unique token, made of 8 characters a-z.
        token = "".join(
            [random.choice(string.ascii_lowercase) for i in range(8)])
        if not tokens_db.contains(q.token == token):
            break

    tokens_db.insert({
        "user_id": user_id,
        "poll_tag": poll_tag,
        "token": token
    })

    return token
ghost.py 文件源码 项目:ghost 作者: nir0s 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get(self, key_name):
        """Return a dictionary consisting of the key itself

        e.g.
        {u'created_at': u'2016-10-10 08:31:53',
         u'description': None,
         u'metadata': None,
         u'modified_at': u'2016-10-10 08:31:53',
         u'name': u'aws',
         u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
         u'value': u'the_value'}

        """
        result = self.db.search(Query().name == key_name)
        if not result:
            return {}
        return result[0]
ghost.py 文件源码 项目:ghost 作者: nir0s 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def list(self):
        """Return a list of all keys (not just key names, but rather the keys
        themselves).

        e.g.
         {u'created_at': u'2016-10-10 08:31:53',
          u'description': None,
          u'metadata': None,
          u'modified_at': u'2016-10-10 08:31:53',
          u'name': u'aws',
          u'uid': u'459f12c0-f341-413e-9d7e-7410f912fb74',
          u'value': u'the_value'},
         {u'created_at': u'2016-10-10 08:32:29',
          u'description': u'my gcp token',
          u'metadata': {u'owner': u'nir'},
          u'modified_at': u'2016-10-10 08:32:29',
          u'name': u'gcp',
          u'uid': u'a51a0043-f241-4d52-93c1-266a3c5de15e',
          u'value': u'the_value'}]

        """
        # TODO: Return only the key names from all storages
        return self.db.search(Query().name.matches('.*'))
db.py 文件源码 项目:Worksets 作者: DozyDolphin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, publisher, settings):
        self.logger = logging.getLogger(' Db')
        try:
            if not os.path.exists(settings.db_dir):
                self.logger.info("db directory doesn't exist - creating...")
                os.makedirs(settings.db_dir)
        except IOError as e:
            self.logger.critical("Couldn't create directory " + settings.db_dir + " : " + str(e))
        self.db_file = 'db.json'
        db_path = settings.db_dir + '/' + self.db_file
        self.publisher = publisher
        try:
            if not os.path.isfile(db_path):
                self.logger.info("db file doesn't exist - creating...")
                self.db = TinyDB(db_path)
                self.db.table('worksets')
                self.db.purge_table('default')
        except IOError as e:
            self.logger.critical("Couldn't create db file: " + str(e))
        self.db = TinyDB(db_path)
        self.w_query = Query()
tinymongo.py 文件源码 项目:tinymongo 作者: schapman1974 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_query(self, query):
        """
        Creates a tinydb Query() object from the query dict

        :param query: object containing the dictionary representation of the
        query
        :return: composite Query()
        """
        logger.debug(u'query to parse2: {}'.format(query))

        # this should find all records
        if query == {} or query is None:
            return Query()._id != u'-1'  # noqa

        q = None
        # find the final result of the generator
        for c in self.parse_condition(query):
            if q is None:
                q = c
            else:
                q = q & c

        logger.debug(u'new query item2: {}'.format(q))

        return q
storage.py 文件源码 项目:KeyCounter 作者: Microcore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def save(self, day, count):
        '''
        Save count for given day.
        Args:
          - day: <datetime>
          - count: <int>
        Returns: None
        '''
        date = day.strftime(self.__date_format)
        count = int(count)
        Day = tinydb.Query()

        if self.__db.contains(Day.Date == date):
            self.__db.update({'Count': count}, Day.Date == date)
        else:
            self.__db.insert({'Date': date, 'Count': count})
populateTinyDB.py 文件源码 项目:NetSpark-Scripts 作者: admiralspark 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def populate():
    ''' Populates the DB with data. This is old code, needs cleanup '''
    with open(filename, mode='r') as f:
        reader = csv.DictReader(f)
    # Now iterate through every row in the CSVfile and set variables
        for row in reader:
            ip = row['IP_Address']
            hostname = row['SysName']
            device_type = row['device_type']
            department = row['Department']
            switch = {
                'ip': row['IP_Address'],
                'hostname': row['SysName'],
                'device_type': row['device_type'],
                'department': row['Department']
            }

            dbf = Query()
            resultf = db.search(dbf.ip == row['IP_Address'])
            if str(resultf) != "[]":
                print("Skipping " + row['IP_Address'] + " as it already exists.")
            else:
                db.insert(switch)
                print ("Added " + row['IP_Address'])
result_log.py 文件源码 项目:dontwi 作者: vocalodon 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def has_result_of_status(self, status, results):
        inbound_str = self.items["operation"]["inbound"]
        query = Query()
        result_q = reduce(or_, [
            query.result == a_result for a_result in results])
        querys = [query.inbound == inbound_str,
                  query.inbound_status_id == status.get_status_id(), result_q]
        combined_query = reduce(and_, querys)
        return self.search_db(combined_query)
result_log.py 文件源码 项目:dontwi 作者: vocalodon 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_result_summaries_by_status(self, status):
        inbound_str = self.items["operation"]["inbound"]
        query = Query()
        combined_query = (query.inbound == inbound_str) & (
            query.inbound_status_id == status.get_status_id())
        return self.search_db(combined_query)
result_log.py 文件源码 项目:dontwi 作者: vocalodon 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_result_summaries_by_results(self, results):
        query = Query()
        querys = [query.result == a_result for a_result in results]
        combined_query = reduce(or_, querys)
        return self.search_db(combined_query)
tynydb.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_user(self, user_id):
        User = Query()

        user = self.tinydb.get(User.user_id == user_id)

        return user["data"] if user else {}
tynydb.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def save_user(self, user_id, data):
        User = Query()

        if not self.tinydb.update({'user_id': user_id, 'data': data}, User.user_id == user_id):
            self.tinydb.insert({'user_id': user_id, 'data': data})
tynydb.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete_user(self, user_id):
        User = Query()

        self.tinydb.remove(User.user_id == user_id)
dbworker.py 文件源码 项目:telegram-xkcd-password-generator 作者: MasterGroosha 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def user_exists(user_id):
    return True if len(db.search(Query().user_id == user_id)) > 0 else False
dbworker.py 文件源码 项目:telegram-xkcd-password-generator 作者: MasterGroosha 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_person(user_id):
    # Check if user exists
    S = Query()
    person = db.search(S.user_id == user_id)
    if len(person) is 0:
        usr = {"user_id": user_id,
               "word_count": DEFAULT_WORD_COUNT,
               "prefixes": DEFAULT_PREFIX_SUFFIX,
               "separators": DEFAULT_SEPARATOR}
        db.insert(usr)
        return usr
    return person[0]
dbworker.py 文件源码 项目:telegram-xkcd-password-generator 作者: MasterGroosha 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def change_word_count(user_id, increase):
    S = Query()
    if increase:
        db.update(increment("word_count"), S.user_id == user_id)
    else:
        db.update(decrement("word_count"), S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
dbworker.py 文件源码 项目:telegram-xkcd-password-generator 作者: MasterGroosha 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def change_prefixes(user_id, enable_prefixes):
    S = Query()
    if enable_prefixes:
        db.update({"prefixes": True}, S.user_id == user_id)
    else:
        db.update({"prefixes": False}, S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
dbworker.py 文件源码 项目:telegram-xkcd-password-generator 作者: MasterGroosha 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def change_separators(user_id, enable_separators):
    S = Query()
    if enable_separators:
        db.update({"separators": True}, S.user_id == user_id)
    else:
        db.update({"separators": False}, S.user_id == user_id)
    return db.search(S.user_id == user_id)[0]
get_json.py 文件源码 项目:demo-day-vikings 作者: Mester 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_score(db, json_object):
    """
    Method to update the score of the song
    """
    q = Query()
    j = db.search(q.url == json_object['url'])[0]
    logger.debug("Duplicate post found: {}".format(j['title'].encode('ascii', 'ignore')))
    logger.debug("Old Score: {}".format(j['score']))
    logger.debug("Now Score: {}".format(json_object['score']))
    if int(j['score']) != int(json_object['score']):
        logger.debug("Updating Score for {}".format(j['title'].encode('ascii', 'ignore')))
        db.update({'score':json_object['score']}, q.url == json_object['url'])
    else:
        logger.debug("The scores are still the same, not updating")
get_json.py 文件源码 项目:demo-day-vikings 作者: Mester 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def is_json_unique(db, json_object):
    """
    Checks if the url of this post is present in 
    the database already
    """
    q = Query()
    if len(db.search(q.url == json_object['url'])) == 0:
        return True
    return False
query.py 文件源码 项目:stf-selector 作者: RedQA 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def where(key):
    return Query([key])
public_sharing_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_public_download_indicators(self, public_download_hash_url):
        query = Query()
        public_download_indicators = self.table.search(query.public_download_hash_url.search(public_download_hash_url))

        return public_download_indicators
public_sharing_manager.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_public_file_details_by_local_hash(self, local_file_hash):
        query = Query()
        public_download_indicators = self.table.search(query.public_download_hash.search(local_file_hash))

        return public_download_indicators


问题


面经


文章

微信
公众号

扫码关注公众号