python类Query()的实例源码

base.py 文件源码 项目:simple-environment-monitor-system 作者: diegorubin 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def load(self, label):
        query = Query()
        self.__dict__.update(self.table.search(query.label == label)[0])
base.py 文件源码 项目:simple-environment-monitor-system 作者: diegorubin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save(self):
        attributes = self.get_attributes()
        if self.__new_record__():
            self.table.insert(attributes)
        else:
            query = Query()
            self.table.update(attributes, query.label == self.label)
        return True
base.py 文件源码 项目:simple-environment-monitor-system 作者: diegorubin 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def destroy(self):
        query = Query()
        self.table.remove(eids=[self.table.search(query.label == self.label)[0].eid])
base.py 文件源码 项目:simple-environment-monitor-system 作者: diegorubin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __new_record__(self):
        query = Query()
        return len(self.table.search(query.label == self.label)) == 0
firewalls.py 文件源码 项目:G-Scout 作者: nccgroup 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_network_rules(projectId, db):
    for firewall in db.table('Firewall').all():
        if not firewall.get('sourceRanges'):
            firewall['sourceRanges'] = firewall['sourceTags']
        db.table('Network').update(
                    add_rule({
                        "name":firewall['name'], 
                        "allowed":firewall['allowed'],
                        "sourceRanges":firewall['sourceRanges'],
                        "tags":firewall.get('targetTags')
                        }),
                    eids=[db.table('Network').get(
                        Query().selfLink==firewall['network']
                        ).eid])
firewalls.py 文件源码 项目:G-Scout 作者: nccgroup 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def add_affected_instances(projectId, db):
    for firewall in db.table('Firewall').all():
        try:
            for instance in db.table('Network').get(Query().selfLink==firewall['network'])['members']:
                try:
                    if not firewall.get('targetTags'):
                        db.table('Firewall').update(
                        add_instance({
                        "kind":instance['kind'],
                        "selfLink":instance['selfLink'],
                        "tags":instance.get('tags'),
                        "name":instance['name']
                        }),eids=[firewall.eid])
                    try:
                        for tag in instance.get('tags'):
                            if tag in firewall.get('targetTags'):
                                db.table('Firewall').update(
                            add_instance({
                            "kind":instance['kind'],
                            "selfLink":instance['selfLink'],
                            "tags":instance.get('tags'),
                            "name":instance['name']
                            }),eids=[firewall.eid])
                    except TypeError:
                        continue
                except KeyError:
                    continue
        except KeyError:
            continue

# Function to pass Tinydb for the update query
add_finding.py 文件源码 项目:G-Scout 作者: nccgroup 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def add_finding(db,entity_table, entity_id, rule_title):
    finding_table = db.table('Finding')
    rule_table = db.table('Rule')
    finding_table.insert({
                "entity": {"table":entity_table,"id":entity_id} ,
                "rule": {"table":"rule","id":rule_table.search(Query().title==rule_title)[0].eid}
                })
pricing.py 文件源码 项目:aws-pricing-tools 作者: concurrencylabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def calculate(pdim):

  log.info("Calculating DynamoDB pricing with the following inputs: {}".format(str(pdim.__dict__)))

  ts = phelper.Timestamp()
  ts.start('totalCalculationKinesis')

  dbs, indexMetadata = phelper.loadDBs(consts.SERVICE_KINESIS, phelper.get_partition_keys(pdim.region,consts.SCRIPT_TERM_TYPE_ON_DEMAND))

  cost = 0
  pricing_records = []

  awsPriceListApiVersion = indexMetadata['Version']
  priceQuery = tinydb.Query()

  kinesisDb = dbs[phelper.create_file_key([consts.REGION_MAP[pdim.region], consts.TERM_TYPE_MAP[pdim.termType], consts.PRODUCT_FAMILY_KINESIS_STREAMS])]

  #Shard Hours
  query = ((priceQuery['Group'] == 'Provisioned shard hour'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.shardHours, pricing_records, cost)

  #PUT Payload Units
  query = ((priceQuery['Group'] == 'Payload Units'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.putPayloadUnits, pricing_records, cost)

  #Extended Retention Hours
  query = ((priceQuery['Group'] == 'Addon shard hour'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_KINESIS, kinesisDb, query, pdim.extendedDataRetentionHours, pricing_records, cost)

  #TODO: add Enhanced (shard-level) metrics

  #Data Transfer - N/A
  #Note there is no charge for data transfer in Kinesis as per https://aws.amazon.com/kinesis/streams/pricing/

  pricing_result = PricingResult(awsPriceListApiVersion, pdim.region, cost, pricing_records)
  log.debug(json.dumps(vars(pricing_result),sort_keys=False,indent=4))

  log.debug("Total time to compute: [{}]".format(ts.finish('totalCalculationKinesis')))
  return pricing_result.__dict__
pricing.py 文件源码 项目:aws-pricing-tools 作者: concurrencylabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def calculate(pdim):

  log.info("Calculating DynamoDB pricing with the following inputs: {}".format(str(pdim.__dict__)))

  ts = phelper.Timestamp()
  ts.start('totalCalculationDynamoDB')

  #Load On-Demand DBs
  dbs, indexMetadata = phelper.loadDBs(consts.SERVICE_DYNAMODB, phelper.get_partition_keys(pdim.region,consts.SCRIPT_TERM_TYPE_ON_DEMAND))

  cost = 0
  pricing_records = []

  awsPriceListApiVersion = indexMetadata['Version']
  priceQuery = tinydb.Query()

  #TODO:add support for free-tier flag (include or exclude from calculation)

  iopsDb = dbs[phelper.create_file_key([consts.REGION_MAP[pdim.region], consts.TERM_TYPE_MAP[pdim.termType], consts.PRODUCT_FAMILY_DB_PIOPS])]

  #Read Capacity Units
  query = ((priceQuery['Group'] == 'DDB-ReadUnits'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_DYNAMODB, iopsDb, query, pdim.readCapacityUnitHours, pricing_records, cost)

  #Write Capacity Units
  query = ((priceQuery['Group'] == 'DDB-WriteUnits'))
  pricing_records, cost = phelper.calculate_price(consts.SERVICE_DYNAMODB, iopsDb, query, pdim.writeCapacityUnitHours, pricing_records, cost)

  #DB Storage (TODO)

  #Data Transfer (TODO)
  #there is no additional charge for data transferred between Amazon DynamoDB and other Amazon Web Services within the same Region
  #data transferred across Regions (e.g., between Amazon DynamoDB in the US East (Northern Virginia) Region and Amazon EC2 in the EU (Ireland) Region), will be charged on both sides of the transfer.

  #API Requests (only applies for DDB Streams)(TODO)

  pricing_result = PricingResult(awsPriceListApiVersion, pdim.region, cost, pricing_records)
  log.debug(json.dumps(vars(pricing_result),sort_keys=False,indent=4))

  log.debug("Total time to compute: [{}]".format(ts.finish('totalCalculationDynamoDB')))
  return pricing_result.__dict__
househunt.py 文件源码 项目:househunt 作者: althor880 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def listing_in_cache(self, listing):
        lquery = Query()
        return self.db.contains(lquery.hsh == listing.hsh)
househunt.py 文件源码 项目:househunt 作者: althor880 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def retrieve_listing(self, listing):
        lquery = Query()
        list_dict = self.db.get(lquery.hsh == listing.hsh)
        return Listing.from_dict(list_dict)
househunt.py 文件源码 项目:househunt 作者: althor880 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def remove_listing(self, listing):
        lquery = Query()
        self.db.remove(lquery.hsh == listing.hsh)
househunt.py 文件源码 项目:househunt 作者: althor880 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_listing(self, listing):
        lquery = Query()
        if self.listing_in_cache(listing):
            self.remove_listing(listing)
        self.insert_listing(listing)
message_store.py 文件源码 项目:aDTN-python 作者: megfault 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_object(self, data):
        """
        Attempt to insert a data object into the store. If it does not exist, it gets initialized. Otherwise the
        statistics are updated by increasing the receive count and the time of the last reception if the message has
        not been flagged as deleted.
        :param data: data object to store
        """
        idx = hash_string(data)
        now = int(time.time())
        with self.lock:
            Stats = Query()
            res = self.stats.search(Stats.idx == idx)
            if len(res) == 0:
                self.data.insert({'idx': idx, 'content': data})
                self.stats.insert({'idx': idx,
                                   'first_seen': now,
                                   'receive_count': 0,
                                   'send_count': 0,
                                   'last_received': None,
                                   'last_sent': None,
                                   'deleted': False})
                log_debug("Data object created: {}".format(data))
            else:
                deleted = res[0]['deleted']
                if deleted:
                    log_debug("Received deleted data object: {}".format(data))
                self.stats.update({'last_received': now}, Stats.idx == idx)
                self.stats.update(increment('receive_count'), Stats.idx == idx)
                log_debug("Data object updated: {}".format(data))
vanity.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def update_data(self):
        db = TinyDB(config.vanitygen)
        vanity_db = Query()

        db.update(set("finish", True), vanity_db.id == self.id)
        db.update(set("difficulty", self.difficulty), vanity_db.id == self.id)
        db.update(set("duration", self.duration), vanity_db.id == self.id)
user_function.py 文件源码 项目:sodogetip 作者: just-an-dev 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def remove_pending_tip(id_tip):
    db = TinyDB(config.unregistered_tip_user)
    tip = Query()
    db.remove(tip.id == id_tip)
    db.close()
filestorage.py 文件源码 项目:tus-py-client 作者: tus 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, fp):
        self._db = TinyDB(fp)
        self._urls = Query()
database.py 文件源码 项目:lainonlife 作者: barrucadu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_dj_info(username):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return
    return check_if_user_exists[0]
database.py 文件源码 项目:lainonlife 作者: barrucadu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_dj_info(username, form_dict):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return False
    # trust no one, even if someone modified their response we don't want them to
    # most of these require different levels of permission
    dont_touch = ['admin', 'banned', 'id', 'password']
    for k in dont_touch:
        if k in form_dict:
            del form_dict[k]
    THE_DB.update(form_dict, check_query.id == username)
    return True
database.py 文件源码 项目:lainonlife 作者: barrucadu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_dj_status(username, status_key, new_status):
    check_query = Query()
    check_if_user_exists = THE_DB.search(check_query.id == username)
    if len(check_if_user_exists) == 0:
        return
    THE_DB.update({status_key: new_status}, check_query.id == username)
    return new_status


问题


面经


文章

微信
公众号

扫码关注公众号