def get_feed_items(self, feed_url):
'''
This takes a url and returns the matching document in the feeds
database.
'''
request = bson.BSON.encode({
'key': self.key,
'database': 'feedlark',
'collection': 'feed',
'query': {
'url': feed_url,
},
'projection': {
'_id': 0,
},
})
# submit_job as below is blocking
gm_job = self.gm_client.submit_job('db-get', str(request))
return bson.BSON(gm_job.result).decode()['docs'][0]['items']
python类BSON的实例源码
def bsonify_update_data(item_id, url, all_data):
"""Convert given data to bson in valid format for db-update"""
items_list = {
"key": key,
"database": "feedlark",
"collection": "feed",
"data": {
"updates": {
"items": all_data,
"url": url
},
"selector": {
"_id": item_id
},
},
}
return bson.BSON.encode(items_list)
def get_all_feed_docs():
"""Submits a job to 'db-get' to get all ids and urls of the feeds"""
# format the request
to_get_urls_ids = str(bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "feed",
"query": {},
"projection": {
"_id": 1,
"url": 1,
"items": 1,
},
}))
url_fields_gotten = gm_client.submit_job("db-get", to_get_urls_ids)
bson_object = bson.BSON.decode(bson.BSON(url_fields_gotten.result))
return bson_object["docs"]
def test_upserter(self):
ident = bson.objectid.ObjectId()
req = {"database":"testing", "collection":"unit_tests", "data":{"_id":ident, "inserttime":time(), "test":"upserter", "dank":"memes"}}
bson_req = bson.BSON.encode(req)
raw_response = self.client.submit_job('db-add', str(bson_req))
upsert_req = {"database":"testing", "collection":"unit_tests", "data":{"selector":{"_id":ident}, "updates":{"dank":"cave"}}}
bson_req = bson.BSON.encode(upsert_req)
raw_response = self.client.submit_job('db-upsert', str(bson_req))
resp = bson.BSON.decode(bson.BSON(raw_response.result))
self.assertTrue("status" in resp)
self.assertTrue("new_doc" in resp)
self.assertEquals(resp["status"], "ok")
self.assertEquals(resp["new_doc"], False)
get_req = {"database":"testing", "collection":"unit_tests", "query": {"_id": ident}, "projection": {"dank": 1}}
bson_req = bson.BSON.encode(get_req)
raw_response = self.client.submit_job('db-get', str(bson_req))
resp = bson.BSON.decode(bson.BSON(raw_response.result))
self.assertTrue("status" in resp)
self.assertEquals(resp["status"], "ok")
self.assertEquals(len(resp["docs"]), 1)
self.assertEquals(resp["docs"][0]["dank"], "cave")
def bsonify_update_data(item_id, url, all_data, key=None):
items_list = {
"key": key,
"database": "feedlark",
"collection": "feed",
"data": {
"updates": {
"items": all_data,
"url": url
},
"selector": {
"_id": item_id
},
},
}
return bson.BSON.encode(items_list)
# submits a job to 'db-get' to get all ids and urls of the singular feed
def get_single_feed_db_data(url, key=None):
# format the request
to_get_urls_ids = str(bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "feed",
"query": {"url": url},
"projection": {
"_id": 1,
"url": 1,
"items": [{
"link": 1,
"pub_date": 1,
"link": 1,
"article_text": 1,
}],
},
}))
url_fields_gotten = gm_client.submit_job("db-get", to_get_urls_ids)
bson_object = bson.BSON.decode(bson.BSON(url_fields_gotten.result))
return bson_object["docs"]
# updates all of the item fields for all the unique feeds in the feeds db
def update_user_data(username, updates):
"""
Update the data for the given user in the database,
with the given dict of updates
"""
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "user",
"data": {
"selector": {
"username": username,
},
"updates": updates,
},
})
update_rsp = gearman_client.submit_job('db-update', str(req_data))
result = bson.BSON(update_rsp.result).decode()
if result[u"status"] != u"ok":
log(2, "Error updating user data: " + str(result))
def get_feed_data(feed_url):
"""Get the data of a given feed"""
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "feed",
"query": {
"url": feed_url,
},
"projection": {},
})
get_response = gearman_client.submit_job('db-get', str(req_data))
result = bson.BSON(get_response.result).decode()
if result[u"status"] != u"ok":
log(2, "Error getting database entry for feed " + str(feed_url))
return None
if "docs" not in result or len(result["docs"]) == 0:
log(1, "No docs returned for feed " + str(feed_url))
return None
return result["docs"][0]
def get_users(self):
'''
Returns a list of all the user documents in the user database.
The documents returned contain only the username and subscribed_feeds.
'''
request = bson.BSON.encode({
'key': self.key,
'database': 'feedlark',
'collection': 'user',
'query': {},
'projection': {
'username': 1,
'subscribed_feeds': 1,
'words': 1,
},
})
# submit_job as below is blocking
gm_job = self.gm_client.submit_job('db-get', str(request))
return bson.BSON(gm_job.result).decode()['docs']
def get_user_data(username):
log(0, 'Getting db doc for user {}'.format(username))
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "user",
"query": {
"username": username
},
"projection": {}
})
get_response = gearman_client.submit_job('db-get', str(req_data))
result = bson.BSON(get_response.result).decode()
if result['status'] != 'ok':
log(2, "Error getting db entry for user {}".format(username))
log(2, result['description'])
return None
if "docs" not in result or len(result['docs']) == 0:
log(1, "No docs returned for user {}".format(username))
return None
return result['docs'][0]
def update_user_data(username, data):
'''
Update the document for the given user,
With the dict of updates provided in `data`
'''
log(0, 'Updating db doc for user {}'.format(username))
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "user",
"data":{
"updates": data,
"selector":{
"username": username
}
}
})
update_response = gearman_client.submit_job('db-update', str(req_data))
result = bson.BSON(update_response.result).decode()
if result['status'] != 'ok':
log(2, 'Error updating db entry for user {}'.format(username))
log(2, result['description'])
return
def get_votes_for_user(username):
'''
Get all the votes that this user has cast on articles
'''
log(0, 'Getting votes for user {}'.format(username))
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "vote",
"query": {
"username": username
},
"projection": {}
})
get_response = gearman_client.submit_job('db-get', str(req_data))
result = bson.BSON(get_response.result).decode()
if result['status'] != 'ok':
log(2, "Error getting votes for user {}".format(username))
log(2, result['description'])
return None
if 'docs' not in result or len(result['docs']) == 0:
log(1, "No docs returned for user {}".format(username))
return []
return result['docs']
def to_object(bson_bytes):
"""Return deserialized object from BSON bytes"""
return bson.BSON(bson_bytes).decode(CodecOptions(document_class=SON,
tz_aware=True))
def to_bson(obj):
"""Return serialized BSON string from object"""
return bson.BSON.encode(obj)
def __init__(self, custom_codec_implementation=None):
if custom_codec_implementation is not None:
self._loads = custom_codec_implementation.loads
self._dumps = custom_codec_implementation.dumps
else:
# Use implementation from pymongo or from pybson
import bson
if hasattr(bson, 'BSON'):
# pymongo
self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
else:
# pybson
self._loads = bson.loads
self._dumps = bson.dumps
def __init__(self, custom_codec_implementation=None):
if custom_codec_implementation is not None:
self._loads = custom_codec_implementation.loads
self._dumps = custom_codec_implementation.dumps
else:
# Use implementation from pymongo or from pybson
import bson
if hasattr(bson, 'BSON'):
# pymongo
self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
else:
# pybson
self._loads = bson.loads
self._dumps = bson.dumps
def main():
if len(sys.argv) != 2:
print 'This tool takes 1 command line argument; the number of topics to output data on. See README.md'
return
num_requested_feeds = int(sys.argv[1])
gearman_client = gearman.GearmanClient(['localhost:4730'])
result = bson.BSON.decode(bson.BSON(gearman_client.submit_job('db-get', str(bson.BSON.encode({
'key': os.getenv('SECRETKEY'),
'database':'feedlark',
'collection':'user',
'query':{},
'projection':{
'subscribed_feeds':1
}
}))).result))
if result[u'status'] == u'ok':
# add the feeds to a dictionary as keys with count as variables
feed_counts = make_list_feeds(result["docs"])
sorted_feed = sorted(feed_counts,reverse=True)
#get ouput ready
output = []
output.append(str(len(feed_counts)))
if num_requested_feeds > len(sorted_feed):
num_requested_feeds = len(sorted_feed)
for i in xrange(num_requested_feeds):
output.append(sorted_feed[i] + " " + str(feed_counts[sorted_feed[i]]))
output = "\n".join(output)
# output to file
f = open("most_popular_feeds.txt", "w")
f.write(output)
f.close()
else:
print 'Error getting user data from database'
print result['description']
return
def db_get(collection, query, projection):
db_req = {
'database': 'feedlark',
'collection': collection,
'query': query,
'projection': projection
}
key = os.getenv('SECRETKEY')
if key is not None:
db_req['key'] = key
str_bson_db_req = str(bson.BSON.encode(db_req))
raw_db_result = gearman_client.submit_job('db-get', str_bson_db_req).result
db_result = bson.BSON.decode(bson.BSON(raw_db_result))
return db_result
def main():
if len(sys.argv) != 2:
print 'This tool takes 1 command line argument; the number of topics to output data on. See README.md'
return
num_requested_topics = int(sys.argv[1])
gearman_client = gearman.GearmanClient(['localhost:4730'])
result = bson.BSON.decode(bson.BSON(gearman_client.submit_job('db-get', str(bson.BSON.encode({
'key': getenv('SECRETKEY'),
'database':'feedlark',
'collection':'user',
'query':{},
'projection':{
'words':1
}
}))).result))
if result[u'status'] == u'ok':
users = result['docs']
print len(users)
topic_data = get_all_topic_data(users)
num_output_topics = min(num_requested_topics, len(topic_data))
print len(topic_data), num_output_topics
sorted_topics = sorted(topic_data, key=lambda x:len(topic_data[x]), reverse=True)
for i in xrange(num_output_topics):
sorted_values = sorted(topic_data[sorted_topics[i]])
mean_val = mean(sorted_values)
mode_val = mode(sorted_values)
median_val = median(sorted_values)
if mode_val is None:
mode_val = 'X'
print sorted_topics[i], len(sorted_values), mean_val, mode_val, median_val
else:
print('Error getting user data from database')
print(result['description'])
return
def update_all_feeds(worker, job):
log(0, "'update-all-feeds' initiated")
if key is not None:
log(0, "Checking secret key")
request = bson.BSON(job.data).decode()
if 'key' not in request or request['key'] != key:
log(2, "Secret key mismatch")
response = bson.BSON.encode({
'status': 'error',
'description': 'Secret key mismatch',
})
return str(response)
log(0, "Retrieving data from feed db")
feed_db_data = get_all_feed_docs()
try:
for doc in feed_db_data:
updated_feeds = gather_updates(doc)
update_database(doc, updated_feeds)
except Exception as e:
log(2, "'update-all-feeds' failed")
return str(bson.BSON.encode({
"status": "error",
"error-description": str(e)
}))
log(0, "'update-all-feeds' finished")
return str(bson.BSON.encode({
"status": "ok",
"updated_feeds": [x['_id'] for x in feed_db_data],
}))
return str(bson.BSON.encode({"status": "ok"}))
# Get secret key, must be global.
def test_adder(self):
req = {"database":"testing", "collection":"unit_tests", "data":{"inserttime":time(), "test":"adder"}}
bsonReq = bson.BSON.encode(req)
raw_response = self.client.submit_job('db-add', str(bsonReq))
resp = bson.BSON.decode(bson.BSON(raw_response.result))
self.assertTrue("status" in resp)
self.assertEquals(resp["status"], "ok")
def test_updater(self):
req = {"database":"testing", "collection":"unit_tests", "data":{"inserttime":time(), "has_been_updated":False, "test":"updater"}}
bsonReq = bson.BSON.encode(req)
raw_response = self.client.submit_job('db-add', str(bsonReq))
resp = bson.BSON.decode(bson.BSON(raw_response.result))
ident = resp["_id"]
req = {"database":"testing", "collection":"unit_tests", "data":{"selector":{"_id":ident}, "updates":{"has_been_updated":True}}}
bsonReq = bson.BSON.encode(req)
raw_response = self.client.submit_job('db-update', str(bsonReq))
resp = bson.BSON.decode(bson.BSON(raw_response.result))
self.assertTrue("status" in resp)
self.assertEquals(resp["status"], "ok")
def add_update_to_db(data):
"""
log the given user opinion to the vote db collection
"""
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "vote",
"data": data,
})
gearman_client.submit_job('db-add', str(req_data))
def vote_already_exists(username, article_url):
'''
Check if the user with the given username
has already voted on the specified article.
Returns True or False
'''
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "vote",
"query": {
"$and": [{
"article_url": article_url,
},{
"username": username
}
]
},
"projection": {}
})
get_response = gearman_client.submit_job('db-get', str(req_data))
result = bson.BSON(get_response.result).decode()
if result['status'] != 'ok':
log(2, 'Error getting votes for user {} for article {}'.format(username, article_url))
return False
return 'docs' in result and len(result['docs']) > 0
def test_put_g2g(self):
gm_client = gearman.GearmanClient(['localhost:4730'])
agg = Aggregator(gm_client, None)
# add_request = bson.BSON.encode({
# 'database':'feedlark',
# 'collecion':'g2g',
# 'data':{
# 'username':'__test123__',
# 'test_parameter':'NOLO'
# }
# })
# gm_client.submit_job('db-add',str(add_request))
test_document = {
'username': 'iandioch',
'test_parameter': 'YOLO',
}
agg.put_g2g('iandioch', test_document)
get_request = bson.BSON.encode({
'database': 'feedlark',
'collection': 'g2g',
'query': {
'username': 'iandioch',
},
'projection': {
'test_parameter': 1,
},
})
g2g_data = gm_client.submit_job('db-get', str(get_request)).result
self.assertEqual(
bson.BSON(g2g_data).decode()['docs'][0]['test_parameter'], 'YOLO')
def get_score(self, topic, words):
request = bson.BSON.encode({
'key': self.key,
'user_words': words,
'article_words': topic,
})
gm_job = self.gm_client.submit_job('score', str(request))
result = bson.BSON(gm_job.result).decode()
if result['status'] != 'ok':
log("Scoring article failed", level=1)
log('Description: ' + result['description'], level=1)
return 0
return result['score']
def get_feed_items(feed_url, item_urls):
'''
Fetches the data for each article with its url in item_urls,
From the feed with the url feed_url
'''
log(0, 'Getting feed items for feed {}'.format(feed_url))
req_data = bson.BSON.encode({
"key": key,
"database": "feedlark",
"collection": "feed",
"query":{
"url": feed_url
},
"projection": {
"items": 1
}
})
get_response = gearman_client.submit_job('db-get', str(req_data))
result = bson.BSON(get_response.result).decode()
if result['status'] != 'ok':
log(2, 'Error getting feed {}'.format(feed_url))
log(2, result['description'])
return None
if 'docs' not in result or len(result['docs']) == 0:
log(1, 'No docs returned for feed {}'.format(feed_url))
return None
item_url_set = set(item_urls)
response = [d for d in result['docs'][0]['items'] if ('link' in d and d['link'] in item_url_set)]
return response
def __nonzero__(self):
"""Required for the JSON reporting module as otherwise the on-demand
generated list of API calls would be seen as empty.
Note that the result structure is kept between processing and
reporting time which means that at reporting time, where this
functionality is actually needed, the has_apicalls will already have
been set while iterating through the BSON logs iterator in the parse()
function of the WindowsMonitor class. We use this knowledge to pass
along whether or not this log actually has API call events and thus
whether it's "nonzero" or not. (The correctness of this field is
required as otherwise the json.dump() function will fail - probably
due to buffering issues).
"""
return self.has_apicalls
def get_details(self, node):
url = reverse('node_handler', args=[node.system_id])
response = self.client.get(url, {'op': 'details'})
self.assertEqual(http.client.OK, response.status_code)
self.assertEqual('application/bson', response['content-type'])
return bson.BSON(response.content).decode()
def process_response(response):
"""All responses should be httplib.OK.
The response should contain a BSON document (content-type
application/bson) or a JSON document (content-type application/json). If
so, the document will be decoded and the result returned, otherwise the
raw binary content will be returned.
:param response: The result of MAASClient.get/post/etc.
:type response: urllib.request.addinfourl (a file-like object that has a
.code attribute.)
"""
if response.code != http.client.OK:
text_status = http.client.responses.get(response.code, '<unknown>')
message = '%s, expected 200 OK' % text_status
raise urllib.error.HTTPError(
response.url, response.code, message,
response.headers, response.fp)
content = response.read()
content_type = response.headers.get_content_type()
if content_type == "application/bson":
return bson.BSON(content).decode()
elif content_type == "application/json":
content_charset = response.headers.get_content_charset()
return json.loads(content.decode(
"utf-8" if content_charset is None else content_charset))
else:
return content