dbtools_test.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:feedlark 作者: CPSSD 项目源码 文件源码
def test_upserter(self):
        ident = bson.objectid.ObjectId()
        req = {"database":"testing", "collection":"unit_tests", "data":{"_id":ident, "inserttime":time(), "test":"upserter", "dank":"memes"}}
        bson_req = bson.BSON.encode(req)
        raw_response = self.client.submit_job('db-add', str(bson_req))

        upsert_req = {"database":"testing", "collection":"unit_tests", "data":{"selector":{"_id":ident}, "updates":{"dank":"cave"}}}
        bson_req = bson.BSON.encode(upsert_req)
        raw_response = self.client.submit_job('db-upsert', str(bson_req))
        resp = bson.BSON.decode(bson.BSON(raw_response.result))

        self.assertTrue("status" in resp)
        self.assertTrue("new_doc" in resp)
        self.assertEquals(resp["status"], "ok")
        self.assertEquals(resp["new_doc"], False)

        get_req = {"database":"testing", "collection":"unit_tests", "query": {"_id": ident}, "projection": {"dank": 1}}
        bson_req = bson.BSON.encode(get_req)
        raw_response = self.client.submit_job('db-get', str(bson_req))
        resp = bson.BSON.decode(bson.BSON(raw_response.result))

        self.assertTrue("status" in resp)
        self.assertEquals(resp["status"], "ok")
        self.assertEquals(len(resp["docs"]), 1)
        self.assertEquals(resp["docs"][0]["dank"], "cave")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号