def get_http_headers(url):
"""
Get HTTP headers for given url
"""
if not url:
raise BadRequest('Missing url')
url = _get_deobfuscate_item(url)
try:
validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp'))
validate(url)
except ValidationError:
raise BadRequest('Not a valid URL')
try:
response = ImplementationFactory.instance.get_singleton_of(
'PhishingServiceBase'
).get_http_headers(url)
schema.valid_adapter_response('PhishingServiceBase', 'get_http_headers', response)
return response
except (PhishingServiceException, schema.InvalidFormatError, schema.SchemaNotFound) as ex:
raise InternalServerError(str(ex))
python类BadRequest()的实例源码
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def test_trapping_of_bad_request_key_errors(self):
app = flask.Flask(__name__)
app.testing = True
@app.route('/fail')
def fail():
flask.request.form['missing_key']
c = app.test_client()
self.assert_equal(c.get('/fail').status_code, 400)
app.config['TRAP_BAD_REQUEST_ERRORS'] = True
c = app.test_client()
try:
c.get('/fail')
except KeyError as e:
self.assert_true(isinstance(e, BadRequest))
else:
self.fail('Expected exception')
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def on_json_loading_failed(self, e):
"""Called if decoding of the JSON data failed. The return value of
this method is used by :meth:`get_json` when an error occurred. The
default implementation just raises a :class:`BadRequest` exception.
.. versionchanged:: 0.10
Removed buggy previous behavior of generating a random JSON
response. If you want that behavior back you can trivially
add it by subclassing.
.. versionadded:: 0.8
"""
ctx = _request_ctx_stack.top
if ctx is not None and ctx.app.config.get('DEBUG', False):
raise BadRequest('Failed to decode JSON object: {0}'.format(e))
raise BadRequest()
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def test_trapping_of_bad_request_key_errors(self):
app = flask.Flask(__name__)
app.testing = True
@app.route('/fail')
def fail():
flask.request.form['missing_key']
c = app.test_client()
self.assert_equal(c.get('/fail').status_code, 400)
app.config['TRAP_BAD_REQUEST_ERRORS'] = True
c = app.test_client()
try:
c.get('/fail')
except KeyError as e:
self.assert_true(isinstance(e, BadRequest))
else:
self.fail('Expected exception')
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def on_json_loading_failed(self, e):
"""Called if decoding of the JSON data failed. The return value of
this method is used by :meth:`get_json` when an error occurred. The
default implementation just raises a :class:`BadRequest` exception.
.. versionchanged:: 0.10
Removed buggy previous behavior of generating a random JSON
response. If you want that behavior back you can trivially
add it by subclassing.
.. versionadded:: 0.8
"""
ctx = _request_ctx_stack.top
if ctx is not None and ctx.app.config.get('DEBUG', False):
raise BadRequest('Failed to decode JSON object: {0}'.format(e))
raise BadRequest()
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def test_trapping_of_bad_request_key_errors(self):
app = flask.Flask(__name__)
app.testing = True
@app.route('/fail')
def fail():
flask.request.form['missing_key']
c = app.test_client()
self.assert_equal(c.get('/fail').status_code, 400)
app.config['TRAP_BAD_REQUEST_ERRORS'] = True
c = app.test_client()
try:
c.get('/fail')
except KeyError as e:
self.assert_true(isinstance(e, BadRequest))
else:
self.fail('Expected exception')
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def parse_protobuf(self, proto_type):
"""Parse the data into an instance of proto_type."""
if 'protobuf' not in self.environ.get('CONTENT_TYPE', ''):
raise BadRequest('Not a Protobuf request')
obj = proto_type()
try:
obj.ParseFromString(self.data)
except Exception:
raise BadRequest("Unable to parse Protobuf request")
# Fail if not all required fields are set
if self.protobuf_check_initialization and not obj.IsInitialized():
raise BadRequest("Partial Protobuf request")
return obj
def test_trapping_of_bad_request_key_errors(self):
app = flask.Flask(__name__)
app.testing = True
@app.route('/fail')
def fail():
flask.request.form['missing_key']
c = app.test_client()
self.assert_equal(c.get('/fail').status_code, 400)
app.config['TRAP_BAD_REQUEST_ERRORS'] = True
c = app.test_client()
try:
c.get('/fail')
except KeyError as e:
self.assert_true(isinstance(e, BadRequest))
else:
self.fail('Expected exception')
def create(body):
""" Create new tag
"""
try:
body.pop('id', None)
if body.get('tagType') not in TAG_TYPE:
raise BadRequest('Invalid or missing tag type')
existing = [tag.lower() for tag in Tag.objects.all().values_list('name', flat=True)]
if body['name'].lower().strip() in existing:
raise BadRequest('Tag already exists')
body['codename'] = body['name'].lower().replace(' ', '_')
tag = Tag.objects.get_or_create(**body)[0]
except (AttributeError, KeyError, FieldError, IntegrityError, ValueError):
raise BadRequest('Invalid fields in body')
return model_to_dict(tag)
def update(tag_id, body):
""" Update category
"""
try:
tag = Tag.objects.get(id=tag_id)
except (ObjectDoesNotExist, ValueError):
raise NotFound('Tag not found')
try:
body.pop('id', None)
existing = Tag.objects.exclude(id=tag.id).values_list('name', flat=True)
existing = [tg.lower() for tg in existing]
if body['name'].lower().strip() in existing:
raise BadRequest('Tag already exists')
Tag.objects.filter(pk=tag.pk).update(**body)
tag = Tag.objects.get(pk=tag.pk)
except (AttributeError, KeyError, FieldError, IntegrityError, ValueError, TypeError):
raise BadRequest('Invalid fields in body')
return model_to_dict(tag)
def get_ip_internal_reputation(ip_addr):
"""
Internal checks
"""
try:
validate_ipv4_address(ip_addr)
except ValidationError:
raise BadRequest('Not a valid IPV4')
results = []
if ImplementationFactory.instance.is_implemented('ReputationDaoBase'):
try:
results = ImplementationFactory.instance.get_singleton_of(
'ReputationDaoBase'
).get_ip_internal_reputations(ip_addr)
except ReputationDaoException:
pass
return results
def get_ip_external_reputation(ip_addr):
"""
External checks
"""
try:
validate_ipv4_address(ip_addr)
except ValidationError:
raise BadRequest('Not a valid IPV4')
results = []
if ImplementationFactory.instance.is_implemented('ReputationDaoBase'):
try:
results = ImplementationFactory.instance.get_singleton_of(
'ReputationDaoBase'
).get_ip_external_reputations(ip_addr)
except ReputationDaoException:
pass
return results
def get_ip_external_detail(ip_addr, source):
"""
Get documents matching ip_addr and source
"""
try:
validate_ipv4_address(ip_addr)
except ValidationError:
raise BadRequest('Not a valid IPV4')
results = []
if ImplementationFactory.instance.is_implemented('ReputationDaoBase'):
try:
results = ImplementationFactory.instance.get_singleton_of(
'ReputationDaoBase'
).get_ip_external_details(
ip_addr,
source
)
except ReputationDaoException:
pass
return results
def get_ip_tools(ip_addr):
"""
Generates link to online reputation tools
"""
try:
validate_ipv4_address(ip_addr)
except ValidationError:
raise BadRequest('Not a valid IPV4')
results = []
if ImplementationFactory.instance.is_implemented('ReputationDaoBase'):
try:
results = ImplementationFactory.instance.get_singleton_of(
'ReputationDaoBase'
).get_ip_tools(ip_addr)
except ReputationDaoException:
pass
return results
def update_order(user, body):
"""
Update groupId/orderId for preset display
"""
group_id = 0
try:
for group in body:
order_id = 0
for preset_dict in group['presets']:
preset = TicketWorkflowPreset.objects.get(id=preset_dict['id'])
preset.orderId = order_id
preset.groupId = group_id
preset.save()
order_id += 1
group_id += 1
except (AttributeError, KeyError, ObjectDoesNotExist, ValueError):
raise BadRequest('Bad Request')
return index(user)
def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
def update(news_id, body, user):
""" Update news
"""
try:
if user.is_superuser:
news = News.objects.get(id=news_id)
else:
news = News.objects.get(id=news_id, author__id=user.id)
except (ObjectDoesNotExist, ValueError):
return NotFound('News not found')
try:
body = {k: v for k, v in body.iteritems() if k not in ['author', 'date', 'tags']}
News.objects.filter(pk=news.pk).update(**body)
news = News.objects.get(pk=news.pk)
except (KeyError, FieldError, IntegrityError):
raise BadRequest('Invalid fields in body')
return model_to_dict(news)
def create(body, ticket_id=None, defendant_id=None, user_id=None):
""" Create a comment
"""
try:
content = body.pop('comment')
except KeyError:
raise BadRequest('Missing comment field in body')
comment = Comment.objects.create(comment=content, user_id=user_id)
if ticket_id:
TicketComment.objects.create(ticket_id=ticket_id, comment_id=comment.id)
user = User.objects.get(id=user_id)
ticket = Ticket.objects.get(id=ticket_id)
database.log_action_on_ticket(
ticket=ticket,
action='add_comment',
user=user
)
elif defendant_id:
DefendantComment.objects.create(defendant_id=defendant_id, comment_id=comment.id)
return show(comment.id)
def update(body, comment_id=None, ticket_id=None, user_id=None):
""" Update comment
"""
try:
comment = Comment.objects.get(id=comment_id)
content = body.pop('comment')
comment.comment = content
comment.save()
if ticket_id:
user = User.objects.get(id=user_id)
ticket = Ticket.objects.get(id=ticket_id)
database.log_action_on_ticket(
ticket=ticket,
action='update_comment',
user=user
)
except KeyError:
raise BadRequest('Missing comment field in body')
return show(comment_id)
def create(body, user):
"""
Create a new report item
"""
try:
resp = __get_item_infos(body, user)
item, created = ReportItem.objects.get_or_create(**resp)
if resp['report'].ticket:
database.log_action_on_ticket(
ticket=resp['report'].ticket,
action='add_item',
user=user
)
except (AttributeError, FieldError, IntegrityError, KeyError, ObjectDoesNotExist) as ex:
raise BadRequest(str(ex.message))
if not created:
raise BadRequest('Report items already exists')
return show(item.id)
def get_screenshot(item_id, report_id):
"""
Get screenshot for item
"""
try:
item = ReportItem.objects.get(id=item_id, report__id=report_id)
if item.itemType != 'URL':
raise BadRequest('Item is not an URL')
except (ObjectDoesNotExist, ValueError):
raise NotFound('Item not found')
try:
screenshots = ImplementationFactory.instance.get_singleton_of(
'PhishingServiceBase'
).get_screenshots(item.rawItem)
schema.valid_adapter_response('PhishingServiceBase', 'get_screenshots', screenshots)
results = {
'rawItem': item.rawItem,
'screenshots': screenshots,
}
return results
except (PhishingServiceException, schema.InvalidFormatError, schema.SchemaNotFound):
raise InternalServerError('Error while loading screenshots')