def setUpClass(cls):
# Monkeypatch Subscriber so sub balance lookups succeed.
cls.original_subscriber = utilities.subscriber
cls.mock_subscriber = mocks.MockSubscriber()
utilities.subscriber = cls.mock_subscriber
subscriber.create_subscriber('IMSI901550000000084', '5551234')
subscriber.create_subscriber('IMSI901550000000082', '5551235')
# Connect to the GPRSDB and EventStore.
cls.gprs_db = gprs_database.GPRSDB()
cls.event_store = events.EventStore()
# Add some records to the GPRSDB. The method we're testing should
# extract these records and create events in the EventStore.
cls.now = time.time()
records = [
(psycopg2.TimestampFromTicks(cls.now - 120), 'IMSI901550000000084',
'192.168.99.1', 50, 80, 50, 80),
(psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000084',
'192.168.99.1', 50, 80, 0, 0),
(psycopg2.TimestampFromTicks(cls.now - 30), 'IMSI901550000000084',
'192.168.99.1', 300, 500, 250, 420),
(psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000084',
'192.168.99.1', 700, 600, 400, 100),
(psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000084',
'192.168.99.1', 750, 625, 50, 25),
# Create events for a different IMSI.
(psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000082',
'192.168.99.2', 50, 80, 0, 0),
(psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000082',
'192.168.99.2', 400, 300, 350, 220),
(psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000082',
'192.168.99.2', 450, 325, 50, 25),
]
schema = ('record_timestamp, imsi, ipaddr, uploaded_bytes,'
' downloaded_bytes, uploaded_bytes_delta,'
' downloaded_bytes_delta')
connection = psycopg2.connect(host='localhost', database='endaga',
user=PG_USER, password=PG_PASSWORD)
with connection.cursor() as cursor:
for record in records:
values = "%s, '%s', '%s', %s, %s, %s, %s" % record
command = 'insert into gprs_records (%s) values(%s)' % (
schema, values)
cursor.execute(command)
connection.commit()
gprs_tests.py 文件源码
python
阅读 35
收藏 0
点赞 0
评论 0
评论列表
文章目录