test_thread.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:oil 作者: oilshell 项目源码 文件源码
def readerThread(self, d, readerNum):
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        c = d.cursor()
        count = 0
        rec = dbutils.DeadlockWrap(c.first, max_retries=10)
        while rec:
            count += 1
            key, data = rec
            self.assertEqual(self.makeData(key), data)
            rec = dbutils.DeadlockWrap(c.next, max_retries=10)
        if verbose:
            print "%s: found %d records" % (name, count)
        c.close()

        if verbose:
            print "%s: thread finished" % name
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号