test_bigmessage.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码
def test_1_basic(self):
        c = boto.connect_sqs()

        # create a queue so we can test BigMessage
        queue_name = 'test%d' % int(time.time())
        timeout = 60
        queue = c.create_queue(queue_name, timeout)
        self.addCleanup(c.delete_queue, queue, True)
        queue.set_message_class(BigMessage)

        # create a bucket with the same name to store the message in
        s3 = boto.connect_s3()
        bucket = s3.create_bucket(queue_name)
        self.addCleanup(s3.delete_bucket, queue_name)
        time.sleep(30)

        # now add a message
        msg_body = 'This is a test of the big message'
        fp = StringIO(msg_body)
        s3_url = 's3://%s' % queue_name
        message = queue.new_message(fp, s3_url=s3_url)

        queue.write(message)
        time.sleep(30)

        s3_object_name = message.s3_url.split('/')[-1]

        # Make sure msg body is in bucket
        self.assertTrue(bucket.lookup(s3_object_name))

        m = queue.read()
        self.assertEqual(m.get_body().decode('utf-8'), msg_body)

        m.delete()
        time.sleep(30)

        # Make sure msg is deleted from bucket
        self.assertIsNone(bucket.lookup(s3_object_name))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号