def start_http_server(context):
httpretty.disable()
context.http_port = get_free_tcp_port()
context.server = TornadoServer(context.http_port)
context.server.start()
ready = False
timeout = 2
started_at = time.time()
while not ready:
httpretty.disable()
time.sleep(.1)
try:
requests.get('http://localhost:{0}/'.format(context.http_port))
ready = True
except:
if time.time() - started_at >= timeout:
break
httpretty.enable()
python类disable()的实例源码
def teardown_httpretty():
httpretty.disable()
httpretty.reset()
def test_request_timeout():
timeout = 0.1
http_scheme = 'http'
host = 'coordinator'
port = 8080
url = http_scheme + '://' + host + ':' + str(port) + constants.URL_STATEMENT_PATH
def long_call(request, uri, headers):
time.sleep(timeout * 2)
return (200, headers, "delayed success")
httpretty.enable()
for method in [httpretty.POST, httpretty.GET]:
httpretty.register_uri(method, url, body=long_call)
# timeout without retry
for request_timeout in [timeout, (timeout, timeout)]:
req = PrestoRequest(
host=host,
port=port,
user='test',
http_scheme=http_scheme,
max_attempts=1,
request_timeout=request_timeout,
)
with pytest.raises(requests.exceptions.Timeout):
req.get(url)
with pytest.raises(requests.exceptions.Timeout):
req.post('select 1')
httpretty.disable()
httpretty.reset()
def tearDown(self):
os.remove("line_file.json")
os.remove("array_file.json")
os.remove("object_file.json")
os.remove("block_file.json")
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.disable()
httpretty.reset()
def tearDown(self):
os.remove("test.csv")
httpretty.disable()
httpretty.reset()
def cleanUp(self):
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.reset()
httpretty.disable()
def tearDown(self):
httpretty.reset()
httpretty.disable()
def setUp(self):
httpretty.reset()
httpretty.disable()
if not hasattr(self, 'owner'):
self.owner = utils.createUser()
self.ownedDomains = [utils.createDomain(self.owner), utils.createDomain(self.owner)]
self.token = utils.createToken(user=self.owner)
self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
self.otherOwner = utils.createUser()
self.otherDomains = [utils.createDomain(self.otherOwner), utils.createDomain()]
self.otherToken = utils.createToken(user=self.otherOwner)
def tearDown(self):
httpretty.reset()
httpretty.disable()
def tearDown(self):
httpretty.reset()
httpretty.disable()
def _patched_get_content_and_type(self, url, harvest_job, page=1, content_type=None):
httpretty.enable()
value1, value2 = original_get_content_and_type(self, url, harvest_job, page, content_type)
httpretty.disable()
return value1, value2
def tearDown(self):
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.disable()
httpretty.reset()
test_text_monitor.py 文件源码
项目:simple-environment-monitor-system
作者: diegorubin
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def tearDown(self):
httpretty.disable()
httpretty.reset()
test_socket_port_monitor.py 文件源码
项目:simple-environment-monitor-system
作者: diegorubin
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_alive(self):
httpretty.enable()
httpretty.register_uri(httpretty.GET, SERVICE_URL, body="LIVE")
monitor = SocketPortMonitor(SERVICE_HOST, port=SERVICE_PORT);
self.assertTrue(monitor.alive())
httpretty.disable()
httpretty.reset()
test_socket_port_monitor.py 文件源码
项目:simple-environment-monitor-system
作者: diegorubin
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_check_alive(self):
httpretty.enable()
httpretty.register_uri(httpretty.GET, SERVICE_URL, body="LIVE")
self.assertTrue(check_alive('SocketPortMonitor', SERVICE_HOST, port='8888'))
httpretty.disable()
httpretty.reset()
test_http_status_monitor.py 文件源码
项目:simple-environment-monitor-system
作者: diegorubin
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def tearDown(self):
httpretty.disable()
httpretty.reset()
def teardown_function(function):
httpretty.disable()
def httpretty():
httpretty_module.reset()
httpretty_module.enable()
yield httpretty_module
httpretty_module.disable()
def step_impl(context):
d = context.bad_login
assert d['source']['username'] != 'ValidUser'
httpretty.enable()
httpretty.register_uri(
httpretty.POST,
context.good_build_url,
body='{"errors":[{"context":null,"message":"Authentication failed. Please check your credentials and try again.","exceptionName":"com.atlassian.bitbucket.auth.IncorrectPasswordAuthenticationException"}]}',
status=401
)
from scripts.bitbucket import post_result, err
result = post_result(context.good_build_url, d['source']['username'], d['source']['password'], False, good_status_dict(), True)
assert result.status_code == 401
httpretty.disable()
httpretty.reset()
def step_impl(context):
d = context.good_login
assert context.good_login['source']['username'] == 'ValidUser'
httpretty.enable()
httpretty.register_uri(
httpretty.POST,
context.bad_build_url,
body='{"errors":[{"context":null,"message":"You are not permitted to access this resource","exceptionName":"com.atlassian.bitbucket.AuthorisationException"}]}',
status=403
)
from scripts.bitbucket import post_result, err
result = post_result(context.bad_build_url, d['source']['username'], d['source']['password'], False, good_status_dict(), True)
assert result.status_code == 403
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.disable() # disable afterwards, so that you will have no problems in code that uses that socket module
httpretty.reset()
def tearDown(self):
"""Disable afterwards to let other code use that socket module"""
httpretty.disable()
httpretty.reset()
# -----
# About
# -----
def cleanup_httpretty(self):
httpretty.reset()
httpretty.disable()
def test_httpretty_bypasses_when_disabled(context):
"httpretty should bypass all requests by disabling it"
httpretty.register_uri(
httpretty.GET, "http://localhost:{0}/go-for-bubbles/".format(context.http_port),
body="glub glub")
httpretty.disable()
fd = urllib2.urlopen('http://localhost:{0}/go-for-bubbles/'.format(context.http_port))
got1 = fd.read()
fd.close()
expect(got1).to.equal(
b'. o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o .')
fd = urllib2.urlopen('http://localhost:{0}/come-again/'.format(context.http_port))
got2 = fd.read()
fd.close()
expect(got2).to.equal(b'<- HELLO WORLD ->')
httpretty.enable()
fd = urllib2.urlopen('http://localhost:{0}/go-for-bubbles/'.format(context.http_port))
got3 = fd.read()
fd.close()
expect(got3).to.equal(b'glub glub')
core.POTENTIAL_HTTP_PORTS.remove(context.http_port)
def tearDown(self):
httpretty.disable()
httpretty.reset()
def tearDown(self):
httpretty.disable()