def setUp(self):
self.resrc = SimpleResource()
self.resrc.putChild('', self.resrc)
self.site = server.Site(self.resrc)
self.site = server.Site(self.resrc)
self.site.logFile = log.logfile
# HELLLLLLLLLLP! This harness is Very Ugly.
self.channel = self.site.buildProtocol(None)
self.transport = http.StringTransport()
self.transport.close = lambda *a, **kw: None
self.transport.disconnecting = lambda *a, **kw: 0
self.transport.getPeer = lambda *a, **kw: "peer"
self.transport.getHost = lambda *a, **kw: "host"
self.channel.makeConnection(self.transport)
for l in ["GET / HTTP/1.1",
"Accept: text/html"]:
self.channel.lineReceived(l)
python类Site()的实例源码
def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
d = client.getPage("http://127.0.0.1:%d/here/there" % \
self.port2.getHost().port)
d.addCallback(self.failUnlessEqual, 'root')
return d
def setUp(self):
name = str(id(self)) + "_webclient"
if not os.path.exists(name):
os.mkdir(name)
f = open(os.path.join(name, "file"), "wb")
f.write("0123456789")
f.close()
r = static.File(name)
r.putChild("redirect", util.Redirect("/file"))
r.putChild("wait", LongTimeTakingResource())
r.putChild("error", ErrorResource())
r.putChild("nolength", NoLengthResource())
r.putChild("host", HostHeaderResource())
r.putChild("payload", PayloadResource())
r.putChild("broken", BrokenDownloadResource())
site = server.Site(r, timeout=None)
self.port = self._listen(site)
self.portno = self.port.getHost().port
def setUp(self):
plainRoot = static.Data('not me', 'text/plain')
tlsRoot = static.Data('me neither', 'text/plain')
plainSite = server.Site(plainRoot, timeout=None)
tlsSite = server.Site(tlsRoot, timeout=None)
from twisted import test
self.tlsPort = reactor.listenSSL(0, tlsSite,
contextFactory=ssl.DefaultOpenSSLContextFactory(
sibpath(test.__file__, 'server.pem'),
sibpath(test.__file__, 'server.pem'),
),
interface="127.0.0.1")
self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")
self.plainPortno = self.plainPort.getHost().port
self.tlsPortno = self.tlsPort.getHost().port
plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def run(self):
"""setup the site, start listening on port, setup the looping call to
:py:meth:`~.update_active_node` every ``self.poll_interval`` seconds,
and start the Twisted reactor"""
# get the active node before we start anything...
self.active_node_ip_port = self.get_active_node()
if self.active_node_ip_port is None:
logger.critical("ERROR: Could not get active vault node from "
"Consul. Exiting.")
raise SystemExit(3)
logger.warning("Initial Vault active node: %s",
self.active_node_ip_port)
site = Site(VaultRedirectorSite(self))
# setup our HTTP(S) listener
if self.tls_factory is not None:
self.listentls(site)
else:
self.listentcp(site)
# setup the update_active_node poll every POLL_INTERVAL seconds
self.add_update_loop()
logger.warning('Starting Twisted reactor (event loop)')
self.run_reactor()
def main():
# Setup the blockchain
blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
Blockchain.RegisterBlockchain(blockchain)
dbloop = task.LoopingCall(Blockchain.Default().PersistBlocks)
dbloop.start(.1)
NodeLeader.Instance().Start()
# Disable smart contract events for external smart contracts
settings.set_log_smart_contract_events(False)
# Start a thread with custom code
d = threading.Thread(target=custom_background_code)
d.setDaemon(True) # daemonizing the thread will kill it when the main thread is quit
d.start()
# Hook up Klein API to Twisted reactor
endpoint_description = "tcp:port=%s:interface=localhost" % API_PORT
endpoint = endpoints.serverFromString(reactor, endpoint_description)
endpoint.listen(Site(app.resource()))
# Run all the things (blocking call)
logger.info("Everything setup and running. Waiting for events...")
reactor.run()
logger.info("Shutting down.")
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def setUp(self):
self.resrc = SimpleResource()
self.resrc.putChild('', self.resrc)
self.site = server.Site(self.resrc)
self.site = server.Site(self.resrc)
self.site.logFile = log.logfile
# HELLLLLLLLLLP! This harness is Very Ugly.
self.channel = self.site.buildProtocol(None)
self.transport = http.StringTransport()
self.transport.close = lambda *a, **kw: None
self.transport.disconnecting = lambda *a, **kw: 0
self.transport.getPeer = lambda *a, **kw: "peer"
self.transport.getHost = lambda *a, **kw: "host"
self.channel.makeConnection(self.transport)
for l in ["GET / HTTP/1.1",
"Accept: text/html"]:
self.channel.lineReceived(l)
def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
d = client.getPage("http://127.0.0.1:%d/here/there" % \
self.port2.getHost().port)
d.addCallback(self.failUnlessEqual, 'root')
return d
def setUp(self):
name = str(id(self)) + "_webclient"
if not os.path.exists(name):
os.mkdir(name)
f = open(os.path.join(name, "file"), "wb")
f.write("0123456789")
f.close()
r = static.File(name)
r.putChild("redirect", util.Redirect("/file"))
r.putChild("wait", LongTimeTakingResource())
r.putChild("error", ErrorResource())
r.putChild("nolength", NoLengthResource())
r.putChild("host", HostHeaderResource())
r.putChild("payload", PayloadResource())
r.putChild("broken", BrokenDownloadResource())
site = server.Site(r, timeout=None)
self.port = self._listen(site)
self.portno = self.port.getHost().port
def setUp(self):
plainRoot = static.Data('not me', 'text/plain')
tlsRoot = static.Data('me neither', 'text/plain')
plainSite = server.Site(plainRoot, timeout=None)
tlsSite = server.Site(tlsRoot, timeout=None)
from twisted import test
self.tlsPort = reactor.listenSSL(0, tlsSite,
contextFactory=ssl.DefaultOpenSSLContextFactory(
sibpath(test.__file__, 'server.pem'),
sibpath(test.__file__, 'server.pem'),
),
interface="127.0.0.1")
self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")
self.plainPortno = self.plainPort.getHost().port
self.tlsPortno = self.tlsPort.getHost().port
plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('resource')
args = parser.parse_args()
module_name, name = args.resource.rsplit('.', 1)
sys.path.append('.')
resource = getattr(import_module(module_name), name)()
http_port = reactor.listenTCP(PORT, Site(resource))
def print_listening():
host = http_port.getHost()
print('Mock server {} running at http://{}:{}'.format(
resource, host.host, host.port))
reactor.callWhenRunning(print_listening)
reactor.run()
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
self = cls()
self.n = node.Node(factory, bitcoind, [], [], net)
yield self.n.start()
self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
self.n.p2p_node.start()
wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
self.wb = wb
web_root = resource.Resource()
worker_interface.WorkerInterface(wb).attach_to(web_root)
self.web_port = reactor.listenTCP(0, server.Site(web_root))
defer.returnValue(self)
def main():
parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus')
parser.add_argument('-c', '--config', dest='config_file',
default='config.yml', help="configuration file")
parser.add_argument('-p', '--port', dest='port', type=int,
default=9272, help="HTTP port to expose metrics")
args = parser.parse_args()
# Start up the server to expose the metrics.
root = Resource()
root.putChild(b'metrics', VMWareMetricsResource(args))
factory = Site(root)
print("Starting web server on port {}".format(args.port))
reactor.listenTCP(args.port, factory)
reactor.run()
def setUp(self):
yield super(TestBwscan, self).setUp()
yield setconf_fetch_all_descs(self.tor)
class DummyResource(Resource):
isLeaf = True
def render_GET(self, request):
size = request.uri.split('/')[-1]
if 'k' in size:
size = int(size[:-1])*(2**10)
elif 'M' in size:
size = int(size[:-1])*(2**20)
return 'a'*size
self.port = yield available_tcp_port(reactor)
self.test_service = yield reactor.listenTCP(
self.port, Site(DummyResource()))
def setUp(self):
yield super(TestStreamBandwidthListener, self).setUp()
self.fetch_size = 8*2**20 # 8MB
self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor)
class DummyResource(Resource):
isLeaf = True
def render_GET(self, request):
return 'a'*8*2**20
self.port = yield available_tcp_port(reactor)
self.site = Site(DummyResource())
self.test_service = yield reactor.listenTCP(self.port, self.site)
self.not_enough_measurements = NotEnoughMeasurements(
"Not enough measurements to calculate STREAM_BW samples.")
def startService(self):
"""
Start TCP listener on designated HTTP port,
serving ``HttpChannelContainer`` as root resource.
"""
# Don't start service twice
if self.running == 1:
return
self.running = 1
# Prepare startup
http_listen = self.settings.kotori.http_listen
http_port = int(self.settings.kotori.http_port)
log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port)
# Configure root Site object and start listening to requests.
# This must take place only once - can't bind to the same port multiple times!
factory = LocalSite(self.root)
reactor.listenTCP(http_port, factory, interface=http_listen)
def boot_frontend(config, debug=False):
"""
Boot a Pyramid WSGI application as Twisted component
"""
http_port = int(config.get('config-web', 'http_port'))
websocket_uri = unicode(config.get('wamp', 'listen'))
# https://stackoverflow.com/questions/13122519/serving-pyramid-application-using-twistd/13138610#13138610
config = resource_filename('kotori.frontend', 'development.ini')
application = get_app(config, 'main')
# https://twistedmatrix.com/documents/13.1.0/web/howto/web-in-60/wsgi.html
resource = WSGIResource(reactor, reactor.getThreadPool(), application)
reactor.listenTCP(http_port, Site(resource))
def test_sessionUIDGeneration(self):
"""
L{site.getSession} generates L{Session} objects with distinct UIDs from
a secure source of entropy.
"""
site = server.Site(resource.Resource())
# Ensure that we _would_ use the unpredictable random source if the
# test didn't stub it.
self.assertIdentical(site._entropy, os.urandom)
def predictableEntropy(n):
predictableEntropy.x += 1
return (unichr(predictableEntropy.x) * n).encode("charmap")
predictableEntropy.x = 0
self.patch(site, "_entropy", predictableEntropy)
a = self.getAutoExpiringSession(site)
b = self.getAutoExpiringSession(site)
self.assertEqual(a.uid, b"01" * 0x20)
self.assertEqual(b.uid, b"02" * 0x20)
# This functionality is silly (the value is no longer used in session
# generation), but 'counter' was a public attribute since time
# immemorial so we should make sure if anyone was using it to get site
# metrics or something it keeps working.
self.assertEqual(site.counter, 2)
def setUp(self):
self.resrc = SimpleResource()
self.resrc.putChild(b'', self.resrc)
self.resrc.putChild(b'with-content-type', SimpleResource(b'image/jpeg'))
self.site = server.Site(self.resrc)
self.site.startFactory()
self.addCleanup(self.site.stopFactory)
# HELLLLLLLLLLP! This harness is Very Ugly.
self.channel = self.site.buildProtocol(None)
self.transport = http.StringTransport()
self.transport.close = lambda *a, **kw: None
self.transport.disconnecting = lambda *a, **kw: 0
self.transport.getPeer = lambda *a, **kw: "peer"
self.transport.getHost = lambda *a, **kw: "host"
self.channel.makeConnection(self.transport)
def test_processingFailedNoTraceback(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{False} does not write out the failure, but give a generic error
message.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = False
fail = failure.Failure(Exception("Oh no!"))
request.processingFailed(fail)
self.assertNotIn(b"Oh no!", request.transport.written.getvalue())
self.assertIn(
b"Processing Failed", request.transport.written.getvalue()
)
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTracebackHandlesUnicode(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{True} writes out the failure, making UTF-8 items into HTML
entities.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = True
fail = failure.Failure(Exception(u"\u2603"))
request.processingFailed(fail)
self.assertIn(b"☃", request.transport.written.getvalue())
# On some platforms, we get a UnicodeError when trying to
# display the Failure with twisted.python.log because
# the default encoding cannot display u"\u2603". Windows for example
# uses a default encodig of cp437 which does not support u"\u2603".
self.flushLoggedErrors(UnicodeError)
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self):
"""
L{Request.session} and L{Request.secure_session} should be two separate
sessions with unique ids and different cookies.
"""
d = DummyChannel()
d.transport = DummyChannel.SSL()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.sitepath = []
secureSession = request.getSession()
self.assertIsNotNone(secureSession)
self.addCleanup(secureSession.expire)
self.assertEqual(request.cookies[0].split(b"=")[0],
b"TWISTED_SECURE_SESSION")
session = request.getSession(forceNotSecure=True)
self.assertIsNotNone(session)
self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION")
self.addCleanup(session.expire)
self.assertNotEqual(session.uid, secureSession.uid)
def test_sessionAttribute(self):
"""
On a L{Request}, the C{session} attribute retrieves the associated
L{Session} only if it has been initialized. If the request is secure,
it retrieves the secure session.
"""
site = server.Site(resource.Resource())
d = DummyChannel()
d.transport = DummyChannel.SSL()
request = server.Request(d, 1)
request.site = site
request.sitepath = []
self.assertIs(request.session, None)
insecureSession = request.getSession(forceNotSecure=True)
self.addCleanup(insecureSession.expire)
self.assertIs(request.session, None)
secureSession = request.getSession()
self.addCleanup(secureSession.expire)
self.assertIsNot(secureSession, None)
self.assertIsNot(secureSession, insecureSession)
self.assertIs(request.session, secureSession)
def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
agent = client.Agent(reactor)
d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
(self.port2.getHost().port,))
d.addCallback(client.readBody)
d.addCallback(self.assertEqual, 'root')
return d
def setUp(self):
plainRoot = Data(b'not me', 'text/plain')
tlsRoot = Data(b'me neither', 'text/plain')
plainSite = server.Site(plainRoot, timeout=None)
tlsSite = server.Site(tlsRoot, timeout=None)
self.tlsPort = reactor.listenSSL(
0, tlsSite,
contextFactory=ssl.DefaultOpenSSLContextFactory(
serverPEMPath, serverPEMPath),
interface="127.0.0.1")
self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")
self.plainPortno = self.plainPort.getHost().port
self.tlsPortno = self.tlsPort.getHost().port
plainRoot.putChild(b'one', Redirect(self.getHTTPS('two')))
tlsRoot.putChild(b'two', Redirect(self.getHTTP('three')))
plainRoot.putChild(b'three', Redirect(self.getHTTPS('four')))
tlsRoot.putChild(b'four', Data(b'FOUND IT!', 'text/plain'))