def index(request):
if request.method == 'POST':
form = PathAnalysisForm(request.POST)
if form.is_valid():
query = form.cleaned_data['search']
print(query)
#here is where the magic happens!
#search in kegg
# data = kegg_rest_request('list/pathway/hsa')
# pathways = kegg_rest_request('find/pathway/%s' % (query))
pathways = Pathway.objects.filter(Q(name__icontains=query))
# print pathways
else:
form = PathAnalysisForm()
# pathways = kegg_rest_request('list/pathway/hsa')
pathways = Pathway.objects.all()
return render_to_response('pathway_analysis/index.html', {'form': form, 'pathways': pathways}, context_instance=RequestContext(request))
python类method()的实例源码
def call_method(self, method_name_or_object, params=None):
"""
Calls the ``method_name`` method from the given service and returns a
:py:class:`gemstone.client.structs.Result` instance.
:param method_name_or_object: The name of te called method or a ``MethodCall`` instance
:param params: A list of dict representing the parameters for the request
:return: a :py:class:`gemstone.client.structs.Result` instance.
"""
if isinstance(method_name_or_object, MethodCall):
req_obj = method_name_or_object
else:
req_obj = MethodCall(method_name_or_object, params)
raw_response = self.handle_single_request(req_obj)
response_obj = Result(result=raw_response["result"], error=raw_response['error'],
id=raw_response["id"], method_call=req_obj)
return response_obj
def call_method_async(self, method_name_or_object, params=None):
"""
Calls the ``method_name`` method from the given service asynchronously
and returns a :py:class:`gemstone.client.structs.AsyncMethodCall` instance.
:param method_name_or_object: The name of te called method or a ``MethodCall`` instance
:param params: A list of dict representing the parameters for the request
:return: a :py:class:`gemstone.client.structs.AsyncMethodCall` instance.
"""
thread_pool = self._get_thread_pool()
if isinstance(method_name_or_object, MethodCall):
req_obj = method_name_or_object
else:
req_obj = MethodCall(method_name_or_object, params)
async_result_mp = thread_pool.apply_async(self.handle_single_request, args=(req_obj,))
return AsyncMethodCall(req_obj=req_obj, async_resp_object=async_result_mp)
def notify(self, method_name_or_object, params=None):
"""
Sends a notification to the service by calling the ``method_name``
method with the ``params`` parameters. Does not wait for a response, even
if the response triggers an error.
:param method_name_or_object: the name of the method to be called or a ``Notification``
instance
:param params: a list of dict representing the parameters for the call
:return: None
"""
if isinstance(method_name_or_object, Notification):
req_obj = method_name_or_object
else:
req_obj = Notification(method_name_or_object, params)
self.handle_single_request(req_obj)
def setUp(self):
# Create a list of temporary files. Each item in the list is a file
# name (absolute path or relative to the current working directory).
# All files in this list will be deleted in the tearDown method. Note,
# this only helps to makes sure temporary files get deleted, but it
# does nothing about trying to close files that may still be open. It
# is the responsibility of the developer to properly close files even
# when exceptional conditions occur.
self.tempFiles = []
# Create a temporary file.
self.registerFileForCleanUp(support.TESTFN)
self.text = b'testing urllib.urlretrieve'
try:
FILE = open(support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
finally:
try: FILE.close()
except: pass
def check_auth(request): # /auth
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = AuthForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "user logout form not correctly filled out")
post_data = form.cleaned_data
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/user/auth/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_LOGIN_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_LOGIN_FAILED, resp)
# if datetime.datetime.now() - resp['resp']['date_created'] > : ... expiration of auth token not implemented.
return _success_response(request, resp['resp'])
# Look into https://github.com/kencochrane/django-defender for blocking brute force login requests
def login(request): # /login
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = LoginForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "user login form not correctly filled out")
post_data = form.cleaned_data
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/user/login/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_LOGIN_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_LOGIN_FAILED, resp)
return _success_response(request, resp['resp'])
def logout(request): # /logout
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = AuthForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "user logout form not correctly filled out")
post_data = form.cleaned_data
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/user/logout/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_LOGIN_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_LOGIN_FAILED, resp)
#return _success_response(request, resp['resp'])
return _success_response(request, "logged out successfully")
def my_drones(request): # /my-drones
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = AuthForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "user logout form not correctly filled out")
post_data = form.cleaned_data
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-TODO:8000/api/v1/my-drones/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_LOGIN_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_LOGIN_FAILED, resp)
# if datetime.datetime.now() - resp['resp']['date_created'] > : ... expiration of auth token not implemented.
return _success_response(request, resp['resp'])
def search(request): # /search
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
if not es.indices.exists(index='listing_index'):
return _error_response(request, 'listings not found')
resp = []
res = es.search(index='listing_index', body={'query': {'query_string': {'query': request.POST['query']}}, 'size': 10})
hits = res['hits']['hits']
if not hits:
return _error_response(request, res)
for hit in hits:
resp.append(hit['_source']) # parse es source
return _success_response(request, resp)
def setUp(self):
# Create a list of temporary files. Each item in the list is a file
# name (absolute path or relative to the current working directory).
# All files in this list will be deleted in the tearDown method. Note,
# this only helps to makes sure temporary files get deleted, but it
# does nothing about trying to close files that may still be open. It
# is the responsibility of the developer to properly close files even
# when exceptional conditions occur.
self.tempFiles = []
# Create a temporary file.
self.registerFileForCleanUp(support.TESTFN)
self.text = b'testing urllib.urlretrieve'
try:
FILE = open(support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
finally:
try: FILE.close()
except: pass
def fetch_impl(self, request):
req = urllib.request.Request(
request.url, method=request.method,
data=request.body, headers=request.headers)
start_time = time.time()
try:
resp = urllib.request.urlopen(req, timeout=request.timeout)
except urllib.error.HTTPError as e:
resp = e
except socket.timeout as e:
return ToshiHTTPResponse(request, 599)
end_time = time.time()
code = resp.code
buffer = BytesIO(resp.read())
headers = dict(resp.info())
return ToshiHTTPResponse(request, code, headers=headers, buffer=buffer, request_time=end_time - start_time)
def setUp(self):
# Create a list of temporary files. Each item in the list is a file
# name (absolute path or relative to the current working directory).
# All files in this list will be deleted in the tearDown method. Note,
# this only helps to makes sure temporary files get deleted, but it
# does nothing about trying to close files that may still be open. It
# is the responsibility of the developer to properly close files even
# when exceptional conditions occur.
self.tempFiles = []
# Create a temporary file.
self.registerFileForCleanUp(support.TESTFN)
self.text = b'testing urllib.urlretrieve'
try:
FILE = open(support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
finally:
try: FILE.close()
except: pass
def train(request):
# check to see if this is a GET request
if request.method == "GET":
# check to see if an image was uploaded
if request.GET.get("imageBase64", None) is not None and request.GET.get("user", None) is not None :
# grab the uploaded image
image = _grab_image(base64_string=request.GET.get("imageBase64", None))
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
rects = detector.detectMultiScale(image, scaleFactor=1.1, minNeighbors=5,
minSize=(30, 30), flags=0)
# construct a list of bounding boxes from the detection
rects = [(int(x), int(y), int(x + w), int(y + h)) for (x, y, w, h) in rects]
if len(rects) == 0:
return JsonResponse({"error" : "No faces detected"})
else :
x, y, w, h = rects[0]
cv2.imwrite( TRAINED_FACES_PATH + "/" + str(request.GET.get("user", None)) + "/" + str(uuid.uuid4()) + ".jpg", image[y:h, x:w] );
return JsonResponse({"success" : True})
def view(request, pathway_id):
print(pathway_id)
# pathway_data = kegg_rest_request('get/hsa%s' % (pathway_id))
pathway = Pathway.objects.get(kegg=pathway_id)
pathway.genes = pathway.genes.split(',')
print(pathway.genes)
# genes = parse_genes(pathway_data)
# pathway = {}
# pathway['name'] = pathway_data.split('\n')[1].replace('NAME', '')
# #get gene_ids
# genelist = []
# for gene in genes:
#
# genelist.append('hsa:%s' % gene['id'])
## print gene['id']
# gene_url = '+'.join(genelist)
# url = '/conv/ncbi-geneid/%s' % (gene_url)
# results = kegg_rest_request(url)
#print results
#if request.method == 'GET':
return render_to_response('pathway_analysis/view.html', {'pathway':pathway}, context_instance=RequestContext(request))
def build_request_body(self, method_name, params, id=None):
request_body = {
"jsonrpc": "2.0",
"method": method_name,
"params": params
}
if id:
request_body['id'] = id
return request_body
def build_http_request_obj(self, request_body):
request = urllib.request.Request(self.url)
request.add_header("Content-Type", "application/json")
request.add_header("User-Agent", "gemstone-client")
request.data = json.dumps(request_body).encode()
request.method = "POST"
return request
def validate_captcha(view):
"""
Decorator to validate a captcha based on settings
from g_recaptcha.validate_recaptcha import validate_captcha
@validate_Captcha
def a_view():
...
"""
def wrap(request, *args, **kwargs):
def failure_http():
return render(request, 'captcha_fail.html',)
def failure_ajax():
return HttpResponse('There was a problem with the captcha, please try again')
if request.method == 'POST':
url = "https://www.google.com/recaptcha/api/siteverify"
values = {
'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY,
'response': request.POST.get('g-recaptcha-response', None),
'remoteip': request.META.get("REMOTE_ADDR", None),
}
data = urllib.parse.urlencode(values)
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req)
result = json.loads(response.read())
# result["success"] will be True on a success
if result["success"]:
return view(request, *args, **kwargs)
elif request.is_ajax():
return failure_ajax()
else:
return failure_http()
return view(request, *args, **kwargs)
return wrap
def test_close(self):
# Test close() by calling it here and then having it be called again
# by the tearDown() method for the test
self.returned_obj.close()
def help_inputtype(self, given, test_type):
"""Helper method for testing different input types.
'given' must lead to only the pairs:
* 1st, 1
* 2nd, 2
* 3rd, 3
Test cannot assume anything about order. Docs make no guarantee and
have possible dictionary input.
"""
expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
result = urllib.parse.urlencode(given)
for expected in expect_somewhere:
self.assertIn(expected, result,
"testing %s: %s not found in %s" %
(test_type, expected, result))
self.assertEqual(result.count('&'), 2,
"testing %s: expected 2 '&'s; got %s" %
(test_type, result.count('&')))
amp_location = result.index('&')
on_amp_left = result[amp_location - 1]
on_amp_right = result[amp_location + 1]
self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
"testing %s: '&' not located in proper place in %s" %
(test_type, result))
self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
"testing %s: "
"unexpected number of characters: %s != %s" %
(test_type, len(result), (5 * 3) + 2))
def test_with_method_arg(self):
Request = urllib.request.Request
request = Request("http://www.python.org", method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", {}, method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", method='GET')
self.assertEqual(request.get_method(), 'GET')
request.method = 'HEAD'
self.assertEqual(request.get_method(), 'HEAD')
def register(request): # /register
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = RegisterForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "user registration form not correctly filled out")
post_data = form.cleaned_data
post_data['password'] = hashers.make_password(post_data['password1']) # get first validated password and hash it
post_data['date_joined'] = datetime.datetime.now()
post_data['is_active'] = True
post_data['email_address'] = post_data['email1']
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/user/create/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_REGISTER_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_REGISTER_FAILED, resp)
return _success_response(request, resp['resp'])
# fields = ['username', 'password', 'email_address','date_joined','is_active','f_name','l_name', 'bio']
def productdetails(request, drone_id): # /product-details/(?P<drone_id>\d+)
if request.method != 'GET':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make GET request with drone_id")
req = urllib.request.Request('http://models-api:8000/api/v1/drone/'+drone_id)
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
return _success_response(request, resp)
def listing(request, listing_id):
if request.method != 'GET':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make a GET request with listing_id")
req = urllib.request.Request('http://models-api:8000/api/v1/listing/'+listing_id)
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
return _success_response(request, resp)
def featured_items(request): # /shop/
if request.method != 'GET':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make GET request")
req = urllib.request.Request('http://models-api:8000/api/v1/shop/')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
return _success_response(request, resp)
def index(request):
location=0
if request.method == 'POST':
existsLocalization = Localizacion.objects.filter(lugar_id = request.POST['lugar'] )
if existsLocalization.count()==0:
location= Localizacion.objects.create(lugar_id=request.POST['lugar'] , lugar_ds=request.POST['lugar'] )
localizacion_id=location.id
if location!=0:
coordenadas = BuscarCoordenadas(location.lugar_ds)
try:
coordenadasl = Coordenadas.objects.filter(local_ref = location.id)
coordenadasl.delete()
except:
pass
location.coordenadas_set.create(latitud=coordenadas.lat, longitud=coordenadas.lon, descripcion=coordenadas.ds)
opcion= Option.objects.create(option_code=location.id ,option_type="lugares", option_text=coordenadas.ds )
opcionesTemperatura = Option.objects.filter(option_type = "temperaturas")
opcionesEcoRiverFlow = Option.objects.filter(option_type = "ecological river flow")
opcionesMaximumFlow = Option.objects.filter(option_type = "Maximun flow")
opcionesReservoirCapacity = Option.objects.filter(option_type = "Reservoir capacity")
opcionesLugares = Option.objects.filter(option_type = "lugares")
opcionesNetFalling = Option.objects.filter(option_type = "net falling height or head")
opcionesNumberOfTurbines = Option.objects.filter(option_type = "Number of turbines")
opcionesTypeOfTurbine = Option.objects.filter(option_type = "Type of turbines")
anios = Option.objects.filter(option_type = "year")
context = {
'opcionesTemperatura': opcionesTemperatura,
'opcionesEcoRiverFlow': opcionesEcoRiverFlow,
'opcionesMaximumFlow': opcionesMaximumFlow,
'opcionesReservoirCapacity': opcionesReservoirCapacity,
'opcionesNetFalling': opcionesNetFalling,
'opcionesNumberOfTurbines': opcionesNumberOfTurbines,
'opcionesTypeOfTurbine': opcionesTypeOfTurbine,
'opcionesLugares': opcionesLugares,
'anios': anios,
}
return render(request, 'options/index.html', context)
def test_close(self):
# Test close() by calling it here and then having it be called again
# by the tearDown() method for the test
self.returned_obj.close()
def help_inputtype(self, given, test_type):
"""Helper method for testing different input types.
'given' must lead to only the pairs:
* 1st, 1
* 2nd, 2
* 3rd, 3
Test cannot assume anything about order. Docs make no guarantee and
have possible dictionary input.
"""
expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
result = urllib.parse.urlencode(given)
for expected in expect_somewhere:
self.assertIn(expected, result,
"testing %s: %s not found in %s" %
(test_type, expected, result))
self.assertEqual(result.count('&'), 2,
"testing %s: expected 2 '&'s; got %s" %
(test_type, result.count('&')))
amp_location = result.index('&')
on_amp_left = result[amp_location - 1]
on_amp_right = result[amp_location + 1]
self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
"testing %s: '&' not located in proper place in %s" %
(test_type, result))
self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
"testing %s: "
"unexpected number of characters: %s != %s" %
(test_type, len(result), (5 * 3) + 2))
def test_with_method_arg(self):
Request = urllib.request.Request
request = Request("http://www.python.org", method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", {}, method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", method='GET')
self.assertEqual(request.get_method(), 'GET')
request.method = 'HEAD'
self.assertEqual(request.get_method(), 'HEAD')
def register(request):
if request.method == 'POST':
form = RegistrationForm(request.POST)
if form.is_valid():
user = User.objects.create_user(
username=form.cleaned_data['username'],
password=form.cleaned_data['password1'],
email=form.cleaned_data['email']
)
username = form.cleaned_data['username']
password = form.cleaned_data['password1']
new_user = authenticate(username=username, password=password)
if new_user is not None:
if new_user.is_active:
stripe_actions.customers.create(user=new_user)
login(request, new_user)
return HttpResponseRedirect('/scan/default/')
else:
# this would be weird to get here
return HttpResponseRedirect('/register/success/')
else:
return HttpResponseRedirect('/register/success/')
else:
form = RegistrationForm()
return render(
request,
'registration/register.html',
{ 'form': form },
)