def authenticate(*,email,passwd):
if not email:
raise APIValueError('email','Invalid email')
if not passwd:
raise APIValueError('passwd','Invalid passwd')
users = await User.findAll('email=?',[email])
if len(users) == 0:
raise APIValueError('email','Email not exist.')
user = users[0]
# check passwd
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd','passwd error')
# authenticate ok set cookie
r = web.Response()
r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user,ensure_ascii=False).encode('utf-8')
return r
# ??
python类findAll()的实例源码
def index(*, page='1'):
page_index = get_page_index(page)
num = yield from Blog.findNumber('count(id)')
page = Page(num, page_index)
if num == 0:
blogs = []
else:
blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(page.offset, page.limit))
# ?????????????????????
# app.py?response_factory???handler.py??????????
return {
'__template__': 'blogs.html',
'page': page,
'blogs': blogs
}
# day11??
# ????????
def get_blog(id, request):
blog = yield from Blog.find(id) # ??id???????????
# ????????blog??????????????????????
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
# ?????????html??
for c in comments:
c.html_content = text2html(c.content)
# blog??markdown????????html??
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'__user__':request.__user__,
'comments': comments
}
# day10???
# ???????
def authenticate(*, email, passwd):
if not email:
raise APIValueError('email', 'Invalid email.')
if not passwd:
raise APIValueError('passwd', 'Invalid password.')
users = yield from User.findAll('email=?', [email])
if len(users) == 0:
raise APIValueError('email', 'Email not exist.')
user = users[0]
# check passwd:
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd', 'Invalid password.')
# authenticate ok, set cookie:
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
def index(*, page='1'):
page_index = get_page_index(page)
num = yield from Blog.findNumber('count(id)')
page = Page(num, page_index)
if num == 0:
blogs = []
else:
blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(page.offset, page.limit))
# ?????????????????????
# app.py?response_factory???handler.py??????????
return {
'__template__': 'blogs.html',
'page': page,
'blogs': blogs
}
# day11??
# ????????
def get_blog(id, request):
blog = yield from Blog.find(id) # ??id???????????
# ????????blog??????????????????????
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
# ?????????html??
for c in comments:
c.html_content = text2html(c.content)
# blog??markdown????????html??
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'__user__':request.__user__,
'comments': comments
}
# day10???
# ???????
def index(request,*,page='1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
page = Page(num,page_index)
if page == 0:
blogs = []
else:
blogs = await Blog.findAll(orderBy = 'created_at desc',limit=(page.offset,page.limit))
return{
'__template__' : 'blogs.html',
'page' : page,
'blogs' : blogs,
'__user__' : request.__user__
}
def get_blog(id,request):
blog = await Blog.find(id)
comments = await Comment.findAll('blog_id=?',[id],orderBy='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__' : 'blog.html',
'blog' : blog,
'comments' : comments,
'__user__' : request.__user__
}
# ????
def api_comments(*,page='1'):
page_index = get_page_index(page)
num = await Comment.findNumber('count(id)')
p = Page(num,page_index)
if num == 0:
return dict(page=p,comments=())
comments = await Comment.findAll(orderBy='created_at desc',limit=(p.offset,p.limit))
return dict(page=p,comments=comments)
def api_get_users(*,page='1'):
page_index = get_page_index(page)
num = await User.findNumber('count(id)')
p = Page(num,page_index)
if num ==0 :
return dict(page=p,users=())
users = await User.findAll(orderBy='created_at desc',limit=(p.offset,p.limit))
for u in users:
u.passwd = '******'
return dict(page=p,users=users)
def api_blogs(*,page='1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
p = Page(num,page_index)
if num == 0:
return dict(page=0,blogs={})
blogs = await Blog.findAll(orderBy='created_at desc',limit=(p.offset,p.limit))
return dict(page=p,blogs=blogs)
def authenticate(*, email, passwd): # ???????????
# ????????????
if not email:
raise APIValueError('email', 'Invalid email.')
if not passwd:
raise APIValueError('passwd', 'Invalid password.')
# ???????email???list?????
users = yield from User.findAll('email=?', [email])
# ??list???0??????????????????????
if len(users) == 0:
raise APIValueError('email', 'Email not exist.')
user = users[0] # ??????.???,?????????,???????list
# ????:
# ????????????????,????????
# ????????????????????,???????????????,?????????
# ???????????:sha1 = hashlib.sha1((user.id+":"+passwd).encode("utf-8"))
# ???????????????(?api_register_user),??????
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd', 'Invalid password.')
# ???????????cookie:
# ?????????????
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
# ??????
def api_comments(*, page='1'):
page_index = get_page_index(page)
num = yield from Comment.findNumber('count(id)') # num?????
p = Page(num, page_index) # ??Page?????????
if num == 0:
return dict(page=p, comments=()) # ???????????????app.py?response??????
# ??????0,??????????
# limit??select??????????,?????????,?????????????
comments = yield from Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, comments=comments)
# day14??
# API?????
def find():
await orm.create_pool(loop, user='root', password='password', db='awesome')
all = await User.findAll()
print(all)
pk = await User.find('00149276202953187d8d3176f894f1fa82d9caa7d36775a000')
print(pk)
num = await User.findNumber('email')
print(num)
await orm.destory_pool()
def index(*, page='1'):
page_index = get_page_index(page)
num = yield from Blog.findNumber('count(id)')
page = Page(num)
if num == 0:
blogs = []
else:
blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(page.offset, page.limit))
return {
'__template__': 'blogs.html',
'page': page,
'blogs': blogs
}
def get_blog(id):
blog = yield from Blog.find(id)
comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'comments': comments
}
def api_comments(*, page='1'):
page_index = get_page_index(page)
num = yield from Comment.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, comments=())
comments = yield from Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, comments=comments)
def api_get_users(*, page='1'):
page_index = get_page_index(page)
num = yield from User.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, users=())
users = yield from User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
for u in users:
u.passwd = '******'
return dict(page=p, users=users)
def api_blogs(*, page='1'):
page_index = get_page_index(page)
num = yield from Blog.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, blogs=())
blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, blogs=blogs)
def cookie2user(cookie_str):
'''
Parse cookie and load user if cookie is valid
'''
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = await User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
# @get('/')
# async def index(request):
# users = await User.findAll()
# return {
# '__template__': 'test.html',
# 'users': users
# }
def index(request):
summary = 'Lorem ipsum dolor amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut lobore et dolore magna aliqua.'
# blogs = [
# Blog(id='1', name='Test Blog', summary=summary, created_at=time.time()-120),
# Blog(id='2', name='Something New', summary=summary, created_at=time.time()-3600),
# Blog(id='3', name='Learn Swift', summary=summary, created_at=time.time()-7200)
# ]
blogs = await Blog.findAll()
return {
'__template__': 'blogs.html',
'blogs': blogs
}
def get_blog(id):
blog = await Blog.find(id)
comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'comments': comments
}
def api_get_users(*, page='1'):
page_index = get_page_index(page)
num = await User.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, users=())
users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
for u in users:
u.password = '******'
return dict(page=p, users=users)
def api_register_user(*, email, name, passwd):
if not name or not name.strip():
raise APIValueError('name', 'Invalid name.')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email', 'Invalid email.')
if not passwd or not _RE_SHA1.match(passwd):
raise APIValueError('passwd', 'Invalid password.')
users = await User.findAll('email=?', [email])
if len(users) > 0:
raise APIError('register: failed', 'email', 'Email is already in use.')
uid = next_id()
sha1_passwd = '%s:%s' % (uid, passwd)
user = User(
id=uid,
name=name.strip(),
email=email,
passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(),
image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest()
)
await user.save()
# make session cookie:
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
def api_blogs(*, page='1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, blogs=())
blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, blogs=blogs)
def api_comments(*, page='1'):
page_index = get_page_index(page)
num = await Comment.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, comments=())
comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, comments=comments)
def index(*, page = '1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
page = Page(num, page_index)
if num == 0:
blogs = []
else:
blogs = await Blog.findAll(orderBy = 'created_at desc', limit = (page.offset, page.limit))
return {
'__template__' : 'blogs.html',
'page' : page,
'blogs' : blogs
}
def get_blog(id):
'''
??blog???blog??.
'''
blog = await Blog.find(id)
comments = await Comment.findAll('blog_id=?', [id], orderby='created_at desc')
for c in comments:
c.html_content = text2html(c.content)
blog.html_content = markdown2.markdown(blog.content)
return {
'__template__' : 'blog.html',
'blog' : blog,
'comments' : comments
}
def authenticate(*, email, passwd):
'''
???????????cookie, ?signin?????.
'''
if not email:
raise APIValueError('email', 'Invalid email')
if not passwd:
raise APIValueError('passwd', 'Invalid passwd.')
users = await User.findAll('email=?', [email])
if len(users) == 0:
raise APIValueError('email', 'Email not exist.')
user = users[0]
# check passwd:
# sha1: create a SHA1 hash object
sha1 = hashlib.sha1()
# update: Update the hash object with the object arg,Repeated calls are equivalent to a single call with the concatenation of all the arguments
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd', 'Invalid password.')
# authenticate ok, set cookie:
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age = 86400, httponly = True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii = False).encode('utf-8')
return r
def api_comments(*, page = '1'):
'''
??????, ????manage_comments.html.
'''
page_index = get_page_index(page)
num = await Comment.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page = p, comments = ())
comments = await Comment.findAll(orderBy = 'created_at desc', limit = (p.offset, p.limit))
return dict(page = p, comments = comments)