def testPost(self):
exampleUser = copy.deepcopy(EXAMPLE_USER)
exampleSchool = copy.deepcopy(EXAMPLE_SCHOOL)
assert self.db.user.find_one(exampleUser) is None
# Try posting without having logged in via github
args = {"schoolID": exampleSchool["_id"], "userID": exampleUser["_id"]}
req = self.app.post("/users", data=json.dumps(args), content_type="application/json")
assert req.status_code == 400
# Fake github login
self.db.tempUser.insert_one(exampleUser)
self.db.school.insert_one(exampleSchool)
req = self.app.post("/users", data=json.dumps(args), content_type="application/json")
assert req.status_code == 201
returnedUser = json.loads(req.data.decode("utf-8"))
assert returnedUser["schoolID"] == exampleSchool["_id"]
returnedUser.pop("schoolID")
assert areDicsEqual(exampleUser, returnedUser)
assert self.db.user.find_one(exampleUser) is not None
python类db()的实例源码
def add_sample_search(self):
uchicago_lat = 41.7886
uchicago_long = -87.5987
radius = 1000
url = "https://data.cityofchicago.org/resource/6zsd-86xi.json"
data_source1 = DataSource("Crimes 2001 - Present", url, [])
filter1 = Filter("primary_type", "THEFT")
data_search1 = DataSearch(data_source1, [])
data_search2 = DataSearch(data_source1, [filter1])
search = Search([data_search1, data_search2], uchicago_lat, uchicago_long, radius, "sample")
main.db.session.add(search)
main.db.session.flush()
return search.id
# Make sure that correctly constructed searches can retrieve the data
# from their data sources
def register():
username = loggedIn(session, LoggedIn)
if username != False:
return render_template('index.html', username=username)
form = RegisterForm()
if form.validate_on_submit():
hashedPwd = hashpw(str(request.form['password']).encode('utf-8'), gensalt()) # encrypt user's password
user = User(username=request.form['username'], password=hashedPwd) # create user
db.session.add(user)
db.session.commit() # save new user in User table
new_user = User.query.filter_by(username=request.form['username']).first() # new profile
user_profile = Profile(user_id=new_user.id, name="no-name", surname="no-surname", avatar="saitama-batman.jpg", description="no-description", skills="no-skills,")
db.session.add(user_profile)
db.session.commit() # save new profile in Profile table
return render_template('registration_success.html', username=request.form['username'])
return render_template('register.html', form=form)
def setUp(self):
main.db = MongoClient().test
self.db = main.db
self.app = main.app.test_client()
def testGetAll(self):
assert b'[]' in self.app.get("/users").data
exampleUser = copy.deepcopy(EXAMPLE_USER)
self.db.user.insert_one(exampleUser)
newUser = json.loads(self.app.get("/users").data.decode("utf-8"))[0]
assert areDicsEqual(exampleUser, newUser)
def testGet(self):
assert self.app.get("/users/1").status_code == 404
exampleUser = copy.deepcopy(EXAMPLE_USER)
self.db.user.insert_one(exampleUser)
newUser = json.loads(self.app.get("/users/"+str(exampleUser['_id'])).data.decode("utf-8"))
assert areDicsEqual(exampleUser, newUser)
def generateExampleEvent(db):
exampleUser = copy.deepcopy(EXAMPLE_USER)
db.user.insert_one(exampleUser)
return {"userID": str(exampleUser["_id"]), "title": "New submission", "description": "Pushed a submission with a score of 14"}
def testGet(self):
assert self.app.get("/events/1").status_code == 404
exampleEvent = generateExampleEvent(self.db)
self.db.event.insert_one(exampleEvent)
newEvent = json.loads(self.app.get("/events/"+str(exampleEvent['_id'])).data.decode("utf-8"))
assert areDicsEqual(exampleEvent, newEvent)
def testPost(self):
exampleEvent = generateExampleEvent(self.db)
assert self.db.event.find_one(exampleEvent) is None
req = self.app.post("/events", data=json.dumps(exampleEvent), content_type="application/json")
assert req.status_code == 201
returnedEvent = json.loads(req.data.decode("utf-8"))
assert "_id" in returnedEvent
returnedEvent.pop("_id")
assert areDicsEqual(exampleEvent, returnedEvent)
assert self.db.event.find_one(exampleEvent) is not None
def testGetAll(self):
assert b'[]' in self.app.get("/problems").data
exampleProblem = copy.deepcopy(EXAMPLE_PROBLEM)
self.db.problem.insert_one(exampleProblem)
newProblem = json.loads(self.app.get("/problems").data.decode("utf-8"))[0]
assert areDicsEqual(exampleProblem, newProblem)
def testGet(self):
assert self.app.get("/problems/1").status_code == 404
exampleProblem = copy.deepcopy(EXAMPLE_PROBLEM)
self.db.problem.insert_one(exampleProblem)
newProblem = json.loads(self.app.get("/problems/"+str(exampleProblem['_id'])).data.decode("utf-8"))
assert areDicsEqual(exampleProblem, newProblem)
def generateExampleEntry(db, exampleProblem=EXAMPLE_PROBLEM, exampleUser=EXAMPLE_USER):
exampleUser = copy.deepcopy(exampleUser)
exampleProblem = copy.deepcopy(exampleProblem)
if db.user.find_one(exampleUser) is None:
db.user.insert_one(exampleUser)
if db.problem.find_one(exampleProblem) is None:
db.problem.insert_one(exampleProblem)
return {"problemID": str(exampleProblem["_id"]), "userID": str(exampleUser["_id"]), "score": "12"}
def testGetProblem(self):
assert b'[]' in self.app.get("/entries").data
exampleEntry1 = generateExampleEntry(self.db)
exampleProblem2 = copy.deepcopy(EXAMPLE_PROBLEM)
exampleProblem2["name"] = "Other Problem"
exampleEntry2 = generateExampleEntry(self.db, exampleProblem=exampleProblem2)
self.db.entry.insert_one(exampleEntry1)
self.db.entry.insert_one(exampleEntry2)
returnedEntries = json.loads(self.app.get("/entries", query_string={"problemID": exampleEntry1["problemID"]}).data.decode("utf-8"))
assert areDicsEqual(exampleEntry1, returnedEntries[0])
assert len(returnedEntries) == 1
def testGet(self):
assert self.app.get("/entries/1").status_code == 404
exampleEntry = generateExampleEntry(self.db)
self.db.entry.insert_one(exampleEntry)
newEntry = json.loads(self.app.get("/entries/"+str(exampleEntry['_id'])).data.decode("utf-8"))
assert areDicsEqual(exampleEntry, newEntry)
def testGetAll(self):
assert b'[]' in self.app.get("/blogs").data
exampleBlog = copy.deepcopy(EXAMPLE_BLOG)
self.db.blog.insert_one(exampleBlog)
newBlog = json.loads(self.app.get("/blogs").data.decode("utf-8"))[0]
assert areDicsEqual(exampleBlog, newBlog)
def testGet(self):
assert self.app.get("/blogs/1").status_code == 404
exampleBlog = copy.deepcopy(EXAMPLE_BLOG)
self.db.blog.insert_one(exampleBlog)
newBlog = json.loads(self.app.get("/blogs/"+str(exampleBlog['_id'])).data.decode("utf-8"))
assert areDicsEqual(exampleBlog, newBlog)
def testGet(self):
assert b'{"results": {}}' == self.app.get("/search", query_string={"query": "thisshouldbeinnothing"}).data
exampleUser = copy.deepcopy(EXAMPLE_USER)
self.db.user.insert_one(exampleUser)
req = self.app.get("/search", query_string={"query": exampleUser['name']})
returnedResults = json.loads(req.data.decode("utf-8"))
correctResult = {"results": {"user": {"name": "User", "results": [{"title": exampleUser["name"], "url": "/users/?"+str(exampleUser["_id"])}]}}}
assert correctResult == returnedResults
def make_shell_context():
return dict(
app=app,
db=db,
User=User,
Post=Post,
Tag=Tag,
Comment=Comment
)
def make_shell_context():
return dict(
app=app,
db=db,
User=User,
Post=Post,
Tag=Tag,
Comment=Comment
)
def make_shell_context():
return dict(app=app, db=db, User=User, Post=Post, Tag=Tag)
def setUp(self):
main.app.config['TESTING'] = True
main.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test_testing.db'
main.app.config['WTF_CSRF_ENABLED'] = False
main.db.init_app(main.app)
with main.app.test_request_context():
main.db.create_all()
main.load_meta()
self.db = main.db
self.app = main.app.test_client()
def tearDown(self):
main.app.config['TESTING'] = False
with main.app.test_request_context():
self.db.session.remove()
self.db.drop_all()
def del_course(id):
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
# delete thumbnail
course = Course.query.filter_by(id=id).first()
target = path.join(APP_ROOT, 'static/courses/')
file_path = path.join(target, course.thumbnail)
remove(file_path)
# delete course
db.session.delete(course)
db.session.commit()
# render my courses page
user = User.query.filter_by(username=username).first()
courses = Course.query.filter_by(user_id=user.id).all()
return render_template('my_courses.html', username=username, courses=courses)
def add_descr():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = EditProfileForm()
a_skills = AddSkillsForm()
e_skills = EditSkillsForm()
e_avatar = EditAvatarForm()
e_descr = EditDescription()
e_flname = EditFullNameForm()
a_descr = AddDescription()
if a_descr.validate_on_submit():
user = User.query.filter_by(username=username).first()
user_profile = Profile.query.filter_by(user_id=user.id).first()
# add and save description
if user_profile.description == 'no-description':
user_profile.description = request.form['extra_descr']
else:
user_profile.description += request.form['extra_descr']
db.session.commit()
# render profile page
user_skills = user_profile.skills.split(',')
return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills)
# render edit profile pages
return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def edit_descr():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = EditProfileForm()
a_skills = AddSkillsForm()
e_skills = EditSkillsForm()
e_avatar = EditAvatarForm()
a_descr = AddDescription()
e_flname = EditFullNameForm()
e_descr = EditDescription()
if e_descr.validate_on_submit():
user = User.query.filter_by(username=username).first()
user_profile = Profile.query.filter_by(user_id=user.id).first()
# edit and save description
user_profile.description = request.form['new_descr']
db.session.commit()
# render profile page
user_skills = user_profile.skills.split(',')
return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills)
# render edit profile pages
return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def edit_flname():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = EditProfileForm()
a_skills = AddSkillsForm()
e_skills = EditSkillsForm()
e_avatar = EditAvatarForm()
e_descr = EditDescription()
a_descr = AddDescription()
e_flname = EditFullNameForm()
if e_flname.validate_on_submit():
user = User.query.filter_by(username=username).first()
user_profile = Profile.query.filter_by(user_id=user.id).first()
# change skills
user_profile.name = request.form['new_name']
user_profile.surname = request.form['new_surname']
db.session.commit()
# render profile page
user_skills = user_profile.skills.split(',')
return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills)
# render edit profile pages
return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def add_skills():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = EditProfileForm()
e_skills = EditSkillsForm()
e_avatar = EditAvatarForm()
e_flname = EditFullNameForm()
e_descr = EditDescription()
a_descr = AddDescription()
a_skills = AddSkillsForm()
if a_skills.validate_on_submit():
user = User.query.filter_by(username=username).first()
user_profile = Profile.query.filter_by(user_id=user.id).first()
# change profile and save
if user_profile.skills == 'no-skills':
user_profile.skills = request.form['new_skills']
else:
user_profile.skills += request.form['new_skills']
db.session.commit()
# render profile page
user_skills = user_profile.skills.split(',')
return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills)
# render edit profile page
return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def course():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = CourseForm()
if form.validate_on_submit():
user = User.query.filter_by(username=username).first()
# save thumbnail
thumbnail = form.thumbnail.data
filename = secure_filename(thumbnail.filename)
rand_ID = str(randId())
while True:
result = Course.query.filter_by(thumbnail=rand_ID).first()
if result:
rand_ID = str(randId())
else:
break
filename = ''+rand_ID+'.jpg'
target = path.join(APP_ROOT, 'static/courses/')
if not path.isdir(target):
mkdir(target)
thumbnail.save(path.join(target, filename))
# save course
title = request.form['title']
description = request.form['description']
required_skills = request.form['required_skills']
course = Course(user_id=user.id, title=title, description=description, thumbnail=filename, required_skills=required_skills)
db.session.add(course)
db.session.commit()
return render_template('index.html', username=username)
return render_template('new_course.html', form=form)
def login():
username = loggedIn(session, LoggedIn)
if username != False:
return render_template('index.html', username=username)
form = LoginForm()
if form.validate_on_submit():
pwd = request.form['password']
existing_user = User.query.filter_by(username=request.form['username']).first()
if not existing_user:
error = 'Username or password are incorrect.'
return render_template('login.html', form=form, loggedInError=error)
hash1 = hashpw(str(pwd).encode('utf-8'), str(existing_user.password).encode('utf-8'))
hash2 = existing_user.password
if hash1 != hash2:
error = 'Username or password are incorrect.'
return render_template('login.html', form=form, loggedInError=error)
rand_ID = str(randId())
while True:
user = LoggedIn.query.filter_by(rand_id=rand_ID).first()
if user:
rand_ID = str(randId())
else:
break
userLoggedIn = LoggedIn(username=request.form['username'], rand_id=rand_ID)
db.session.add(userLoggedIn)
db.session.commit()
session['user'] = rand_ID
return render_template('index.html', username=userLoggedIn.username)
return render_template('login.html', form=form)
def edit_avatar():
username = loggedIn(session, LoggedIn)
if username == False:
form = LoginForm()
return render_template('login.html', form=form)
form = EditProfileForm()
a_skills = AddSkillsForm()
e_skills = EditSkillsForm()
e_flname = EditFullNameForm()
e_descr = EditDescription()
a_descr = AddDescription()
e_avatar = EditAvatarForm()
if e_avatar.validate_on_submit():
user = User.query.filter_by(username=username).first()
user_profile = Profile.query.filter_by(user_id=user.id).first()
# update avatar
new_avatar = e_avatar.new_avatar.data
filename = secure_filename(new_avatar.filename)
rand_ID = str(randId())
while True:
result = Profile.query.filter_by(avatar=rand_ID).first()
if result:
rand_ID = str(randId())
else:
break
filename = ''+rand_ID+'.jpg' # ex: filename = ekjenrfueorf.jpg
target = path.join(APP_ROOT, 'static/avatars/')
if not path.isdir(target):
mkdir(target)
old_path = path.join(target, user_profile.avatar)
new_path = path.join(target, filename)
if user_profile.avatar != "saitama-batman.jpg":
remove(old_path)
new_avatar.save(new_path)
user_profile.avatar = filename
db.session.commit()
# render profile page
user_skills = user_profile.skills.split(',')
return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills)
# render edit profile pages
return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)