def index():
"""Primary index function.
This function handles searching and the main page. If ``q`` is passed in a query
string, e.g. ``http://localhost?q=gabriel+dropout``, then a search will be performed.
If request path is ``search``, e.g. ``http://localhost/search``, then the navigation
menu will not be rendered.
Should there be no shows returned from the backend, ``front_end.do_first_time_setup``
will be called to scrape shows from the source.
Returns:
A rendered template, either ``first_time.html`` for the first run or ``default.html`` otherwise.
"""
log.debug("Entering index, attempting to get shows.")
watching, airing, specials, movies = fe.get_shows_for_display(request.args.get('q',None))
standalone = True if request.path.strip('/') == 'search' else False
logged_in = fe.check_login_id(escape(session['logged_in'])) if 'logged_in' in session else False
if not watching and not airing and not specials and not movies:
log.debug("No shows found in any category. Starting first time startup.")
fe.do_first_time_setup()
return render_template('first_time.html', logged_in=logged_in)
return render_template('default.html', watching=watching, airing=airing, specials=specials, movies=movies, standalone=standalone, logged_in=logged_in)
python类escape()的实例源码
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star, trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def drop_show():
"""Show removal handler.
Attempts to remove a show from the backend system. The show ID must
be passed in the ``id`` query string. If the user if unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
An HTTP redirect to the home page, to refresh.
"""
log.debug("Entering drop_show, trying to remove show from list.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.remove_show(request.args['id'])
log.debug("Refreshing user's page.")
return redirect('/')
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def login():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
table = dynamodb.Table('user')
params = request.get_json()
if not 'username' in params or not 'password' in params:
abort(400)
username = params['username']
password = params['password']
response = table.get_item( Key={ 'username': username })
if 'Item' in response:
db_password = response['Item']['password']
if hash(password) == db_password:
session['username'] = username
return 'Login Succeed'
abort(401)
def logout():
"""Logout handler.
Ends the client session and deletes the session ID from the database.
Returns:
JSON formatted output describing success.
"""
log.debug("Entering logout, attempting to end session.")
fe.delete_login_id(escape(session['logged_in']))
session.pop('logged_in', None)
log.debug("Returning to user.")
return jsonify({ "logout": "success" })
def test_publication_titles(publication_response, publication_data, category):
"""Test all the plublication titles are present."""
for paper in publication_data[category]:
title = escape(paper["title"]).encode('utf-8')
assert title in publication_response.data
def test_publication_links(publication_response, publication_data, category):
"""Test all the plublication adsabs links are present.
Test that links are inserted for the title and "read more" sections.
"""
for paper in publication_data[category]:
url = escape(paper["adsabs"])
read_more = '...<a href="{0}" target="_blank"> read more</a>'.format(url)
title_link = '<a href="{0}" target="_blank">{1}</a>'.format(url, paper["title"])
assert read_more.encode('utf-8') in publication_response.data
assert title_link.encode('utf-8') in publication_response.data
def test_publication_authors(publication_response, publication_data, category):
"""Test all the plublication authors are present."""
for paper in publication_data[category]:
authors = escape(paper["authors"]).encode('utf-8')
assert authors in publication_response.data
def view_log()->str:
with open('einstein.log') as log:
content=log.read() #tüm dosya içeri?ini okur
return escape(content) #logdaki < > gibi render ad?m?nda i?leri bozacak karakterler yerine > < ler ekler
def view_log()->'html': #ç?kt? html olaca?? için de?i?tirdik
content= [] #bo? bir liste olu?turduk
with open('einstein.log') as log:
for line in log:
content.append([]) #content listesine bo? bir liste daha ekledik. Her bir sat?r için olu?acak
for item in line.split('|'): #sat?r | i?aretine göre ayr??t?r?p içindeki her bir ö?eyi ele al
content[-1].append(escape(item)) # -1 ile listenin en sonuna eleman eklemi? oluyoruz.
titles=('Form Data','Remote_addr', 'User_agent','Results') #log tablosundaki ba?l?klar? tutacak bir tuple
return render_template('log.html',
the_title='Calculation Logs',
row_titles=titles,
log_data=content,)
def index():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
return 'You are not logged in'
def pending_data(status, event_id):
"""Return server side data."""
# defining columns
columns = []
columns.append(ColumnDT('id'))
columns.append(ColumnDT('ioc'))
columns.append(ColumnDT('itype.name'))
columns.append(ColumnDT('control.name'))
columns.append(ColumnDT('comment'))
columns.append(ColumnDT('enrich'))
columns.append(ColumnDT('first_seen'))
base_query = db.session.query(Indicator).join(Control).join(Itype)
if status == 'pending':
columns.append(ColumnDT('event_id'))
columns.append(ColumnDT('event.name'))
query = base_query.join(Event).filter(Indicator.pending == True)
elif status == 'search':
columns.append(ColumnDT('event_id'))
columns.append(ColumnDT('event.name'))
query = base_query.join(Event).filter(Indicator.pending == False)
elif status == 'approved':
columns.append(ColumnDT('last_seen'))
columns.append(ColumnDT('rel_list'))
query = base_query.filter(Indicator.event_id == event_id).filter(Indicator.pending == False )
else:
query = base_query.filter(Indicator.pending == True)
rowTable = DataTables(request.args, Indicator, query, columns)
#xss catch just to be safe
res = rowTable.output_result()
for item in res['data']:
for k,v in item.iteritems():
item[k] = escape(v)
return jsonify(res)
def event_data(status):
"""Return server side data."""
# defining columns
columns = []
columns.append(ColumnDT('id'))
columns.append(ColumnDT('name'))
columns.append(ColumnDT('status.name'))
columns.append(ColumnDT('source.name'))
columns.append(ColumnDT('tlp.name'))
columns.append(ColumnDT('confidence'))
columns.append(ColumnDT('created'))
columns.append(ColumnDT('indicator_count'))
base_query = db.session.query(Event).join(Source).join(Tlp).join(Status)
if status in ['New', 'Open', 'Resolved']:
query = base_query.filter(Status.name == status)
else:
query = base_query
rowTable = DataTables(request.args, Event, query, columns)
#xss catch just to be safe
res = rowTable.output_result()
for item in res['data']:
for k,v in item.iteritems():
item[k] = escape(v)
return jsonify(res)
###
# API Calls
###
def test_configure(self, mock_getCircles, mock_HipchatApiHandler, mock_getInstallationFromJWT):
mock_installation = self.defaultInstallation(set_glassfrogToken=False)
assert mock_installation.glassfrogToken is None
mock_getInstallationFromJWT.return_value = mock_installation
# Loading of page
rv = self.app.get('/configure.html', follow_redirects=True,
query_string=test_values.mock_jwt_data('bogus'))
assert b'Glassfrog Token' in rv.data
# Wrong token
mock_getCircles.return_value = [401, test_values.mock_401_responsebody['message']]
rv = self.app.post('/configure.html', follow_redirects=True,
data=dict(glassfrogtoken=test_values.mock_glassfrogToken),
query_string=test_values.mock_jwt_data('bogus'))
assert mock_getCircles.called
assert escape(test_values.mock_401_flash_message) in rv.data.decode('utf-8')
# Right token
mock_getCircles.return_value = (200, test_values.mock_circles_message)
rv = self.app.post('/configure.html', follow_redirects=True,
data=dict(glassfrogtoken=test_values.mock_glassfrogToken),
query_string=test_values.mock_jwt_data('bogus'))
assert mock_getCircles.called
assert escape(strings.configured_successfully_flash) in rv.data.decode('utf-8')
mock_HipchatApiHandler.return_value.sendMessage.assert_called_with(
color=strings.succes_color,
message=strings.configured_successfully,
installation=mock_installation)
def view_the_log() -> 'html':
"""Display the contents of the log file as a HTML table."""
contents = []
with open('vsearch.log') as log:
for line in log:
contents.append([])
for item in line.split('|'):
contents[-1].append(escape(item))
titles = ('????', '???IP', '???', '????')
return render_template('viewlog.html',
the_title='????',
the_row_titles=titles,
the_data=contents,)
def view_the_log() -> 'html':
"""Display the contents of the log file as a HTML table."""
contents = []
with open('vsearch.log') as log:
for line in log:
contents.append([])
for item in line.split('|'):
contents[-1].append(escape(item))
titles = ('????', '???IP', '???', '????')
return render_template('viewlog.html',
the_title='????',
the_row_titles=titles,
the_data=contents,)
def principal():
conectar = mysql.connection.cursor()
xy = ""
#conectar.execute("SELECT * FROM Publicacion ORDER BY idPublicacion DESC limit 10")
conectar.execute("""SELECT idPublicacion, titulo, cuerpo, portada,Nombre, Ape_pat, user, fechaPublicacion
FROM Publicacion INNER JOIN momantaiter_blogflask.Usuario ON Usuario.idUsuario = Publicacion.Usuario_idUsuario
INNER JOIN momantaiter_blogflask.Usuario_datos ON Usuario_datos.idUsuario_datos = Usuario.Usuario_datos_idUsuario_datos
ORDER BY idPublicacion DESC limit 10""")
resutado=conectar.fetchall()
if 'username' in session: #Verifica si hay un usuario en sesion
xy = escape(session['username'])
return render_template('Index.html',lista=resutado, sessionopen=xy)
#Ruta de direcciones para entrar al perfil de algún usuario con su nombre de usuario de manera "+nameUser"
def categorias(categoria, subcategoria):
conectar = mysql.connection.cursor()
conectar.execute("""SELECT idPublicacion, titulo, cuerpo, portada,Nombre, Ape_pat, user, fechaPublicacion
FROM Publicacion INNER JOIN momantaiter_blogflask.Usuario ON Usuario.idUsuario = Publicacion.Usuario_idUsuario
INNER JOIN momantaiter_blogflask.Usuario_datos ON Usuario_datos.idUsuario_datos = Usuario.Usuario_datos_idUsuario_datos
WHERE categoria = (%s) AND subCategoria = (%s) ORDER BY idPublicacion DESC""", [[categoria], [subcategoria]])
resultado = conectar.fetchall()
xy = ""
if 'username' in session: #Verifica si hay un usuario en sesion
xy = escape(session['username'])
return render_template("cat.html", sessionopen=xy, lista=resultado)
def categoria(categoria):
conectar = mysql.connection.cursor()
conectar.execute("""SELECT idPublicacion, titulo, cuerpo, portada,Nombre, Ape_pat, user, fechaPublicacion
FROM Publicacion INNER JOIN momantaiter_blogflask.Usuario ON Usuario.idUsuario = Publicacion.Usuario_idUsuario
INNER JOIN momantaiter_blogflask.Usuario_datos ON Usuario_datos.idUsuario_datos = Usuario.Usuario_datos_idUsuario_datos
WHERE categoria = (%s) ORDER BY idPublicacion DESC""", [categoria])
resultado = conectar.fetchall()
xy = ""
if 'username' in session: #Verifica si hay un usuario en sesion
xy = escape(session['username'])
return render_template("cat.html", sessionopen=xy, lista=resultado)
def publlicacion():
conectar = mysql.connection.cursor()
idpublicacion = request.args.get('ID')#se recibe el parametro de la url de index.html para hacer la consulta y mostrar el contenido en la ventana.
conectar.execute("SELECT * FROM Publicacion WHERE idPublicacion = (%s)" % idpublicacion)
resultado=conectar.fetchall()
conectar.execute("SELECT Nombre, user, comentario FROM Comentario INNER JOIN Usuario ON Usuario.idUsuario = Comentario.Usuario_idUsuarioC INNER JOIN Usuario_datos ON Usuario_datos.idUsuario_datos = Usuario.Usuario_datos_idUsuario_datos WHERE Publicacion_idPublicacionC = (%s)" %idpublicacion)
coments=conectar.fetchall()
xy = ""
if 'username' in session:
xy = escape(session['username'])
return render_template('Publicacion.html',resutado=resultado,comentarios=coments,id=idpublicacion, sessionopen=xy)
def publicar():
conectar = mysql.connection.cursor()
xy = ""
if 'username' in session:
xy = escape(session['username'])
return render_template('publicar.html', sessionopen=xy)
else:
return redirect(url_for("login"))
def editP():
conectar = mysql.connection.cursor()
f = ""
subCat = ""
if request.form["categoria"]=="1":
subCat = request.form['cat1']
elif request.form["categoria"]=="2":
subCat = request.form['cat2']
elif request.form["categoria"]=="3":
subCat = request.form['cat3']
elif request.form["categoria"]=="4":
subCat = request.form['cat4']
elif request.form["categoria"]=="5":
subCat = request.form['cat5']
else:
subCat = ""
idU = escape(session['id'])
try:
f = photos.save(request.files['file'])
conectar.execute("""UPDATE Publicacion SET titulo = (%s), cuerpo = (%s), categoria = (%s),
subCategoria = (%s), portada = (%s) WHERE idPublicacion = (%s) and Usuario_idUsuario = (%s)
""", [[request.form['titulo']], [request.form['publicacion']], [request.form['categoria']], [subCat], [f], [request.form['xpubxid']], [idU]])
mysql.connection.commit()
except:
conectar.execute("""UPDATE Publicacion SET titulo = (%s), cuerpo = (%s), categoria = (%s),
subCategoria = (%s) WHERE idPublicacion = (%s) and Usuario_idUsuario = (%s)
""", [[request.form['titulo']], [request.form['publicacion']], [request.form['categoria']], [subCat], [request.form['xpubxid']], [idU]])
mysql.connection.commit()
return redirect("/publicacion?ID="+request.form['xpubxid'])
def usuario(user):
conectar = mysql.connection.cursor()
#PD. Aun no hace nada por que aun se esta haciendo el html, pero ya se comprobo que funciona la url.
conectar.execute("SELECT idUsuario,user,Nombre,Ape_pat,imagen_perfil,firma FROM Usuario INNER JOIN Usuario_datos ON Usuario_datos_idUsuario_datos = idUsuario_datos WHERE user =(%s);", [user])
resultado = conectar.fetchall()
conectar.execute("SELECT idPublicacion,portada,titulo,cuerpo,fechaPublicacion,user,Nombre,Ape_pat FROM Usuario INNER JOIN Publicacion ON Usuario_idUsuario=idUsuario INNER JOIN Usuario_datos ON Usuario_datos_idUsuario_datos = idUsuario_datos WHERE user =(%s);", [user])
publicaciones = conectar.fetchall()
xy = ""
if 'username' in session: #Verifica si hay un usuario en sesion
xy = escape(session['username'])
id=session['id']
return render_template("user.html", sessionopen=xy, datos=resultado,publicaciones=publicaciones,id=id)
def nl2br_filters(s):
return escape(s).replace('\n', Markup('</br>'))
#?????,??????????
def update_show():
"""Show add and edit handler.
Either displays a template allowing the user to edit or add a show, or attempts
to edit or add the show, depending on if the method is GET or POST. The function
is aborted with a ``404`` message to hide the page if the user is not authenticated.
GET method:
Requires ``id`` to be passed as the ID of the show in a query string. If
the show can't be found, abort with a ``404`` message. Otherwise, lookup
the show in the db, as well as sub groups subtitling the show and the selected
sub group if there is one, and some fanart to render on the background.
POST method:
Requires ``id``, ``beid``, and ``subgroup`` to be passed as form parameters.
``id`` is the DB id of the show, ``beid`` is the backend ID of the show, and
``subgroup`` is the subgroup the user has selected. This will attempt to add
the show to the backend.
Returns:
A rendered template with a form on the GET method.
A redirect to the home as a refresh on the POST method.
"""
log.debug("Entering update_show, trying to {0} show".format(request.path.strip('/')))
logged_in = fe.check_login_id(escape(session['logged_in']))
if logged_in and request.method == 'POST':
log.debug("Request method is POST, so sending results to function.")
subgroup = request.form['subgroup']
dbid = request.form['dbid']
beid = request.form['beid']
log.debug("Got SG: {0} ID: {1} and BEID: {2} from form.".format(subgroup, dbid, beid))
fe.add_update_show(dbid, beid, subgroup)
log.debug("Refreshing user's page.")
return redirect('/')
elif logged_in and request.method == 'GET':
log.debug("Request method is GET, so showing page to user.")
if 'id' in request.args:
id = request.args['id']
log.debug("Attempting to operate on id {0}".format(id))
sonarr_show = fe.search_show_from_backend(id)
if not sonarr_show:
log.debug("Could not find show from backend with ID {0}".format(id))
abort(404)
db_show = fe.get_show_from_db(id)
subgroups = fe.get_subgroups(id)
selected_group = fe.get_selected_group(sonarr_show['beid'])
fanart = fe.get_fanart(sonarr_show['beid'])
log.debug("Rendering form for user")
return render_template("add.html", id=id, title=db_show['title'], subgroups=subgroups, selectedGroup=selected_group, sonarr=sonarr_show, logged_in=logged_in, fanart=fanart, action=request.path.strip('/'))
log.debug("No ID sent with request, so just refresh user's page to the home.")
return redirect('/')
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)