def get_main():
services = []
for service in config.sections():
service_status = get_service_action(service, 'status')
if service_status['status'] == 'not-found':
cls = 'active'
elif service_status['status'] == 'inactive' or service_status['status'] == 'failed':
cls = 'danger'
elif service_status['status'] == 'active':
cls = 'success'
else:
cls = 'warning'
disabled_start = True if cls == 'active' or cls == 'success' else False
disabled_stop = True if cls == 'active' or cls == 'danger' else False
disabled_restart = True if cls == 'active' or cls == 'danger' else False
services.append({'class': cls,
'disabled_start': disabled_start,
'disabled_stop': disabled_stop,
'disabled_restart': disabled_restart,
'title': config.get(service, 'title'),
'service': service})
return template('index', hostname=gethostname(), services=services)
python类status()的实例源码
def http(host, port):
from bottle import route, response, run
@route('/')
def root():
return http_root_str
@route('/<s:path>')
def do_conversion(s):
try:
i = parse_input(s)
except ValueError as e:
response.status = 400
return str(e)
return convert(i)
run(host=host, port=port)
def dtos_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.dataViewDto.getItems("Film", queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: POST: api/datasource/crud/operations/dtos/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def post_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
# try:
# result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
# response.content_type = "application/json; charset=utf-8"
# return json.dumps(result, cls=CustomEncoder)
# except dalUtils.StatusCodeError as err:
# response.status = err.value
# except:
# abort(400, 'Bad Request')
# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def entities_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/operations/entities/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def _assert_admin(self, cert):
if cert is not None:
if isinstance(cert, bytes):
cert = load_certificate(cert_bytes=cert)
if isinstance(cert, x509.Certificate):
host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
else:
raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')
else:
logger.warning('Admin command received by unauthentified host.')
raise HTTPResponse(
status=401,
body={'message': 'Authentication required'}
)
if host not in self.admin_hosts:
logger.warning('Host %s unauthorized for admin commands.', host)
raise HTTPResponse(
status=403,
body={'message': 'Host {} unauthorized for admin commands'.format(host)}
)
def _error(error, code):
"""Return an error as json"""
_error = {
"status": str(code),
"title": errors[code],
"detail": error.body
}
traceback.print_exception(type(error), error, error.traceback)
response.headers["Content-Type"] = "application/vnd.api+json"
return response_object(errors=[_error])
def get_metadata():
if request.query.file_hash == '':
response.status = 400
return jsonize({'message': 'file_hash parameter is missing'})
file_hash = clean_hash(request.query.file_hash)
if not valid_hash(file_hash):
response.status = 400
return jsonize({'message': 'Invalid hash format (use MD5, SHA1 or SHA2)'})
file_hash = get_file_id(file_hash)
if file_hash is None:
response.status = 404
return jsonize({'message': 'Metadata not found in the database'})
mdc = MetaController()
res = mdc.read(file_hash)
if res is None:
log_event("metadata", file_hash)
return dumps(change_date_to_str(res))
def last_uploaded():
number = request.query.get("n")
if number is None:
response.status = 400
return jsonize({"error": 1, "error_message": "Parameter n is missing"})
if number.isdigit() is False:
response.status = 400
return jsonize({"error": 2, "error_message": "Parameter n must be a number"})
if int(number) == 0:
return jsonize({"error": 3, "error_message": "Parameter n must be greater than zero."})
pc = PackageController()
lasts = pc.last_updated(int(number))
for i in range(0, len(lasts)): # Convert datetime objects
lasts[i] = change_date_to_str(lasts[i])
return jsonize(lasts)
def auth_server():
username = request.forms.get('username')
if username == 'user_ok':
return {'access_token': 'my-nifty-access-token'}
elif username == 'auth_fail':
response.status = 400
return {'error': 'errored with HTTP 400 on request'}
elif username == 'create_fail':
return {'access_token': 'the-token-with-which-create-will-fail'}
elif username == 'delete_fail':
return {'access_token': 'the-token-with-which-delete-will-fail'}
elif username == 'empty_page':
return {'access_token': 'the-token-which-causes-an-empty-page'}
else:
return {}
def create():
auth_token = request.headers.get('Authorization').split()[1]
if auth_token == 'my-nifty-access-token':
return {'response': 'Successful creation of user.'}
elif auth_token == 'the-token-with-which-create-will-fail':
response.status = 403
return {'error': 'Permission denied'}
elif auth_token == 'the-token-which-causes-an-empty-page':
response.status = 403
return "empty"
def delete():
auth_token = request.headers.get('Authorization').split()[1]
if auth_token == 'my-nifty-access-token':
return {'response': 'Successful deletion of user.'}
elif auth_token == 'the-token-with-which-delete-will-fail':
response.status = 403
return {'error': 'Permission denied'}
def get_service_action(service, action):
if service in config.sections():
sdbus = systemdBus(True) if config.get('DEFAULT', 'scope', fallback='system') == 'user' else systemdBus()
unit = config.get(service, 'unit')
if action == 'start':
return {action: 'OK'} if sdbus.start_unit(unit) else {action: 'Fail'}
elif action == 'stop':
return {action: 'OK'} if sdbus.stop_unit(unit) else {action: 'Fail'}
elif action == 'restart':
return {action: 'OK'} if sdbus.restart_unit(unit) else {action: 'Fail'}
elif action == 'reload':
return {action: 'OK'} if sdbus.reload_unit(unit) else {action: 'Fail'}
elif action == 'reloadorrestart':
return {action: 'OK'} if sdbus.reload_or_restart_unit(unit) else {action: 'Fail'}
elif action == 'status':
if sdbus.get_unit_load_state(unit) != 'not-found':
return {action: str(sdbus.get_unit_active_state(unit))}
else:
return {action: 'not-found'}
elif action == 'journal':
return get_service_journal(service, 100)
else:
response.status = 400
return {'msg': 'Sorry, but cannot perform \'{}\' action.'.format(action)}
else:
response.status = 400
return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
def get_service_journal(service, lines):
if service in config.sections():
if get_service_action(service, 'status')['status'] == 'not-found':
return {'journal': 'not-found'}
try:
lines = int(lines)
except Exception as e:
response.status = 500
return {'msg': '{}'.format(e)}
unit = config.get(service, 'unit')
journal = Journal(unit)
return {'journal': journal.get_tail(lines)}
else:
response.status = 400
return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
def get_service_journal_page(service):
if service in config.sections():
if get_service_action(service, 'status')['status'] == 'not-found':
abort(400,'Sorry, but service \'{}\' unit not found in system.'.format(config.get(service, 'title')))
journal_lines = get_service_journal(service, 100)
return template('journal', hostname=gethostname(), service=config.get(service, 'title'), journal=journal_lines['journal'])
else:
abort(400, 'Sorry, but \'{}\' is not defined in config.'.format(service))
# Serve static content
def dtos_test_action():
try:
param1 = request.json['param1']
# TODO: Add some actions in here
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
def put_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
def patch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/:entitySetName
def post_entityset(entitySetName):
# test1 = json.loads(request.body.read())
try:
result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PATCH: api/datasource/crud/batch/:entitySetName
#@patch('/api/datasource/crud/batch/<entitySetName>')
def patch_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/batch/:entitySetName
def delete_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
def _get_cert_config_if_allowed(self, domain, cert):
if cert is not None:
if isinstance(cert, bytes):
cert = load_certificate(cert_bytes=cert)
if isinstance(cert, x509.Certificate):
host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
else:
raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')
else:
logger.warning('Request received for domain %s by unauthentified host.', domain)
raise HTTPResponse(
status=401,
body={'message': 'Authentication required'}
)
certconfig = self.certificates_config.match(domain)
if certconfig:
logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern)
if host in self.admin_hosts or host in certconfig.allowed_hosts:
return certconfig
else:
logger.warning('Host %s unauthorized for domain %s.', host, domain)
raise HTTPResponse(
status=403,
body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)}
)
else:
logger.warning('No config matching domain %s found.', domain)
raise HTTPResponse(
status=404,
body={'message': 'No configuration found for domain {}'.format(domain)}
)
def _handle_auth(self):
request_data = request.json
csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend()) # pylint: disable=unsubscriptable-object
if not csr.is_signature_valid:
raise HTTPResponse(
status=400,
body={'message': 'The certificate signing request signature is invalid.'}
)
host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
csr_file = os.path.join(self.csr_path, "%s.csr" % (host))
crt_file = os.path.join(self.crt_path, "%s.crt" % (host))
if os.path.isfile(crt_file):
crt = load_certificate(crt_file)
if crt.public_key().public_numbers() == csr.public_key().public_numbers():
return {
'status': 'authorized',
'crt': dump_pem(crt).decode()
}
else:
raise HTTPResponse(
status=409,
body={'message': 'Mismatch between the certificate signing request and the certificate.'}
)
else:
# Save CSR
with open(csr_file, 'w') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM).decode())
response.status = 202
return {
'status': 'pending'
}
def _handle_revoke_cert(self, domain):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
self.acmeproxy.revoke_certificate(domain)
return {
'status': 'revoked'
}
def _handle_delete_cert(self, domain):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
self.acmeproxy.delete_certificate(domain)
return {
'status': 'deleted'
}
def _handle_health_check(self):
response.status = 200
return {
'status': 'alive'
}
def delete_resources():
"""Delete all resources."""
resources = get_resources()
resources.clear()
response.status = 204
def jsonp(data, callback):
reply = {"status": "OK", "data": data}
return callback + "([" + jsonize(reply) + "]);"
def api_process_file():
file_hash = clean_hash(request.query.file_hash)
if len(file_hash) != 40:
response.status = 400
return jsonize({'message': 'Invalid hash format (use sha1)'})
res = process_file(file_hash, True)
if res is None:
response.status = 404
return jsonize("File not found in the database")
return jsonize("File processed")