python类get()的实例源码

_google_appengine_ext_db.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getGAEObjects(context):
    """
    Returns a reference to the C{gae_objects} on the context. If it doesn't
    exist then it is created.

    @param context: The context to load the C{gae_objects} index from.
    @return: The C{gae_objects} index reference.
    @rtype: Instance of L{GAEReferenceCollection}
    @since: 0.4.1
    """
    ref_collection = context.get(XDB_CONTEXT_NAME, None)

    if ref_collection:
        return ref_collection

    ret = context[XDB_CONTEXT_NAME] = XDBReferenceCollection()

    return ret
_google_appengine_ext_db.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def encode_xdb_key(key, encoder=None):
    """
    Convert the `db.Key` to it's entity and encode it.
    """
    gae_objects = getGAEObjects(encoder.context.extra)

    klass = db.class_for_kind(key.kind())

    try:
        referenced_object = gae_objects.get(klass, key)
    except KeyError:
        referenced_object = db.get(key)
        gae_objects.set(klass, key, referenced_object)

    if not referenced_object:
        encoder.writeElement(None)
    else:
        encoder.writeObject(referenced_object)
blog.py 文件源码 项目:Multi_User_Blog 作者: Nshmais 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, post_id):
        """
            This renders home post page with content, comments and likes.
        """
        key = db.Key.from_path('Post', int(post_id), parent=blog_key())
        post = db.get(key)

        comments = db.GqlQuery("select * from Comment where post_id = " +
                               post_id + " order by created desc")

        likes = db.GqlQuery("select * from Like where post_id="+post_id)

        if not post:
            self.error(404)
            return

        error = self.request.get('error')

        self.render("permalink.html", post=post, noOfLikes=likes.count(),
                    comments=comments, error=error)
blog.py 文件源码 项目:Multi_User_Blog 作者: Nshmais 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, post_id):
        if self.user:
            key = db.Key.from_path('Post', int(post_id), parent=blog_key())
            post = db.get(key)
            # check if the post exist in the database
            if not post:
                # if post does not exist, redirect to login page
                return self.redirect('/login')

            if post.user_id == self.user.key().id():
                post.delete()

                # delete all the comments associated with that post
                comments = Comment.all()
                comments.filter("post_id", int(post_id))
                for comment in comments:
                    comment.delete()

                self.redirect("/?deleted_post_id="+post_id)
            else:
                self.redirect("/blog/" + post_id + "?error=You don't have " +
                              "access to delete this record.")
        else:
            self.redirect("/login?error=You need to be logged, in order" +
                          " to delete your post!!")
blog.py 文件源码 项目:Multi_User_Blog 作者: Nshmais 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, post_id):
        if self.user:
            key = db.Key.from_path('Post', int(post_id), parent=blog_key())
            post = db.get(key)
            if not post:
                # if post does not exist, redirect to login page
                return self.redirect('/login')
            if post.user_id == self.user.key().id():
                self.render("editpost.html", subject=post.subject,
                            content=post.content)
            else:
                self.redirect("/blog/" + post_id + "?error=You don't have " +
                              "access to edit this record.")
        else:
            self.redirect("/login?error=You need to be logged, " +
                          "in order to edit your post!!")
blog.py 文件源码 项目:Multi_User_Blog 作者: Nshmais 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self, post_id, comment_id):
        if self.user:
            key = db.Key.from_path('Comment', int(comment_id),
                                   parent=blog_key())
            c = db.get(key)
            if not c:
                # if post does not exist, redirect to login page
                return self.redirect('/login')
            if c.user_id == self.user.key().id():
                c.delete()
                return self.redirect("/blog/"+post_id+"?deleted_comment_id=" +
                                     comment_id)
            else:
                return self.redirect("/blog/" + post_id +
                                     "?error=You don't have " +
                                     "access to delete this comment.")
        else:
            return self.redirect("/login?error=You need to be logged," +
                                 "in order to delete your comment!!")
blog.py 文件源码 项目:Multi_User_Blog 作者: Nshmais 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self, post_id, comment_id):
        if self.user:
            key = db.Key.from_path('Comment',
                                   int(comment_id),
                                   parent=blog_key())
            c = db.get(key)
            if not c:
                return self.redirect('/login')
            if c.user_id == self.user.key().id():
                return self.render("editcomment.html", comment=c.comment)
            else:
                return self.redirect("/blog/" + post_id +
                                     "?error=You don't have access" +
                                     "to edit this comment.")
        else:
            return self.redirect("/login?error=You need to be logged," +
                                 "in order to edit your post!!")
deletepost.py 文件源码 项目:udacity-multi-user-blog 作者: rrjoson 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, post_id, post_user_id):
        if self.user and self.user.key().id() == int(post_user_id):
            key = db.Key.from_path('Post', int(post_id), parent=blog_key())
            post = db.get(key)
            post.delete()

            self.redirect('/')

        elif not self.user:
            self.redirect('/login')

        else:
            key = db.Key.from_path('Post', int(post_id), parent=blog_key())
            post = db.get(key)

            comments = db.GqlQuery(
                "select * from Comment where ancestor is :1 order by created desc limit 10", key)

            error = "You don't have permission to delete this post"
            self.render("permalink.html", post=post, comments=comments, error=error)
unlikepost.py 文件源码 项目:udacity-multi-user-blog 作者: rrjoson 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, post_id):
        key = db.Key.from_path('Post', int(post_id), parent=blog_key())
        post = db.get(key)

        if self.user and self.user.key().id() == post.user_id:
            self.write("You cannot dislike your own post")
        elif not self.user:
            self.redirect('/login')
        else:
            user_id = self.user.key().id()
            post_id = post.key().id()

            l = Like.all().filter('user_id =', user_id).filter('post_id =', post_id).get()

            if l:
                l.delete()
                post.likes -= 1
                post.put()

                self.redirect('/' + str(post.key().id()))
            else:
                self.redirect('/' + str(post.key().id()))
userinfo_utility.py 文件源码 项目:ifttt-line 作者: MypaceEngine 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getUserByDashId(dashId):
    # id=memcache.get(key = "Dash-user-"+dashid)
    logging.debug('ENTER getUser_DashButton\n')
    found_lineId = None

    # get lineId from dashId
    try:
        q = UserData.all()
        q.filter('dashId = ', dashId)
        for p in q.run(limit=1):
            found_lineId = p.lineId
            logging.debug('  found: ' + str(p) + '\n')
    except:
        logging.error(u'dashId??lineId????????????????')

    return found_lineId
main.py 文件源码 项目:Utmost_Bot 作者: jiachen247 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post(self):
        data = json.loads(self.request.body)
        logging.debug(self.request.body)

        if data.get('message'):
            logging.info('Processing incoming message')
            self.handle_message(data.get('message'))
        elif data.get('callback_query'):
            logging.info('Processing incoming callback query')
            self.handle_callback_query(data.get('callback_query'))
        # elif data.inline_query:
        #     logging.info('Processing incoming inline query')
        #     self.handle_inline_query(data.inline_query)
        #
        else:
            logging.info(LOG_TYPE_NON_MESSAGE)
            return
main.py 文件源码 项目:Utmost_Bot 作者: jiachen247 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def post(self):
        params = json.loads(self.request.body)
        msg_type = params.get('msg_type')
        data = params.get('data')
        uid = str(json.loads(data).get('chat_id'))
        user = get_user(uid)

        try:
            result = telegram_post(data, 4)
        except Exception as e:
            logging.warning(LOG_ERROR_SENDING.format(msg_type, uid, user.get_description(), str(e)))
            logging.debug(data)
            self.abort(502)

        response = json.loads(result.content)

        if handle_response(response, user, uid, msg_type) == False:
            logging.debug(data)
            self.abort(502)
main.py 文件源码 项目:Utmost_Bot 作者: jiachen247 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self):
        logging.debug("Caching devos for today.")
        memcache.flush_all()

        status = "Status : "

        try:
            # cache only today and tmr
            for delta in range(0, 2):
                for version in range(V.get_size()):
                    logging.debug("how many times {}".format(version))
                    get_devo(delta=delta, version=V.get_version_letters(version))
        except Exception as e:
            status += "Cache Failed - " + str(e)

        else:
            status += "Cache Passed: "
        finally:
            send_message(CREATOR_ID, status)
            self.response.write("Success...")

        return
main.py 文件源码 项目:FSND_Multi_User_Blog 作者: harrystaley 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def post(self, login_id):
        auth_error = True
        if self.read_secure_cookie('usercookie'):
            auth_error = False
        else:
            auth_error = True
        username = self.read_secure_cookie('usercookie')
        if not self.user_exists(username):
            auth_error = False
        else:
            auth_error = True

        if not auth_error:
            comment_id = self.request.get('edit_comment_id')
            post_id = self.request.get('post_id')
            if self.read_secure_cookie('usercookie'):
                if comment_id and post_id:
                    self.redirect('/editcomment?comment_id=%s&post_id=%s' %
                                  (comment_id, post_id))
        else:
            self.redirect('/signup')
main.py 文件源码 项目:FSND_Multi_User_Blog 作者: harrystaley 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self):
        """ handles the GET request for welcome.html """
        # if the usercookie exists render the welcome page
        # otherwise redirect to the signup page.
        if self.read_secure_cookie('usercookie'):
            # Gets the user id from the cookie
            user_id = self.read_secure_cookie('usercookie')
            # gets the key for the kind (table)
            key = db.Key.from_path('User',
                                   int(user_id),
                                   parent=user_key())
            # gets the user data based upon what is passed
            # from user_id into key
            user = db.get(key)
            # renders the welcome page passing in the user name
            self.render("welcome.html",
                        username=user.username)
        else:
            self.redirect('/signup')
main.py 文件源码 项目:FSND_Multi_User_Blog 作者: harrystaley 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):
        """ uses get request to get newpost.html """
        post_id = self.request.get('post_id')
        key = db.Key.from_path('Post',
                               int(post_id),
                               parent=blog_key())
        # gets the post data based upon what
        # is passed from post_id into key
        post = db.get(key)
        if self.read_secure_cookie('usercookie'):
            user_id = self.read_secure_cookie('usercookie')
            # If the current logged in user is not the post author
            # it redirects them back to the previous page
            if user_id == post.author_id:
                self.render("editpost.html",
                            subject=post.subject,
                            content=post.content,
                            post_id=post_id)
            else:
                referrer = self.request.headers.get('referer')
                if referrer:
                    return self.redirect(referrer)
                return self.redirect_to('/')
        else:
            self.redirect('/signup')
main.py 文件源码 项目:FSND_Multi_User_Blog 作者: harrystaley 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def post(self):
        """ Submits data to the server to delete the post """
        auth_error = True
        if self.read_secure_cookie('usercookie'):
            auth_error = False
        else:
            auth_error = True
        username = self.read_secure_cookie('usercookie')
        if not self.user_exists(username):
            auth_error = False
        else:
            auth_error = True

        if not auth_error:
            post_id = self.request.get('post_id')
            key = db.Key.from_path('Post',
                                   int(post_id),
                                   parent=blog_key())
            # gets the post data based upon what
            # is passed from post_id into key
            db.delete(key)
            self.render('/postdeleted.html')
        else:
            self.redirect('/signup')
api.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def post(self):
        data = json.loads(self.request.body)
        logging.info(data)  

        p = Proposal(
                     proposal_title=data["proposal_title"],
                     proposal_text=data["proposal_text"],
                     cost=float(data["cost"]),
                     dap=db.get(data["dap"]),
                     user=str(self.get_current_user().key())
                    )
        p.put()

        r = {"proposal_title":p.proposal_title,
             "proposal_text":p.proposal_text,
             "cost":p.cost,
             "timestamp":.p.created.strftime("%d/%m/%Y %H:%M:%S"),
             "user":p.user.key(),
             "dap":p.dap.key()
             })
        self.set_status(200)
        self.write(json.dumps(r))
api.py 文件源码 项目:deprecated_thedap 作者: unitedvote 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self):
        data = json.loads(self.get_arguments)
        logging.info(data)

        user = self.from_email(data["email"])

        if user.check_password(data["password"]):
            #set secure cookie
            self.set_secure_cookie("user", str(user.key()))
            self.set_status(200)
            resp = {"success":True, "error":""}
        else:
            self.set_status(400)
            resp = {"success":False, "error":"Could not authenticate user"}            

        self.write(json.dumps(resp))
fixed_mappers.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _drop_gracefully(self):
        """Drop worker task gracefully.

    Set current shard_state to failed. Controller logic will take care of
    other shards and the entire MR.
    """
        shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
        mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
        shard_state, mr_state = db.get([model.ShardState.get_key_by_shard_id(shard_id), model.MapreduceState.get_key_by_job_id(mr_id)])

        if shard_state and shard_state.active:
            logging.error('Would normally mark this shard for failure...and kill the entire mapreduce!')
            logging.error('But we ignore that and let this shard continue to run (and fail) instead.')
            # shard_state.set_for_failure()
            # config = util.create_datastore_write_config(mr_state.mapreduce_spec)
            # shard_state.put(config=config)
            raise Exception('Worker cannot run due to attempt to drop gracefully.')
metadata.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def key_for_entity(cls, entity_or_key):
    """Return the metadata key for the entity group containing entity_or_key.

    Use this key to get() the __entity_group__ metadata entity for the
    entity group containing entity_or_key.

    Args:
      entity_or_key: a key or entity whose __entity_group__ key you want.

    Returns:
      The __entity_group__ key for the entity group containing entity_or_key.
    """
    if isinstance(entity_or_key, db.Model):
      key = entity_or_key.key()
    else:
      key = entity_or_key
    while key.parent():
      key = key.parent()
    return db.Key.from_path(cls.KIND_NAME, cls.ID, parent=key)
metadata.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_entity_group_version(entity_or_key):
  """Return the version of the entity group containing entity_or_key.

  Args:
    entity_or_key: a key or entity whose version you want.

  Returns: The version of the entity group containing entity_or_key. This
    version is guaranteed to increase on every change to the entity
    group. The version may increase even in the absence of user-visible
    changes to the entity group. May return None if the entity group was
    never written to.

    On non-HR datatores, this function returns None.
  """

  eg = db.get(EntityGroup.key_for_entity(entity_or_key))
  if eg:
    return eg.version
  else:
    return None
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def Render(cls, handler):
    """Rendering method that can be called by main.py.

    Args:
      handler: the webapp.RequestHandler invoking the method
    """
    requested_backup_ids = handler.request.get_all('backup_id')
    backups = []
    gs_warning = False
    if requested_backup_ids:
      for backup in db.get(requested_backup_ids):
        if backup:
          backups.append(backup)
          gs_warning |= backup.filesystem == FILES_API_GS_FILESYSTEM
    template_params = {
        'form_target': DoBackupDeleteHandler.SUFFIX,
        'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
        'backups': backups,
        'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
        'gs_warning': gs_warning,
        'run_as_a_service': handler.request.get('run_as_a_service'),
    }
    utils.RenderToResponse(handler, 'confirm_delete_backup.html',
                           template_params)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def Render(cls, handler):
    """Rendering method that can be called by main.py.

    Args:
      handler: the webapp.RequestHandler invoking the method
    """
    requested_backup_ids = handler.request.get_all('backup_id')
    backups = []
    if requested_backup_ids:
      for backup in db.get(requested_backup_ids):
        if backup:
          backups.append(backup)
    template_params = {
        'form_target': DoBackupAbortHandler.SUFFIX,
        'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
        'backups': backups,
        'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
        'run_as_a_service': handler.request.get('run_as_a_service'),
    }
    utils.RenderToResponse(handler, 'confirm_abort_backup.html',
                           template_params)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self):
    """Handler for get requests to datastore_admin backup operations.

    Status of executed jobs is displayed.
    """
    jobs = self.request.get_all('job')
    remote_job = self.request.get('remote_job')
    tasks = self.request.get_all('task')
    error = self.request.get('error', '')
    xsrf_error = self.request.get('xsrf_error', '')
    template_params = {
        'job_list': jobs,
        'remote_job': remote_job,
        'task_list': tasks,
        'mapreduce_detail': self.MAPREDUCE_DETAIL,
        'error': error,
        'xsrf_error': xsrf_error,
        'datastore_admin_home': utils.GenerateHomeUrl(self.request),
    }
    utils.RenderToResponse(self, self._get_html_page, template_params)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def post(self):
    """Handler for post requests to datastore_admin/backup.do.

    Redirects to the get handler after processing the request.
    """
    token = self.request.get('xsrf_token')

    if not utils.ValidateXsrfToken(token, XSRF_ACTION):
      parameters = [('xsrf_error', '1')]
    else:
      try:
        parameters = self._ProcessPostRequest()


      except Exception, e:
        error = self._HandleException(e)
        parameters = [('error', error)]

    self.SendRedirect(self._get_post_html_page, parameters)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _run_map_jobs_deferred(backup_name, job_operation_key, backup_info_key,
                           kinds, job_name, backup_handler, input_reader,
                           output_writer, mapper_params, mapreduce_params,
                           queue):
  backup_info = BackupInformation.get(backup_info_key)
  if backup_info:
    try:
      _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
                    backup_handler, input_reader, output_writer, mapper_params,
                    mapreduce_params, queue)
    except BaseException:
      logging.exception('Failed to start a datastore backup job[s] for "%s".',
                        backup_name)
      delete_backup_info(backup_info)
  else:
    logging.info('Missing backup info, can not start backup jobs for "%s"',
                 backup_name)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self):
    """Handler for post requests to datastore_admin/backup_delete.do.

    Deletes are executed and user is redirected to the base-path handler.
    """
    backup_ids = self.request.get_all('backup_id')
    token = self.request.get('xsrf_token')
    params = ()
    if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
      try:
        for backup_info in db.get(backup_ids):
          if backup_info:
            delete_backup_info(backup_info)
      except Exception, e:
        logging.exception('Failed to delete datastore backup.')
        params = [('error', e.message)]

    self.SendRedirect(params=params)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post(self):
    """Handler for post requests to datastore_admin/backup_abort.do.

    Abort is executed and user is redirected to the base-path handler.
    """
    backup_ids = self.request.get_all('backup_id')
    token = self.request.get('xsrf_token')
    params = ()
    if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
      try:
        for backup_info in db.get(backup_ids):
          if backup_info:
            operation = backup_info.parent()
            if operation.parent_key():
              job_id = str(operation.parent_key())
              datastore_admin_service = services_client.DatastoreAdminClient()
              datastore_admin_service.abort_backup(job_id)
            else:
              utils.AbortAdminOperation(operation.key())
            delete_backup_info(backup_info)
      except Exception, e:
        logging.exception('Failed to abort pending datastore backup.')
        params = [('error', e.message)]

    self.SendRedirect(params=params)
backup_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _perform_backup_complete(
    operation, job_id, kind, backup_info_pk, gcs_path_prefix, filenames, queue):
  backup_info = BackupInformation.get(backup_info_pk)
  if backup_info:
    if job_id in backup_info.active_jobs:
      backup_info.active_jobs.remove(job_id)
      backup_info.completed_jobs = list(
          set(backup_info.completed_jobs + [job_id]))


    filenames = [GCSUtil.add_gs_prefix_if_missing(name) for name in filenames]
    kind_backup_files = backup_info.get_kind_backup_files([kind])[0]
    if kind_backup_files:
      kind_backup_files.files = list(set(kind_backup_files.files + filenames))
    else:
      kind_backup_files = backup_info.create_kind_backup_files(kind, filenames)
    db.put((backup_info, kind_backup_files), force_writes=True)
    if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
      deferred.defer(finalize_backup_info, backup_info.key(),
                     gcs_path_prefix,
                     _url=config.DEFERRED_PATH,
                     _queue=queue,
                     _transactional=True)
  else:
    logging.warn('BackupInfo was not found for %s', backup_info_pk)


问题


面经


文章

微信
公众号

扫码关注公众号