python类delete()的实例源码

test_appengine.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_delete_ndb(self):
        # Start empty
        storage = appengine.StorageByKeyName(
            appengine.CredentialsNDBModel, 'foo', 'credentials')
        self.assertEqual(None, storage.get())

        # Add credentials to model with storage, and check equivalent
        # w/o storage
        storage.put(self.credentials)
        credmodel = appengine.CredentialsNDBModel.get_by_id('foo')
        self.assertEqual(credmodel.credentials.to_json(),
                         self.credentials.to_json())

        # Delete and make sure empty
        storage.delete()
        self.assertEqual(None, storage.get())
test_appengine.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_delete_db_ndb_mixed(self):
        # Start empty
        storage_ndb = appengine.StorageByKeyName(
            appengine.CredentialsNDBModel, 'foo', 'credentials')
        storage = appengine.StorageByKeyName(
            appengine.CredentialsModel, 'foo', 'credentials')

        # First DB, then NDB
        self.assertEqual(None, storage.get())
        storage.put(self.credentials)
        self.assertNotEqual(None, storage.get())

        storage_ndb.delete()
        self.assertEqual(None, storage.get())

        # First NDB, then DB
        self.assertEqual(None, storage_ndb.get())
        storage_ndb.put(self.credentials)

        storage.delete()
        self.assertNotEqual(None, storage_ndb.get())
        # NDB uses memcache and an instance cache (Context)
        ndb.get_context().clear_cache()
        memcache.flush_all()
        self.assertEqual(None, storage_ndb.get())
test_appengine.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_build_and_parse_state(self):
        secret = appengine.xsrf_secret_key()

        # Secret shouldn't change from call to call.
        secret2 = appengine.xsrf_secret_key()
        self.assertEqual(secret, secret2)

        # Secret shouldn't change if memcache goes away.
        memcache.delete(appengine.XSRF_MEMCACHE_ID,
                        namespace=appengine.OAUTH2CLIENT_NAMESPACE)
        secret3 = appengine.xsrf_secret_key()
        self.assertEqual(secret2, secret3)

        # Secret should change if both memcache and the model goes away.
        memcache.delete(appengine.XSRF_MEMCACHE_ID,
                        namespace=appengine.OAUTH2CLIENT_NAMESPACE)
        model = appengine.SiteXsrfSecretKey.get_or_insert('site')
        model.delete()

        secret4 = appengine.xsrf_secret_key()
        self.assertNotEqual(secret3, secret4)
test_appengine.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_delete_ndb(self):
        # Start empty
        storage = appengine.StorageByKeyName(
            appengine.CredentialsNDBModel, 'foo', 'credentials')
        self.assertEqual(None, storage.get())

        # Add credentials to model with storage, and check equivalent
        # w/o storage
        storage.put(self.credentials)
        credmodel = appengine.CredentialsNDBModel.get_by_id('foo')
        self.assertEqual(credmodel.credentials.to_json(),
                         self.credentials.to_json())

        # Delete and make sure empty
        storage.delete()
        self.assertEqual(None, storage.get())
test_appengine.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_delete_db_ndb_mixed(self):
        # Start empty
        storage_ndb = appengine.StorageByKeyName(
            appengine.CredentialsNDBModel, 'foo', 'credentials')
        storage = appengine.StorageByKeyName(
            appengine.CredentialsModel, 'foo', 'credentials')

        # First DB, then NDB
        self.assertEqual(None, storage.get())
        storage.put(self.credentials)
        self.assertNotEqual(None, storage.get())

        storage_ndb.delete()
        self.assertEqual(None, storage.get())

        # First NDB, then DB
        self.assertEqual(None, storage_ndb.get())
        storage_ndb.put(self.credentials)

        storage.delete()
        self.assertNotEqual(None, storage_ndb.get())
        # NDB uses memcache and an instance cache (Context)
        ndb.get_context().clear_cache()
        memcache.flush_all()
        self.assertEqual(None, storage_ndb.get())
test_appengine.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_build_and_parse_state(self):
        secret = appengine.xsrf_secret_key()

        # Secret shouldn't change from call to call.
        secret2 = appengine.xsrf_secret_key()
        self.assertEqual(secret, secret2)

        # Secret shouldn't change if memcache goes away.
        memcache.delete(appengine.XSRF_MEMCACHE_ID,
                        namespace=appengine.OAUTH2CLIENT_NAMESPACE)
        secret3 = appengine.xsrf_secret_key()
        self.assertEqual(secret2, secret3)

        # Secret should change if both memcache and the model goes away.
        memcache.delete(appengine.XSRF_MEMCACHE_ID,
                        namespace=appengine.OAUTH2CLIENT_NAMESPACE)
        model = appengine.SiteXsrfSecretKey.get_or_insert('site')
        model.delete()

        secret4 = appengine.xsrf_secret_key()
        self.assertNotEqual(secret3, secret4)
github_utils.py 文件源码 项目:t2-crash-reporter 作者: tessel 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_comment_job(cls, fingerprint):
        """
        Handles the create comment job
        """
        crash_report = None
        try:
            github_client = GithubClient()
            crash_report = CrashReport.get_crash(fingerprint)
            if crash_report is not None:
                github_client.create_comment(crash_report)
        except Exception, e:
            logging.error('Error creating comment for fingerprint ({0}) [{1}]'.format(fingerprint, str(e)))
        finally:
            # remove the backoff cache key, so future jobs may be enqueued
            backoff_cache_key = cls.backoff_crash_key_new_comment(crash_report)
            memcache.delete(backoff_cache_key)
db.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def PutAsync( entities, **kwargs ):
    """
        Asynchronously store one or more entities in the data store.

        This function is identical to :func:`server.db.Put`, except that it
        returns an asynchronous object. Call ``get_result()`` on the return value to
        block on the call and get the results.
    """
    if isinstance( entities, Entity ):
        entities._fixUnindexedProperties()
    elif isinstance( entities, List ):
        for entity in entities:
            assert isinstance( entity, Entity )
            entity._fixUnindexedProperties()
    if conf["viur.db.caching" ]>0:
        if isinstance( entities, Entity ): #Just one:
            if entities.is_saved(): #Its an update
                memcache.delete( str( entities.key() ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
        elif isinstance( entities, list ):
            for entity in entities:
                assert isinstance( entity, Entity )
                if entity.is_saved(): #Its an update
                    memcache.delete( str( entity.key() ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
    return( datastore.PutAsync( entities, **kwargs ) )
db.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def DeleteAsync(keys, **kwargs):
    """
        Asynchronously deletes one or more entities from the data store.

        This function is identical to :func:`server.db.Delete`, except that it
        returns an asynchronous object. Call ``get_result()`` on the return value to
        block on the call and get the results.
    """
    if conf["viur.db.caching" ]>0:
        if isinstance( keys, datastore_types.Key ): #Just one:
            memcache.delete( str( keys ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
        elif isinstance( keys, list ):
            for key in keys:
                assert isinstance( key, datastore_types.Key ) or isinstance( key, basestring )
                memcache.delete( str( key ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
    return( datastore.DeleteAsync( keys, **kwargs ) )
main.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def post(self):
        # We set the same parent key on the 'Greeting' to ensure each greeting
        # is in the same entity group. Queries across the single entity group
        # are strongly consistent. However, the write rate to a single entity
        # group is limited to ~1/second.
        guestbook_name = self.request.get('guestbook_name')
        greeting = Greeting(parent=guestbook_key(guestbook_name))

        if users.get_current_user():
            greeting.author = users.get_current_user().nickname()

        greeting.content = self.request.get('content')
        greeting.put()
        memcache.delete('{}:greetings'.format(guestbook_name))
        self.redirect('/?' +
                      urllib.urlencode({'guestbook_name': guestbook_name}))
recording.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def end_recording(status, firepython_set_extension_data=None):
  """Stop recording RPC traces and save all traces to memcache.

  This clears the recorder set for this request in 'recorder_proxy'.

  Args:
    status: HTTP Status, a 3-digit integer.
  """
  if firepython_set_extension_data is not None:
    warnings.warn('Firepython is no longer supported')
  rec = recorder_proxy.get_for_current_request()
  recorder_proxy.clear_for_current_request()
  if config.DEBUG:
    logging.debug('Cleared recorder')
  if rec is not None:
    try:
      rec.record_http_status(status)
      rec.save()
    finally:
      memcache.delete(lock_key(), namespace=config.KEY_NAMESPACE)
__init__.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def post(self):
    """Handle POST."""
    index_name = self.request.get('index')
    namespace = self.request.get('namespace')

    docs = []
    index = 0
    num_docs = int(self.request.get('numdocs'))
    for i in xrange(1, num_docs+1):
      key = self.request.get('doc%d' % i)
      if key:
        docs.append(key)

    index = search.Index(name=index_name, namespace=namespace)
    index.delete(docs)
    self.redirect(self.request.get('next'))
query_cache.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def purge_keys(keys):
    for k in keys:
        memcache.delete(MC_GUARD%k)
        memcache.delete(MC_KEY%k)
app_engine.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_token(unique_key, token_str):
  """Saves the serialized auth token in the datastore.

  The token is also stored in memcache to speed up retrieval on a cache hit.

  Args:
    unique_key: The unique name for this token as a string. It is up to your
        code to ensure that this token value is unique in your application.
        Previous values will be silently overwitten.
    token_str: A serialized auth token as a string. I expect that this string
        will be generated by gdata.gauth.token_to_blob.

  Returns:
    True if the token was stored sucessfully, False if the token could not be
    safely cached (if an old value could not be cleared). If the token was
    set in memcache, but not in the datastore, this function will return None.
    However, in that situation an exception will likely be raised.

  Raises:
    Datastore exceptions may be raised from the App Engine SDK in the event of
    failure.
  """
  # First try to save in memcache.
  result = memcache.set(unique_key, token_str)
  # If memcache fails to save the value, clear the cached value.
  if not result:
    result = memcache.delete(unique_key)
    # If we could not clear the cached value for this token, refuse to save.
    if result == 0:
      return False
  # Save to the datastore.
  if Token(key_name=unique_key, t=token_str).put():
    return True
  return None
app_engine.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_token(unique_key):
  # Clear from memcache.
  memcache.delete(unique_key)
  # Clear from the datastore.
  Token(key_name=unique_key).delete()
memcache.py 文件源码 项目:isthislegit 作者: duo-labs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete(key=None):
    parts_count = memcache.get(key='%s_parts' % key)
    if parts_count:
        for i in xrange(parts_count):
            memcache.delete(key='%s%d' % (key, i))

    memcache.delete(key=key)
    memcache.delete(key='%s_parts' % key)
test_appengine.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_and_put_cached(self):
        storage = appengine.StorageByKeyName(
            appengine.CredentialsModel, 'foo', 'credentials', cache=memcache)

        self.assertEqual(None, storage.get())
        self.credentials.set_store(storage)

        http = http_mock.HttpMock(data=BASIC_RESP)
        self.credentials._refresh(http)
        credmodel = appengine.CredentialsModel.get_by_key_name('foo')
        self.assertEqual(BASIC_TOKEN, credmodel.credentials.access_token)

        # Now remove the item from the cache.
        memcache.delete('foo')

        # Check that getting refreshes the cache.
        credentials = storage.get()
        self.assertEqual(BASIC_TOKEN, credentials.access_token)
        self.assertNotEqual(None, memcache.get('foo'))

        # Deleting should clear the cache.
        storage.delete()
        credentials = storage.get()
        self.assertEqual(None, credentials)
        self.assertEqual(None, memcache.get('foo'))

        # Verify mock.
        self._verify_basic_refresh(http)
test_appengine.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_storage_delete(self, new_http):
        new_http.return_value = http_mock.HttpMock(data=DEFAULT_RESP)
        # An initial request to an oauth_required decorated path should be a
        # redirect to start the OAuth dance.
        response = self.app.get('/foo_path')
        self.assertTrue(response.status.startswith('302'))

        with mock.patch.object(appengine, '_parse_state_value',
                               return_value='foo_path',
                               autospec=True) as parse_state_value:
            # Now simulate the callback to /oauth2callback.
            response = self.app.get('/oauth2callback', {
                'code': 'foo_access_code',
                'state': 'foo_path:xsrfkey123',
            })
            self.assertEqual('http://localhost/foo_path',
                             response.headers['Location'])
            self.assertEqual(None, self.decorator.credentials)

            # Now requesting the decorated path should work.
            response = self.app.get('/foo_path')

            self.assertTrue(self.had_credentials)

            # Credentials should be cleared after each call.
            self.assertEqual(None, self.decorator.credentials)

            # Invalidate the stored Credentials.
            self.found_credentials.store.delete()

            # Invalid Credentials should start the OAuth dance again.
            response = self.app.get('/foo_path')
            self.assertTrue(response.status.startswith('302'))

            parse_state_value.assert_called_once_with(
                'foo_path:xsrfkey123', self.current_user)

        # Check the mocks were called.
        new_http.assert_called_once_with()
test_appengine.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_get_and_put_cached(self):
        storage = appengine.StorageByKeyName(
            appengine.CredentialsModel, 'foo', 'credentials', cache=memcache)

        self.assertEqual(None, storage.get())
        self.credentials.set_store(storage)

        http = http_mock.HttpMock(data=BASIC_RESP)
        self.credentials._refresh(http)
        credmodel = appengine.CredentialsModel.get_by_key_name('foo')
        self.assertEqual(BASIC_TOKEN, credmodel.credentials.access_token)

        # Now remove the item from the cache.
        memcache.delete('foo')

        # Check that getting refreshes the cache.
        credentials = storage.get()
        self.assertEqual(BASIC_TOKEN, credentials.access_token)
        self.assertNotEqual(None, memcache.get('foo'))

        # Deleting should clear the cache.
        storage.delete()
        credentials = storage.get()
        self.assertEqual(None, credentials)
        self.assertEqual(None, memcache.get('foo'))

        # Verify mock.
        self._verify_basic_refresh(http)
test_appengine.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_storage_delete(self, new_http):
        new_http.return_value = http_mock.HttpMock(data=DEFAULT_RESP)
        # An initial request to an oauth_required decorated path should be a
        # redirect to start the OAuth dance.
        response = self.app.get('/foo_path')
        self.assertTrue(response.status.startswith('302'))

        with mock.patch.object(appengine, '_parse_state_value',
                               return_value='foo_path',
                               autospec=True) as parse_state_value:
            # Now simulate the callback to /oauth2callback.
            response = self.app.get('/oauth2callback', {
                'code': 'foo_access_code',
                'state': 'foo_path:xsrfkey123',
            })
            self.assertEqual('http://localhost/foo_path',
                             response.headers['Location'])
            self.assertEqual(None, self.decorator.credentials)

            # Now requesting the decorated path should work.
            response = self.app.get('/foo_path')

            self.assertTrue(self.had_credentials)

            # Credentials should be cleared after each call.
            self.assertEqual(None, self.decorator.credentials)

            # Invalidate the stored Credentials.
            self.found_credentials.store.delete()

            # Invalid Credentials should start the OAuth dance again.
            response = self.app.get('/foo_path')
            self.assertTrue(response.status.startswith('302'))

            parse_state_value.assert_called_once_with(
                'foo_path:xsrfkey123', self.current_user)

        # Check the mocks were called.
        new_http.assert_called_once_with()
gae.py 文件源码 项目:vishnu 作者: anomaly 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def clear(self):
        super(Client, self).clear()
        if self._sid:
            memcache.delete(self._sid, namespace=NAMESPACE)
github_utils.py 文件源码 项目:t2-crash-reporter 作者: tessel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_issue_job(cls, fingerprint):
        """
        Handles the create issue job.
        """
        crash_report = None
        try:
            github_client = GithubClient()
            crash_report = CrashReport.get_crash(fingerprint)
            if crash_report is not None:
                # create the github issue
                issue = github_client.create_issue(crash_report)
                logging.info(
                    'Created GitHub Issue No({0}) for crash ({1})'.format(issue.number, crash_report.fingerprint))
                # update the crash report with the issue id
                updated_report = CrashReports.update_crash_report(crash_report.fingerprint, {
                    # convert to unicode string
                    'issue': str(issue.number)
                })
                logging.info(
                    'Updating crash report with fingerprint ({0}) complete.'.format(updated_report.fingerprint))
        except Exception, e:
            logging.error('Error creating issue for fingerprint ({0}) [{1}]'.format(fingerprint, str(e)))
        finally:
            # remove the backoff cache key, so future jobs may be enqueued
            backoff_cache_key = cls.backoff_crash_key_new_crash(crash_report)
            memcache.delete(backoff_cache_key)
userinfo_utility.py 文件源码 项目:ifttt-line 作者: MypaceEngine 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def deleteUserData(id):
    # memcache.delete(key="MakerSecret-"+id)
    # memcache.delete(key="USER-"+id)
    # dashid=memcache.get(key="User-dash-"+id)
    # memcache.delete(key = "User-dash-"+id)
    # memcache.delete(key = "Dash-user-"+str(dashid))

    try:
        key = Key.from_path('UserData', id)
        entity = datastore.Delete(key)
    except:
        logging.debug(id + u"???????????")

### Private methods
userinfo_utility.py 文件源码 项目:ifttt-line 作者: MypaceEngine 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _clearCurrentUser():
    return memcache.delete(key="CURRENTUSR")


### Tests
db.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def Put( entities, **kwargs ):
    """
        Store one or more entities in the data store.

        The entities may be new or previously existing. For new entities,
        ``Put()`` will fill in the app id and key assigned by the data store.

        :param entities: Entity or list of entities to be stored.
        :type entities: :class:`server.db.Entity` | list of :class:`server.db.Entity`

        :param config: Optional configuration to use for this request. This must be specified\
        as a keyword argument.
        :type config: dict

        :returns: If the argument ``entities`` is a single :class:`server.db.Entity`, \
        a single Key is returned. If the argument is a list of :class:`server.db.Entity`, \
        a list of Keys will be returned.
        :rtype: Key | list of keys

        :raises: :exc:`TransactionFailedError`, if the action could not be committed.
    """
    if isinstance( entities, Entity ):
        entities._fixUnindexedProperties()
    elif isinstance( entities, list ):
        for entity in entities:
            assert isinstance( entity, Entity )
            entity._fixUnindexedProperties()
    if conf["viur.db.caching" ]>0:
        if isinstance( entities, Entity ): #Just one:
            if entities.is_saved(): #Its an update
                memcache.delete( str( entities.key() ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
        elif isinstance( entities, list ):
            for entity in entities:
                assert isinstance( entity, Entity )
                if entity.is_saved(): #Its an update
                    memcache.delete( str( entity.key() ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
    return( datastore.Put( entities, **kwargs ) )
db.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def Delete(keys, **kwargs):
    """
        Deletes one or more entities from the data store.

        :warning: Permanently deletes entities, use with care!

        Deletes the given entity or entities from the data store. You can only delete
        entities from your app. If there is an error, the function raises a
        subclass of :exc:`datastore_errors.Error`.

        :param keys: Key, str or list of keys or strings to be deleted.
        :type keys: Key | str | list of Key | list of str

        :param config: Optional configuration to use for this request. This must be specified\
        as a keyword argument.
        :type config: dict

        :raises: :exc:`TransactionFailedError`, if the deletion could not be committed.
    """
    if conf["viur.db.caching" ]>0:
        if isinstance( keys, datastore_types.Key ) or isinstance( keys, basestring ): #Just one:
            memcache.delete( str( keys ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
        elif isinstance( keys, list ):
            for key in keys:
                assert isinstance( key, datastore_types.Key ) or isinstance( key, basestring )
                memcache.delete( str( key ), namespace=__CacheKeyPrefix__, seconds=__cacheLockTime__  )
    return( datastore.Delete( keys, **kwargs ) )
app_engine.py 文件源码 项目:gdata-python3 作者: dvska 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_token(unique_key, token_str):
    """Saves the serialized auth token in the datastore.

    The token is also stored in memcache to speed up retrieval on a cache hit.

    Args:
      unique_key: The unique name for this token as a string. It is up to your
          code to ensure that this token value is unique in your application.
          Previous values will be silently overwitten.
      token_str: A serialized auth token as a string. I expect that this string
          will be generated by gdata.gauth.token_to_blob.

    Returns:
      True if the token was stored sucessfully, False if the token could not be
      safely cached (if an old value could not be cleared). If the token was
      set in memcache, but not in the datastore, this function will return None.
      However, in that situation an exception will likely be raised.

    Raises:
      Datastore exceptions may be raised from the App Engine SDK in the event of
      failure.
    """
    # First try to save in memcache.
    result = memcache.set(unique_key, token_str)
    # If memcache fails to save the value, clear the cached value.
    if not result:
        result = memcache.delete(unique_key)
        # If we could not clear the cached value for this token, refuse to save.
        if result == 0:
            return False
    # Save to the datastore.
    if Token(key_name=unique_key, t=token_str).put():
        return True
    return None
app_engine.py 文件源码 项目:gdata-python3 作者: dvska 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_token(unique_key):
    # Clear from memcache.
    memcache.delete(unique_key)
    # Clear from the datastore.
    Token(key_name=unique_key).delete()
snippets_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_get_data_not_present(query_fn, testbed):
    data = snippets.get_data()
    query_fn.assert_called_once_with()
    assert data == 'data'
    memcache.delete('key')
snippets_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_get_data_present(query_fn, testbed):
    memcache.add('key', 'data', 9000)
    data = snippets.get_data()
    query_fn.assert_not_called()
    assert data == 'data'
    memcache.delete('key')


问题


面经


文章

微信
公众号

扫码关注公众号