def deployment():
"""
"""
hasFiles = False
if request.files.getlist('file')[0]:
hasFiles = True
DATA = {'files': list(zip(request.files.getlist('file'), request.form.getlist('File Type'), request.form.getlist('file_code')))}
for file, fileType, fileCod in DATA['files']:
if fileType in ['static', 'data'] and not fileCod:
return json.dumps('Error - You should specify a file code for statics and outputs'), 500
env = request.values['env']
isNew = True if request.values['isNew'] == 'true' else False
if isNew:
createEnv(env)
if hasFiles:
return deployFiles(env, DATA)
return json.dumps("Environment created"), 200
python类values()的实例源码
def crawl():
try:
depth_limit = int(request.values['depth'])
except ValueError as e:
return "Depth parameter must be a number", 400
except:
depth_limit = 1
if 'url' in request.values:
url = request.values['url']
parsed_url = urlparse.urlsplit(url)
if parsed_url.scheme not in ['http', 'https']:
return "Only http and https protocols are supported", 400
if parsed_url.netloc == '':
return "Missing domain", 400
allowed_domains = [ parsed_url.netloc ]
crawler = Crawler(allowed_domains, depth_limit)
crawler.crawl(url)
return jsonify(**crawler.crawled)
else:
return "Missing url parameter", 400
def verify_slack_username(http_method, api_method, method_acl_userlist):
user_name = ''
if http_method == 'GET':
user_name = request.args.get('user_name', '')
elif http_method == 'POST':
user_name = request.values['user_name']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if "*" in method_acl_userlist:
super_name_list = method_acl_userlist["*"]
if user_name in super_name_list:
return None
if api_method in method_acl_userlist:
allowed_user_list = method_acl_userlist[api_method]
if user_name not in allowed_user_list:
content = "Error: user(%s) not allowed to call api_method(%s)" % \
(user_name, api_method)
raise Exception(content)
return None
def join(id):
event = find_event(id)
if event == None:
return not_found()
user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
if user == None:
return user_not_specified()
join_event(user, event)
return jsonify({
'joined': True,
'rsvps_disabled': False,
'event_archived': False,
'over_capacity': False,
'past_deadline': False,
})
def chan_friends(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/friend
with <channel> replaced for the channel of the friends you want to get
<channel> can either be an int that matches the channel, or a string
that matches the owner's username
"""
# model = request.path.split("/")[-1]
model = "Friend"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
data=results,
fields=fields
)
return make_response(jsonify(packet), code)
# There was an error!
# if not str(code).startswith("2"):
# return make_response(jsonify(packet), code)
# NOTE: Not needed currently, but this is how you would check
# TODO: Fix this endpoint to remove timing elements (friends are forever)
# TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/messages
with <channel> replaced for the messages you want to get
"""
model = "Message"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_quotes(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/quote
with <channel> replaced for the channel you want to get quotes for
"""
model = "quote"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"owner": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_commands(channel):
"""
If you GET this endpoint, simply go to /api/v1/channel/<channel>/command
with <channel> replaced for the channel you want to get commands for
"""
model = "Command"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"channelName": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def handle_states():
'''Returns all the states from the database as JSON objects with a GET
request, or adds a new state to the database with a POST request. Refer to
exception rules of peewee `get()` method for additional explanation of
how the POST request is handled:
http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get
'''
if request.method == 'GET':
list = ListStyle().list(State.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
'''Check that all the required parameters are made in request.'''
required = set(["name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
try:
State.select().where(State.name == request.form['name']).get()
return jsonify(code=10001, msg="State already exists"), 409
except State.DoesNotExist:
state = State.create(name=request.form['name'])
return jsonify(state.to_dict()), 201
def handle_state_id(state_id):
'''Select the state with the id from the database and store as the variable
`state` with a GET request method. Update the data of the particular state
with a PUT request method. This will take the parameters passed and update
only those values. Remove the state with this id from the database with
a DELETE request method.
Keyword arguments:
state_id: The id of the state from the database.
'''
try:
state = State.select().where(State.id == state_id).get()
except State.DoesNotExist:
return jsonify(msg="There is no state with this id."), 404
if request.method == 'GET':
return jsonify(state.to_dict()), 200
elif request.method == 'DELETE':
try:
state = State.delete().where(State.id == state_id)
except State.DoesNotExist:
raise Exception("There is no state with this id.")
state.execute()
return jsonify(msg="State deleted successfully."), 200
def handle_sms():
"""Handles Twillio SMS message"""
customer_phone_number = request.values["From"]
text_message_body = request.values["Body"]
# Retrieve the customer from the DB or create a new one
customer = Customer.get_customer_by_phone_number(customer_phone_number)
if not customer:
# Save the customer to the DB
customer = Customer.create_new({
CFields.PHONE_NUMBER: customer_phone_number,
})
customer.create()
customer_lc.on_new_customer()
messaging.on_message_recieve(customer, text_message_body)
return jsonpickle.encode(dict(
success=True
))
def engine():
if session.get('login_in',None):
if session.get('username',None):
if request.values.get('uuid',None):
uuid = request.values['uuid']
session['uuid'] = uuid
else:
uuid = session['uuid']
username = session['username']
try:
engine = models.Engine.query.filter_by(uuid = uuid, user_name = username).first()
base_url = "tcp://" + engine.host + ":" + engine.port
docker = DOCKER(base_url = base_url, timeout = 5, version = "auto")
return render_template('engine.html', host_info = docker.get_info(), usage = docker.monitor())
except Exception as msg:
return str(msg), 503
else:
return "Error 401, Authentication failed", 401
else:
return redirect('/login')
def change_passwd():
if session.get('login_in',None):
if session.get('username',None):
oldpassword = request.values['oldpassword']
newpassword = request.values['newpassword']
try:
user = models.User.query.filter_by(username = session['username']).first()
if check_password_hash(user.password, oldpassword):
user.password = generate_password_hash(newpassword)
db.session.add(user)
db.session.commit()
return jsonify(result="change sucessfull")
else:
return jsonify(result="change failed")
except:
db.session.rollback()
return jsonify(result="change failed")
finally:
db.session.close()
else:
return redirect('/login')
else:
return redirect('/login')
def readGcodeFilesForOrigin(origin):
if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown origin: %s" % origin, 404)
recursive = request.values.get("recursive", "false") in valid_boolean_trues
force = request.values.get("force", "false") in valid_boolean_trues
if force:
with _file_cache_mutex:
try:
del _file_cache[origin]
except KeyError:
pass
files = _getFileList(origin, recursive=recursive)
if origin == FileDestinations.LOCAL:
usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
return jsonify(files=files, free=usage.free, total=usage.total)
else:
return jsonify(files=files)
def _get_temperature_data(preprocessor):
if not printer.is_operational():
return make_response("Printer is not operational", 409)
tempData = printer.get_current_temperatures()
if "history" in request.values.keys() and request.values["history"] in valid_boolean_trues:
tempHistory = printer.get_temperature_history()
limit = 300
if "limit" in request.values.keys() and unicode(request.values["limit"]).isnumeric():
limit = int(request.values["limit"])
history = list(tempHistory)
limit = min(limit, len(history))
tempData.update({
"history": map(lambda x: preprocessor(x), history[-limit:])
})
return preprocessor(tempData)
def uploadLanguagePack():
input_name = "file"
input_upload_path = input_name + "." + settings().get(["server", "uploads", "pathSuffix"])
input_upload_name = input_name + "." + settings().get(["server", "uploads", "nameSuffix"])
if not input_upload_path in request.values or not input_upload_name in request.values:
return make_response("No file included", 400)
upload_name = request.values[input_upload_name]
upload_path = request.values[input_upload_path]
exts = filter(lambda x: upload_name.lower().endswith(x), (".zip", ".tar.gz", ".tgz", ".tar"))
if not len(exts):
return make_response("File doesn't have a valid extension for a language pack archive", 400)
target_path = settings().getBaseFolder("translations")
if tarfile.is_tarfile(upload_path):
_unpack_uploaded_tarball(upload_path, target_path)
elif zipfile.is_zipfile(upload_path):
_unpack_uploaded_zipfile(upload_path, target_path)
else:
return make_response("Neither zip file nor tarball included", 400)
return getInstalledLanguagePacks()
def _get_locale(self):
global LANGUAGES
if "l10n" in request.values:
return Locale.negotiate([request.values["l10n"]], LANGUAGES)
if "X-Locale" in request.headers:
return Locale.negotiate([request.headers["X-Locale"]], LANGUAGES)
if hasattr(g, "identity") and g.identity and userManager.enabled:
userid = g.identity.id
try:
user_language = userManager.getUserSetting(userid, ("interface", "language"))
if user_language is not None and not user_language == "_default":
return Locale.negotiate([user_language], LANGUAGES)
except octoprint.users.UnknownUser:
pass
default_language = self._settings.get(["appearance", "defaultLanguage"])
if default_language is not None and not default_language == "_default" and default_language in LANGUAGES:
return Locale.negotiate([default_language], LANGUAGES)
return Locale.parse(request.accept_languages.best_match(LANGUAGES))
def get_alerts_by_form_parameters():
'''
Get an alert based on the
'''
start = request.args.get('start') or request.form.get('start')
end = request.args.get('end') or request.form.get('end')
_logger.info('{}: {} - {}'.format(request.method,
request.path,
request.values))
# Convert to datetime objects
start_datetime = _parse_date(start)
end_datetime = _parse_date(end)
alerts = s3_model.get_alerts_by_date(start_datetime, end_datetime)
status_code = 200
success = True
response = {
'success': success,
'alerts': alerts
}
return jsonify(response), status_code
def create():
"""
????
:return:
"""
username = request.values.get("username")
password = request.values.get("password")
email = request.values.get("email")
gender = int(request.values.get("gender"))
password_hash = generate_password_hash(password, method='pbkdf2:sha1', salt_length=8)
user = User(username=username,
password_hash=password_hash,
email=email,
gender=gender)
user.save()
return success()
def session_test():
if request.method == 'DELETE':
session.pop('username')
# ?? ??
return 'Session deleted!'
else:
if 'username' in session:
# ?? ?? ?? ??
return 'Hello {0}'.format(session['username'])
else:
session['username'] = request.values['username']
# ?? ??
return 'Session appended!'
def __init__(self, request, columns, collection):
self.columns = columns
self.collection = collection
# values specified by the datatable for filtering, sorting, paging
self.request_values = request.values
# results from the db
self.result_data = None
# total in the table after filtering
self.cardinality_filtered = 0
# total in the table unfiltered
self.cadinality = 0
self.run_queries()
def profile_change():
user = g.user
if set(['password', 'question', 'answer']).issubset(request.values):
password = request.values['password']
if is_valid_password(password):
name = request.values['name']
question = request.values['question']
answer = request.values['answer']
user.name = name
user.password = password
user.question = question
user.answer = answer
db.session.add(user)
db.session.commit()
flash('Account information successfully changed.')
else:
flash('Password does not meet complexity requirements.')
return redirect(url_for('ph_bp.profile'))
def guild_config_history(guild):
def serialize(gcc):
return {
'user': serialize_user(gcc.user_id),
'before': unicode(gcc.before_raw),
'after': unicode(gcc.after_raw),
'created_at': gcc.created_at.isoformat(),
}
q = GuildConfigChange.select(GuildConfigChange, User).join(
User, on=(User.user_id == GuildConfigChange.user_id),
).where(GuildConfigChange.guild_id == guild.guild_id).order_by(
GuildConfigChange.created_at.desc()
).paginate(int(request.values.get('page', 1)), 25)
return jsonify(map(serialize, q))
def update():
if request.method == 'POST':
user = request.values['user']
vector = request.values['vector']
if user not in logged_in:
abort(404)
vector = np.fromstring(vector, dtype=float, sep=',')
if True:
fm = FeatureMatrix()
# nearest_neighbor = fm.match('A', np.array([.1,.2,.3,.5]))
nearest_neighbor = fm.match(user, vector)
else:
nearest_neighbor = querySpark.match(user, vector)
return jsonify(result='Success',
neighbor=nearest_neighbor)
def login():
if request.method == 'POST':
remote_ip = request.remote_addr
print("Login from client IP",remote_ip)
user = request.values['user']
pwd = request.values['password']
if user in logged_in:
return jsonify(result='Success')
if user in registered_ids.keys():
if registered_ids[user] == pwd:
print("logging in",user)
logged_in.append(user)
#r.set(user,str(remote_ip))
return jsonify(result='Success')
else:
abort(404)
def get_history(targets, start='-1h', end='now'):
"""Retrieve the time series data for one or more metrics.
Args:
targets (str|List[str]): Graphite path, or list of paths.
start (Optional[str]): Start of date range. Defaults to one hour ago.
end (Optional[str]): End of date range. Defaults to now.
Returns:
A list of metric values.
"""
metrics = query(targets, start, end)
try:
metrics = json.loads(metrics)[0]['datapoints']
except:
return []
# Convert unix timestamps to plot.ly's required date format
for metric in metrics:
timestamp = datetime.fromtimestamp(metric[1])
metric[1] = timestamp.strftime('%Y-%m-%d %H:%M:%S')
return metrics
def plotly(metrics=[], name='', color=None):
"""Convert a list of metric values to a Plot.ly object.
"""
values, timestamps = zip(*metrics)
trace = {
'type': 'scatter',
'x': timestamps,
'y': values,
'name': name,
}
if color:
trace['marker'] = { color: color }
return trace
def date_range():
"""Ensure the requested date range is in a format Graphite will accept.
"""
range_from = request.values.get('from', '-1h')
range_end = request.values.get('end', 'now')
try:
# Try to coerce date range into integers
range_from = int(float(range_from) / 1000)
range_end = int(float(range_end) / 1000)
except:
# ... or pass string directly to Graphite
pass
return (range_from, range_end)
def get_efo_info_from_code(self, efo_codes, **kwargs):
params = SearchParams(**kwargs)
if not isinstance(efo_codes, list):
efo_codes = [efo_codes]
query_body = addict.Dict()
query_body.query.ids["values"] = efo_codes
query_body.size = params.size
if params.facets == 'true':
query_body.aggregations.significant_therapeutic_areas.significant_terms.field = "therapeutic_labels.keyword"
query_body.aggregations.significant_therapeutic_areas.significant_terms.size = 25
query_body.aggregations.significant_therapeutic_areas.significant_terms.min_doc_count = 2
if params.fields:
query_body._source = params.fields
if efo_codes:
res = self._cached_search(index=self._index_efo,
doc_type=self._docname_efo,
body=query_body.to_dict()
)
return PaginatedResult(res, params)
def get_evidences_by_id(self, evidenceid, **kwargs):
if isinstance(evidenceid, str):
evidenceid = [evidenceid]
params = SearchParams(**kwargs)
if params.datastructure == SourceDataStructureOptions.DEFAULT:
params.datastructure = SourceDataStructureOptions.FULL
res = self._cached_search(index=self._index_data,
# doc_type=self._docname_data,
body={"query": {
"ids": {"values": evidenceid},
},
"size": len(evidenceid),
}
)
return SimpleResult(res,
params,
data=[hit['_source'] for hit in res['hits']['hits']])