test_api_v2.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码
def test_deactivate_subscriber(self):
        """We can deactivate the Subscriber via DELETE.

        Disassociates the subscriber with its BTS and Network.  Starts an async
        post task (which we will mock) to send this info to the client.
        Deactivates all numbers associated with the subscriber and creates a
        'delete_imsi' UsageEvent.  Also deletes all associated
        PendingCreditUpdates.
        """
        url = '/api/v2/subscribers/%s' % self.sub.imsi
        header = {
            'HTTP_AUTHORIZATION': 'Token %s' % self.user_profile.network.api_token
        }
        with mock.patch('endagaweb.celery.app.send_task') as mocked_task:
            response = self.client.delete(url, **header)
        self.assertEqual(200, response.status_code)
        # The subscriber should no longer be in the DB.
        self.assertEqual(0, models.Subscriber.objects.filter(
            imsi=self.sub.imsi).count())
        # Both of the associated numbers should have been deactivated -- reload
        # them from the DB to check their state.
        number = models.Number.objects.get(id=self.number.id)
        self.assertEqual('available', number.state)
        self.assertEqual(None, number.network)
        self.assertEqual(None, number.subscriber)
        number2 = models.Number.objects.get(id=self.number2.id)
        self.assertEqual('available', number2.state)
        # The associated PendingCreditUpdate should be gone.
        self.assertEqual(0, models.PendingCreditUpdate.objects.filter(
            pk=self.pcu.pk).count())
        # The mocked task should have been called with specific arguments
        self.assertTrue(mocked_task.called)
        args, _ = mocked_task.call_args
        task_name, task_args = args
        task_endpoint, task_data = task_args
        self.assertEqual('endagaweb.tasks.async_post', task_name)
        expected_url = '%s/config/deactivate_subscriber' % self.bts.inbound_url
        self.assertEqual(expected_url, task_endpoint)
        # The task_data should be signed with the BTS UUID and should have a
        # jwt key which is a dict with a imsi key.
        serializer = itsdangerous.JSONWebSignatureSerializer(self.bts.secret)
        task_data = serializer.loads(task_data['jwt'])
        self.assertEqual(self.sub.imsi, task_data['imsi'])
        # A 'delete_imsi' UsageEvent should have been created.
        event_count = models.UsageEvent.objects.filter(
            subscriber_imsi=self.sub.imsi, kind='delete_imsi').count()
        self.assertEqual(1, event_count)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号