python类get_multi()的实例源码

test_types_relay.py 文件源码 项目:graphene-gae 作者: graphql-python 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def transform_to_reader_edges(edges, args, context):
    article_readers = [edge.node for edge in edges]
    readers = ndb.get_multi([article_reader.reader_key for article_reader in article_readers])
    transformed_edges = []
    for edge, reader in zip(edges, readers):
        if reader.is_alive:
            edge.node = reader
            transformed_edges.append(edge)

    return transformed_edges
main.py 文件源码 项目:grafanalib 作者: weaveworks 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def schedule(test_run, shard_count, shard):
    # read tests from body
    test_names = flask.request.get_json(force=True)['tests']

    # first see if we have a scedule already
    schedule_id = "%s-%d" % (test_run, shard_count)
    schedule = Schedule.get_by_id(schedule_id)
    if schedule is not None:
        return flask.json.jsonify(tests=schedule.shards[str(shard)])

    # if not, do simple greedy algorithm
    test_times = ndb.get_multi(
        ndb.Key(Test, test_name) for test_name in test_names)

    def avg(test):
        if test is not None:
            return test.cost()
        return 1

    test_times = [(test_name, avg(test))
                  for test_name, test in zip(test_names, test_times)]
    test_times_dict = dict(test_times)
    test_times.sort(key=operator.itemgetter(1))

    shards = {i: [] for i in xrange(shard_count)}
    while test_times:
        test_name, time = test_times.pop()

        # find shortest shard and put it in that
        s, _ = min(
            ((i, sum(test_times_dict[t] for t in shards[i]))
             for i in xrange(shard_count)),
            key=operator.itemgetter(1))

        shards[s].append(test_name)

    # atomically insert or retrieve existing schedule
    schedule = Schedule.get_or_insert(schedule_id, shards=shards)
    return flask.json.jsonify(tests=schedule.shards[str(shard)])
api.py 文件源码 项目:flow-dashboard 作者: onejgordon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def random_batch(self, d):
        '''
        Return a random batch, optionally filtered
        '''
        BATCH_SIZE = 50
        sample_keys = Readable.Fetch(self.user, with_notes=True, limit=500, keys_only=True)
        if len(sample_keys) > BATCH_SIZE:
            sample_keys = random.sample(sample_keys, BATCH_SIZE)
        readables = ndb.get_multi(sample_keys)
        self.set_response({
            'readables': [r.json() for r in readables]
            }, success=True)
api.py 文件源码 项目:flow-dashboard 作者: onejgordon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def random_batch(self, d):
        '''
        Return a random batch, optionally filtered
        '''
        BATCH_SIZE = 50
        sample_keys = Quote.Fetch(self.user, limit=500, keys_only=True)
        if len(sample_keys) > BATCH_SIZE:
            sample_keys = random.sample(sample_keys, BATCH_SIZE)
        quotes = ndb.get_multi(sample_keys)
        self.set_response({
            'quotes': [q.json() for q in quotes]
            }, success=True)
models.py 文件源码 项目:flow-dashboard 作者: onejgordon 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def Fetch(user, start, end):
        journal_keys = []
        iso_dates = []
        if start < end:
            date_cursor = start
            while date_cursor < end:
                date_cursor += timedelta(days=1)
                iso_date = tools.iso_date(date_cursor)
                journal_keys.append(ndb.Key('MiniJournal', iso_date, parent=user.key))
                iso_dates.append(iso_date)
        return ([j for j in ndb.get_multi(journal_keys) if j], iso_dates)
models.py 文件源码 项目:flow-dashboard 作者: onejgordon 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def Current(user, which="all"):
        date = tools.local_time(user.get_timezone(), datetime.today())
        keys = []
        if which in ["all", "year"]:
            annual_id = ndb.Key('Goal', datetime.strftime(date, "%Y"), parent=user.key)
            keys.append(annual_id)
        if which in ["all", "month"]:
            monthly_id = ndb.Key('Goal', datetime.strftime(date, "%Y-%m"), parent=user.key)
            keys.append(monthly_id)
        if which in ["all", "longterm"]:
            longterm_id = ndb.Key('Goal', datetime.strftime(date, "longterm"), parent=user.key)
            keys.append(longterm_id)
        goals = ndb.get_multi(keys)
        return [g for g in goals]
agent.py 文件源码 项目:flow-dashboard 作者: onejgordon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _habit_status(self):
        habits = Habit.All(self.user)
        today = datetime.today().date()
        habitday_keys = [ndb.Key('HabitDay', HabitDay.ID(h, today), parent=self.user.key) for h in habits]
        habitdays = ndb.get_multi(habitday_keys)
        n_habits_done = 0
        habits_committed_undone = []
        habits_done = []
        for hd in habitdays:
            if hd:
                habit = hd.habit.get()
                if hd.committed and not hd.done:
                    if habit:
                        habits_committed_undone.append(habit.name)
                if hd.done:
                    habits_done.append(habit.name)
                    n_habits_done += 1
        if habits:
            if n_habits_done:
                text = "Good work on doing %d %s (%s)!" % (
                    n_habits_done,
                    tools.pluralize('habit', n_habits_done),
                    tools.english_list(habits_done)
                )
            else:
                text = "No habits done yet."
            if habits_committed_undone:
                text += " Don't forget you've committed to %s." % tools.english_list(habits_committed_undone)
        else:
            text = "You haven't added any habits yet. Try saying 'add habit run'"
        return text
user.py 文件源码 项目:electron-crash-reporter 作者: lipis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def user_merge():
  args = parser.parse({
    'user_key': wf.Str(missing=None),
    'user_keys': wf.DelimitedList(wf.Str(), delimiter=',', required=True),
  })

  user_db_keys = [ndb.Key(urlsafe=k) for k in args['user_keys']]
  user_dbs = ndb.get_multi(user_db_keys)
  if len(user_dbs) < 2:
    flask.abort(400)

  user_dbs.sort(key=lambda user_db: user_db.created)
  merged_user_db = user_dbs[0]
  auth_ids = []
  permissions = []
  is_admin = False
  is_active = False
  for user_db in user_dbs:
    auth_ids.extend(user_db.auth_ids)
    permissions.extend(user_db.permissions)
    is_admin = is_admin or user_db.admin
    is_active = is_active or user_db.active
    if user_db.key.urlsafe() == args['user_key']:
      merged_user_db = user_db

  auth_ids = sorted(list(set(auth_ids)))
  permissions = sorted(list(set(permissions)))
  merged_user_db.permissions = permissions
  merged_user_db.admin = is_admin
  merged_user_db.active = is_active
  merged_user_db.verified = False

  form_obj = copy.deepcopy(merged_user_db)
  form_obj.user_key = merged_user_db.key.urlsafe()
  form_obj.user_keys = ','.join(args['user_keys'])

  form = UserMergeForm(obj=form_obj)
  if form.validate_on_submit():
    form.populate_obj(merged_user_db)
    merged_user_db.auth_ids = auth_ids
    merged_user_db.put()

    deprecated_keys = [k for k in user_db_keys if k != merged_user_db.key]
    merge_user_dbs(merged_user_db, deprecated_keys)
    return flask.redirect(
      flask.url_for('user_update', user_id=merged_user_db.key.id()),
    )

  return flask.render_template(
    'user/user_merge.html',
    title='Merge Users',
    html_class='user-merge',
    user_dbs=user_dbs,
    merged_user_db=merged_user_db,
    form=form,
    auth_ids=auth_ids,
    api_url=flask.url_for('api.admin.user.list'),
  )
user.py 文件源码 项目:meet-notes 作者: lipis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def user_merge():
  args = parser.parse({
    'user_key': wf.Str(missing=None),
    'user_keys': wf.DelimitedList(wf.Str(), delimiter=',', required=True),
  })

  user_db_keys = [ndb.Key(urlsafe=k) for k in args['user_keys']]
  user_dbs = ndb.get_multi(user_db_keys)
  if len(user_dbs) < 2:
    flask.abort(400)

  user_dbs.sort(key=lambda user_db: user_db.created)
  merged_user_db = user_dbs[0]
  auth_ids = []
  permissions = []
  is_admin = False
  is_active = False
  for user_db in user_dbs:
    auth_ids.extend(user_db.auth_ids)
    permissions.extend(user_db.permissions)
    is_admin = is_admin or user_db.admin
    is_active = is_active or user_db.active
    if user_db.key.urlsafe() == args['user_key']:
      merged_user_db = user_db

  auth_ids = sorted(list(set(auth_ids)))
  permissions = sorted(list(set(permissions)))
  merged_user_db.permissions = permissions
  merged_user_db.admin = is_admin
  merged_user_db.active = is_active
  merged_user_db.verified = False

  form_obj = copy.deepcopy(merged_user_db)
  form_obj.user_key = merged_user_db.key.urlsafe()
  form_obj.user_keys = ','.join(args['user_keys'])

  form = UserMergeForm(obj=form_obj)
  if form.validate_on_submit():
    form.populate_obj(merged_user_db)
    merged_user_db.auth_ids = auth_ids
    merged_user_db.put()

    deprecated_keys = [k for k in user_db_keys if k != merged_user_db.key]
    merge_user_dbs(merged_user_db, deprecated_keys)
    return flask.redirect(
      flask.url_for('user_update', user_id=merged_user_db.key.id()),
    )

  return flask.render_template(
    'user/user_merge.html',
    title='Merge Users',
    html_class='user-merge',
    user_dbs=user_dbs,
    merged_user_db=merged_user_db,
    form=form,
    auth_ids=auth_ids,
    api_url=flask.url_for('api.admin.user.list'),
  )


问题


面经


文章

微信
公众号

扫码关注公众号