def bulk_insert(self, table, items):
parsed_items = []
for item in items:
dfields = dict((f.name, self.represent(v, f.type)) for f, v in item)
parsed_items.append(table._tableobj(**dfields))
if self.use_ndb:
ndb.put_multi(parsed_items)
else:
gae.put(parsed_items)
return True
python类put_multi()的实例源码
def _update_employees(employee_dicts):
"""Given a JSON string in the format "[{employee info 1}, {employee info 2}, ...]",
create new employee records and update existing records as necessary.
Then determine whether any employees have been terminated since the last update,
and mark these employees as such.
"""
logging.info('Updating employees...')
all_employees, new_employees = [], []
current_usernames = set()
for d in employee_dicts:
existing_employee = Employee.query(Employee.username == d['username']).get()
if existing_employee is None:
new_employee = Employee.create_from_dict(d, persist=False)
all_employees.append(new_employee)
new_employees.append(new_employee)
else:
existing_employee.update_from_dict(d)
# If the user is in the S3 dump, then the user is no longer
# terminated.
existing_employee.terminated = False
all_employees.append(existing_employee)
current_usernames.add(d['username'])
ndb.put_multi(all_employees)
# Figure out if there are any employees in the DB that aren't in the S3
# dump. These are terminated employees, and we need to mark them as such.
usernames_to_employees = dict(
(employee.username, employee)
for employee
in Employee.query()
)
db_usernames = set(usernames_to_employees.keys())
terminated_usernames = db_usernames - current_usernames
terminated_employees = []
for u in terminated_usernames:
employee = usernames_to_employees[u]
employee.terminated = True
terminated_employees.append(employee)
ndb.put_multi(terminated_employees)
logging.info('Done.')
def update_schema_task(cursor=None, num_updated=0, batch_size=100):
"""Task that handles updating the models' schema.
This is started by
UpdateSchemaHandler. It scans every entity in the datastore for the
Picture model and re-saves it so that it has the new schema fields.
"""
# Force ndb to use v2 of the model by re-loading it.
reload(models_v2)
# Get all of the entities for this Model.
query = models_v2.Picture.query()
pictures, next_cursor, more = query.fetch_page(
batch_size, start_cursor=cursor)
to_put = []
for picture in pictures:
# Give the new fields default values.
# If you added new fields and were okay with the default values, you
# would not need to do this.
picture.num_votes = 1
picture.avg_rating = 5
to_put.append(picture)
# Save the updated entities.
if to_put:
ndb.put_multi(to_put)
num_updated += len(to_put)
logging.info(
'Put {} entities to Datastore for a total of {}'.format(
len(to_put), num_updated))
# If there are more entities, re-queue this task for the next page.
if more:
deferred.defer(
update_schema_task, cursor=next_cursor, num_updated=num_updated)
else:
logging.debug(
'update_schema_task complete with {0} updates!'.format(
num_updated))
# [END update_schema]
def get(self, d):
hack_id = self.request.get('hack_id')
res = {}
if hack_id == 'index_quotes_readables':
page = self.request.get_range('page')
PAGE_SIZE = 50
index_lookup = {} # index_name -> (index, list of items)
for q in Quote.query().fetch(limit=PAGE_SIZE, offset=page * PAGE_SIZE):
sd, index = q.update_sd(index_put=False)
if index and index.name not in index_lookup:
index_lookup[index.name] = (index, [sd])
else:
index_lookup[index.name][1].append(sd)
for r in Readable.query().fetch(limit=PAGE_SIZE, offset=page * PAGE_SIZE):
sd, index = r.update_sd(index_put=False)
if index and index.name not in index_lookup:
index_lookup[index.name] = (index, [sd])
else:
index_lookup[index.name][1].append(sd)
if index_lookup:
n = 0
for index_tuple in index_lookup.values():
index, items = index_tuple
index.put(items)
n += len(items)
res['result'] = "Put %d items in %d indexes" % (n, len(index_tuple))
res['page'] = page
elif hack_id == 'normalize_key_props':
dbp = []
for hd in HabitDay.query().iter():
habit_key = hd.habit
if habit_key.parent() is None:
# Need to update
hd.habit = ndb.Key('User', hd.key.parent().id(), 'Habit', int(habit_key.id()))
dbp.append(hd)
res['habitdays'] = len(dbp)
ndb.put_multi(dbp)
dbp = []
for jrnl in MiniJournal.query().iter():
changes = False
for i, tag_key in enumerate(jrnl.tags):
if tag_key.parent() is None:
# Need to update
jrnl.tags[i] = ndb.Key('User', jrnl.key.parent().id(), 'JournalTag', tag_key.id())
changes = True
if changes:
dbp.append(jrnl)
res['journals'] = len(dbp)
ndb.put_multi(dbp)
else:
res['result'] = 'hack_id not found'
self.json_out(res)
def get_books_on_shelf(user, shelf='currently-reading'):
'''
Return JSON array {title, author, isbn, image}
'''
user_id = user.get_integration_prop('goodreads_user_id')
readables = []
success = False
if user_id:
data = urllib.urlencode({
'shelf': shelf,
'key': GR_API_KEY,
'v': 2
})
params = data
url = "https://www.goodreads.com/review/list/%s.xml?%s" % (user_id, params)
logging.debug("Fetching %s for %s" % (url, user))
res = urlfetch.fetch(
url=url,
method=urlfetch.GET,
validate_certificate=True)
logging.debug(res.status_code)
if res.status_code == 200:
xml = res.content
data = etree.parse(StringIO(xml))
for r in data.getroot().find('reviews').findall('review'):
book = r.find('book')
isbn = book.find('isbn13').text
image_url = book.find('image_url').text
title = book.find('title').text
authors = book.find('authors')
link = book.find('link').text
first_author = authors.find('author')
if first_author is not None:
name = first_author.find('name')
if name is not None:
author = name.text
r = Readable.CreateOrUpdate(user, isbn, title=title,
url=link, source='goodreads',
image_url=image_url, author=author,
type=READABLE.BOOK,
read=False)
readables.append(r)
success = True
logging.debug("Putting %d readable(s)" % len(readables))
ndb.put_multi(readables)
Readable.put_sd_batch(readables)
return (success, readables)