python类Key()的实例源码

__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init_zoneinfo():
    """
    Add each zone info to the datastore. This will overwrite existing zones.

    This must be called before the AppengineTimezoneLoader will work.
    """
    import os, logging
    from zipfile import ZipFile
    zoneobjs = []

    zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
      'zoneinfo.zip'))

    with ZipFile(zoneinfo_path) as zf:
        for zfi in zf.filelist:
            key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE)
            zobj = Zoneinfo(key=key, data=zf.read(zfi))
            zoneobjs.append(zobj)

    logging.info("Adding %d timezones to the pytz-appengine database" %
        len(zoneobjs)
        )

    ndb.put_multi(zoneobjs)
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def open_resource(name):
    """Load the object from the datastore"""
    import logging
    from cStringIO import StringIO
    try:
        data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data
    except AttributeError:
        # Missing zone info; test for GMT - which would be there if the 
        # Zoneinfo has been initialized.
        if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get():
          # the user asked for a zone that doesn't seem to exist.
          logging.exception("Requested zone '%s' is not in the database." %
              name)
          raise

        # we need to initialize the database
        init_zoneinfo()
        return open_resource(name)

    return StringIO(data)
_google_appengine_ext_ndb.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def encode_ndb_key(key, encoder=None):
    """
    When encountering an L{ndb.Key} instance, find the entity in the datastore
    and encode that.
    """
    klass = ndb.Model._kind_map.get(key.kind())

    referenced_object = _get_by_class_key(
        encoder,
        klass,
        key,
    )

    if not referenced_object:
        encoder.writeElement(None)

        return

    encoder.writeObject(referenced_object)
test_ndb.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def test_key_no_entity(self):
        """
        Attempting to encode an `ndb.Key` that does not have a matching entity
        in the datastore must be `None`.
        """
        key = ndb.Key('SimpleEntity', 'bar')

        self.assertIsNone(key.get())

        self.assertEncodes(key, (
            b'\x05',
        ), encoding=pyamf.AMF0)

        self.assertEncodes(key, (
            b'\x01'
        ), encoding=pyamf.AMF3)
test_ndb.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def test_key_with_entity(self):
        """
        Attempting to encode an `ndb.Key` that DOES have a matching entity
        in the datastore must encode that entity.
        """
        key = ndb.Key('SimpleEntity', 'bar')
        entity = models.SimpleEntity(key=key)

        entity.put()

        self.assertEncodes(key, (
            b'\x03', (
                b'\x00\x04_key\x02\x002agx0ZXN0YmVkLXRlc3RyFQsSDFNpbXBsZUVudGl'
                b'0eSIDYmFyDA'
            ),
            '\x00\x00\t'
        ), encoding=pyamf.AMF0)

        self.assertEncodes(key, (
            '\n\x0b\x01', (
                '\t_key\x06eagx0ZXN0YmVkLXRlc3RyFQsSDFNpbXBsZUVudGl0eSIDYmFyDA'
            ),
            '\x01'
        ), encoding=pyamf.AMF3)
main.py 文件源码 项目:arithmancer 作者: google 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def GetUserByAuth():
  """Returns current users profile."""
  user_key = users.get_current_user().user_id()
  user_key = ndb.Key('Profile', user_key)
  profile = user_key.get()
  for ledger in profile.user_ledger:
    try:
      price = GetPriceByPredictionId(ledger.prediction_id)
      ledger.value = math.fabs((price * ledger.contract_one) - (
          price * ledger.contract_two))
      ledger.prediction_statement = ndb.Key(
          urlsafe=ledger.prediction_id).get().statement
    except:
      ledger.value = 404
      ledger.prediction_statement = 'ERROR'
  return render_template('profile.html', profile=profile)
main.py 文件源码 项目:arithmancer 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def SellStake():
  user_id = users.get_current_user().user_id()
  user_key = ndb.Key('Profile', user_id)
  current_user = user_key.get()
  prediction_key = ndb.Key(urlsafe=request.form['prediction_id'])
  prediction = prediction_key.get()
  portfolio = GetUserPortfolioByAuth(request.form['prediction_id'])
  for ledger in portfolio:
      if ledger.contract_one > 0:
          contract = 'CONTRACT_ONE'
          quantity = ledger.contract_one
      else:
          contract = 'CONTRACT_TWO'
          quantity = ledger.contract_two
  trade = Trade(
      prediction_id=prediction_key,
      user_id=user_key,
      direction='SELL',
      contract=contract,
      quantity=float(quantity))
  err = CreateTradeAction(prediction, current_user, trade)
  if err != 'error':
      flash('You sold your stake!')
  return redirect('/users/me')
model_base.py 文件源码 项目:flask-gae-ndb-starter 作者: jideobs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _SetAncestor(self, value):
        """Setter to be used for public ancestor property on query info.

Args:
  value: A potential value for an ancestor.

Raises:
  AttributeError: if query on the object is already final.
  AttributeError: if the ancestor has already been set.
  TypeError: if the value to be set is not an instance of ndb.Key.
"""
        if self._query_final is not None:
            raise AttributeError('Can\'t set ancestor. Query info is final.')

        if self._ancestor is not None:
            raise AttributeError('Ancestor can\'t be set twice.')
        if not isinstance(value, ndb.Key):
            raise TypeError('Ancestor must be an instance of ndb.Key.')
        self._ancestor = value
model_base.py 文件源码 项目:flask-gae-ndb-starter 作者: jideobs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def from_filter_data(cls, filter_data):
        url_string = filter_data.get(UNIQUE_ID)
        if url_string:
            entity_key = ndb.Key(urlsafe=url_string)

            if entity_key:
                filter_data.pop(UNIQUE_ID)
                entity = entity_key.get()
                for field_name, value in filter_data.iteritems():
                    if getattr(entity, field_name) != value:
                        return None
                return entity
            else:
                return None
        else:
            entity_query = cls.query()
            for field_name, value in filter_data.iteritems():
                value_property = _verify_property(cls, field_name)
                entity_query = entity_query.filter(value_property == value)
            return entity_query.fetch()
model_base.py 文件源码 项目:flask-gae-ndb-starter 作者: jideobs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_json(self, request_data):
        """
        Update entity with new data from json.

        check for data to transform, if needed, perform operations and assign values to respective properties in entity.
        :param request_data:
        :return:
        """
        for property, value in request_data.iteritems():
            prop_type = self._properties.get(property)
            if prop_type:
                prop_value = value
                if isinstance(prop_type, (ndb.DateProperty, ndb.DateTimeProperty, ndb.TimeProperty)):
                    prop_value = utils.date_from_str(prop_type, prop_value)
                elif isinstance(prop_type, ndb.KeyProperty):
                    prop_value = ndb.Key(urlsafe=prop_value)
                setattr(self, property, prop_value)
model_base.py 文件源码 项目:flask-gae-ndb-starter 作者: jideobs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def transform_response(self, transform_fields=None):
        """
        Select ndb.Key property types for their respective data response.
        :param transform_fields: optional list or tuple which is used to specify returned properties for a
                    ndb.Key property.
        :return:
        """
        data = self._to_dict()
        for property_name, value in data.iteritems():
            if isinstance(value, ndb.Key):
                property_value = value.get()
                if property_value:
                    property_value = property_value.to_json()
            else:
                property_value = self.to_json_data(value)
            data[property_name] = property_value
        data['id'] = self.key.urlsafe()
        return data
preferences.py 文件源码 项目:beans 作者: Yelp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def preferences_api_post(subscription):
    data = request.json
    user = get_user()
    if not user:
        return '400'

    subscription_key = ndb.Key(urlsafe=subscription)
    form_selection = {}
    for key, value in data.items():
        form_selection[ndb.Key(urlsafe=key)] = value

    removed = remove_preferences(user, form_selection, subscription_key)
    logging.info('Removed')
    logging.info(removed)

    added = add_preferences(user, form_selection, subscription_key)
    logging.info('Added')
    logging.info(added)

    return 'OK'
meeting_requests.py 文件源码 项目:beans 作者: Yelp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_delete_meeting_request():
    user = get_user()
    data = request.json
    meeting_spec_key = data['meeting_spec_key']
    meeting_request_key = data['meeting_request_key']

    if meeting_request_key == '':
        meeting_spec = ndb.Key(urlsafe=meeting_spec_key)
        if not meeting_spec:
            return 400
        meeting_request = query_meeting_request(meeting_spec, user)

        if not meeting_request:
            meeting_request = MeetingRequest(meeting_spec=meeting_spec, user=user.key)
            meeting_request.put()

        return json.dumps({'key': meeting_request.key.urlsafe()})
    else:
        meeting_request = ndb.Key(urlsafe=meeting_request_key)
        meeting_request.delete()
        return json.dumps({'key': ''})
subscription.py 文件源码 项目:beans 作者: Yelp 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def filter_subscriptions_by_user_data(subscriptions, user):
    approved_subscriptions = []
    for subscription in subscriptions:
        subscription_rules = ndb.Key(urlsafe=subscription['id']).get().user_rules

        if subscription.get('rule_logic') == 'any':
            assert subscription_rules, 'You created logic for rules but don\'t have any rules!'
            approved = apply_rules(user, subscription, subscription_rules, any)
        elif subscription.get('rule_logic') == 'all':
            assert subscription_rules, 'You created logic for rules but don\'t have any rules!'
            approved = apply_rules(user, subscription, subscription_rules, all)
        else:
            approved = subscription

        if approved is not None:
            approved_subscriptions.append(approved)
    return approved_subscriptions
__init__.py 文件源码 项目:start 作者: argeweb 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def init_zoneinfo():
    """
    Add each zone info to the datastore. This will overwrite existing zones.

    This must be called before the AppengineTimezoneLoader will work.
    """
    import os
    import logging
    from zipfile import ZipFile
    zoneobjs = []

    zoneinfo_path = os.path.abspath(
        os.path.join(os.path.dirname(__file__), 'zoneinfo.zip'))

    with ZipFile(zoneinfo_path) as zf:
        for zfi in zf.filelist:
            key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE)
            zobj = Zoneinfo(key=key, data=zf.read(zfi))
            zoneobjs.append(zobj)

    logging.info(
        "Adding %d timezones to the pytz-appengine database" %
        len(zoneobjs))

    ndb.put_multi(zoneobjs)
__init__.py 文件源码 项目:start 作者: argeweb 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def open_resource(name):
    """Load the object from the datastore"""
    import logging
    from cStringIO import StringIO
    try:
        data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data
    except AttributeError:
        # Missing zone info; test for GMT
        # which would be there if the Zoneinfo has been initialized.
        if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get():
            # the user asked for a zone that doesn't seem to exist.
            logging.exception(
                "Requested zone '%s' is not in the database." % name)
            raise

        # we need to initialize the database
        init_zoneinfo()
        return open_resource(name)

    return StringIO(data)
models.py 文件源码 项目:tichu-tournament 作者: aragos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def PutChangeLog(self, changed_by):
    ''' Create a change log for the current state of the hand.

    Uses current timestamp in seconds as key.

    Args:
        changed_by: Integer. Pair number for the user requesting the change.
    '''
    change_dict = {
      "calls" : self.calls_dict(),
      "notes" : self.notes,
      "ns_score" : self.get_ns_score(),
      "ew_score" : self.get_ew_score(),
    }
    epoch = datetime.datetime.utcfromtimestamp(0)
    nowtime = datetime.datetime.now()
    change_log = ChangeLog(changed_by=changed_by, change=json.dumps(change_dict))
    change_log.key = ndb.Key("ChangeLog", str((nowtime - epoch).total_seconds()),
                             parent=self.key)
    change_log.put()
main.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self):
        self.response.out.write('<html><body>')
        guestbook_name = self.request.get('guestbook_name')
        ancestor_key = ndb.Key("Book", guestbook_name or "*notitle*")
        greetings = Greeting.query_book(ancestor_key).fetch(20)

        for greeting in greetings:
            self.response.out.write('<blockquote>%s</blockquote>' %
                                    cgi.escape(greeting.content))
# [END query]

        self.response.out.write("""
          <form action="/sign?%s" method="post">
            <div><textarea name="content" rows="3" cols="60"></textarea></div>
            <div><input type="submit" value="Sign Guestbook"></div>
          </form>
          <hr>
          <form>Guestbook name: <input value="%s" name="guestbook_name">
          <input type="submit" value="switch"></form>
        </body>
      </html>""" % (urllib.urlencode({'guestbook_name': guestbook_name}),
                    cgi.escape(guestbook_name)))


# [START submit]
guestbook_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_main(app):
    # Add a greeting to find
    guestbook.Greeting(
        content='Hello world',
        parent=ndb.Key('Book', 'brane3')).put()

    # Add a greeting to not find.
    guestbook.Greeting(
        content='Flat sheet',
        parent=ndb.Key('Book', 'brane2')).put()

    response = app.get('/?guestbook_name=brane3')

    assert response.status_int == 200
    assert 'Hello world' in response.body
    assert 'Flat sheet' not in response.body
main.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def add_note():
    page_name = flask.request.args.get('page_name', 'default')
    note_title = flask.request.form['note_title']
    note_text = flask.request.form['note_text']

    parent = parent_key(page_name)

    choice = random.randint(0, 1)
    if choice == 0:
        # Use transactional function
        # [START calling]
        note_key = ndb.Key(Note, note_title, parent=parent)
        note = Note(key=note_key, content=note_text)
        # [END calling]
        if pick_random_insert(note_key, note) is False:
            return ('Already there<br><a href="%s">Return</a>'
                    % flask.url_for('main_page', page_name=page_name))
        return flask.redirect(flask.url_for('main_page', page_name=page_name))
    elif choice == 1:
        # Use get_or_insert, which is transactional
        note = Note.get_or_insert(note_title, parent=parent, content=note_text)
        if note.content != note_text:
            return ('Already there<br><a href="%s">Return</a>'
                    % flask.url_for('main_page', page_name=page_name))
        return flask.redirect(flask.url_for('main_page', page_name=page_name))
relation_model_models_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_relationship(testbed):
    # Creates 1 contact and 2 companies
    addressbook_key = ndb.Key('AddressBook', 'tmatsuo')
    mary = models.Contact(parent=addressbook_key, name='Mary')
    mary.put()
    google = models.Company(name='Google')
    google.put()
    candit = models.Company(name='Candit')
    candit.put()

    # first google hires Mary
    models.ContactCompany(parent=addressbook_key,
                          contact=mary.key,
                          company=google.key,
                          title='engineer').put()
    # then another company named 'candit' hires Mary too
    models.ContactCompany(parent=addressbook_key,
                          contact=mary.key,
                          company=candit.key,
                          title='president').put()

    # get the list of companies that Mary belongs to
    assert len(mary.companies) == 2
datastore_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testEventuallyConsistentGlobalQueryResult(self):
        class TestModel(ndb.Model):
            pass

        user_key = ndb.Key('User', 'ryan')

        # Put two entities
        ndb.put_multi([
            TestModel(parent=user_key),
            TestModel(parent=user_key)
        ])

        # Global query doesn't see the data.
        self.assertEqual(0, TestModel.query().count(3))
        # Ancestor query does see the data.
        self.assertEqual(2, TestModel.query(ancestor=user_key).count(3))
# [END HRD_example_1]

    # [START HRD_example_2]
registration.py 文件源码 项目:ByDefaultProject 作者: zjresler 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post(self):

        accountname = self.session.get('account')
        if self.validate_user(accountname, SADMIN):
            suser = User.query(User.username == accountname).fetch()
            if len(suser) !=0:
                suser = suser[0]
            else:
                suser = None
            user_key = self.request.get('user_key')
            if user_key != '':
                fname = self.request.get('firstName')
                lname = self.request.get('lastName')
                pword = self.request.get('passWord')
                user = ndb.Key(urlsafe=user_key).get()
                user.firstname = fname
                user.lastname = lname
                user.password = pword
                user.put()
            self.success_redirect('DSUBMIT_SUCCESS', '/registrationhomepage')
        else:
            self.error_redirect('INVALID_LOGIN_STATE', '/logout')
popular_people.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_people_rankings_for_city_names_dev_remote(geoname_ids, attendees_only):
    rankings = []
    client = datastore.Client('dancedeets-hrd')

    for city_name in geoname_ids:
        q = client.query(kind='PRCityCategory')
        q.add_filter('city', '=', city_name)
        if attendees_only:
            q.add_filter('person_type', '=', 'ATTENDEE')

        for result in q.fetch(100):
            ranking = PRCityCategory()
            ranking.key = ndb.Key('PRCityCategory', result.key.name)
            ranking.person_type = result['person_type']
            ranking.geoname_id = result['geoname_id']
            ranking.category = result['category']
            ranking.top_people_json = json.loads(result.get('top_people_json', '[]'))
            rankings.append(ranking)
    return rankings
access_tokens.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_multiple_tokens(token_count):
    good_users = users.User.query(users.User.key >= ndb.Key(users.User, '701004'),
                                  users.User.expired_oauth_token == False).fetch(token_count)  # noqa
    guaranteed_users = [x for x in good_users if x.fb_uid == '701004']
    if guaranteed_users:
        guaranteed_user_token = guaranteed_users[0].fb_access_token
    else:
        guaranteed_user_token = None
    tokens = [x.fb_access_token for x in good_users]
    debug_token_infos = fb_api.lookup_debug_tokens(tokens)
    some_future_time = time.time() + 7 * (24 * 60 * 60)
    # Ensure our tokens are really still valid, and still valid a few days from now, for long-running mapreduces
    good_tokens = []
    for token, info in zip(tokens, debug_token_infos):
        if info['empty']:
            logging.error('Trying to lookup invalid access token: %s', token)
            continue
        if (info['token']['data']['is_valid'] and
            (info['token']['data']['expires_at'] == 0 or  # infinite token
             info['token']['data']['expires_at'] > some_future_time)):
            good_tokens.append(token)
    # Only trim out the guaranteed-token if we have some to spare
    if len(good_tokens) > 1:
        good_tokens = [x for x in good_tokens if x != guaranteed_user_token]
    return good_tokens
appengine.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _delete_entity(self):
        """Delete entity from datastore.

        Attempts to delete using the key_name stored on the object, whether or
        not the given key is in the datastore.
        """
        if self._is_ndb():
            ndb.Key(self._model, self._key_name).delete()
        else:
            entity_key = db.Key.from_path(self._model.kind(), self._key_name)
            db.delete(entity_key)
api.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_model(self):
        if not isinstance(self.json, dict):
            raise HTTPSuccess()
        key = self.json.get('key', None)
        if not key:
            logging.warn('send handler called without key')
            raise HTTPSuccess()
        key = ndb.Key(urlsafe=key)
        job = key.get()
        if job is None:
            logging.warn('job {} not found'.format(key))
            raise HTTPSuccess()
        return job
models.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_key(cls, user_id):
        """ Get Key by given user id """
        return ndb.Key(Account, user_id)
appengine.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _delete_entity(self):
    """Delete entity from datastore.

    Attempts to delete using the key_name stored on the object, whether or not
    the given key is in the datastore.
    """
    if self._is_ndb():
      ndb.Key(self._model, self._key_name).delete()
    else:
      entity_key = db.Key.from_path(self._model.kind(), self._key_name)
      db.delete(entity_key)
_google_appengine_ext_ndb.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decode_key(self, key):
        return ndb.Key(urlsafe=key)


问题


面经


文章

微信
公众号

扫码关注公众号