def cookie2user(cookie_str):
'''Parse cookie and load user if cookie is valid'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid,expires,sha1 = L
if int(expires) < time.time():
return None
user = await User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid,user.passwd,expires,_COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
python类find()的实例源码
def api_edit_blog(request, *, id, name, summary, content):
check_admin(request)
if not name or not name.strip():
raise APIValueError('name', 'name can not be empty')
if not summary or not summary.strip():
raise APIValueError('summary', 'summary can not be empty')
if not content or not content.strip():
raise APIValueError('content', 'content can not be empty')
#blog = Blog(user_id = request.__user__.id, user_name= request.__user__.name, user_image = request.__user__.image, name = name.strip(), summary = summary.strip(), content = content.strip())
blog = yield from Blog.find(id)
if not blog:
raise APIValueError('id', 'request path error, id : {}'.format(id))
blog.name = name
blog.summary = summary
blog.content = content
yield from blog.update()
return blog
def get_blog(id, request):
blog = yield from Blog.find(id) # ??id???????????
# ????????blog??????????????????????
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
# ?????????html??
for c in comments:
c.html_content = text2html(c.content)
# blog??markdown????????html??
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'__user__':request.__user__,
'comments': comments
}
# day10???
# ???????
def api_create_comment(id, request, *, content):
user = request.__user__
# ????
if user is None:
raise APIPermissionError('Please signin first.')
# ??????????
if not content or not content.strip():
raise APIValueError('content')
# ????????
blog = yield from Blog.find(id)
if blog is None:
raise APIResourceNotFoundError('Blog')
# ??????
comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip())
yield from comment.save() # ?????????
return comment # ????
# day14??
# API?????
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid.
'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = yield from User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
def get_user_by_cookie(cookie):
'''
Parse cookie and load user if cookie is valid.
split the cookie and get the encrypted part to validate
'''
if not cookie:
return None
try:
lst = cookie.split('-')
if len(lst) != 3:
return None
uid, expiration, sha1 = lst
if int(expiration) < time.time():
return None
user = yield from User.find(uid)
if user is None:
return None
valid = '%s-%s-%s-%s' % (uid, user.password, expiration, _COOKIE_KEY)
if sha1 != hashlib.sha1(valid.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.password = '******'
return user
except Exception as e:
logging.exception(e)
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid.
'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = await User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
def get_blog(id, request):
blog = yield from Blog.find(id) # ??id???????????
# ????????blog??????????????????????
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
# ?????????html??
for c in comments:
c.html_content = text2html(c.content)
# blog??markdown????????html??
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'__user__':request.__user__,
'comments': comments
}
# day10???
# ???????
def api_create_comment(id, request, *, content):
user = request.__user__
# ????
if user is None:
raise APIPermissionError('Please signin first.')
# ??????????
if not content or not content.strip():
raise APIValueError('content')
# ????????
blog = yield from Blog.find(id)
if blog is None:
raise APIResourceNotFoundError('Blog')
# ??????
comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip())
yield from comment.save() # ?????????
return comment # ????
# day14??
# API?????
def get_blog(id,request):
blog = await Blog.find(id)
comments = await Comment.findAll('blog_id=?',[id],orderBy='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__' : 'blog.html',
'blog' : blog,
'comments' : comments,
'__user__' : request.__user__
}
# ????
def api_create_comments(id,request,*,content):
user = request.__user__
if user is None:
raise APIPermissionError('Please signin first')
if not content or not content.strip():
raise APIValueError('content')
blog = await Blog.find(id)
if blog is None:
raise APIResourceNotFoundError('Blog')
comment = Comment(blog_id=blog.id,user_id=user.id,user_name=user.name,user_image=user.image,content=content.strip())
await comment.save()
return comment
def api_delete_comments(id,request):
check_admin(request)
c = await Comment.find(id)
if c is None:
raise APIResourceNotFoundError('Comment')
await c.remove()
return dict(id=id)
def api_get_blog(*,id):
blog = await Blog.find(id)
return blog
def api_delete_blog(request,*,id):
check_admin(request)
blog = await Blog.find(id)
await blog.remove()
return dict(id=id)
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid
:param cookie_str:
:return:
'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = yield from User.find(uid)
if user is None:
return None
s = '{}-{}-{}-{}'.format(uid, user.password, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('cookie:{} is invalid, invalid sha1')
return None
user.password = '*' * 8
return user
except Exception as e:
logging.exception(e)
return None
def get_blog(id):
blog = yield from Blog.find(id)
if not blog:
raise APIValueError('id', 'can not find blog id is :{}'.format(id))
comments = yield from Comment.find_all('blog_id=?', [id], order_by='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'comments': comments
}
def api_get_blog(*, id):
blog = yield from Blog.find(id)
return blog
def api_delete_blog(request, *, id):
logging.info('delete blog id: {}'.format(id))
check_admin(request)
blog = yield from Blog.find(id)
if blog:
yield from blog.remove()
return blog
raise APIValueError('id', 'id can not find...')
def api_delete_comments(request, *, comment_id):
'''delete comment.'''
check_admin(request)
comment = yield from Comment.find(comment_id)
if comment is None:
raise APIResourceNotFoundError('Comment', 'can not find comment, comment id: {}'.format(comment_id))
yield from comment.remove()
return comment
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid.??cookie???????cookie???????
'''
if not cookie_str:
return None
try:
# ?????????????????“-”??cookie?????id?????????????
L = cookie_str.split('-') # ????str?list
if len(L) != 3: # cookie????????????????????????????
return None
uid, expires, sha1 = L
if int(expires) < time.time(): # ??????????cookie???
return None
user = yield from User.find(uid) # ???????????
if user is None: # ??????????????
return None
# ??sha1?????????cookie??sha1?????
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
# ????????????
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
# ??cookie??????????????????????????????
# ?? ????????
return user
except Exception as e:
logging.exception(e)
return None
# ----------------------------------?????--------------------------------
# day14???
# ?????
def api_get_blog(*, id):
blog = yield from Blog.find(id)
return blog
# day11??
# API?????????
def api_delete_comments(id, request):
check_admin(request) #???????????
c = yield from Comment.find(id) # ?????????
if c is None:
raise APIResourceNotFoundError('Comment')
yield from c.remove() # ????
return dict(id=id) # ????????id
# day14??
# API:????
def api_delete_blog(request, *, id):
check_admin(request)
blog = yield from Blog.find(id)
yield from blog.remove()
return dict(id=id)
def find():
await orm.create_pool(loop, user='root', password='password', db='awesome')
all = await User.findAll()
print(all)
pk = await User.find('00149276202953187d8d3176f894f1fa82d9caa7d36775a000')
print(pk)
num = await User.findNumber('email')
print(num)
await orm.destory_pool()
def get_blog(id):
blog = yield from Blog.find(id)
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'comments': comments
}
def api_create_comment(id, request, *, content):
user = request.__user__
if user is None:
raise APIPermissionError('Please signin first.')
if not content or not content.strip():
raise APIValueError('content')
blog = yield from Blog.find(id)
if blog is None:
raise APIResourceNotFoundError('Blog')
comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip())
yield from comment.save()
return comment
def api_delete_comments(id, request):
check_admin(request)
c = yield from Comment.find(id)
if c is None:
raise APIResourceNotFoundError('Comment')
yield from c.remove()
return dict(id=id)
def api_get_blog(*, id):
blog = yield from Blog.find(id)
return blog
def api_delete_blog(request, *, id):
check_admin(request)
blog = yield from Blog.find(id)
yield from blog.remove()
return dict(id=id)
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid
'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = await User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
# @get('/')
# async def index(request):
# users = await User.findAll()
# return {
# '__template__': 'test.html',
# 'users': users
# }