test_notify.py 文件源码

python
阅读 44 收藏 0 点赞 0 评论 0

项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码
def notify(self, name, sec=0, payload=None):
        """Send a notification to the database, eventually after some time."""
        if payload is None:
            payload = ''
        else:
            payload = ", %r" % payload

        script = ("""\
import time
time.sleep(%(sec)s)
import %(module)s as psycopg2
import %(module)s.extensions as ext
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
""" % {
        'module': psycopg2.__name__,
        'dsn': dsn, 'sec': sec, 'name': name, 'payload': payload})

        return Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号