def get(self):
parser = reqparse.RequestParser()
parser.add_argument("code", type=str, required=True, location="json")
code = parser.parse_args()["code"]
response = json.loads(requests.post("https://github.com/login/oauth/access_token", json={"code": code, "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET}).text)
accessToken = response["access_token"]
githubUser = json.loads(requests.get("https://api.github.com/user", data={"access_token": accessToken}).text)
dbUser = db.user.find_one({"_id": githubUser['id']})
if dbUser is None:
newUser = {"_id": githubUser["id"], "name": githubUser['username'], "joinDate": datetime.datetime.today().strftime('%Y-%m-%d')}
db.tempUser.insert_one(newUser)
return redirect(WEBSITE_DOMAIN+"/signup.html#"+newUser["_id"])
else:
session['userID'] = dbUser["_id"]
return jsonify({ "loggedIn": True, "user": user })
python类RequestParser()的实例源码
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("schoolID", type=str, required=True, location="json")
parser.add_argument("userID", type=str, required=True, location="json")
args = parser.parse_args()
user = db.tempUser.find_one({"_id": args["userID"]})
if user is None: abort(400)
school = db.school.find_one({"_id": args["schoolID"]})
if school is None: abort(400)
user["schoolID"] = args["schoolID"]
db.user.insert_one(user)
session['userID'] = user["_id"]
return jsonify(user, status=201)
def get_request_params(params):
"""
parse args from request, and return a dict
:param params: the params to parse
:type params: list
:return: dict
eg:
params = [
("name", str, True, ["headers", "cookies"]),
("password", str, True, ["args", "form"])
]
"""
parser = reqparse.RequestParser()
for p in params:
parser.add_argument(p[0], type=p[1], required=p[2], location=p[3])
return parser.parse_args()
def __init__(self, select_list_options, data_list_options,
radio_buttons_options, **kwargs):
self.parser = reqparse.RequestParser()
self.parser.add_argument('text_field')
self.parser.add_argument('number_field', type=int)
self.parser.add_argument(
'date_field',
type=lambda x: dt.datetime.strptime(x, "%Y-%m-%d").date()
)
self.parser.add_argument('select_list')
self.parser.add_argument('data_list')
self.parser.add_argument('checkbox', default=False,
type=lambda x: True if x == 'on' else False)
self.parser.add_argument('radio_button')
self.parser.add_argument('slider', type=int)
self.parser.add_argument('comment_field')
self.parser.add_argument('email')
self.parser.add_argument('password', type=pbkdf2_sha256.hash)
self.user_inputs = {k: v
for k, v in self.parser.parse_args().items()
if v is not None}
self.select_list = select_list_options
self.data_list = data_list_options
self.radio_buttons = radio_buttons_options
super().__init__()
def __init__(self, model, features, confusion_matrix, features_imp, **kwargs):
self.parser = reqparse.RequestParser()
self.parser.add_argument('petal_length_cm', type=float)
self.parser.add_argument('petal_width_cm', type=float)
self.parser.add_argument('sepal_length_cm', type=float)
self.parser.add_argument('sepal_width_cm', type=float)
self.parser.add_argument('return_html', type=bool, default=False)
args = self.parser.parse_args()
self.return_html = args['return_html']
self.user_inputs = {k: v
for k, v in args.items()
if v is not None and k is not 'return_html'}
self.model = model
self.features = features
self.confusion_matrix = confusion_matrix
self.features_imp = features_imp
super().__init__()
def post(self):
''' process threat-intel and conditionally alert user as threat notification '''
try:
parser = reqparse.RequestParser()
parser.add_argument('score', type=int, location='json')
parser.add_argument('threat_data', type=json_encode, location='json')
args = parser.parse_args()
# return jsonify(
# score = args['subject'],
# threat_data = json_decode(args['threat_data'])
# )
return jsonify(
status = 200,
message = 'Email delivery success'
)
except Exception as e:
return {'status' : str(e)}
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('text', location='form', required=True)
args = parser.parse_args()
emotion = comment_emotions.emotions(args['text'], g)
sentic_values = emotion.get_all_sentic_values()
compound_emotions = emotion.get_compound_emotion()
sentic_values = [e.name for e in sentic_values if e is not None]
compound_emotions = [(e.name, v.name) for e, v in compound_emotions]
return {
'sentic_values': sentic_values,
'compound_emotions': compound_emotions
}
def get(self, zip_code=None):
parser = reqparse.RequestParser()
parser.add_argument('limit', type=int)
args = parser.parse_args(strict=True)
limit = args.get('limit')
if limit is not None:
zip_codes = Zipcode.limit(limit)
zip_codes_len = len(zip_codes)
if zip_codes_len < limit:
app.info_logger.info("Received option to list {limit}"
" zipcodes but only {quantity} were found".format(limit=limit,
quantity=zip_codes_len))
app.info_logger.info("Listing {quantity} zip_codes".format(quantity=zip_codes_len))
return [zip_code.to_dict() for zip_code in zip_codes], 200
elif zip_code is not None:
zip_code_document = Zipcode.get_or_404(zip_code=zip_code,
message="zip code {zip_code} not found.".format(zip_code=zip_code))
app.info_logger.info("Get zip_code: {zip_code}".format(zip_code=zip_code))
return zip_code_document.to_dict(), 200
app.error_logger.error("No limit or zip_code option were found in the request")
abort(400, message="You must provide the limit or zip_code.")
def __init__(self):
self.__app = Flask(__name__)
self.__api = Api(self.__app)
self.__parser = reqparse.RequestParser()
self.__stub_to_peer_service = None
# SSL ?? ??? ?? context ?? ??? ????.
if conf.ENABLE_REST_SSL == 0:
self.__ssl_context = None
elif conf.ENABLE_REST_SSL == 1:
self.__ssl_context = (conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH)
else:
self.__ssl_context = ssl.SSLContext(_ssl.PROTOCOL_SSLv23)
self.__ssl_context.verify_mode = ssl.CERT_REQUIRED
self.__ssl_context.check_hostname = False
self.__ssl_context.load_verify_locations(cafile=conf.DEFAULT_SSL_TRUST_CERT_PATH)
self.__ssl_context.load_cert_chain(conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH)
def post(self):
args = self.reqparse.parse_args()
if parse_handler==None:
return {"result":"fail", "reason":"Please initialize the model first!"}, 400
#parse = parser.SyntaxnetParser(segmenter_model,parser_model,folder=args['syntax_folder'])
try:
return parse_handler.parse_multi_string(args['strings'],tree=args['tree'])
except Exception as e:
return {'result': 'fail', "reason": str(e)}, 400
# class SyntaxModelQuery(Resource):
# def __init__(self):
# self.reqparse = reqparse.RequestParser()
# self.reqparse.add_argument('strings', required=True, type=list, help='string is required field, you should input a list of strings', location='json')
# self.reqparse.add_argument('syntax_folder', required=False, type=str, default=config.syntaxnetFolder,
# location='json')
# super(SyntaxModelQuery, self).__init__()
#
# def post(self,folder):
# args = self.reqparse.parse_args()
# parse = parser.SyntaxnetParser(folder=args['syntax_folder'])
# try:
# return parse.parse_multi_string_custom(args['strings'],folder=folder)
# except Exception, e:
# return {'result': 'fail', "reason": e}, 400
def get(self):
parser = reqparse.RequestParser()
parser.add_argument('url', required=True, help='url cannot be blank!')
parser.add_argument('exclude_semiotic_layer', action='append')
parser.add_argument('domain')
parser.add_argument('already_converted', type=inputs.boolean, default=False)
parser.add_argument('debug', type=inputs.boolean, default=False) # remove this when in production
args = parser.parse_args()
semiotic_quality_flags = {'syntactic', 'semantic', 'pragmatic', 'social'}
if args.exclude_semiotic_layer:
exclude_semiotic_layer = set(args.exclude_semiotic_layer)
else:
exclude_semiotic_layer = set()
if exclude_semiotic_layer & semiotic_quality_flags != exclude_semiotic_layer:
raise ValueError(
'Invalid semiotic layer. Must be one of: {}.'.format(', '.join(semiotic_quality_flags)))
return owl_quality(args.url, semiotic_quality_flags - exclude_semiotic_layer,
args.domain, already_converted=args.already_converted, debug=args.debug)
def get(self, task_id):
''' Endpoint for getting information about current state of task '''
order_queries = {
'latest': Answer.query
.filter_by(task_id=task_id)
.order_by(Answer.created_at.desc()),
'shortest': Answer.query.filter_by(task_id=task_id)
.order_by(Answer.length)
}
parser = reqparse.RequestParser()
parser.add_argument('order', location='args', default='latest')
args = parser.parse_args()
task = manager.get_task(task_id)
if not task:
return make_response("Tried to query an invalid task.", 400)
answers = [answer.to_dict()
for answer in order_queries[args.order].all()]
return jsonify({
'task': task,
'answers': answers
})
def post(self):
'''Endpoint for getting auth token for existing user'''
parser = reqparse.RequestParser()
parser.add_argument('email', required=True)
parser.add_argument('password', required=True)
args = parser.parse_args()
hash_obj = hashlib.sha256()
hash_obj.update(args.password.encode('utf-8'))
password_hash = hash_obj.hexdigest()
user = db.session.query(User).filter((User.email==args.email) & (User.password_hash==password_hash)).first()
if user is None:
return redirect('/?message=%s' % 'Could not find account.')
# allocate and maintain session token
token = os.urandom(256)
tokens[token] = args.email
session['token'] = token
session['username'] = user.username
return redirect('/')
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('node_select', type=int)
parser.add_argument('wind_directtion', type=int)
parser.add_argument('wind_speed', type=int)
parser.add_argument('working_model', type=int)
parser.add_argument('temp_setting', type=int)
parser.add_argument('switch', type=int)
args = parser.parse_args()
print(args)
mqtt_pub_air_con(args)
return args
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('tianshuoSwitch', type=str)
parser.add_argument('tianshuoNo', type=int)
args = parser.parse_args()
print(args)
tianshuoNo = args['tianshuoNo']
tianshuoSwitch = args['tianshuoSwitch']
if tianshuoSwitch == '1':
output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 9)
rs485_socket_send(output_hex)
print("{0} switch on!".format(tianshuoNo))
elif tianshuoSwitch == '0':
output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 1)
rs485_socket_send(output_hex)
print("{0} switch off!".format(tianshuoNo))
def get(self):
parser = reqparse.RequestParser()
parser.add_argument('airconSwitch', type=str)
parser.add_argument('barnNo', type=str)
args = parser.parse_args()
print(args)
barnNo = args['barnNo']
airconSwitch = args['airconSwitch']
nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter(
GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all()
print("nodes are:", nodes)
power_controls_dic = {'data': 'power_controls'}
return nodes
def get(self):
parser = reqparse.RequestParser()
parser.add_argument('airconSwitch', type=str)
parser.add_argument('barnNo', type=str)
args = parser.parse_args()
print(args)
barnNo = args['barnNo']
airconSwitch = args['airconSwitch']
nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter(
GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all()
print("nodes are:", nodes)
if airconSwitch == '1':
pass
elif airconSwitch == '0':
pass
else:
print('airconSwitch is :', airconSwitch)
return nodes
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('user_email', type=str)
parser.add_argument('subject', type=str)
parser.add_argument('user_name', type=str)
parser.add_argument('alarm_msg', type=str)
args = parser.parse_args()
print('\n' * 5)
print('**email**' *5)
print(args)
print('**email**' *5)
print('\n' * 5)
user_email = args['user_email']
subject = args['subject']
user_name = args['user_name']
alarm_msg = args['alarm_msg']
send_email(user_email, subject, 'mail/email_alarm', user_name=user_name, alarm_msg=alarm_msg)
return 'alarm email has sended successfuly!'
def get(self):
"""API endpoint to retrieve a list of links to transaction
outputs.
Returns:
A :obj:`list` of :cls:`str` of links to outputs.
"""
parser = reqparse.RequestParser()
parser.add_argument('public_key', type=parameters.valid_ed25519,
required=True)
parser.add_argument('spent', type=parameters.valid_bool)
args = parser.parse_args(strict=True)
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
outputs = bigchain.get_outputs_filtered(args['public_key'],
args['spent'])
return [{'transaction_id': output.txid, 'output_index': output.output}
for output in outputs]
def get(self):
"""API endpoint to get the related blocks for a transaction.
Return:
A ``list`` of ``block_id``s that contain the given transaction. The
list may be filtered when provided a status query parameter:
"valid", "invalid", "undecided".
"""
parser = reqparse.RequestParser()
parser.add_argument('transaction_id', type=str, required=True)
parser.add_argument('status', type=str, case_sensitive=False,
choices=[Bigchain.BLOCK_VALID, Bigchain.BLOCK_INVALID, Bigchain.BLOCK_UNDECIDED])
args = parser.parse_args(strict=True)
tx_id = args['transaction_id']
status = args['status']
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
block_statuses = bigchain.get_blocks_status_containing_tx(tx_id)
blocks = [block_id for block_id, block_status in block_statuses.items()
if not status or block_status == status]
return blocks
def get_arguments(self):
parser = reqparse.RequestParser()
parser.add_argument(
"old_password",
required=True,
help="Old password is missing."
)
parser.add_argument(
"password",
required=True,
help="New password is missing."
)
parser.add_argument(
"password_2",
required=True,
help="New password confirmation is missing."
)
args = parser.parse_args()
return (
args["old_password"],
args["password"],
args["password_2"]
)
def get_arguments(self):
parser = reqparse.RequestParser()
geometry_type = files_service.get_or_create_output_type("geometry")
maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max")
parser.add_argument("mode", default="working")
parser.add_argument("sep", default="/")
parser.add_argument("software_id", default=maxsoft["id"])
parser.add_argument("output_type_id", default=geometry_type["id"])
parser.add_argument("name", default="name")
args = parser.parse_args()
return (
args["mode"],
args["software_id"],
args["output_type_id"],
args["name"],
args["sep"],
)
def get_arguments(self):
geometry_type = files_service.get_or_create_output_type("geometry")
maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max")
parser = reqparse.RequestParser()
parser.add_argument("mode", default="working")
parser.add_argument("comment", default="")
parser.add_argument("version", default=0)
parser.add_argument("software_id", default=maxsoft["id"])
parser.add_argument("output_type_id", default=geometry_type["id"])
parser.add_argument("name", default="")
parser.add_argument("sep", default="/")
args = parser.parse_args()
return (
args["mode"],
args["version"],
args["comment"],
args["software_id"],
args["output_type_id"],
args["name"],
args["sep"]
)
def get_arguments(self):
parser = reqparse.RequestParser()
output_type = files_service.get_or_create_output_type("Geometry")
parser.add_argument("output_type_id", default=output_type["id"])
parser.add_argument("person_id", default="")
parser.add_argument("comment", default="")
parser.add_argument("revision", default=0, type=int)
parser.add_argument("separator", default="/")
parser.add_argument("extension", default="")
args = parser.parse_args()
return (
args["comment"],
args["person_id"],
args["output_type_id"],
args["revision"],
args["separator"],
args["extension"]
)
def get_arguments(self):
parser = reqparse.RequestParser()
parser.add_argument(
"email",
help="The email is required.",
required=True
)
parser.add_argument(
"first_name",
help="The first name is required.",
required=True
)
parser.add_argument(
"last_name",
help="The last name is required.",
required=True
)
parser.add_argument("phone", default="")
args = parser.parse_args()
return args
def __init__(self, **kwargs):
self.restapi = kwargs['restapi']
self.server = kwargs['server']
self.fields = {
'port': fields.Integer,
'name': fields.String,
'type': fields.String,
'created': fields.DateTime(dt_format='iso8601'),
'updated': fields.DateTime(dt_format='iso8601')
}
self.parser = reqparse.RequestParser(bundle_errors=True)
self.parser.add_argument('appeui', type=int)
self.parser.add_argument('port', type=int)
self.parser.add_argument('name', type=str)
self.parser.add_argument('type', type=str)
self.args = self.parser.parse_args()
def __init__(self, **kwargs):
self.restapi = kwargs['restapi']
self.server = kwargs['server']
self.fields = {
'appif': fields.Integer,
'name': fields.String,
}
self.parser = reqparse.RequestParser(bundle_errors=True)
self.parser.add_argument('type', type=str)
self.parser.add_argument('name', type=str)
self.parser.add_argument('protocol', type=str)
self.parser.add_argument('iothost', type=str)
self.parser.add_argument('keyname', type=str)
self.parser.add_argument('keyvalue', type=str)
self.parser.add_argument('pollinterval', type=int)
self.args = self.parser.parse_args()
def test_get(self):
"""Test get method"""
app = self._test_application()
mockDBObject.return_value = app
with patch.object(reqparse.RequestParser, 'parse_args'):
resource = RestApplication(restapi=self.restapi, server=self.server)
# Fail to find a device: raises 404 NotFound
with patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
yield self.assertFailure(resource.get(app.appeui), e.NotFound)
# Find a device success returns a dict of field values
with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
resource.getProperties = MagicMock()
result = yield resource.get(app.appeui)
self.assertEqual(app.appeui, result['appeui'])
def test_put(self):
"""Test put method"""
app = self._test_application()
mockDBObject.return_value = app
with patch.object(reqparse.RequestParser, 'parse_args'):
resource = RestApplication(restapi=self.restapi, server=self.server)
# Fail to find a device: raises 404 NotFound
with patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
yield self.assertFailure(resource.put(app.appeui), e.NotFound)
# Find a device, device fails validity check: raises 400 BadRequest
with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
app.valid = MagicMock(return_value=(False, {}))
yield self.assertFailure(resource.put(app.appeui), e.BadRequest)
# Pass validity check, returns 200
expected = ({}, 200)
app.valid = MagicMock(return_value=(True, {}))
app.update = MagicMock()
result = yield resource.put(app.appeui)
self.assertEqual(expected, result)
def test_delete(self):
"""Test delete method"""
app = self._test_application()
mockDBObject.return_value = app
with patch.object(reqparse.RequestParser, 'parse_args'):
resource = RestApplication(restapi=self.restapi, server=self.server)
# Device exists with AppEUI: raises 400 error
with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)):
yield self.assertFailure(resource.delete(app.appeui), e.BadRequest)
# Fail to find the application: raises 404 NotFound
with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \
patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
yield self.assertFailure(resource.delete(app.appeui), e.NotFound)
# Find and delete, returns 200
with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \
patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
expected = ({}, 200)
app.delete = MagicMock()
result = yield resource.delete(app.appeui)
self.assertEqual(expected, result)